Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2453,6 +2453,7 @@ project(':storage:inkless') {
}
implementation libs.metrics
implementation libs.caffeine
implementation libs.bucket4jCore

testImplementation project(':clients').sourceSets.test.output.classesDirs
testImplementation project(':test-common')
Expand Down
26 changes: 25 additions & 1 deletion docs/inkless/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@ Under ``inkless.``
* Default: io.aiven.inkless.storage_backend.in_memory.InMemoryStorage
* Importance: high

``fetch.lagging.consumer.request.rate.limit``
Maximum requests per second for lagging consumer data fetches. Set to 0 to disable rate limiting. The upper bound of 10000 req/s is a safety limit to prevent misconfiguration. For high-throughput systems, consider the relationship between this rate limit, thread pool size, and storage backend capacity. At the default rate of 200 req/s with ~50ms per request latency, this allows ~10 concurrent requests.

* Type: int
* Default: 200
* Valid Values: [0,...,10000]
* Importance: medium

``fetch.lagging.consumer.threshold.ms``
The time threshold in milliseconds to distinguish between recent and lagging consumers. Fetch requests for data strictly older than this threshold (dataAge > threshold, based on batch timestamp) will use the lagging consumer path. Set to -1 to use the default heuristic: the cache expiration lifespan. This provides a grace period ensuring data remains in cache before being considered 'lagging', accounting for cache warm-up and typical consumer lag variations. Must be >= cache expiration lifespan (see consume.cache.expiration.lifespan.sec). This is a startup-only configuration (no dynamic reconfiguration support). Both threshold and cache lifespan must be set together at startup to maintain the constraint.

* Type: long
* Default: -1
* Valid Values: [-1,...]
* Importance: medium

``object.key.prefix``
The object storage key prefix. It cannot start of finish with a slash.

Expand Down Expand Up @@ -125,8 +141,16 @@ Under ``inkless.``
* Valid Values: [0,...]
* Importance: low

``fetch.lagging.consumer.thread.pool.size``
Thread pool size for lagging consumer fetch requests (consumers reading old data). Set to 0 to disable the lagging consumer feature (all requests will use the recent data path). The default value of 16 is designed as approximately half of the default fetch.data.thread.pool.size (32), providing sufficient capacity for typical cold storage access patterns while leaving headroom for the hot path. The queue capacity is automatically set to thread.pool.size * 100, providing burst buffering (e.g., 16 threads = 1600 queue capacity ≈ 8 seconds buffer at 200 req/s). Tune based on lagging consumer SLA and expected load patterns.

* Type: int
* Default: 16
* Valid Values: [0,...]
* Importance: low

``fetch.metadata.thread.pool.size``
Thread pool size to concurrently fetch metadata from batch coordinator
Thread pool size to concurrently fetch metadata from batch coordinator. Note: This executor is shared between hot and cold path requests. The hot/cold path separation only applies to data fetching (after metadata is retrieved). A burst of lagging consumer requests can still compete with recent consumer requests at the metadata layer. For workloads with significant lagging consumer traffic, consider increasing this value proportionally to the combined fetch.data.thread.pool.size + fetch.lagging.consumer.thread.pool.size to prevent metadata fetching from becoming a bottleneck in mixed hot/cold workloads.

* Type: int
* Default: 8
Expand Down
4 changes: 3 additions & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ versions += [
awsSdk: "2.29.6",
azureSdk: "1.2.28",
bcpkix: "1.80",
caffeine: "3.2.0",
bndlib: "7.1.0",
bucket4j: "8.14.0",
caffeine: "3.2.0",
checkstyle: project.hasProperty('checkstyleVersion') ? checkstyleVersion : "10.20.2",
commonsBeanutils: "1.11.0",
commonsValidator: "1.9.0",
Expand Down Expand Up @@ -164,6 +165,7 @@ libs += [
azureSdkBom: "com.azure:azure-sdk-bom:$versions.azureSdk",
bcpkix: "org.bouncycastle:bcpkix-jdk18on:$versions.bcpkix",
bndlib:"biz.aQute.bnd:biz.aQute.bndlib:$versions.bndlib",
bucket4jCore: "com.bucket4j:bucket4j_jdk11-core:$versions.bucket4j",
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
commonsBeanutils: "commons-beanutils:commons-beanutils:$versions.commonsBeanutils",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Metrics;

import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -136,9 +137,54 @@ public class InklessConfig extends AbstractConfig {
private static final int FETCH_DATA_THREAD_POOL_SIZE_DEFAULT = 32;

public static final String FETCH_METADATA_THREAD_POOL_SIZE_CONFIG = "fetch.metadata.thread.pool.size";
public static final String FETCH_METADATA_THREAD_POOL_SIZE_DOC = "Thread pool size to concurrently fetch metadata from batch coordinator";
public static final String FETCH_METADATA_THREAD_POOL_SIZE_DOC = "Thread pool size to concurrently fetch metadata from batch coordinator. "
+ "Note: This executor is shared between hot and cold path requests. The hot/cold path separation "
+ "only applies to data fetching (after metadata is retrieved). A burst of lagging consumer requests "
+ "can still compete with recent consumer requests at the metadata layer. For workloads with significant "
+ "lagging consumer traffic, consider increasing this value proportionally to the combined "
+ "fetch.data.thread.pool.size + fetch.lagging.consumer.thread.pool.size to prevent metadata fetching "
+ "from becoming a bottleneck in mixed hot/cold workloads.";
private static final int FETCH_METADATA_THREAD_POOL_SIZE_DEFAULT = 8;

public static final String FETCH_LAGGING_CONSUMER_THREAD_POOL_SIZE_CONFIG = "fetch.lagging.consumer.thread.pool.size";
public static final String FETCH_LAGGING_CONSUMER_THREAD_POOL_SIZE_DOC = "Thread pool size for lagging consumer fetch requests (consumers reading old data). "
+ "Set to 0 to disable the lagging consumer feature (all requests will use the recent data path). "
+ "The default value of 16 is designed as approximately half of the default fetch.data.thread.pool.size (32), "
+ "providing sufficient capacity for typical cold storage access patterns while leaving headroom for the hot path. "
+ "The queue capacity is automatically set to thread.pool.size * 100, providing burst buffering "
+ "(e.g., 16 threads = 1600 queue capacity ≈ 8 seconds buffer at 200 req/s). "
+ "Tune based on lagging consumer SLA and expected load patterns.";
// Default 16: Designed as half of default fetch.data.thread.pool.size (32), sufficient for typical
// cold storage access patterns while leaving headroom for hot path. Tune based on lagging consumer SLA.
private static final int FETCH_LAGGING_CONSUMER_THREAD_POOL_SIZE_DEFAULT = 16;

public static final String FETCH_LAGGING_CONSUMER_THRESHOLD_MS_CONFIG = "fetch.lagging.consumer.threshold.ms";
public static final String FETCH_LAGGING_CONSUMER_THRESHOLD_MS_DOC = "The time threshold in milliseconds to distinguish between recent and lagging consumers. "
+ "Fetch requests for data strictly older than this threshold (dataAge > threshold, based on batch timestamp) will use the lagging consumer path. "
+ "Set to -1 to use the default heuristic: the cache expiration lifespan. "
+ "This provides a grace period ensuring data remains in cache before being considered 'lagging', "
+ "accounting for cache warm-up and typical consumer lag variations. "
+ "Must be >= cache expiration lifespan (see " + CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_CONFIG + "). "
+ "This is a startup-only configuration (no dynamic reconfiguration support). "
+ "Both threshold and cache lifespan must be set together at startup to maintain the constraint.";
/**
* Default value for {@link #FETCH_LAGGING_CONSUMER_THRESHOLD_MS_CONFIG}.
* A value of -1 means "auto-detect from cache TTL" - the {@link #fetchLaggingConsumerThresholdMs()} method
* will automatically use the cache expiration lifespan as the effective threshold.
*/
private static final int FETCH_LAGGING_CONSUMER_THRESHOLD_MS_DEFAULT = -1;

public static final String FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_CONFIG = "fetch.lagging.consumer.request.rate.limit";
public static final String FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_DOC = "Maximum requests per second for lagging consumer data fetches. "
+ "Set to 0 to disable rate limiting. "
+ "The upper bound of 10000 req/s is a safety limit to prevent misconfiguration. For high-throughput systems, "
+ "consider the relationship between this rate limit, thread pool size, and storage backend capacity. "
+ "At the default rate of 200 req/s with ~50ms per request latency, this allows ~10 concurrent requests.";
// Default 200 req/s: Conservative limit based on typical object storage GET request costs and latency.
// At ~50ms per request, 200 req/s = ~10 concurrent requests, balancing throughput with cost control.
// Tune based on storage backend capacity and budget constraints.
private static final int FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_DEFAULT = 200;

public static final String FETCH_FIND_BATCHES_MAX_BATCHES_PER_PARTITION_CONFIG = "fetch.find.batches.max.per.partition";
public static final String FETCH_FIND_BATCHES_MAX_BATCHES_PER_PARTITION_DOC = "The maximum number of batches to find per partition when processing a fetch request. "
+ "A value of 0 means all available batches are fetched. "
Expand Down Expand Up @@ -322,6 +368,32 @@ public static ConfigDef configDef() {
ConfigDef.Importance.LOW,
FETCH_METADATA_THREAD_POOL_SIZE_DOC
);
configDef.define(
FETCH_LAGGING_CONSUMER_THREAD_POOL_SIZE_CONFIG,
ConfigDef.Type.INT,
FETCH_LAGGING_CONSUMER_THREAD_POOL_SIZE_DEFAULT,
ConfigDef.Range.atLeast(0),
ConfigDef.Importance.LOW,
FETCH_LAGGING_CONSUMER_THREAD_POOL_SIZE_DOC
);
configDef.define(
FETCH_LAGGING_CONSUMER_THRESHOLD_MS_CONFIG,
ConfigDef.Type.LONG,
FETCH_LAGGING_CONSUMER_THRESHOLD_MS_DEFAULT,
ConfigDef.Range.atLeast(-1),
ConfigDef.Importance.MEDIUM,
FETCH_LAGGING_CONSUMER_THRESHOLD_MS_DOC
);
configDef.define(
FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_CONFIG,
ConfigDef.Type.INT,
FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_DEFAULT,
ConfigDef.Range.between(0, 10000),
// Safety limit to prevent misconfiguration. For high-throughput systems,
// consider the relationship between this rate limit, thread pool size, and storage backend capacity.
ConfigDef.Importance.MEDIUM,
FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_DOC
);
configDef.define(
FETCH_FIND_BATCHES_MAX_BATCHES_PER_PARTITION_CONFIG,
ConfigDef.Type.INT,
Expand Down Expand Up @@ -362,7 +434,69 @@ public InklessConfig(final AbstractConfig config) {
}

public InklessConfig(final Map<String, ?> props) {
super(configDef(), props);
super(validate(props), props);
}

private static ConfigDef validate(final Map<String, ?> props) {
final ConfigDef configDef = configDef();
// Parse the properties using ConfigDef directly for validation. This avoids creating a
// temporary AbstractConfig instance while still leveraging the same parsing and defaulting
// logic that AbstractConfig would use. Note: We still parse twice (once here, once in super()),
// but this avoids the overhead of creating an AbstractConfig instance. This is necessary to
// avoid 'this-escape' warnings in JDK 23+ and ensure super() is the first statement for JDK 17.
// The performance impact is minimal since config parsing only happens at startup.
final Map<String, Object> parsedProps = configDef.parse(props);

final long thresholdMs =
((Number) parsedProps.get(FETCH_LAGGING_CONSUMER_THRESHOLD_MS_CONFIG)).longValue();
final int cacheLifespanSec =
((Number) parsedProps.get(CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_CONFIG)).intValue();
final long lifespanMs = Duration.ofSeconds(cacheLifespanSec).toMillis();

// Validate threshold is not less than cache lifespan (unless using default heuristic).
// If threshold < cache lifespan, we'd route requests for potentially cached data to the
// cold path, defeating the cache and unnecessarily loading the cold path/storage backend.
//
// Note: This validation occurs at construction time. These configurations are startup-only
// and do not support dynamic reconfiguration. Both threshold and cache lifespan must be set
// together at startup to maintain the constraint that threshold >= cache lifespan.
//
// Explicitly reject threshold=0 with a clear error message: While threshold=0 would always fail
// the cache lifespan validation below (since minimum cache lifespan is 10 seconds = 10000ms),
// we check it explicitly here to provide a more specific error message that explains why 0 is
// invalid. With threshold=0, the runtime check (dataAge > threshold) would route almost all cached
// data (anything with dataAge > 0) to the cold path, defeating the cache. Only data with
// dataAge == 0 would use the hot path, which is negligible.
if (thresholdMs == 0) {
throw new ConfigException(
FETCH_LAGGING_CONSUMER_THRESHOLD_MS_CONFIG,
thresholdMs,
"Lagging consumer threshold cannot be 0. Use -1 to auto-detect from cache TTL, or set a value >= cache lifespan ("
+ lifespanMs + "ms). Threshold=0 would route almost all cached data to the cold path, defeating the cache."
);
}
//
// Minimum allowed value: threshold == cache lifespan (>=, not >) is valid because:
// - The runtime check uses dataAge > threshold (strictly greater), so dataAge == threshold uses hot path
// - Data can still be in cache at exactly TTL seconds old (cache expiration runs periodically)
// - With threshold == cache lifespan, when dataAge == cache lifespan, data might still be cached
// and correctly uses hot path. When dataAge > cache lifespan, data is expired and uses cold path.
// This ensures we only route data to cold path after it's guaranteed to be expired from cache.
//
// Special case: thresholdMs == -1 is explicitly excluded from validation (condition checks != -1)
// because fetchLaggingConsumerThresholdMs() will automatically use cache lifespan as the effective
// runtime value, which is always >= cache lifespan by definition. This design allows operators
// to use -1 as a "use cache TTL" heuristic without needing to know the exact cache lifespan value.
if (thresholdMs != -1 && thresholdMs < lifespanMs) {
throw new ConfigException(
FETCH_LAGGING_CONSUMER_THRESHOLD_MS_CONFIG,
thresholdMs,
"Lagging consumer threshold (" + thresholdMs + "ms) must be >= cache lifespan ("
+ lifespanMs + "ms) to avoid routing requests for cached data to the lagging path."
);
}

return configDef;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -459,6 +593,34 @@ public int fetchMetadataThreadPoolSize() {
return getInt(FETCH_METADATA_THREAD_POOL_SIZE_CONFIG);
}

public int fetchLaggingConsumerThreadPoolSize() {
return getInt(FETCH_LAGGING_CONSUMER_THREAD_POOL_SIZE_CONFIG);
}

/**
* Returns the effective lagging consumer threshold in milliseconds.
* <p>
* If the configured value is -1 (auto), this method returns the cache expiration lifespan,
* which serves as the default heuristic. This ensures the effective threshold is always >= cache
* lifespan, which is why validation in the constructor skips threshold=-1 (it will automatically
* use cache lifespan at runtime).
* </p>
*
* @return the effective threshold in milliseconds (cache lifespan if configured as -1, otherwise the configured value)
*/
public long fetchLaggingConsumerThresholdMs() {
final long configuredValue = getLong(FETCH_LAGGING_CONSUMER_THRESHOLD_MS_CONFIG);
if (configuredValue == -1) {
// Use heuristic: cache TTL (provides grace period for recent data)
return Duration.ofSeconds(getInt(CONSUME_CACHE_EXPIRATION_LIFESPAN_SEC_CONFIG)).toMillis();
}
return configuredValue;
}

public int fetchLaggingConsumerRequestRateLimit() {
return getInt(FETCH_LAGGING_CONSUMER_REQUEST_RATE_LIMIT_CONFIG);
}

public int maxBatchesPerPartitionToFind() {
return getInt(FETCH_FIND_BATCHES_MAX_BATCHES_PER_PARTITION_CONFIG);
}
Expand Down
Loading