-
Notifications
You must be signed in to change notification settings - Fork 6
feat(inkless:consume): add rate-limited fetch for lagging consumers #467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds rate-limiting capabilities for lagging consumers (consumers reading old data) to prevent resource exhaustion when multiple consumers fall behind. The implementation separates recent and lagging consumer data paths, with lagging consumers subjected to request-based rate limiting using the Bucket4j library.
Key changes:
- Adds a new lagging consumer thread pool and rate limiter to manage fetch requests for old data
- Introduces age-based path separation in FetchPlanner using a configurable threshold (defaulting to 2x cache expiration lifespan)
- Implements request-based rate limiting via Bucket4j with configurable limits (default 200 requests/second)
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| FetchPlanner.java | Implements age-based path separation and rate limiting logic; lagging path bypasses cache |
| Reader.java | Adds lagging consumer executor, threshold, and rate limiter initialization; updated constructors |
| InklessFetchMetrics.java | Adds metrics for tracking recent/lagging requests and rate limit wait times |
| InklessConfig.java | Defines configuration for lagging consumer thread pool size and rate limit |
| FetchHandler.java | Passes new configuration parameters to Reader constructor |
| FetchException.java | Adds constructor accepting message and cause for rate limit interruptions |
| FetchPlannerTest.java | Adds tests for lagging consumer path, recent path, and rate limiting behavior |
| ReaderTest.java | Refactors to use helper method for Reader construction with new parameters |
| build.gradle | Adds Bucket4j dependency |
| gradle/dependencies.gradle | Defines Bucket4j version 8.14.0 |
Comments suppressed due to low confidence (1)
storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java:101
- The test reuses the same dataExecutor for both recent and lagging consumer paths, but doesn't verify that the lagging consumer executor is properly shut down. The testClose should verify that all three executors (metadataExecutor, dataExecutor, and laggingConsumerExecutor) are shut down, but since the lagging consumer executor is the same instance as dataExecutor in tests, this doesn't catch potential issues where the lagging consumer executor might not be shut down in production.
@Test
public void testClose() throws Exception {
final var reader = getReader();
reader.close();
verify(metadataExecutor, atLeastOnce()).shutdown();
verify(dataExecutor, atLeastOnce()).shutdown();
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/consume/InklessFetchMetrics.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
6f06aff to
d4c4bc4
Compare
282a419 to
fc5bae4
Compare
31b9aa9 to
f6be2a5
Compare
d768ef9 to
4155e45
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 10 out of 11 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (1)
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java:255
- The javadoc comment states "This method is called by the cache (on dataExecutor) when a cache miss occurs" but this is no longer accurate after the lagging consumer changes. The method is now called from two different paths: (1) by the cache on recentDataExecutor for recent data (line 203), and (2) directly from the lagging consumer executor without caching for lagging data (line 217). The documentation should be updated to reflect both calling contexts.
/**
* Fetches a file extent from remote storage.
* This method is called by the cache (on dataExecutor) when a cache miss occurs.
*
* @param request the fetch request with object key and byte range
* @return the fetched file extent
* @throws FetchException if remote fetch fails
*/
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
4155e45 to
d55d019
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 10 out of 11 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/consume/FetchPlannerTest.java
Outdated
Show resolved
Hide resolved
9e79e59 to
535f9cb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 10 out of 11 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fda3624 to
0ae3314
Compare
f6be2a5 to
94ed0bf
Compare
0ae3314 to
a4f0131
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 12 out of 13 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/test/java/io/aiven/inkless/config/InklessConfigTest.java
Outdated
Show resolved
Hide resolved
2713196 to
3285a0e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 16 out of 17 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java
Show resolved
Hide resolved
|
Applying fixup commits to simplify review. |
91d7ad4 to
5ec14d4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 16 out of 17 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java
Show resolved
Hide resolved
5ec14d4 to
e52d6d1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 16 out of 17 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/consume/InklessFetchMetrics.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 16 out of 17 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 16 out of 17 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
23746f2 to
7c76073
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 16 out of 17 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/config/InklessConfig.java
Outdated
Show resolved
Hide resolved
Implements hot/cold path separation to protect Inkless from load spikes caused by lagging consumers while maintaining performance for recent consumers. Hot Path (Recent Data): - Uses object cache for fast repeated access - Dedicated executor pool - Prioritizes low-latency for up-to-date consumers Cold Path (Lagging Consumers): - Bypasses cache to avoid evicting hot data - Dedicated executor pool (default: half of hot-path pool) with bounded queue - Optional rate limiting (default 200 req/s) to control S3 costs and CPU usage - Separate storage client for resource isolation - AbortPolicy for backpressure: queue full → RejectedExecutionException → Kafka error handler → consumer backs off via fetch purgatory Path Selection: - Age threshold: Data older than cache TTL (default 60s) uses cold path - Configurable via fetch.lagging.consumer.threshold.ms (-1 = auto) - Based on batch timestamp (max across partitions in same object) - Handles future timestamps (clock skew) by treating as recent data Backpressure Mechanism: - Bounded queue prevents OOM (1600 tasks ≈ 3.2MB, ~8 seconds buffer) - AbortPolicy rejects tasks when full (no request handler blocking) - Only RejectedExecutionException tracked as rejections (capacity/backpressure) - InterruptedException treated as fetch operation failure, not rejection - Exceptions propagate to Kafka error handler (no custom empty responses) - Consumers receive error → fetch purgatory backoff → natural retry Fixes partial failure scenario: Reader#allOfFileExtents now handles partial failures gracefully and ensure that successful partitions are returned. Robustness Improvements: - Null safety: Validates laggingObjectFetcher is non-null when executor exists - Validation: FindBatchesJob validates response count matches request count - Edge cases: Handles future timestamps (clock skew) and empty batch coordinates - Resource management: Rate limiter only created when lagging consumer feature enabled Monitoring Metrics: - LaggingConsumerRequestRate: cold path request rate - LaggingConsumerRejectedRate: rejection events (queue full/shutdown) - LaggingRateLimitWaitTime: rate limiting latency histogram - RecentDataRequestRate: hot path request rate Configuration: - fetch.lagging.consumer.thread.pool.size (default: 16) Controls cold path concurrency - fetch.lagging.consumer.request.rate.limit (default: 200 req/s) Controls S3 request rate (set 0 to disable) - fetch.lagging.consumer.threshold.ms (default: -1 = cache TTL) Age threshold for hot/cold path selection -1 uses cache TTL automatically (validated to always be >= cache lifespan) Design Trade-offs: - Separate storage client doubles connection pools but prevents hot path degradation from cold path bursts - Cache bypass for cold path means no deduplication across lagging consumers, acceptable because objects are multi-partition blobs (future: split by partition may allow to consider enabling caching)
7c76073 to
afd9620
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 16 out of 17 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java
Show resolved
Hide resolved
Reformat to align style and reorder methods for clarity
Cover for potential bug where multiple file extents are found for the same object, and the first fails but the second succeeds, potentially returning incomplete data. This fixup includes the additional data to keep to enable an ordered response on the FetchCompleter, and properly serve up to the available data in order.
1e3036c to
5b4dfa5
Compare
This PR implements hot/cold path separation to protect Inkless from load spikes caused by lagging consumers while maintaining performance for recent consumers.
Problem
When consumers fall behind (e.g., during catchup after downtime, backfills, or replays), they create significant load spikes on the storage backend by fetching old data that's not in cache. This can:
Current mitigation attempts (increasing cache size, aggressive caching) are insufficient because:
Solution
This PR implements a two-tier fetch architecture with hot/cold path separation:
Hot Path (Recent Data)
Note: The metadata executor (
fetch.metadata.thread.pool.size) is shared between hot and cold paths. The hot/cold separation only applies to data fetching (after metadata is retrieved). For workloads with significant lagging consumer traffic, consider increasingfetch.metadata.thread.pool.sizeproportionally to the combined data thread pool sizes (fetch.data.thread.pool.size + fetch.lagging.consumer.thread.pool.size) to prevent metadata fetching from becoming a bottleneck.Cold Path (Lagging Consumers)
Path Selection Logic
Separation based on data age, not consumer lag, because:
Why Cache TTL as Default?
The default threshold of
-1(auto) uses 1x cache TTL, not 3x:Rationale:
Validation: When set to explicit value (not -1), threshold must be >= cache lifespan to avoid routing cached data to cold path. The -1 heuristic is validated to always satisfy this constraint automatically.
Fixes issue with allOf where there are partial failures
Reader#allOfFileExtents now handles partial failures gracefully and ensure that successful partitions are returned.
Key Design Decisions
1. Cache Bypass for Cold Path
Why bypass cache?
Future consideration: Split files by partition to enable cold path caching and deduplication.
2. Separate Storage Client
Why separate ObjectFetcher instance?
Trade-off: Doubles connection pools (~500KB each), but prevents hot path degradation from cold path bursts.
3. Rate Limiting
Why request-based (not bytes-based)?
Default 200 req/s rationale:
Robustness Improvements
This PR includes several defensive programming improvements:
laggingObjectFetcheris non-null when executor exists to prevent NPE ifFetchPlanneris constructed incorrectlyFindBatchesJobvalidates response count matches request count to prevent index errors and incorrect mappingConfiguration
Three new broker configs under
inkless.:fetch.lagging.consumer.thread.pool.sizefetch.lagging.consumer.threshold.ms-1= auto (uses cache TTL). Set explicit value to override. Validation: Explicit values must be >= cache lifespan.fetch.lagging.consumer.request.rate.limitThread pool sizing:
Metrics
Four new metrics for monitoring hot/cold path behavior:
RecentDataRequestRateLaggingConsumerRequestRateLaggingConsumerRejectedRateLaggingRateLimitWaitTimeMetric relationships:
LaggingRateLimitWaitTime.rate ≈ LaggingConsumerRequestRateLaggingRateLimitWaitTime.rate = 0LaggingConsumerRejectedRate→ consider increasing thread pool size or rate limitMigration Path
Feature is opt-in via configuration:
fetch.lagging.consumer.threshold.msnot setRollback plan:
fetch.lagging.consumer.thread.pool.size = 0to disable featurefetch.lagging.consumer.threshold.ms = -1with very large cache TTLPerformance benchmarks
The following OMB metrics show a before and after of this feature: