Skip to content

Conversation

@jeqo
Copy link
Contributor

@jeqo jeqo commented Dec 23, 2025

This PR implements hot/cold path separation to protect Inkless from load spikes caused by lagging consumers while maintaining performance for recent consumers.

Problem

When consumers fall behind (e.g., during catchup after downtime, backfills, or replays), they create significant load spikes on the storage backend by fetching old data that's not in cache. This can:

  • Overwhelm object storage with high request rates (potential Network/S3 throttling)
  • Saturate network bandwidth with cold data fetches
  • Pollute cache by evicting hot data with stale objects
  • Degrade performance for up-to-date consumers due to resource contention

Current mitigation attempts (increasing cache size, aggressive caching) are insufficient because:

  1. Objects contain data from multiple partitions (not split by partition)
  2. Caching old data evicts recent data, hurting normal consumers
  3. No mechanism exists to prioritize recent consumers over lagging ones
  4. No backpressure to prevent lagging consumer storms

Solution

This PR implements a two-tier fetch architecture with hot/cold path separation:

Hot Path (Recent Data)

  • Uses object cache for fast repeated access
  • Dedicated executor pool (32 threads)
  • Prioritizes low-latency for up-to-date consumers
  • No rate limiting

Note: The metadata executor (fetch.metadata.thread.pool.size) is shared between hot and cold paths. The hot/cold separation only applies to data fetching (after metadata is retrieved). For workloads with significant lagging consumer traffic, consider increasing fetch.metadata.thread.pool.size proportionally to the combined data thread pool sizes (fetch.data.thread.pool.size + fetch.lagging.consumer.thread.pool.size) to prevent metadata fetching from becoming a bottleneck.

Cold Path (Lagging Consumers)

  • Bypasses cache to avoid evicting hot data
  • Dedicated bounded executor pool (16 threads, 1600 queue capacity)
  • Optional rate limiting (default 200 req/s) to control S3 costs
  • Separate storage client for resource isolation
  • AbortPolicy backpressure: queue full → RejectedExecutionException → Kafka error handler → consumer backs off via fetch purgatory

Path Selection Logic

final long dataAge = Math.max(0, currentTime - batchTimestamp); // Handles clock skew
final boolean isLagging = laggingFeatureEnabled && (dataAge > threshold);

Separation based on data age, not consumer lag, because:

  • Batch timestamp is readily available in metadata
  • Threshold can be tuned independently from cache settings
  • Default heuristic: cache TTL (not 3x) to align with cache lifecycle
  • Ensures cache hits possible before data is considered "lagging"
  • Handles future timestamps (clock skew) by treating as recent data

Why Cache TTL as Default?

The default threshold of -1 (auto) uses 1x cache TTL, not 3x:

// If -1, use cache TTL as threshold (data stays "recent" while in cache)
long effectiveThreshold = threshold == -1 ? cache.ttlMs() : threshold;

Rationale:

  • Data in cache is "recent" by definition
  • Provides grace period for cache warm-up and consumer lag variations
  • Operators can override based on workload (set explicit ms value)
  • Aligns backpressure with cache lifecycle

Validation: When set to explicit value (not -1), threshold must be >= cache lifespan to avoid routing cached data to cold path. The -1 heuristic is validated to always satisfy this constraint automatically.

Fixes issue with allOf where there are partial failures

Reader#allOfFileExtents now handles partial failures gracefully and ensure that successful partitions are returned.

Key Design Decisions

1. Cache Bypass for Cold Path

Why bypass cache?

  • Objects are multi-partition blobs (not split by partition)
  • Caching a lagging consumer's request caches mostly irrelevant data for that consumer
  • Backfill consumers should use large batch sizes (read full objects, not partial ranges)
  • Caching cold data evicts hot data, degrading recent consumer performance

Future consideration: Split files by partition to enable cold path caching and deduplication.

2. Separate Storage Client

Why separate ObjectFetcher instance?

  • Connection pool isolation: Prevents lagging consumer bursts from exhausting connections for hot path
  • HTTP/2 multiplexing limits: Prevents cold path from saturating per-connection stream limits (~100 for S3)
  • Rate limiting granularity: Some backends apply rate limits per client/credential
  • Failure isolation: Cold path throttling/errors don't propagate to hot path

Trade-off: Doubles connection pools (~500KB each), but prevents hot path degradation from cold path bursts.

3. Rate Limiting

Why request-based (not bytes-based)?

  • Objects contain multi-partition data (size ≠ relevance)
  • S3 costs are per-request
  • Prevents request storms more effectively than byte limits

Default 200 req/s rationale:

  • Sustainable S3 GET rate without throttling
  • Bucket4j allows short bursts up to capacity (200 tokens)

Robustness Improvements

This PR includes several defensive programming improvements:

  • Null safety: Validates laggingObjectFetcher is non-null when executor exists to prevent NPE if FetchPlanner is constructed incorrectly
  • Validation: FindBatchesJob validates response count matches request count to prevent index errors and incorrect mapping
  • Edge cases: Handles future timestamps (clock skew) and empty batch coordinates gracefully
  • Resource management: Rate limiter only created when lagging consumer feature enabled (avoids creating unused objects when feature is disabled)

Configuration

Three new broker configs under inkless.:

Config Default Description
fetch.lagging.consumer.thread.pool.size 16 Thread pool size for lagging consumers. Set to 0 to disable feature entirely.
fetch.lagging.consumer.threshold.ms -1 Time threshold (ms) to distinguish recent vs lagging. -1 = auto (uses cache TTL). Set explicit value to override. Validation: Explicit values must be >= cache lifespan.
fetch.lagging.consumer.request.rate.limit 200 Max requests/sec for lagging consumers. Set to 0 to disable rate limiting.

Thread pool sizing:

  • 16 threads = half of hot path (32), adequate for cold storage access patterns
  • Bounded queue: 16 × 100 = 1600 tasks (~3.2MB, ~8 seconds buffer at 200 req/s)
  • AbortPolicy engages when queue full (no request handler blocking)

Metrics

Four new metrics for monitoring hot/cold path behavior:

Metric Type Description
RecentDataRequestRate Meter Requests using hot path (recent data with cache)
LaggingConsumerRequestRate Meter Requests using cold path (all lagging requests, regardless of rate limiting)
LaggingConsumerRejectedRate Meter Rejection events (queue full or executor shutdown). Only RejectedExecutionException tracked as rejections; InterruptedException treated as fetch operation failure, not rejection.
LaggingRateLimitWaitTime Histogram Rate limit wait times (includes zero-wait to avoid histogram bias)

Metric relationships:

  • When rate limiting enabled: LaggingRateLimitWaitTime.rate ≈ LaggingConsumerRequestRate
  • When rate limiting disabled: LaggingRateLimitWaitTime.rate = 0
  • High LaggingConsumerRejectedRate → consider increasing thread pool size or rate limit

Migration Path

Feature is opt-in via configuration:

  • Default behavior unchanged if fetch.lagging.consumer.threshold.ms not set
  • Safe to deploy without config changes
  • Operators can enable gradually:
    1. Set threshold to very high value (only extreme lag uses cold path)
    2. Monitor metrics to understand lagging consumer patterns
    3. Tune threshold, thread pool size, and rate limit based on workload

Rollback plan:

  • Set fetch.lagging.consumer.thread.pool.size = 0 to disable feature
  • Or set fetch.lagging.consumer.threshold.ms = -1 with very large cache TTL

Performance benchmarks

The following OMB metrics show a before and after of this feature:

image

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds rate-limiting capabilities for lagging consumers (consumers reading old data) to prevent resource exhaustion when multiple consumers fall behind. The implementation separates recent and lagging consumer data paths, with lagging consumers subjected to request-based rate limiting using the Bucket4j library.

Key changes:

  • Adds a new lagging consumer thread pool and rate limiter to manage fetch requests for old data
  • Introduces age-based path separation in FetchPlanner using a configurable threshold (defaulting to 2x cache expiration lifespan)
  • Implements request-based rate limiting via Bucket4j with configurable limits (default 200 requests/second)

Reviewed changes

Copilot reviewed 9 out of 10 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
FetchPlanner.java Implements age-based path separation and rate limiting logic; lagging path bypasses cache
Reader.java Adds lagging consumer executor, threshold, and rate limiter initialization; updated constructors
InklessFetchMetrics.java Adds metrics for tracking recent/lagging requests and rate limit wait times
InklessConfig.java Defines configuration for lagging consumer thread pool size and rate limit
FetchHandler.java Passes new configuration parameters to Reader constructor
FetchException.java Adds constructor accepting message and cause for rate limit interruptions
FetchPlannerTest.java Adds tests for lagging consumer path, recent path, and rate limiting behavior
ReaderTest.java Refactors to use helper method for Reader construction with new parameters
build.gradle Adds Bucket4j dependency
gradle/dependencies.gradle Defines Bucket4j version 8.14.0
Comments suppressed due to low confidence (1)

storage/inkless/src/test/java/io/aiven/inkless/consume/ReaderTest.java:101

  • The test reuses the same dataExecutor for both recent and lagging consumer paths, but doesn't verify that the lagging consumer executor is properly shut down. The testClose should verify that all three executors (metadataExecutor, dataExecutor, and laggingConsumerExecutor) are shut down, but since the lagging consumer executor is the same instance as dataExecutor in tests, this doesn't catch potential issues where the lagging consumer executor might not be shut down in production.
    @Test
    public void testClose() throws Exception {
        final var reader = getReader();
        reader.close();
        verify(metadataExecutor, atLeastOnce()).shutdown();
        verify(dataExecutor, atLeastOnce()).shutdown();
    }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/only-fetch-on-work-thread branch from 6f06aff to d4c4bc4 Compare December 23, 2025 11:07
@jeqo jeqo force-pushed the jeqo/rate-limit-lag-consumer branch from 282a419 to fc5bae4 Compare December 23, 2025 11:10
@jeqo jeqo force-pushed the jeqo/only-fetch-on-work-thread branch 3 times, most recently from 31b9aa9 to f6be2a5 Compare December 23, 2025 15:16
@jeqo jeqo force-pushed the jeqo/rate-limit-lag-consumer branch 2 times, most recently from d768ef9 to 4155e45 Compare December 24, 2025 01:52
@jeqo jeqo requested a review from Copilot December 24, 2025 01:52
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 11 changed files in this pull request and generated no new comments.

Comments suppressed due to low confidence (1)

storage/inkless/src/main/java/io/aiven/inkless/consume/FetchPlanner.java:255

  • The javadoc comment states "This method is called by the cache (on dataExecutor) when a cache miss occurs" but this is no longer accurate after the lagging consumer changes. The method is now called from two different paths: (1) by the cache on recentDataExecutor for recent data (line 203), and (2) directly from the lagging consumer executor without caching for lagging data (line 217). The documentation should be updated to reflect both calling contexts.
    /**
     * Fetches a file extent from remote storage.
     * This method is called by the cache (on dataExecutor) when a cache miss occurs.
     *
     * @param request the fetch request with object key and byte range
     * @return the fetched file extent
     * @throws FetchException if remote fetch fails
     */

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/rate-limit-lag-consumer branch from 4155e45 to d55d019 Compare December 24, 2025 02:07
@jeqo jeqo requested a review from Copilot December 24, 2025 02:08
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 11 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/rate-limit-lag-consumer branch 2 times, most recently from 9e79e59 to 535f9cb Compare December 24, 2025 08:19
@jeqo jeqo requested a review from Copilot December 24, 2025 08:29
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 10 out of 11 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/rate-limit-lag-consumer branch 2 times, most recently from fda3624 to 0ae3314 Compare December 25, 2025 13:15
@jeqo jeqo force-pushed the jeqo/only-fetch-on-work-thread branch from f6be2a5 to 94ed0bf Compare December 29, 2025 13:30
@jeqo jeqo force-pushed the jeqo/rate-limit-lag-consumer branch from 0ae3314 to a4f0131 Compare December 29, 2025 14:53
@jeqo jeqo requested a review from Copilot December 29, 2025 14:53
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 12 out of 13 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/only-fetch-on-work-thread branch from 2713196 to 3285a0e Compare December 29, 2025 15:10
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 17 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo
Copy link
Contributor Author

jeqo commented Jan 2, 2026

Applying fixup commits to simplify review.

@jeqo jeqo force-pushed the jeqo/rate-limit-lag-consumer branch from 91d7ad4 to 5ec14d4 Compare January 2, 2026 20:59
@jeqo jeqo requested a review from Copilot January 2, 2026 20:59
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 17 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/rate-limit-lag-consumer branch from 5ec14d4 to e52d6d1 Compare January 2, 2026 21:15
@jeqo jeqo requested a review from Copilot January 2, 2026 21:16
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 17 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 17 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 17 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jeqo jeqo force-pushed the jeqo/rate-limit-lag-consumer branch from 23746f2 to 7c76073 Compare January 2, 2026 23:33
@jeqo jeqo requested a review from Copilot January 2, 2026 23:33
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 17 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Implements hot/cold path separation to protect Inkless from load spikes
caused by lagging consumers while maintaining performance for recent
consumers.

Hot Path (Recent Data):
- Uses object cache for fast repeated access
- Dedicated executor pool
- Prioritizes low-latency for up-to-date consumers

Cold Path (Lagging Consumers):
- Bypasses cache to avoid evicting hot data
- Dedicated executor pool (default: half of hot-path pool) with bounded queue
- Optional rate limiting (default 200 req/s) to control S3 costs and CPU
  usage
- Separate storage client for resource isolation
- AbortPolicy for backpressure: queue full → RejectedExecutionException →
  Kafka error handler → consumer backs off via fetch purgatory

Path Selection:
- Age threshold: Data older than cache TTL (default 60s) uses cold path
- Configurable via fetch.lagging.consumer.threshold.ms (-1 = auto)
- Based on batch timestamp (max across partitions in same object)
- Handles future timestamps (clock skew) by treating as recent data

Backpressure Mechanism:
- Bounded queue prevents OOM (1600 tasks ≈ 3.2MB, ~8 seconds buffer)
- AbortPolicy rejects tasks when full (no request handler blocking)
- Only RejectedExecutionException tracked as rejections (capacity/backpressure)
- InterruptedException treated as fetch operation failure, not rejection
- Exceptions propagate to Kafka error handler (no custom empty responses)
- Consumers receive error → fetch purgatory backoff → natural retry

Fixes partial failure scenario: Reader#allOfFileExtents now handles
partial failures gracefully and ensure that successful partitions are
returned.

Robustness Improvements:
- Null safety: Validates laggingObjectFetcher is non-null when executor exists
- Validation: FindBatchesJob validates response count matches request count
- Edge cases: Handles future timestamps (clock skew) and empty batch coordinates
- Resource management: Rate limiter only created when lagging consumer feature enabled

Monitoring Metrics:
- LaggingConsumerRequestRate: cold path request rate
- LaggingConsumerRejectedRate: rejection events (queue full/shutdown)
- LaggingRateLimitWaitTime: rate limiting latency histogram
- RecentDataRequestRate: hot path request rate

Configuration:
- fetch.lagging.consumer.thread.pool.size (default: 16)
  Controls cold path concurrency
- fetch.lagging.consumer.request.rate.limit (default: 200 req/s)
  Controls S3 request rate (set 0 to disable)
- fetch.lagging.consumer.threshold.ms (default: -1 = cache TTL)
  Age threshold for hot/cold path selection
  -1 uses cache TTL automatically (validated to always be >= cache lifespan)

Design Trade-offs:
- Separate storage client doubles connection pools but
  prevents hot path degradation from cold path bursts
- Cache bypass for cold path means no deduplication across lagging
  consumers, acceptable because objects are multi-partition blobs
  (future: split by partition may allow to consider enabling caching)
@jeqo jeqo force-pushed the jeqo/rate-limit-lag-consumer branch from 7c76073 to afd9620 Compare January 2, 2026 23:48
@jeqo jeqo requested a review from Copilot January 2, 2026 23:50
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 17 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

jeqo added 2 commits January 4, 2026 15:46
Reformat to align style and reorder methods for clarity
Cover for potential bug where multiple file extents are found for the
same object, and the first fails but the second succeeds, potentially
returning incomplete data.

This fixup includes the additional data to keep to enable an ordered
response on the FetchCompleter, and properly serve up to the available
data in order.
@jeqo jeqo force-pushed the jeqo/rate-limit-lag-consumer branch from 1e3036c to 5b4dfa5 Compare January 4, 2026 14:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants