diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java
index aaf2c4f4cb8b..263930ce3576 100644
--- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java
+++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/AsyncBenchmark.java
@@ -45,7 +45,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-abstract class AsyncBenchmark {
+abstract class AsyncBenchmark implements Benchmark {
private static final ImplementationBridgeHelpers.CosmosClientBuilderHelper.CosmosClientBuilderAccessor clientBuilderAccessor
= ImplementationBridgeHelpers.CosmosClientBuilderHelper.getCosmosClientBuilderAccessor();
@@ -343,7 +343,7 @@ uuid, new PartitionKey(partitionKey), PojoizedJson.class)
protected void init() {
}
- void shutdown() {
+ public void shutdown() {
if (workloadConfig.isSuppressCleanup()) {
logger.info("Skipping cleanup of database/container (suppressCleanup=true)");
} else if (this.databaseCreated) {
@@ -420,7 +420,7 @@ private boolean latencyAwareOperations(Operation operation) {
}
}
- void run() throws Exception {
+ public void run() throws Exception {
initializeMeter();
if (workloadConfig.getSkipWarmUpOperations() > 0) {
logger.info("Starting warm up phase. Executing {} operations to warm up ...", workloadConfig.getSkipWarmUpOperations());
diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Benchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Benchmark.java
new file mode 100644
index 000000000000..8cb53a609bbc
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Benchmark.java
@@ -0,0 +1,14 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.benchmark;
+
+/**
+ * Common contract for all benchmark workloads.
+ * Implementations are created by {@link BenchmarkOrchestrator} and participate
+ * in its lifecycle loop (create → run → shutdown → settle × N cycles).
+ */
+public interface Benchmark {
+ void run() throws Exception;
+ void shutdown();
+}
diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkConfig.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkConfig.java
index 4f2206988d30..73da0914e2cd 100644
--- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkConfig.java
+++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkConfig.java
@@ -18,7 +18,7 @@
* Contains lifecycle params, reporting config, and fully-resolved tenant workloads.
*
*
Each {@link TenantWorkloadConfig} carries its complete effective config
- * (account info + workload params), so no separate globalDefaults map is needed.
+ * (account info + workload params), so no separate tenantDefaults map is needed.
*
*
When {@code cycles > 1}, sensible defaults are applied automatically
* unless explicitly overridden (settleTimeMs=90s, suppressCleanup=true).
@@ -81,44 +81,20 @@ public static BenchmarkConfig fromConfiguration(Configuration cfg) throws IOExce
}
config.gcBetweenCycles = cfg.isGcBetweenCycles();
- config.enableJvmStats = cfg.isEnableJvmStats();
- config.enableNettyHttpMetrics = cfg.isEnableNettyHttpMetrics();
-
- // Reporting
- config.reportingDirectory = cfg.getReportingDirectory() != null
- ? cfg.getReportingDirectory().getPath() : null;
- config.printingInterval = cfg.getPrintingInterval();
- config.resultUploadEndpoint = cfg.getServiceEndpointForRunResultsUploadAccount();
- config.resultUploadKey = cfg.getMasterKeyForRunResultsUploadAccount();
- config.resultUploadDatabase = cfg.getResultUploadDatabase();
- config.resultUploadContainer = cfg.getResultUploadContainer();
-
- // Run metadata
- config.testVariationName = cfg.getTestVariationName();
- config.branchName = cfg.getBranchName();
- config.commitId = cfg.getCommitId();
-
- // Tenants
- String tenantsFile = cfg.getTenantsFile();
- if (tenantsFile != null && new File(tenantsFile).exists()) {
- // tenants.json takes priority over CLI workload args (operation, concurrency, etc.)
- logger.info("Loading tenant configs from {}. " +
- "Workload parameters from tenants.json will take priority over CLI args.", tenantsFile);
- config.tenantWorkloads = TenantWorkloadConfig.parseTenantsFile(new File(tenantsFile));
-
- // Extract JVM-global system properties from globalDefaults
- config.loadGlobalSystemPropertiesFromTenantsFile(new File(tenantsFile));
- } else {
- // Single tenant from CLI args - use fromConfiguration() to copy ALL fields
- config.tenantWorkloads = Collections.singletonList(
- TenantWorkloadConfig.fromConfiguration(cfg));
-
- // JVM-global system properties from CLI
- config.isPartitionLevelCircuitBreakerEnabled = cfg.isPartitionLevelCircuitBreakerEnabled();
- config.isPerPartitionAutomaticFailoverRequired = cfg.isPerPartitionAutomaticFailoverRequired();
- config.minConnectionPoolSizePerEndpoint = cfg.getMinConnectionPoolSizePerEndpoint();
+
+ // Workload config - ALWAYS from config file
+ String workloadConfigPath = cfg.getWorkloadConfig();
+ if (workloadConfigPath == null || !new File(workloadConfigPath).exists()) {
+ throw new IllegalArgumentException(
+ "A workload configuration file is required. Use -workloadConfig to specify the path."
+ + (workloadConfigPath != null ? " File not found: " + workloadConfigPath : ""));
}
+ logger.info("Loading workload configs from {}.", workloadConfigPath);
+ File workloadFile = new File(workloadConfigPath);
+ config.tenantWorkloads = TenantWorkloadConfig.parseWorkloadConfig(workloadFile);
+ config.loadWorkloadConfigSections(workloadFile);
+
return config;
}
@@ -161,28 +137,106 @@ public String toString() {
}
/**
- * Reads JVM-global system properties from the globalDefaults section of a tenants.json file.
- * These properties are JVM-wide and cannot vary per tenant.
+ * Loads all non-tenant sections from the workload config file:
+ * JVM system properties, metrics config, result upload, and run metadata.
*/
- private void loadGlobalSystemPropertiesFromTenantsFile(File tenantsFile) throws IOException {
+ private void loadWorkloadConfigSections(File workloadConfigFile) throws IOException {
ObjectMapper mapper = new ObjectMapper();
- JsonNode root = mapper.readTree(tenantsFile);
- JsonNode defaults = root.get("globalDefaults");
- if (defaults == null || !defaults.isObject()) {
- return;
+ JsonNode root = mapper.readTree(workloadConfigFile);
+
+ loadJvmSystemProperties(root);
+ loadMetricsConfig(root);
+ loadResultUploadConfig(root);
+ loadRunMetadata(root);
+ }
+
+ /**
+ * JVM-global system properties from the tenantDefaults section.
+ * These are JVM-wide and cannot vary per tenant.
+ */
+ private void loadJvmSystemProperties(JsonNode root) {
+ JsonNode defaults = root.get("tenantDefaults");
+ if (defaults != null && defaults.isObject()) {
+ if (defaults.has("isPartitionLevelCircuitBreakerEnabled")) {
+ isPartitionLevelCircuitBreakerEnabled =
+ Boolean.parseBoolean(defaults.get("isPartitionLevelCircuitBreakerEnabled").asText());
+ }
+ if (defaults.has("isPerPartitionAutomaticFailoverRequired")) {
+ isPerPartitionAutomaticFailoverRequired =
+ Boolean.parseBoolean(defaults.get("isPerPartitionAutomaticFailoverRequired").asText());
+ }
+ if (defaults.has("minConnectionPoolSizePerEndpoint")) {
+ minConnectionPoolSizePerEndpoint =
+ Integer.parseInt(defaults.get("minConnectionPoolSizePerEndpoint").asText());
+ }
}
+ }
- if (defaults.has("isPartitionLevelCircuitBreakerEnabled")) {
- isPartitionLevelCircuitBreakerEnabled =
- Boolean.parseBoolean(defaults.get("isPartitionLevelCircuitBreakerEnabled").asText());
+ /**
+ * Metrics and reporting settings from the top-level "metrics" section.
+ */
+ private void loadMetricsConfig(JsonNode root) {
+ JsonNode metrics = root.get("metrics");
+ if (metrics != null && metrics.isObject()) {
+ if (metrics.has("enableJvmStats")) {
+ enableJvmStats = Boolean.parseBoolean(metrics.get("enableJvmStats").asText());
+ }
+ if (metrics.has("enableNettyHttpMetrics")) {
+ enableNettyHttpMetrics = Boolean.parseBoolean(metrics.get("enableNettyHttpMetrics").asText());
+ }
+ if (metrics.has("printingInterval")) {
+ printingInterval = Integer.parseInt(metrics.get("printingInterval").asText());
+ }
+ if (metrics.has("reportingDirectory")) {
+ reportingDirectory = metrics.get("reportingDirectory").asText();
+ }
}
- if (defaults.has("isPerPartitionAutomaticFailoverRequired")) {
- isPerPartitionAutomaticFailoverRequired =
- Boolean.parseBoolean(defaults.get("isPerPartitionAutomaticFailoverRequired").asText());
+ }
+
+ /**
+ * Result upload configuration from "metrics.resultUpload".
+ */
+ private void loadResultUploadConfig(JsonNode root) {
+ JsonNode metrics = root.get("metrics");
+ if (metrics == null || !metrics.isObject()) {
+ return;
+ }
+ JsonNode resultUpload = metrics.get("resultUpload");
+ if (resultUpload != null && resultUpload.isObject()) {
+ if (resultUpload.has("serviceEndpoint")) {
+ resultUploadEndpoint = resultUpload.get("serviceEndpoint").asText();
+ }
+ if (resultUpload.has("masterKey")) {
+ resultUploadKey = resultUpload.get("masterKey").asText();
+ }
+ if (resultUpload.has("database")) {
+ resultUploadDatabase = resultUpload.get("database").asText();
+ }
+ if (resultUpload.has("container")) {
+ resultUploadContainer = resultUpload.get("container").asText();
+ }
+ }
+ }
+
+ /**
+ * Run metadata from "metrics.runMetadata" (tagged on uploaded results).
+ */
+ private void loadRunMetadata(JsonNode root) {
+ JsonNode metrics = root.get("metrics");
+ if (metrics == null || !metrics.isObject()) {
+ return;
}
- if (defaults.has("minConnectionPoolSizePerEndpoint")) {
- minConnectionPoolSizePerEndpoint =
- Integer.parseInt(defaults.get("minConnectionPoolSizePerEndpoint").asText());
+ JsonNode runMetadata = metrics.get("runMetadata");
+ if (runMetadata != null && runMetadata.isObject()) {
+ if (runMetadata.has("testVariationName")) {
+ testVariationName = runMetadata.get("testVariationName").asText();
+ }
+ if (runMetadata.has("branchName")) {
+ branchName = runMetadata.get("branchName").asText();
+ }
+ if (runMetadata.has("commitId")) {
+ commitId = runMetadata.get("commitId").asText();
+ }
}
}
}
diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkHelper.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkHelper.java
index b4061b22f708..2bd8b5bd2b8a 100644
--- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkHelper.java
+++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkHelper.java
@@ -20,10 +20,9 @@ public static PojoizedJson generateDocument(String idString, String dataFieldVal
return instance;
}
- public static boolean shouldContinue(long startTimeMillis, long iterationCount, Configuration configuration) {
-
- Duration maxDurationTime = configuration.getMaxRunningTimeDuration();
- int maxNumberOfOperations = configuration.getNumberOfOperations();
+ public static boolean shouldContinue(long startTimeMillis, long iterationCount, TenantWorkloadConfig workloadConfig) {
+ Duration maxDurationTime = workloadConfig.getMaxRunningTimeDuration();
+ int maxNumberOfOperations = workloadConfig.getNumberOfOperations();
if (maxDurationTime == null) {
return iterationCount < maxNumberOfOperations;
diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkOrchestrator.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkOrchestrator.java
index 2f243629d336..dad9937b166d 100644
--- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkOrchestrator.java
+++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/BenchmarkOrchestrator.java
@@ -8,6 +8,12 @@
import com.codahale.metrics.ScheduledReporter;
import com.azure.cosmos.CosmosClient;
import com.azure.cosmos.CosmosClientBuilder;
+import com.azure.cosmos.benchmark.ctl.AsyncCtlWorkload;
+import com.azure.cosmos.benchmark.encryption.AsyncEncryptionQueryBenchmark;
+import com.azure.cosmos.benchmark.encryption.AsyncEncryptionQuerySinglePartitionMultiple;
+import com.azure.cosmos.benchmark.encryption.AsyncEncryptionReadBenchmark;
+import com.azure.cosmos.benchmark.encryption.AsyncEncryptionWriteBenchmark;
+import com.azure.cosmos.benchmark.linkedin.LICtlWorkload;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jvm.CachedThreadStatesGaugeSet;
import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
@@ -191,7 +197,7 @@ private void runLifecycleLoop(BenchmarkConfig config, MetricRegistry registry,
logger.info("[LIFECYCLE] CYCLE_START cycle={} timestamp={}", cycle, Instant.now());
// 1. Create clients
- List> benchmarks = createBenchmarks(config, registry);
+ List benchmarks = createBenchmarks(config, registry);
reporter.report();
logger.info("[LIFECYCLE] POST_CREATE cycle={} clients={} timestamp={}",
cycle, benchmarks.size(), Instant.now());
@@ -244,18 +250,18 @@ private void runLifecycleLoop(BenchmarkConfig config, MetricRegistry registry,
totalCycles, durationSec, Instant.now());
}
- private List> createBenchmarks(BenchmarkConfig config, MetricRegistry registry) {
- List> benchmarks = new ArrayList<>();
+ private List createBenchmarks(BenchmarkConfig config, MetricRegistry registry) throws Exception {
+ List benchmarks = new ArrayList<>();
for (TenantWorkloadConfig tenant : config.getTenantWorkloads()) {
benchmarks.add(createBenchmarkForOperation(tenant, registry));
}
return benchmarks;
}
- private void runWorkload(List> benchmarks, int cycle, ExecutorService executor) throws Exception {
+ private void runWorkload(List benchmarks, int cycle, ExecutorService executor) throws Exception {
List> futures = new ArrayList<>();
final int currentCycle = cycle;
- for (AsyncBenchmark> benchmark : benchmarks) {
+ for (Benchmark benchmark : benchmarks) {
futures.add(executor.submit(() -> {
try {
benchmark.run();
@@ -269,8 +275,8 @@ private void runWorkload(List> benchmarks, int cycle, Executor
}
}
- private void shutdownBenchmarks(List> benchmarks, int cycle) {
- for (AsyncBenchmark> benchmark : benchmarks) {
+ private void shutdownBenchmarks(List benchmarks, int cycle) {
+ for (Benchmark benchmark : benchmarks) {
try {
benchmark.shutdown();
} catch (Exception e) {
@@ -311,7 +317,55 @@ private void prepareTenants(BenchmarkConfig config) {
// ======== Benchmark factory ========
- private AsyncBenchmark> createBenchmarkForOperation(TenantWorkloadConfig cfg, MetricRegistry registry) {
+ private Benchmark createBenchmarkForOperation(TenantWorkloadConfig cfg, MetricRegistry registry) throws Exception {
+ // Sync benchmarks
+ if (cfg.isSync()) {
+ switch (cfg.getOperationType()) {
+ case ReadThroughput:
+ case ReadLatency:
+ return new SyncReadBenchmark(cfg, registry);
+ case WriteThroughput:
+ case WriteLatency:
+ return new SyncWriteBenchmark(cfg, registry);
+ default:
+ throw new IllegalArgumentException(
+ "Sync mode is not supported for operation: " + cfg.getOperationType());
+ }
+ }
+
+ // CTL workloads
+ if (cfg.getOperationType() == Operation.CtlWorkload) {
+ return new AsyncCtlWorkload(cfg, registry);
+ }
+ if (cfg.getOperationType() == Operation.LinkedInCtlWorkload) {
+ return new LICtlWorkload(cfg, registry);
+ }
+
+ // Encryption benchmarks
+ if (cfg.isEncryptionEnabled()) {
+ switch (cfg.getOperationType()) {
+ case WriteThroughput:
+ case WriteLatency:
+ return new AsyncEncryptionWriteBenchmark(cfg, registry);
+ case ReadThroughput:
+ case ReadLatency:
+ return new AsyncEncryptionReadBenchmark(cfg, registry);
+ case QueryCross:
+ case QuerySingle:
+ case QueryParallel:
+ case QueryOrderby:
+ case QueryTopOrderby:
+ case QueryInClauseParallel:
+ return new AsyncEncryptionQueryBenchmark(cfg, registry);
+ case QuerySingleMany:
+ return new AsyncEncryptionQuerySinglePartitionMultiple(cfg, registry);
+ default:
+ throw new IllegalArgumentException(
+ "Encryption is not supported for operation: " + cfg.getOperationType());
+ }
+ }
+
+ // Default: async benchmarks
switch (cfg.getOperationType()) {
case ReadThroughput:
case ReadLatency:
@@ -354,12 +408,55 @@ private MeterRegistry buildCosmosMicrometerRegistry() {
StringUtils.defaultString(
com.google.common.base.Strings.emptyToNull(
System.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING")), null));
- if (instrumentationKey != null || appInsightsConnStr != null) {
- Configuration tempCfg = new Configuration();
- return tempCfg.getAzureMonitorMeterRegistry();
+ if (instrumentationKey == null && appInsightsConnStr == null) {
+ return null;
+ }
+
+ java.time.Duration step = java.time.Duration.ofSeconds(
+ Integer.getInteger("azure.cosmos.monitoring.azureMonitor.step", 10));
+ String testCategoryTag = System.getProperty("azure.cosmos.monitoring.azureMonitor.testCategory");
+ boolean enabled = !Boolean.getBoolean("azure.cosmos.monitoring.azureMonitor.disabled");
+
+ final String connStr = appInsightsConnStr;
+ final String instrKey = instrumentationKey;
+ final io.micrometer.azuremonitor.AzureMonitorConfig amConfig = new io.micrometer.azuremonitor.AzureMonitorConfig() {
+ @Override
+ public String get(String key) { return null; }
+
+ @Override
+ public String instrumentationKey() {
+ return connStr != null ? null : instrKey;
+ }
+
+ @Override
+ public String connectionString() { return connStr; }
+
+ @Override
+ public java.time.Duration step() { return step; }
+
+ @Override
+ public boolean enabled() { return enabled; }
+ };
+
+ String roleName = System.getenv("APPLICATIONINSIGHTS_ROLE_NAME");
+ if (roleName != null) {
+ com.microsoft.applicationinsights.TelemetryConfiguration.getActive().setRoleName(roleName);
+ }
+
+ MeterRegistry registry = new io.micrometer.azuremonitor.AzureMonitorMeterRegistry(
+ amConfig, io.micrometer.core.instrument.Clock.SYSTEM);
+ java.util.List globalTags = new java.util.ArrayList<>();
+ if (!com.google.common.base.Strings.isNullOrEmpty(testCategoryTag)) {
+ globalTags.add(io.micrometer.core.instrument.Tag.of("TestCategory", testCategoryTag));
+ }
+
+ String roleInstance = System.getenv("APPLICATIONINSIGHTS_ROLE_INSTANCE");
+ if (roleInstance != null) {
+ globalTags.add(io.micrometer.core.instrument.Tag.of("cloud_RoleInstance", roleInstance));
}
- return null;
+ registry.config().commonTags(globalTags);
+ return registry;
}
// ======== Global system properties ========
diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Configuration.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Configuration.java
index 940ca2e07b81..ef32a0d4d2f6 100644
--- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Configuration.java
+++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Configuration.java
@@ -3,118 +3,19 @@
package com.azure.cosmos.benchmark;
-import com.azure.cosmos.ConnectionMode;
-import com.azure.cosmos.ConsistencyLevel;
-import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import com.google.common.base.Strings;
-import com.google.common.net.HostAndPort;
-import com.google.common.net.PercentEscaper;
-import com.microsoft.applicationinsights.TelemetryConfiguration;
-import io.micrometer.azuremonitor.AzureMonitorConfig;
-import io.micrometer.azuremonitor.AzureMonitorMeterRegistry;
-import io.micrometer.core.instrument.Clock;
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.core.instrument.Tag;
-import io.micrometer.core.instrument.config.NamingConvention;
-import io.micrometer.core.lang.Nullable;
-import io.micrometer.graphite.GraphiteConfig;
-import io.micrometer.graphite.GraphiteMeterRegistry;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
-import java.io.File;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
import java.util.function.Function;
public class Configuration {
- public static final String SUCCESS_COUNTER_METER_NAME = "#Successful Operations";
- public static final String FAILURE_COUNTER_METER_NAME = "#Unsuccessful Operations";
- public static final String LATENCY_METER_NAME = "Latency";
- public final static String DEFAULT_PARTITION_KEY_PATH = "/pk";
- private final static int DEFAULT_GRAPHITE_SERVER_PORT = 2003;
- private MeterRegistry azureMonitorMeterRegistry;
- private MeterRegistry graphiteMeterRegistry;
-
- @Parameter(names = "-serviceEndpoint", description = "Service Endpoint")
- private String serviceEndpoint;
-
- @Parameter(names = "-masterKey", description = "Master Key")
- private String masterKey;
-
- @Parameter(names = "-serviceEndpointForResultsUploadAccount", description = "Service Endpoint for run results upload account")
- private String serviceEndpointForRunResultsUploadAccount;
-
- @Parameter(names = "-masterKeyForResultsUploadAccount", description = "Master Key for run results upload account")
- private String masterKeyForRunResultsUploadAccount;
-
- @Parameter(names = "-databaseId", description = "Database ID")
- private String databaseId;
-
- @Parameter(names = "-collectionId", description = "Collection ID")
- private String collectionId;
-
- @Parameter(names = "-useNameLink", description = "Use name Link")
- private boolean useNameLink = false;
-
- @Parameter(names = "-documentDataFieldSize", description = "Length of a document data field in characters (16-bit)")
- private int documentDataFieldSize = 20;
-
- @Parameter(names = "-documentDataFieldCount", description = "Number of data fields in document")
- private int documentDataFieldCount = 5;
-
- @Parameter(names = "-maxConnectionPoolSize", description = "Max Connection Pool Size")
- private Integer maxConnectionPoolSize = 1000;
-
- @Parameter(names = "-connectionSharingAcrossClientsEnabled", description = "Enable connection sharing across CosmosClient instances (Gateway mode). Reduces connection count for multi-tenant scenarios.")
- private boolean connectionSharingAcrossClientsEnabled = false;
-
- @Parameter(names = "-diagnosticsThresholdDuration", description = "Latency threshold for printing diagnostics", converter = DurationConverter.class)
- private Duration diagnosticsThresholdDuration = Duration.ofSeconds(60);
-
- @Parameter(names = "-disablePassingPartitionKeyAsOptionOnWrite", description = "Disables passing partition in request options for write operation;" +
- " in this case, json will be parsed and partition key will be extracted (this requires more computational overhead).")
- private boolean disablePassingPartitionKeyAsOptionOnWrite = false;
-
- @Parameter(names = "-consistencyLevel", description = "Consistency Level", converter = ConsistencyLevelConverter.class)
- private ConsistencyLevel consistencyLevel = ConsistencyLevel.SESSION;
-
- @Parameter(names = "-connectionMode", description = "Connection Mode")
- private ConnectionMode connectionMode = ConnectionMode.DIRECT;
-
- @Parameter(names = "-graphiteEndpoint", description = "Graphite endpoint")
- private String graphiteEndpoint;
-
- @Parameter(names = "-enableJvmStats", description = "Enables JVM Stats")
- private boolean enableJvmStats;
-
- @Parameter(names = "-enableNettyHttpMetrics", description = "Enables Reactor Netty HTTP client metrics (connection pool gauges via COSMOS.NETTY_HTTP_CLIENT_METRICS_ENABLED)")
- private boolean enableNettyHttpMetrics;
-
- @Parameter(names = "-throughput", description = "provisioned throughput for test container")
- private int throughput = 100000;
-
- @Parameter(names = "-numberOfCollectionForCtl", description = "Number of collections for ctl load")
- private int numberOfCollectionForCtl = 4;
-
- @Parameter(names = "-readWriteQueryReadManyPct", description = "Comma separated read write query readMany workload percent")
- private String readWriteQueryReadManyPct = "90,8,1,1";
-
- @Parameter(names = "-manageDatabase", description = "Control switch for creating/deleting underlying database resource")
- private boolean manageDatabase = false;
@Parameter(names = "-suppressCleanup", description = "Skip deleting database/container on shutdown (for multi-cycle CHURN)")
private boolean suppressCleanup = false;
- @Parameter(names = "-tenantsFile", description = "Path to tenants.json for multi-tenant benchmarks")
- private String tenantsFile;
+ @Parameter(names = "-workloadConfig", description = "Path to workload configuration JSON file")
+ private String workloadConfig;
@Parameter(names = "-cycles", description = "Number of create/destroy cycles (1 = single run)")
private int cycles = 1;
@@ -125,509 +26,19 @@ public class Configuration {
@Parameter(names = "-gcBetweenCycles", description = "Force GC during settle period between cycles")
private boolean gcBetweenCycles = true;
-
- @Parameter(names = "-preferredRegionsList", description = "Comma separated preferred regions list")
- private String preferredRegionsList;
-
- @Parameter(names = "-encryptedStringFieldCount", description = "Number of string field that need to be encrypted")
- private int encryptedStringFieldCount = 1;
-
- @Parameter(names = "-encryptedLongFieldCount", description = "Number of long field that need to be encrypted")
- private int encryptedLongFieldCount = 0;
-
- @Parameter(names = "-encryptedDoubleFieldCount", description = "Number of double field that need to be encrypted")
- private int encryptedDoubleFieldCount = 0;
-
- @Parameter(names = "-encryptionEnabled", description = "Control switch to enable the encryption operation")
- private boolean encryptionEnabled = false;
-
- @Parameter(names = "-defaultLog4jLoggerEnabled", description = "Control switch to enable the default log4j logger in 4.42 and above")
- private String defaultLog4jLoggerEnabled = String.valueOf(false);
-
-
- @Parameter(names = "-tupleSize", description = "Number of cosmos identity tuples to be queried using readMany")
- private int tupleSize = 1;
-
- @Parameter(names = "-isProactiveConnectionManagementEnabled", description = "Mode which denotes whether connections are proactively established during warm up.")
- private String isProactiveConnectionManagementEnabled = String.valueOf(false);
-
- @Parameter(names = "-isUseUnWarmedUpContainer", description = "Mode which denotes whether to use a container with no warmed up connections. NOTE: " +
- "To be used when isProactiveConnectionManagementEnabled is set to false and isUseUnWarmedUpContainer is set to true")
- private String isUseUnWarmedUpContainer = String.valueOf(false);
-
- @Parameter(names = "-proactiveConnectionRegionsCount", description = "Number of regions where endpoints are to be proactively connected to.")
- private int proactiveConnectionRegionsCount = 1;
-
- @Parameter(names = "-minConnectionPoolSizePerEndpoint", description = "Minimum number of connections to establish per endpoint for proactive connection management")
- private int minConnectionPoolSizePerEndpoint = 0;
-
- @Parameter(names = "-aggressiveWarmupDuration", description = "The duration for which proactive connections are aggressively established", converter = DurationConverter.class)
- private Duration aggressiveWarmupDuration = Duration.ZERO;
-
- @Parameter(names = "-isRegionScopedSessionContainerEnabled", description = "A flag to denote whether region scoped session container is enabled")
- private String isRegionScopedSessionContainerEnabled = String.valueOf(false);
-
- @Parameter(names = "isPartitionLevelCircuitBreakerEnabled", description = "A flag to denote whether partition level circuit breaker is enabled.")
- private String isPartitionLevelCircuitBreakerEnabled = String.valueOf(true);
-
- @Parameter(names = "-aadLoginEndpoint", description = "AAD login endpoint for this configuration instance. Overrides COSMOS.AAD_LOGIN_ENDPOINT / COSMOS_AAD_LOGIN_ENDPOINT.")
- private String aadLoginEndpoint;
-
- @Parameter(names = "-aadTenantId", description = "AAD tenant ID for this configuration instance. Overrides COSMOS.AAD_TENANT_ID / COSMOS_AAD_TENANT_ID.")
- private String aadTenantId;
-
- @Parameter(names = "-aadManagedIdentityClientId", description = "AAD managed identity client ID for this configuration instance. Overrides COSMOS.AAD_MANAGED_IDENTITY_ID / COSMOS_AAD_MANAGED_IDENTITY_ID.")
- private String aadManagedIdentityClientId;
-
- @Parameter(names = "-isManagedIdentityRequired", description = "A flag to denote whether benchmark-specific CosmosClient instance should use Managed Identity to authenticate.")
- private String isManagedIdentityRequired = String.valueOf(false);
-
- // ── Multi-tenancy orchestrator flags (not CLI — set programmatically) ──
-
-
- @Parameter(names = "-isPerPartitionAutomaticFailoverRequired", description = "A flag to denote whether per-partition automatic failover is required.")
- private String isPerPartitionAutomaticFailoverRequired = String.valueOf(true);
-
- @Parameter(names = "-operation", description = "Type of Workload:\n"
- + "\tReadThroughput- run a READ workload that prints only throughput *\n"
- + "\tWriteThroughput - run a Write workload that prints only throughput\n"
- + "\tReadLatency - run a READ workload that prints both throughput and latency *\n"
- + "\tWriteLatency - run a Write workload that prints both throughput and latency\n"
- + "\tQueryInClauseParallel - run a 'Select * from c where c.pk in (....)' workload that prints latency\n"
- + "\tQueryCross - run a 'Select * from c where c._rid = SOME_RID' workload that prints throughput\n"
- + "\tQuerySingle - run a 'Select * from c where c.pk = SOME_PK' workload that prints throughput\n"
- + "\tQuerySingleMany - run a 'Select * from c where c.pk = \"pk\"' workload that prints throughput\n"
- + "\tQueryParallel - run a 'Select * from c' workload that prints throughput\n"
- + "\tQueryOrderby - run a 'Select * from c order by c._ts' workload that prints throughput\n"
- + "\tQueryAggregate - run a 'Select value max(c._ts) from c' workload that prints throughput\n"
- + "\tQueryAggregateTopOrderby - run a 'Select top 1 value count(c) from c order by c._ts' workload that prints throughput\n"
- + "\tQueryTopOrderby - run a 'Select top 1000 * from c order by c._ts' workload that prints throughput\n"
- + "\tMixed - runa workload of 90 reads, 9 writes and 1 QueryTopOrderby per 100 operations *\n"
- + "\tReadMyWrites - run a workflow of writes followed by reads and queries attempting to read the write.*\n"
- + "\tCtlWorkload - run a ctl workflow.*\n"
- + "\tReadAllItemsOfLogicalPartition - run a workload that uses readAllItems for a logical partition and prints throughput\n"
- + "\n\t* writes 10k documents initially, which are used in the reads"
- + "\tLinkedInCtlWorkload - ctl for LinkedIn workload.*\n"
- + "\tReadManyLatency - run a workload for readMany for a finite number of cosmos identity tuples that prints both throughput and latency*\n"
- + "\tReadManyThroughput - run a workload for readMany for a finite no of cosmos identity tuples that prints throughput*\n",
- converter = Operation.OperationTypeConverter.class)
- private Operation operation = Operation.WriteThroughput;
-
- @Parameter(names = "-concurrency", description = "Degree of Concurrency in Inserting Documents."
- + " If this value is not specified, the max connection pool size will be used as the concurrency level.")
- private Integer concurrency;
-
- @Parameter(names = "-numberOfOperations", description = "Total NUMBER Of Documents To Insert")
- private int numberOfOperations = 100000;
-
- public Boolean isManagedIdentityRequired() {
- return Boolean.parseBoolean(this.isManagedIdentityRequired);
- }
-
- public Boolean isPerPartitionAutomaticFailoverRequired() {
- return Boolean.parseBoolean(this.isPerPartitionAutomaticFailoverRequired);
- }
-
- static class DurationConverter implements IStringConverter {
- @Override
- public Duration convert(String value) {
- if (value == null) {
- return null;
- }
-
- return Duration.parse(value);
- }
- }
-
- @Parameter(names = "-maxRunningTimeDuration", description = "Max Running Time Duration", converter = DurationConverter.class)
- private Duration maxRunningTimeDuration;
-
- @Parameter(names = "-printingInterval", description = "Interval of time after which Metrics should be printed (seconds)")
- private int printingInterval = 10;
-
- @Parameter(names = "-reportingDirectory", description = "Location of a directory to which metrics should be printed as comma-separated values")
- private String reportingDirectory = null;
-
- @Parameter(names = "-numberOfPreCreatedDocuments", description = "Total NUMBER Of Documents To pre create for a read workload to use")
- private int numberOfPreCreatedDocuments = 1000;
-
- @Parameter(names = "-sparsityWaitTime", description = "Sleep time before making each request. Default is no sleep time."
- + " NOTE: For now only ReadLatency and ReadThroughput support this."
- + " Format: A string representation of this duration using ISO-8601 seconds based representation, such as "
- + "PT20.345S (20.345 seconds), PT15M (15 minutes)", converter = DurationConverter.class)
- private Duration sparsityWaitTime = null;
-
- @Parameter(names = "-skipWarmUpOperations", description = "the number of operations to be skipped before starting perf numbers.")
- private int skipWarmUpOperations = 0;
-
- @Parameter(names = "-useSync", description = "Uses Sync API")
- private boolean useSync = false;
-
- @Parameter(names = "-contentResponseOnWriteEnabled", description = "if set to false, does not returns content response on document write operations")
- private String contentResponseOnWriteEnabled = String.valueOf(true);
-
- @Parameter(names = "-bulkloadBatchSize", description = "Control the number of documents uploaded in each BulkExecutor load iteration (Only supported for the LinkedInCtlWorkload)")
- private int bulkloadBatchSize = 200000;
-
- @Parameter(names = "-testScenario", description = "The test scenario (GET, QUERY) for the LinkedInCtlWorkload")
- private String testScenario = "GET";
-
- @Parameter(names = "-applicationName", description = "The application name suffix in the user agent header")
- private String applicationName = "";
-
- @Parameter(names = "-accountNameInGraphiteReporter", description = "if set, account name with be appended in graphite reporter")
- private boolean accountNameInGraphiteReporter = false;
-
- @Parameter(names = "-pointLatencyThresholdMs", description = "Latency threshold for point operations")
- private int pointLatencyThresholdMs = -1;
-
- @Parameter(names = "-nonPointLatencyThresholdMs", description = "Latency threshold for non-point operations")
- private int nonPointLatencyThresholdMs = -1;
-
- @Parameter(names = "-testVariationName", description = "An identifier for the test variation")
- private String testVariationName = "";
-
- @Parameter(names = "-branchName", description = "The branch name form where the source code being tested was built")
- private String branchName = "";
-
- @Parameter(names = "-commitId", description = "A commit identifier showing the version of the source code being tested")
- private String commitId = "";
-
- @Parameter(names = "-resultUploadDatabase", description = "The name of the database into which to upload the results")
- private String resultUploadDatabase = "";
-
- @Parameter(names = "-resultUploadContainer", description = "AThe name of the container inot which to upload the results")
- private String resultUploadContainer = "";
-
- public enum Environment {
- Daily, // This is the CTL environment where we run the workload for a fixed number of hours
- Staging; // This is the CTL environment where the worload runs as a long running job
-
- static class EnvironmentConverter implements IStringConverter {
- @Override
- public Environment convert(String value) {
- if (value == null) {
- return Environment.Daily;
- }
-
- return Environment.valueOf(value);
- }
- }
- }
-
- @Parameter(names = "-environment", description = "The CTL Environment we are validating the workload",
- converter = Environment.EnvironmentConverter.class)
- private Environment environment = Environment.Daily;
-
@Parameter(names = {"-h", "-help", "--help"}, description = "Help", help = true)
private boolean help = false;
- // Operation enum extracted to standalone Operation.java
-
- private static ConsistencyLevel fromString(String code) {
- for (ConsistencyLevel output : ConsistencyLevel.values()) {
- if (output.toString().equalsIgnoreCase(code)) {
- return output;
- }
- }
- return null;
- }
-
- static class ConsistencyLevelConverter implements IStringConverter {
-
- /*
- * (non-Javadoc)
- *
- * @see com.beust.jcommander.IStringConverter#convert(java.lang.STRING)
- */
- @Override
- public ConsistencyLevel convert(String value) {
- ConsistencyLevel ret = fromString(value);
- if (ret == null) {
- throw new ParameterException("Value " + value + " can not be converted to ClientType. "
- + "Available values are: " + Arrays.toString(Operation.values()));
- }
- return ret;
- }
- }
-
- public int getSkipWarmUpOperations() {
- return skipWarmUpOperations;
- }
-
- public Duration getSparsityWaitTime() {
- return sparsityWaitTime;
- }
-
- public boolean isDisablePassingPartitionKeyAsOptionOnWrite() {
- return disablePassingPartitionKeyAsOptionOnWrite;
- }
-
- public boolean isSync() {
- return useSync;
- }
-
- public boolean isAccountNameInGraphiteReporter() {
- return accountNameInGraphiteReporter;
- }
-
- public Duration getMaxRunningTimeDuration() {
- return maxRunningTimeDuration;
- }
-
- public Operation getOperationType() {
- return operation;
- }
-
- public int getNumberOfOperations() {
- return numberOfOperations;
- }
-
- public int getThroughput() {
- return throughput;
- }
-
- public String getServiceEndpoint() {
- return serviceEndpoint;
- }
-
- public String getMasterKey() {
- return masterKey;
- }
-
- public String getServiceEndpointForRunResultsUploadAccount() {
- return serviceEndpointForRunResultsUploadAccount;
- }
-
- public String getMasterKeyForRunResultsUploadAccount() {
- return masterKeyForRunResultsUploadAccount;
- }
-
- public String getApplicationName() {
- return applicationName;
- }
-
- public void setApplicationName(String applicationName) {
- this.applicationName = applicationName;
- }
-
- public void setServiceEndpoint(String serviceEndpoint) {
- this.serviceEndpoint = serviceEndpoint;
- }
-
- public void setMasterKey(String masterKey) {
- this.masterKey = masterKey;
- }
-
- public void setDatabaseId(String databaseId) {
- this.databaseId = databaseId;
- }
-
- public void setCollectionId(String collectionId) {
- this.collectionId = collectionId;
- }
-
- public void setOperation(Operation operation) {
- this.operation = operation;
- }
-
- public void setOperationFromString(String operationName) {
- Operation op = Operation.fromString(operationName);
- if (op != null) {
- this.operation = op;
- }
- }
-
- public void setConcurrency(int concurrency) {
- this.concurrency = concurrency;
- }
-
- public void setConnectionMode(ConnectionMode connectionMode) {
- this.connectionMode = connectionMode;
- }
-
- public void setMaxConnectionPoolSize(int maxConnectionPoolSize) {
- this.maxConnectionPoolSize = maxConnectionPoolSize;
- }
-
- public void setNumberOfOperations(int numberOfOperations) {
- this.numberOfOperations = numberOfOperations;
- }
-
- public void setNumberOfPreCreatedDocuments(int numberOfPreCreatedDocuments) {
- this.numberOfPreCreatedDocuments = numberOfPreCreatedDocuments;
- }
-
- public void setConsistencyLevel(ConsistencyLevel consistencyLevel) {
- this.consistencyLevel = consistencyLevel;
- }
-
- public void setThroughput(int throughput) {
- this.throughput = throughput;
- }
-
- public void setManageDatabase(boolean manageDatabase) {
- this.manageDatabase = manageDatabase;
- }
-
- public void setPreferredRegionsList(String preferredRegionsList) {
- this.preferredRegionsList = preferredRegionsList;
- }
-
- public void setSkipWarmUpOperations(int skipWarmUpOperations) {
- this.skipWarmUpOperations = skipWarmUpOperations;
- }
-
public boolean isHelp() {
return help;
}
- public int getDocumentDataFieldSize() {
- return documentDataFieldSize;
- }
-
- public int getDocumentDataFieldCount() {
- return documentDataFieldCount;
- }
-
- public Integer getMaxConnectionPoolSize() {
- return maxConnectionPoolSize;
- }
-
- public boolean isConnectionSharingAcrossClientsEnabled() {
- return connectionSharingAcrossClientsEnabled;
- }
-
- public ConnectionMode getConnectionMode() {
- return connectionMode;
- }
-
- public ConsistencyLevel getConsistencyLevel() {
- return consistencyLevel;
- }
-
- public boolean isContentResponseOnWriteEnabled() {
- return Boolean.parseBoolean(contentResponseOnWriteEnabled);
- }
-
- public String getDatabaseId() {
- return databaseId;
- }
-
- public String getCollectionId() {
- return collectionId;
- }
-
- public int getNumberOfPreCreatedDocuments() {
- return numberOfPreCreatedDocuments;
- }
-
- public int getPrintingInterval() {
- return printingInterval;
- }
-
- public Duration getDiagnosticsThresholdDuration() {
- return diagnosticsThresholdDuration;
- }
-
- public File getReportingDirectory() {
- return reportingDirectory == null ? null : new File(reportingDirectory);
- }
-
- public int getConcurrency() {
- if (this.concurrency != null) {
- return concurrency;
- } else {
- return this.maxConnectionPoolSize;
- }
- }
-
- public boolean isUseNameLink() {
- return useNameLink;
- }
-
- public boolean isEnableJvmStats() {
- return enableJvmStats;
- }
-
- public boolean isEnableNettyHttpMetrics() {
- return enableNettyHttpMetrics;
- }
-
- public MeterRegistry getAzureMonitorMeterRegistry() {
- String instrumentationKey = System.getProperty("azure.cosmos.monitoring.azureMonitor.instrumentationKey",
- StringUtils.defaultString(Strings.emptyToNull(
- System.getenv().get("AZURE_INSTRUMENTATION_KEY")), null));
- String connectionString = System.getProperty("applicationinsights.connection.string",
- StringUtils.defaultString(Strings.emptyToNull(
- System.getenv().get("APPLICATIONINSIGHTS_CONNECTION_STRING")), null));
- return instrumentationKey == null && connectionString == null
- ? null
- : this.azureMonitorMeterRegistry(connectionString, instrumentationKey);
- }
-
- public MeterRegistry getGraphiteMeterRegistry() {
- String serviceAddress = System.getProperty("azure.cosmos.monitoring.graphite.serviceAddress",
- StringUtils.defaultString(Strings.emptyToNull(
- System.getenv().get("GRAPHITE_SERVICE_ADDRESS")), null));
- return serviceAddress == null ? null : this.graphiteMeterRegistry(serviceAddress);
- }
-
- public String getGraphiteEndpoint() {
- if (graphiteEndpoint == null) {
- return null;
- }
-
- return StringUtils.substringBeforeLast(graphiteEndpoint, ":");
- }
-
- public int getGraphiteEndpointPort() {
- if (graphiteEndpoint == null) {
- return -1;
- }
-
- String portAsString = Strings.emptyToNull(StringUtils.substringAfterLast(graphiteEndpoint, ":"));
- if (portAsString == null) {
- return DEFAULT_GRAPHITE_SERVER_PORT;
- } else {
- return Integer.parseInt(portAsString);
- }
- }
-
- public String getTestVariationName() {
- return this.testVariationName;
- }
-
- public String getBranchName() {
- return this.branchName;
- }
-
- public String getCommitId() {
- return this.commitId;
- }
-
- public int getNumberOfCollectionForCtl(){
- return this.numberOfCollectionForCtl;
- }
-
- public String getReadWriteQueryReadManyPct() {
- return this.readWriteQueryReadManyPct;
- }
-
- public boolean shouldManageDatabase() {
- return this.manageDatabase;
- }
-
- public boolean isSuppressCleanup() {
- return this.suppressCleanup;
- }
-
- public void setSuppressCleanup(boolean suppressCleanup) {
- this.suppressCleanup = suppressCleanup;
+ public boolean isGcBetweenCycles() {
+ return gcBetweenCycles;
}
- public String getTenantsFile() {
- return tenantsFile;
+ public String getWorkloadConfig() {
+ return workloadConfig;
}
public int getCycles() {
@@ -638,321 +49,18 @@ public long getSettleTimeMs() {
return settleTimeMs;
}
- public boolean isGcBetweenCycles() {
- return gcBetweenCycles;
- }
-
-
- public int getBulkloadBatchSize() {
- return this.bulkloadBatchSize;
- }
-
- public String getTestScenario() {
- return this.testScenario;
+ public boolean isSuppressCleanup() {
+ return this.suppressCleanup;
}
- public Environment getEnvironment() {
- return this.environment;
+ public void setSuppressCleanup(boolean suppressCleanup) {
+ this.suppressCleanup = suppressCleanup;
}
public String toString() {
return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
}
- public List getPreferredRegionsList() {
- List preferredRegions = null;
- if (StringUtils.isNotEmpty(preferredRegionsList)) {
- String[] preferredArray = preferredRegionsList.split(",");
- if (preferredArray != null && preferredArray.length > 0) {
- preferredRegions = new ArrayList<>(Arrays.asList(preferredArray));
- }
- }
- return preferredRegions;
- }
-
- public int getEncryptedStringFieldCount() {
- return encryptedStringFieldCount;
- }
-
- public int getEncryptedLongFieldCount() {
- return encryptedLongFieldCount;
- }
-
- public int getEncryptedDoubleFieldCount() {
- return encryptedDoubleFieldCount;
- }
-
- public boolean isEncryptionEnabled() {
- return encryptionEnabled;
- }
-
- public boolean isDefaultLog4jLoggerEnabled() {
- return Boolean.parseBoolean(defaultLog4jLoggerEnabled);
- }
-
- public Integer getTupleSize() {
- return tupleSize;
- }
-
- public Duration getPointOperationThreshold() {
- if (this.pointLatencyThresholdMs < 0) {
- return Duration.ofDays(300);
- }
-
- return Duration.ofMillis(this.pointLatencyThresholdMs);
- }
-
- public Duration getNonPointOperationThreshold() {
- if (this.nonPointLatencyThresholdMs < 0) {
- return Duration.ofDays(300);
- }
-
- return Duration.ofMillis(this.nonPointLatencyThresholdMs);
- }
-
- public boolean isProactiveConnectionManagementEnabled() {
- return Boolean.parseBoolean(isProactiveConnectionManagementEnabled);
- }
-
- public boolean isUseUnWarmedUpContainer() {
- return Boolean.parseBoolean(isUseUnWarmedUpContainer);
- }
-
- public Integer getProactiveConnectionRegionsCount() {
- return proactiveConnectionRegionsCount;
- }
-
- public Duration getAggressiveWarmupDuration() {
- return aggressiveWarmupDuration;
- }
-
- public Integer getMinConnectionPoolSizePerEndpoint() {
- return minConnectionPoolSizePerEndpoint;
- }
-
- public String getResultUploadDatabase() {
- return Strings.emptyToNull(resultUploadDatabase);
- }
-
- public String getResultUploadContainer() {
- return Strings.emptyToNull(resultUploadContainer);
- }
-
- public boolean isRegionScopedSessionContainerEnabled() {
- return Boolean.parseBoolean(isRegionScopedSessionContainerEnabled);
- }
-
- public boolean isPartitionLevelCircuitBreakerEnabled() {
- return Boolean.parseBoolean(isPartitionLevelCircuitBreakerEnabled);
- }
-
- public void tryGetValuesFromSystem() {
- serviceEndpoint = StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("SERVICE_END_POINT")),
- serviceEndpoint);
-
- masterKey = StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("MASTER_KEY")), masterKey);
-
- databaseId = StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("DATABASE_ID")), databaseId);
-
- collectionId = StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("COLLECTION_ID")),
- collectionId);
-
- documentDataFieldSize = Integer.parseInt(
- StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("DOCUMENT_DATA_FIELD_SIZE")),
- Integer.toString(documentDataFieldSize)));
-
- maxConnectionPoolSize = Integer.parseInt(
- StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("MAX_CONNECTION_POOL_SIZE")),
- Integer.toString(maxConnectionPoolSize)));
-
- ConsistencyLevelConverter consistencyLevelConverter = new ConsistencyLevelConverter();
- consistencyLevel = consistencyLevelConverter.convert(StringUtils
- .defaultString(Strings.emptyToNull(System.getenv().get("CONSISTENCY_LEVEL")), consistencyLevel.name()));
-
- Operation.OperationTypeConverter operationTypeConverter = new Operation.OperationTypeConverter();
- operation = operationTypeConverter.convert(
- StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("OPERATION")), operation.name()));
-
- String concurrencyValue = StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("CONCURRENCY")),
- concurrency == null ? null : Integer.toString(concurrency));
- concurrency = concurrencyValue == null ? null : Integer.parseInt(concurrencyValue);
-
- String numberOfOperationsValue = StringUtils.defaultString(
- Strings.emptyToNull(System.getenv().get("NUMBER_OF_OPERATIONS")), Integer.toString(numberOfOperations));
- numberOfOperations = Integer.parseInt(numberOfOperationsValue);
-
- String throughputValue = StringUtils.defaultString(
- Strings.emptyToNull(System.getenv().get("THROUGHPUT")), Integer.toString(throughput));
- throughput = Integer.parseInt(throughputValue);
-
- preferredRegionsList = StringUtils.defaultString(Strings.emptyToNull(System.getenv().get(
- "PREFERRED_REGIONS_LIST")), preferredRegionsList);
-
- encryptedStringFieldCount = Integer.parseInt(
- StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("ENCRYPTED_STRING_FIELD_COUNT")),
- Integer.toString(encryptedStringFieldCount)));
-
- encryptedLongFieldCount = Integer.parseInt(
- StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("ENCRYPTED_LONG_FIELD_COUNT")),
- Integer.toString(encryptedLongFieldCount)));
-
- encryptedDoubleFieldCount = Integer.parseInt(
- StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("ENCRYPTED_DOUBLE_FIELD_COUNT")),
- Integer.toString(encryptedDoubleFieldCount)));
-
- encryptionEnabled = Boolean.parseBoolean(StringUtils.defaultString(Strings.emptyToNull(System.getenv().get(
- "ENCRYPTED_ENABLED")),
- Boolean.toString(encryptionEnabled)));
-
- tupleSize = Integer.parseInt(
- StringUtils.defaultString(Strings.emptyToNull(System.getenv().get("COSMOS_IDENTITY_TUPLE_SIZE")),
- Integer.toString(tupleSize)));
-
- testVariationName = StringUtils.defaultString(Strings.emptyToNull(System.getenv().get(
- "COSMOS_TEST_VARIATION_NAME")), testVariationName);
-
- branchName = StringUtils.defaultString(Strings.emptyToNull(System.getenv().get(
- "COSMOS_BRANCH_NAME")), branchName);
-
- commitId = StringUtils.defaultString(Strings.emptyToNull(System.getenv().get(
- "COSMOS_COMMIT_ID")), commitId);
-
- resultUploadDatabase = StringUtils.defaultString(Strings.emptyToNull(System.getenv().get(
- "COSMOS_RESULT_UPLOAD_DATABASE")), resultUploadDatabase);
-
- resultUploadContainer = StringUtils.defaultString(Strings.emptyToNull(System.getenv().get(
- "COSMOS_RESULT_UPLOAD_CONTAINER")), resultUploadContainer);
- }
-
- private synchronized MeterRegistry azureMonitorMeterRegistry(String connectionString, String instrumentationKey) {
-
- if (this.azureMonitorMeterRegistry == null) {
-
- Duration step = Duration.ofSeconds(Integer.getInteger("azure.cosmos.monitoring.azureMonitor.step", this.printingInterval));
- String testCategoryTag = System.getProperty("azure.cosmos.monitoring.azureMonitor.testCategory");
- boolean enabled = !Boolean.getBoolean("azure.cosmos.monitoring.azureMonitor.disabled");
-
- final AzureMonitorConfig config = new AzureMonitorConfig() {
-
- @Override
- @Nullable
- public String get(@Nullable String key) {
- return null;
- }
-
- @Override
- @Nullable
- public String instrumentationKey() {
- return connectionString != null ? null : instrumentationKey;
- }
-
- @Override
- public String connectionString() { return connectionString; }
-
-
- @Override
- public Duration step() {
- return step;
- }
-
- @Override
- public boolean enabled() {
- return enabled;
- }
- };
-
- String roleName = System.getenv("APPLICATIONINSIGHTS_ROLE_NAME");
- if (roleName != null) {
- TelemetryConfiguration.getActive().setRoleName(roleName);
- }
-
- this.azureMonitorMeterRegistry = new AzureMonitorMeterRegistry(config, Clock.SYSTEM);
- List globalTags = new ArrayList<>();
- if (!Strings.isNullOrEmpty(testCategoryTag)) {
- globalTags.add(Tag.of("TestCategory", testCategoryTag));
- }
-
- String roleInstance = System.getenv("APPLICATIONINSIGHTS_ROLE_INSTANCE");
- if (roleName != null) {
- globalTags.add(Tag.of("cloud_RoleInstance", roleInstance));
- }
-
- this.azureMonitorMeterRegistry.config().commonTags(globalTags);
- }
-
- return this.azureMonitorMeterRegistry;
- }
-
- @SuppressWarnings("UnstableApiUsage")
- private synchronized MeterRegistry graphiteMeterRegistry(String serviceAddress) {
-
- if (this.graphiteMeterRegistry == null) {
-
- HostAndPort address = HostAndPort.fromString(serviceAddress);
-
- String host = address.getHost();
- int port = address.getPortOrDefault(DEFAULT_GRAPHITE_SERVER_PORT);
- boolean enabled = !Boolean.getBoolean("azure.cosmos.monitoring.graphite.disabled");
- Duration step = Duration.ofSeconds(Integer.getInteger("azure.cosmos.monitoring.graphite.step", this.printingInterval));
-
- final GraphiteConfig config = new GraphiteConfig() {
-
- private String[] tagNames = { "source" };
-
- @Override
- @Nullable
- public String get(@Nullable String key) {
- return null;
- }
-
- @Override
- public boolean enabled() {
- return enabled;
- }
-
- @Override
- @Nullable
- public String host() {
- return host;
- }
-
- @Override
- @Nullable
- public int port() {
- return port;
- }
-
- @Override
- @Nullable
- public Duration step() {
- return step;
- }
-
- @Override
- @Nullable
- public String[] tagsAsPrefix() {
- return this.tagNames;
- }
- };
-
- this.graphiteMeterRegistry = new GraphiteMeterRegistry(config, Clock.SYSTEM);
- String source;
-
- try {
- PercentEscaper escaper = new PercentEscaper("_-", false);
- source = escaper.escape(InetAddress.getLocalHost().getHostName());
- } catch (UnknownHostException error) {
- source = "unknown-host";
- }
-
- this.graphiteMeterRegistry.config()
- .namingConvention(NamingConvention.dot)
- .commonTags("source", source);
- }
-
- return this.graphiteMeterRegistry;
- }
-
public static String getAadLoginUri() {
return getOptionalConfigProperty(
"AAD_LOGIN_ENDPOINT",
@@ -968,57 +76,6 @@ public static String getAadTenantId() {
return getOptionalConfigProperty("AAD_TENANT_ID", null, v -> v);
}
- /**
- * Returns the AAD login endpoint for this configuration instance.
- * Falls back to the static/system-property value if not set per-instance.
- */
- public String getInstanceAadLoginEndpoint() {
- return aadLoginEndpoint != null ? aadLoginEndpoint : getAadLoginUri();
- }
-
- /**
- * Returns the AAD managed identity client ID for this configuration instance.
- * Falls back to the static/system-property value if not set per-instance.
- */
- public String getInstanceAadManagedIdentityClientId() {
- return aadManagedIdentityClientId != null ? aadManagedIdentityClientId : getAadManagedIdentityId();
- }
-
- /**
- * Returns the AAD tenant ID for this configuration instance.
- * Falls back to the static/system-property value if not set per-instance.
- */
- public String getInstanceAadTenantId() {
- return aadTenantId != null ? aadTenantId : getAadTenantId();
- }
-
- /**
- * Builds a {@link com.azure.core.credential.TokenCredential} based on this configuration instance's
- * AAD settings. Each call returns a new credential, allowing per-tenant identity in multi-tenant benchmarks.
- *
- * @return a new TokenCredential configured with this instance's AAD login endpoint, tenant ID,
- * and managed identity client ID.
- */
- public com.azure.core.credential.TokenCredential buildTokenCredential() {
- return new com.azure.identity.DefaultAzureCredentialBuilder()
- .managedIdentityClientId(getInstanceAadManagedIdentityClientId())
- .authorityHost(getInstanceAadLoginEndpoint())
- .tenantId(getInstanceAadTenantId())
- .build();
- }
-
- public void setAadLoginEndpoint(String aadLoginEndpoint) {
- this.aadLoginEndpoint = aadLoginEndpoint;
- }
-
- public void setAadTenantId(String aadTenantId) {
- this.aadTenantId = aadTenantId;
- }
-
- public void setAadManagedIdentityClientId(String aadManagedIdentityClientId) {
- this.aadManagedIdentityClientId = aadManagedIdentityClientId;
- }
-
private static T getOptionalConfigProperty(String name, T defaultValue, Function conversion) {
String textValue = getConfigPropertyOrNull(name);
diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Main.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Main.java
index 5748cf6c9c75..fafd97fbc296 100644
--- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Main.java
+++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/Main.java
@@ -3,23 +3,11 @@
package com.azure.cosmos.benchmark;
-import com.azure.cosmos.benchmark.ctl.AsyncCtlWorkload;
-import com.azure.cosmos.benchmark.encryption.AsyncEncryptionBenchmark;
-import com.azure.cosmos.benchmark.encryption.AsyncEncryptionQueryBenchmark;
-import com.azure.cosmos.benchmark.encryption.AsyncEncryptionQuerySinglePartitionMultiple;
-import com.azure.cosmos.benchmark.encryption.AsyncEncryptionReadBenchmark;
-import com.azure.cosmos.benchmark.encryption.AsyncEncryptionWriteBenchmark;
-import com.azure.cosmos.benchmark.linkedin.LICtlWorkload;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.ParameterException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Optional;
-
-import static com.azure.cosmos.benchmark.Operation.CtlWorkload;
-import static com.azure.cosmos.benchmark.Operation.LinkedInCtlWorkload;
-
public class Main {
private final static Logger LOGGER = LoggerFactory.getLogger(Main.class);
@@ -28,7 +16,6 @@ public static void main(String[] args) throws Exception {
try {
LOGGER.debug("Parsing the arguments ...");
Configuration cfg = new Configuration();
- cfg.tryGetValuesFromSystem();
JCommander jcommander = new JCommander(cfg, args);
if (cfg.isHelp()) {
@@ -36,21 +23,22 @@ public static void main(String[] args) throws Exception {
return;
}
- validateConfiguration(cfg);
+ // Configuration holds only CLI lifecycle params (cycles, settleTimeMs, etc.).
+ // BenchmarkConfig consumes them and loads all workload config from the JSON file.
+ // BenchmarkOrchestrator handles dispatch for all benchmark types (async, sync,
+ // CTL, encryption, LinkedIn) based on operationType and flags in TenantWorkloadConfig.
+ BenchmarkConfig benchConfig = BenchmarkConfig.fromConfiguration(cfg);
- if (cfg.isSync()) {
- syncBenchmark(cfg);
- } else {
- if (cfg.getOperationType().equals(CtlWorkload)) {
- asyncCtlWorkload(cfg);
- } else if (cfg.getOperationType().equals(LinkedInCtlWorkload)) {
- linkedInCtlWorkload(cfg);
- } else if (cfg.isEncryptionEnabled()) {
- asyncEncryptionBenchmark(cfg);
- } else {
- asyncBenchmark(cfg);
- }
+ if (benchConfig.getTenantWorkloads().isEmpty()) {
+ throw new IllegalArgumentException(
+ "No tenants defined in workload config. The 'tenants' array must contain at least one entry.");
}
+
+ for (TenantWorkloadConfig tenant : benchConfig.getTenantWorkloads()) {
+ validateConfiguration(tenant);
+ }
+
+ new BenchmarkOrchestrator().run(benchConfig);
} catch (ParameterException e) {
System.err.println("INVALID Usage: " + e.getMessage());
System.err.println("Try '-help' for more information.");
@@ -58,142 +46,26 @@ public static void main(String[] args) throws Exception {
}
}
- private static void validateConfiguration(Configuration cfg) {
- switch (cfg.getOperationType()) {
+ private static void validateConfiguration(TenantWorkloadConfig workloadCfg) {
+ switch (workloadCfg.getOperationType()) {
case WriteLatency:
case WriteThroughput:
break;
default:
- if (!cfg.isContentResponseOnWriteEnabled()) {
+ if (!workloadCfg.isContentResponseOnWriteEnabled()) {
throw new IllegalArgumentException("contentResponseOnWriteEnabled parameter can only be set to false " +
"for write latency and write throughput operations");
}
}
- switch (cfg.getOperationType()) {
+ switch (workloadCfg.getOperationType()) {
case ReadLatency:
case ReadThroughput:
break;
default:
- if (cfg.getSparsityWaitTime() != null) {
- throw new IllegalArgumentException("sparsityWaitTime is not supported for " + cfg.getOperationType());
+ if (workloadCfg.getSparsityWaitTime() != null) {
+ throw new IllegalArgumentException("sparsityWaitTime is not supported for " + workloadCfg.getOperationType());
}
}
}
-
- private static void syncBenchmark(Configuration cfg) throws Exception {
- LOGGER.info("Sync benchmark ...");
- SyncBenchmark> benchmark = null;
- try {
- switch (cfg.getOperationType()) {
- case ReadThroughput:
- case ReadLatency:
- benchmark = new SyncReadBenchmark(cfg);
- break;
-
- case WriteLatency:
- case WriteThroughput:
- benchmark = new SyncWriteBenchmark(cfg);
- break;
-
- default:
- throw new RuntimeException(cfg.getOperationType() + " is not supported");
- }
-
- LOGGER.info("Starting {}", cfg.getOperationType());
- benchmark.run();
- } finally {
- if (benchmark != null) {
- benchmark.shutdown();
- }
- }
- }
-
- /**
- * Async benchmark path: builds BenchmarkConfig from CLI args and delegates to BenchmarkOrchestrator.
- * Handles both single-tenant (CLI args) and multi-tenant (tenants.json) modes.
- */
- private static void asyncBenchmark(Configuration cfg) throws Exception {
- BenchmarkConfig benchConfig = BenchmarkConfig.fromConfiguration(cfg);
- LOGGER.info("Async benchmark via BenchmarkOrchestrator ({} tenants, {} cycles)...",
- benchConfig.getTenantWorkloads().size(), benchConfig.getCycles());
- new BenchmarkOrchestrator().run(benchConfig);
- }
-
- private static void asyncEncryptionBenchmark(Configuration cfg) throws Exception {
- LOGGER.info("Async encryption benchmark ...");
- AsyncEncryptionBenchmark> benchmark = null;
- try {
- switch (cfg.getOperationType()) {
- case WriteThroughput:
- case WriteLatency:
- benchmark = new AsyncEncryptionWriteBenchmark(cfg);
- break;
-
- case ReadThroughput:
- case ReadLatency:
- benchmark = new AsyncEncryptionReadBenchmark(cfg);
- break;
-
- case QueryCross:
- case QuerySingle:
- case QueryParallel:
- case QueryOrderby:
- case QueryTopOrderby:
- case QueryInClauseParallel:
- benchmark = new AsyncEncryptionQueryBenchmark(cfg);
- break;
-
- case QuerySingleMany:
- benchmark = new AsyncEncryptionQuerySinglePartitionMultiple(cfg);
- break;
-
- default:
- throw new RuntimeException(cfg.getOperationType() + " is not supported");
- }
-
- LOGGER.info("Starting {}", cfg.getOperationType());
- benchmark.run();
- } finally {
- if (benchmark != null) {
- benchmark.shutdown();
- }
- }
- }
-
- private static void asyncCtlWorkload(Configuration cfg) throws Exception {
- LOGGER.info("Async ctl workload");
- AsyncCtlWorkload benchmark = null;
- try {
- benchmark = new AsyncCtlWorkload(cfg);
- LOGGER.info("Starting {}", cfg.getOperationType());
- benchmark.run();
- } finally {
- if (benchmark != null) {
- benchmark.shutdown();
- }
- }
- }
-
- private static void linkedInCtlWorkload(Configuration cfg) {
- LOGGER.info("Executing the LinkedIn ctl workload");
- LICtlWorkload workload = null;
- try {
- workload = new LICtlWorkload(cfg);
-
- LOGGER.info("Setting up the LinkedIn ctl workload");
- workload.setup();
-
- LOGGER.info("Starting the LinkedIn ctl workload");
- workload.run();
- } catch (Exception e) {
- LOGGER.error("Exception received while executing the LinkedIn ctl workload", e);
- throw e;
- }
- finally {
- Optional.ofNullable(workload)
- .ifPresent(LICtlWorkload::shutdown);
- }
- LOGGER.info("Completed LinkedIn ctl workload execution");
- }
}
diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java
index 49feffb637b2..8a67caa1c6df 100644
--- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java
+++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ReadMyWriteWorkflow.java
@@ -501,7 +501,7 @@ protected String getDocumentLink(Document doc) {
}
@Override
- void shutdown() {
+ public void shutdown() {
if (this.client != null) {
this.client.close();
}
diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ScheduledReporterFactory.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ScheduledReporterFactory.java
index 132709c615b7..09b9a8864145 100644
--- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ScheduledReporterFactory.java
+++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ScheduledReporterFactory.java
@@ -5,14 +5,9 @@
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.CsvReporter;
-import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.ScheduledReporter;
-import com.codahale.metrics.graphite.Graphite;
-import com.codahale.metrics.graphite.GraphiteReporter;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
+import java.io.File;
import java.util.concurrent.TimeUnit;
@@ -22,38 +17,17 @@ private ScheduledReporterFactory() {
}
/**
- * @param configuration CTL workload parameters
+ * @param benchConfig benchmark-level configuration (reporting directory)
* @param metricsRegistry MetricRegistry instance for tracking various execution metrics
* @return ScheduledReporter for reporting the captured metrics
*/
- public static ScheduledReporter create(final Configuration configuration,
+ public static ScheduledReporter create(final BenchmarkConfig benchConfig,
final MetricRegistry metricsRegistry) {
- if (configuration.getGraphiteEndpoint() != null) {
- final Graphite graphite = new Graphite(new InetSocketAddress(
- configuration.getGraphiteEndpoint(),
- configuration.getGraphiteEndpointPort()));
-
- String graphiteReporterPrefix = configuration.getOperationType().name();
- if (configuration.isAccountNameInGraphiteReporter()) {
- try {
- URI uri = new URI(configuration.getServiceEndpoint());
- graphiteReporterPrefix = graphiteReporterPrefix + "-" + uri.getHost().substring(0, uri.getHost().indexOf("."));
- } catch (URISyntaxException e) {
- // do nothing, graphiteReporterPrefix will be configuration.getOperationType().name()
- }
- }
-
- return GraphiteReporter.forRegistry(metricsRegistry)
- .prefixedWith(graphiteReporterPrefix)
- .convertDurationsTo(TimeUnit.MILLISECONDS)
- .convertRatesTo(TimeUnit.SECONDS)
- .filter(MetricFilter.ALL)
- .build(graphite);
- } else if (configuration.getReportingDirectory() != null) {
+ if (benchConfig.getReportingDirectory() != null) {
return CsvReporter.forRegistry(metricsRegistry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
- .build(configuration.getReportingDirectory());
+ .build(new File(benchConfig.getReportingDirectory()));
} else {
return ConsoleReporter.forRegistry(metricsRegistry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java
index 422125b52226..c7f6eb7d13aa 100644
--- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java
+++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncBenchmark.java
@@ -4,7 +4,6 @@
package com.azure.cosmos.benchmark;
import com.azure.core.credential.TokenCredential;
-import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConnectionMode;
import com.azure.cosmos.CosmosClient;
import com.azure.cosmos.CosmosClientBuilder;
@@ -20,27 +19,15 @@
import com.azure.cosmos.models.CosmosClientTelemetryConfig;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.ThroughputProperties;
-import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Meter;
-import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.Timer;
-import com.codahale.metrics.graphite.Graphite;
-import com.codahale.metrics.graphite.GraphiteReporter;
-import com.codahale.metrics.jvm.CachedThreadStatesGaugeSet;
-import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-import io.micrometer.core.instrument.MeterRegistry;
import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.commons.lang3.StringUtils;
import org.mpierce.metrics.reservoir.hdrhistogram.HdrHistogramResetOnSnapshotReservoir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -53,15 +40,12 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;
-abstract class SyncBenchmark {
+abstract class SyncBenchmark implements Benchmark {
private static final ImplementationBridgeHelpers.CosmosClientBuilderHelper.CosmosClientBuilderAccessor clientBuilderAccessor
= ImplementationBridgeHelpers.CosmosClientBuilderHelper.getCosmosClientBuilderAccessor();
- private final MetricRegistry metricsRegistry = new MetricRegistry();
- private final ScheduledReporter reporter;
-
- private final ScheduledReporter resultReporter;
+ private final MetricRegistry metricsRegistry;
private final ExecutorService executorService;
private Meter successMeter;
@@ -71,26 +55,15 @@ abstract class SyncBenchmark {
final Logger logger;
final CosmosClient benchmarkWorkloadClient;
- final CosmosClient resultUploaderClient;
CosmosContainer cosmosContainer;
CosmosDatabase cosmosDatabase;
final String partitionKey;
- final Configuration configuration;
+ final TenantWorkloadConfig workloadConfig;
final List docsToRead;
final Semaphore concurrencyControlSemaphore;
Timer latency;
- private static final List CONFIGURED_HIGH_AVAILABILITY_SYSTEM_PROPERTIES = Arrays.asList(
- "COSMOS.IS_PER_PARTITION_AUTOMATIC_FAILOVER_ENABLED",
- "COSMOS.IS_SESSION_TOKEN_FALSE_PROGRESS_MERGE_ENABLED",
- "COSMOS.E2E_TIMEOUT_ERROR_HIT_THRESHOLD_FOR_PPAF",
- "COSMOS.E2E_TIMEOUT_ERROR_HIT_TIME_WINDOW_IN_SECONDS_FOR_PPAF",
- "COSMOS.STALE_PARTITION_UNAVAILABILITY_REFRESH_INTERVAL_IN_SECONDS",
- "COSMOS.ALLOWED_PARTITION_UNAVAILABILITY_DURATION_IN_SECONDS",
- "COSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_CONFIG" // Implicitly set when COSMOS.IS_PER_PARTITION_AUTOMATIC_FAILOVER_ENABLED is set to true
- );
-
static abstract class ResultHandler implements BiFunction {
ResultHandler() {
}
@@ -123,84 +96,58 @@ public T apply(T o, Throwable throwable) {
}
}
- SyncBenchmark(Configuration cfg) throws Exception {
- executorService = Executors.newFixedThreadPool(cfg.getConcurrency());
- configuration = cfg;
+ SyncBenchmark(TenantWorkloadConfig workloadCfg, MetricRegistry sharedRegistry) throws Exception {
+ executorService = Executors.newFixedThreadPool(workloadCfg.getConcurrency());
+ workloadConfig = workloadCfg;
+ metricsRegistry = sharedRegistry;
logger = LoggerFactory.getLogger(this.getClass());
- if (configuration.isPartitionLevelCircuitBreakerEnabled()) {
- System.setProperty(
- "COSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_CONFIG",
- "{\"isPartitionLevelCircuitBreakerEnabled\": true, "
- + "\"circuitBreakerType\": \"CONSECUTIVE_EXCEPTION_COUNT_BASED\","
- + "\"consecutiveExceptionCountToleratedForReads\": 10,"
- + "\"consecutiveExceptionCountToleratedForWrites\": 5,"
- + "}");
-
- System.setProperty("COSMOS.STALE_PARTITION_UNAVAILABILITY_REFRESH_INTERVAL_IN_SECONDS", "60");
- System.setProperty("COSMOS.ALLOWED_PARTITION_UNAVAILABILITY_DURATION_IN_SECONDS", "30");
- }
-
- if (configuration.isPerPartitionAutomaticFailoverRequired()) {
- System.setProperty(
- "COSMOS.IS_PER_PARTITION_AUTOMATIC_FAILOVER_ENABLED", "true");
- System.setProperty("COSMOS.IS_SESSION_TOKEN_FALSE_PROGRESS_MERGE_ENABLED", "true");
- System.setProperty("COSMOS.E2E_TIMEOUT_ERROR_HIT_THRESHOLD_FOR_PPAF", "5");
- System.setProperty("COSMOS.E2E_TIMEOUT_ERROR_HIT_TIME_WINDOW_IN_SECONDS_FOR_PPAF", "120");
- }
-
- boolean isManagedIdentityRequired = configuration.isManagedIdentityRequired();
+ boolean isManagedIdentityRequired = workloadCfg.isManagedIdentityRequired();
final TokenCredential credential = isManagedIdentityRequired
- ? cfg.buildTokenCredential()
+ ? workloadCfg.buildTokenCredential()
: null;
CosmosClientBuilder benchmarkSpecificClientBuilder = isManagedIdentityRequired ?
new CosmosClientBuilder()
.credential(credential) :
new CosmosClientBuilder()
- .key(cfg.getMasterKey());
-
- CosmosClientBuilder resultUploadClientBuilder = new CosmosClientBuilder();
+ .key(workloadCfg.getMasterKey());
- benchmarkSpecificClientBuilder.preferredRegions(cfg.getPreferredRegionsList())
- .endpoint(cfg.getServiceEndpoint())
- .userAgentSuffix(configuration.getApplicationName())
- .consistencyLevel(cfg.getConsistencyLevel())
- .contentResponseOnWriteEnabled(cfg.isContentResponseOnWriteEnabled());
+ benchmarkSpecificClientBuilder.preferredRegions(workloadCfg.getPreferredRegionsList())
+ .endpoint(workloadCfg.getServiceEndpoint())
+ .userAgentSuffix(workloadCfg.getApplicationName())
+ .consistencyLevel(workloadCfg.getConsistencyLevel())
+ .contentResponseOnWriteEnabled(workloadCfg.isContentResponseOnWriteEnabled());
clientBuilderAccessor
- .setRegionScopedSessionCapturingEnabled(benchmarkSpecificClientBuilder, cfg.isRegionScopedSessionContainerEnabled());
+ .setRegionScopedSessionCapturingEnabled(benchmarkSpecificClientBuilder, workloadCfg.isRegionScopedSessionContainerEnabled());
- if (cfg.getConnectionMode().equals(ConnectionMode.DIRECT)) {
+ if (workloadCfg.getConnectionMode().equals(ConnectionMode.DIRECT)) {
benchmarkSpecificClientBuilder = benchmarkSpecificClientBuilder.directMode(DirectConnectionConfig.getDefaultConfig());
} else {
GatewayConnectionConfig gatewayConnectionConfig = new GatewayConnectionConfig();
- gatewayConnectionConfig.setMaxConnectionPoolSize(cfg.getMaxConnectionPoolSize());
+ gatewayConnectionConfig.setMaxConnectionPoolSize(workloadCfg.getMaxConnectionPoolSize());
benchmarkSpecificClientBuilder = benchmarkSpecificClientBuilder.gatewayMode(gatewayConnectionConfig);
}
CosmosClientTelemetryConfig telemetryConfig = new CosmosClientTelemetryConfig()
.diagnosticsThresholds(
new CosmosDiagnosticsThresholds()
- .setPointOperationLatencyThreshold(cfg.getPointOperationThreshold())
- .setNonPointOperationLatencyThreshold(cfg.getNonPointOperationThreshold())
+ .setPointOperationLatencyThreshold(workloadCfg.getPointOperationThreshold())
+ .setNonPointOperationLatencyThreshold(workloadCfg.getNonPointOperationThreshold())
);
- if (configuration.isDefaultLog4jLoggerEnabled()) {
+ if (workloadCfg.isDefaultLog4jLoggerEnabled()) {
telemetryConfig.diagnosticsHandler(CosmosDiagnosticsHandler.DEFAULT_LOGGING_HANDLER);
}
benchmarkWorkloadClient = benchmarkSpecificClientBuilder.buildClient();
- this.resultUploaderClient = resultUploadClientBuilder
- .endpoint(StringUtils.isNotEmpty(configuration.getServiceEndpointForRunResultsUploadAccount()) ? configuration.getServiceEndpointForRunResultsUploadAccount() : configuration.getServiceEndpoint())
- .key(StringUtils.isNotEmpty(configuration.getMasterKeyForRunResultsUploadAccount()) ? configuration.getMasterKeyForRunResultsUploadAccount() : configuration.getMasterKey())
- .buildClient();
try {
- cosmosDatabase = benchmarkWorkloadClient.getDatabase(this.configuration.getDatabaseId());
+ cosmosDatabase = benchmarkWorkloadClient.getDatabase(workloadCfg.getDatabaseId());
cosmosDatabase.read();
- logger.info("Database {} is created for this test", this.configuration.getDatabaseId());
+ logger.info("Database {} is created for this test", workloadCfg.getDatabaseId());
} catch (CosmosException e) {
if (e.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
@@ -209,8 +156,8 @@ public T apply(T o, Throwable throwable) {
"either pre-create a database and a container or use the management SDK.");
}
- benchmarkWorkloadClient.createDatabase(cfg.getDatabaseId());
- cosmosDatabase = benchmarkWorkloadClient.getDatabase(cfg.getDatabaseId());
+ benchmarkWorkloadClient.createDatabase(workloadCfg.getDatabaseId());
+ cosmosDatabase = benchmarkWorkloadClient.getDatabase(workloadCfg.getDatabaseId());
databaseCreated = true;
} else {
throw e;
@@ -218,7 +165,7 @@ public T apply(T o, Throwable throwable) {
}
try {
- cosmosContainer = cosmosDatabase.getContainer(this.configuration.getCollectionId());
+ cosmosContainer = cosmosDatabase.getContainer(workloadCfg.getContainerId());
cosmosContainer.read();
} catch (CosmosException e) {
if (e.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) {
@@ -228,11 +175,11 @@ public T apply(T o, Throwable throwable) {
"either pre-create a database and a container or use the management SDK.");
}
- cosmosDatabase.createContainer(this.configuration.getCollectionId(),
- Configuration.DEFAULT_PARTITION_KEY_PATH,
- ThroughputProperties.createManualThroughput(this.configuration.getThroughput()));
- cosmosContainer = cosmosDatabase.getContainer(this.configuration.getCollectionId());
- logger.info("Collection {} is created for this test", this.configuration.getCollectionId());
+ cosmosDatabase.createContainer(workloadCfg.getContainerId(),
+ TenantWorkloadConfig.DEFAULT_PARTITION_KEY_PATH,
+ ThroughputProperties.createManualThroughput(workloadCfg.getThroughput()));
+ cosmosContainer = cosmosDatabase.getContainer(workloadCfg.getContainerId());
+ logger.info("Collection {} is created for this test", workloadCfg.getContainerId());
// add some delay to allow container to be created across multiple regions
// container creation across regions is an async operation
@@ -252,20 +199,20 @@ public T apply(T o, Throwable throwable) {
partitionKey = cosmosContainer.read().getProperties().getPartitionKeyDefinition()
.getPaths().iterator().next().split("/")[1];
- concurrencyControlSemaphore = new Semaphore(cfg.getConcurrency());
+ concurrencyControlSemaphore = new Semaphore(workloadCfg.getConcurrency());
ArrayList> createDocumentFutureList = new ArrayList<>();
- if (configuration.getOperationType() != Operation.WriteLatency
- && configuration.getOperationType() != Operation.WriteThroughput
- && configuration.getOperationType() != Operation.ReadMyWrites) {
- String dataFieldValue = RandomStringUtils.randomAlphabetic(cfg.getDocumentDataFieldSize());
- for (int i = 0; i < cfg.getNumberOfPreCreatedDocuments(); i++) {
+ if (workloadCfg.getOperationType() != Operation.WriteLatency
+ && workloadCfg.getOperationType() != Operation.WriteThroughput
+ && workloadCfg.getOperationType() != Operation.ReadMyWrites) {
+ String dataFieldValue = RandomStringUtils.randomAlphabetic(workloadCfg.getDocumentDataFieldSize());
+ for (int i = 0; i < workloadCfg.getNumberOfPreCreatedDocuments(); i++) {
String uuid = UUID.randomUUID().toString();
PojoizedJson newDoc = BenchmarkHelper.generateDocument(uuid,
dataFieldValue,
partitionKey,
- configuration.getDocumentDataFieldCount());
+ workloadCfg.getDocumentDataFieldCount());
CompletableFuture futureResult = CompletableFuture.supplyAsync(() -> {
try {
@@ -284,77 +231,22 @@ public T apply(T o, Throwable throwable) {
docsToRead = createDocumentFutureList.stream().map(future -> getOrThrow(future)).collect(Collectors.toList());
init();
-
- if (configuration.isEnableJvmStats()) {
- metricsRegistry.register("gc", new GarbageCollectorMetricSet());
- metricsRegistry.register("threads", new CachedThreadStatesGaugeSet(10, TimeUnit.SECONDS));
- metricsRegistry.register("memory", new MemoryUsageGaugeSet());
- }
-
- if (configuration.getGraphiteEndpoint() != null) {
- final Graphite graphite = new Graphite(new InetSocketAddress(configuration.getGraphiteEndpoint(), configuration.getGraphiteEndpointPort()));
- reporter = GraphiteReporter.forRegistry(metricsRegistry)
- .prefixedWith(configuration.getOperationType().name())
- .convertRatesTo(TimeUnit.SECONDS)
- .convertDurationsTo(TimeUnit.MILLISECONDS)
- .filter(MetricFilter.ALL)
- .build(graphite);
- } else {
- reporter = ConsoleReporter.forRegistry(metricsRegistry).convertRatesTo(TimeUnit.SECONDS)
- .convertDurationsTo(TimeUnit.MILLISECONDS).build();
- }
-
- if (configuration.getResultUploadDatabase() != null && configuration.getResultUploadContainer() != null) {
- String op = configuration.isSync()
- ? "SYNC_" + configuration.getOperationType().name()
- : configuration.getOperationType().name();
- resultReporter = CosmosTotalResultReporter
- .forRegistry(
- metricsRegistry,
- this.resultUploaderClient.getDatabase(configuration.getResultUploadDatabase()).getContainer(configuration.getResultUploadContainer()),
- op,
- configuration.getTestVariationName(),
- configuration.getBranchName(),
- configuration.getCommitId(),
- configuration.getConcurrency())
- .convertRatesTo(TimeUnit.SECONDS)
- .convertDurationsTo(TimeUnit.MILLISECONDS).build();
- } else {
- resultReporter = null;
- }
-
-
- MeterRegistry registry = configuration.getAzureMonitorMeterRegistry();
-
- if (registry != null) {
- BridgeInternal.monitorTelemetry(registry);
- }
-
- registry = configuration.getGraphiteMeterRegistry();
-
- if (registry != null) {
- BridgeInternal.monitorTelemetry(registry);
- }
}
protected void init() {
}
- void shutdown() {
-
- for (String key : CONFIGURED_HIGH_AVAILABILITY_SYSTEM_PROPERTIES) {
- System.clearProperty(key);
- }
-
- if (this.databaseCreated) {
+ public void shutdown() {
+ if (workloadConfig.isSuppressCleanup()) {
+ logger.info("Skipping cleanup of database/container (suppressCleanup=true)");
+ } else if (this.databaseCreated) {
cosmosDatabase.delete();
- logger.info("Deleted temporary database {} created for this test", this.configuration.getDatabaseId());
+ logger.info("Deleted temporary database {} created for this test", workloadConfig.getDatabaseId());
} else if (this.collectionCreated) {
cosmosContainer.delete();
- logger.info("Deleted temporary collection {} created for this test", this.configuration.getCollectionId());
+ logger.info("Deleted temporary collection {} created for this test", workloadConfig.getContainerId());
}
- resultUploaderClient.close();
benchmarkWorkloadClient.close();
executorService.shutdown();
}
@@ -367,12 +259,12 @@ protected void onError(Throwable throwable) {
protected abstract T performWorkload(long i) throws Exception;
- void run() throws Exception {
+ public void run() throws Exception {
- successMeter = metricsRegistry.meter(Configuration.SUCCESS_COUNTER_METER_NAME);
- failureMeter = metricsRegistry.meter(Configuration.FAILURE_COUNTER_METER_NAME);
+ successMeter = metricsRegistry.meter(TenantWorkloadConfig.SUCCESS_COUNTER_METER_NAME);
+ failureMeter = metricsRegistry.meter(TenantWorkloadConfig.FAILURE_COUNTER_METER_NAME);
- switch (configuration.getOperationType()) {
+ switch (workloadConfig.getOperationType()) {
case ReadLatency:
case WriteLatency:
// TODO: support for other operationTypes will be added later
@@ -386,22 +278,18 @@ void run() throws Exception {
// case QueryAggregateTopOrderby:
// case QueryTopOrderby:
case Mixed:
- latency = metricsRegistry.register(Configuration.LATENCY_METER_NAME, new Timer(new HdrHistogramResetOnSnapshotReservoir()));
+ latency = metricsRegistry.register(TenantWorkloadConfig.LATENCY_METER_NAME, new Timer(new HdrHistogramResetOnSnapshotReservoir()));
break;
default:
break;
}
- reporter.start(configuration.getPrintingInterval(), TimeUnit.SECONDS);
- if (resultReporter != null) {
- resultReporter.start(configuration.getPrintingInterval(), TimeUnit.SECONDS);
- }
long startTime = System.currentTimeMillis();
AtomicLong count = new AtomicLong(0);
long i;
- for ( i = 0; BenchmarkHelper.shouldContinue(startTime, i, configuration); i++) {
+ for ( i = 0; BenchmarkHelper.shouldContinue(startTime, i, workloadConfig); i++) {
ResultHandler resultHandler = new ResultHandler() {
@Override
@@ -437,7 +325,7 @@ public T apply(T t, Throwable throwable) {
concurrencyControlSemaphore.acquire();
final long cnt = i;
- switch (configuration.getOperationType()) {
+ switch (workloadConfig.getOperationType()) {
case ReadLatency:
case WriteLatency:
// TODO: support for other operation types will be added later
@@ -482,15 +370,7 @@ public T apply(T t, Throwable throwable) {
long endTime = System.currentTimeMillis();
logger.info("[{}] operations performed in [{}] seconds.",
- configuration.getNumberOfOperations(), (int) ((endTime - startTime) / 1000));
-
- reporter.report();
- reporter.close();
-
- if (resultReporter != null) {
- resultReporter.report();
- resultReporter.close();
- }
+ workloadConfig.getNumberOfOperations(), (int) ((endTime - startTime) / 1000));
}
RuntimeException propagate(Exception e) {
diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncReadBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncReadBenchmark.java
index 21ee8c8259d9..52a3a6819a2d 100644
--- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncReadBenchmark.java
+++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncReadBenchmark.java
@@ -8,10 +8,12 @@
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.PartitionKey;
+import com.codahale.metrics.MetricRegistry;
+
class SyncReadBenchmark extends SyncBenchmark {
- SyncReadBenchmark(Configuration cfg) throws Exception {
- super(cfg);
+ SyncReadBenchmark(TenantWorkloadConfig workloadCfg, MetricRegistry sharedRegistry) throws Exception {
+ super(workloadCfg, sharedRegistry);
}
@Override
diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncWriteBenchmark.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncWriteBenchmark.java
index 700a00505bd6..d90a40200eed 100644
--- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncWriteBenchmark.java
+++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/SyncWriteBenchmark.java
@@ -7,6 +7,8 @@
import com.azure.cosmos.models.PartitionKey;
import org.apache.commons.lang3.RandomStringUtils;
+import com.codahale.metrics.MetricRegistry;
+
import java.util.UUID;
class SyncWriteBenchmark extends SyncBenchmark {
@@ -14,31 +16,31 @@ class SyncWriteBenchmark extends SyncBenchmark {
private final String dataFieldValue;
private final String uuid;
- SyncWriteBenchmark(Configuration cfg) throws Exception {
- super(cfg);
+ SyncWriteBenchmark(TenantWorkloadConfig workloadCfg, MetricRegistry sharedRegistry) throws Exception {
+ super(workloadCfg, sharedRegistry);
uuid = UUID.randomUUID().toString();
dataFieldValue =
- RandomStringUtils.randomAlphabetic(configuration.getDocumentDataFieldSize());
+ RandomStringUtils.randomAlphabetic(workloadConfig.getDocumentDataFieldSize());
}
@Override
protected CosmosItemResponse performWorkload(long i) throws Exception {
String id = uuid + i;
CosmosItemResponse response;
- if (configuration.isDisablePassingPartitionKeyAsOptionOnWrite()) {
+ if (workloadConfig.isDisablePassingPartitionKeyAsOptionOnWrite()) {
// require parsing partition key from the doc
return cosmosContainer.createItem(BenchmarkHelper.generateDocument(id,
dataFieldValue,
partitionKey,
- configuration.getDocumentDataFieldCount()));
+ workloadConfig.getDocumentDataFieldCount()));
}
// more optimized for write as partition key is already passed as config
return cosmosContainer.createItem(BenchmarkHelper.generateDocument(id,
dataFieldValue,
partitionKey,
- configuration.getDocumentDataFieldCount()),
+ workloadConfig.getDocumentDataFieldCount()),
new PartitionKey(id),
null);
}
diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java
index 9df4ce80d658..6140df01d463 100644
--- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java
+++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/TenantWorkloadConfig.java
@@ -29,7 +29,7 @@
* Fully-resolved configuration for a single tenant workload.
* Contains account connection info, AAD auth, workload params, and connection
* settings. Each instance is the effective config after merging
- * globalDefaults with per-tenant overrides at parse time.
+ * tenantDefaults with per-tenant overrides at parse time.
*
*
This is the single config object passed to {@link AsyncBenchmark} --
* no intermediate Configuration conversion needed.
@@ -37,6 +37,14 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class TenantWorkloadConfig {
+ /**
+ * Benchmark execution environment.
+ */
+ public enum Environment {
+ Daily, // CTL environment where we run the workload for a fixed number of hours
+ Staging // CTL environment where the workload runs as a long running job
+ }
+
private static final Logger logger = LoggerFactory.getLogger(TenantWorkloadConfig.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -137,6 +145,9 @@ public class TenantWorkloadConfig {
@JsonProperty("maxRunningTimeDuration")
private String maxRunningTimeDuration;
+ @JsonProperty("diagnosticsThresholdDuration")
+ private String diagnosticsThresholdDuration;
+
@JsonProperty("sparsityWaitTime")
private String sparsityWaitTime;
@@ -146,6 +157,36 @@ public class TenantWorkloadConfig {
@JsonProperty("isUseUnWarmedUpContainer")
private Boolean isUseUnWarmedUpContainer;
+ @JsonProperty("numberOfCollectionForCtl")
+ private Integer numberOfCollectionForCtl;
+
+ @JsonProperty("readWriteQueryReadManyPct")
+ private String readWriteQueryReadManyPct;
+
+ @JsonProperty("encryptedStringFieldCount")
+ private Integer encryptedStringFieldCount;
+
+ @JsonProperty("encryptedLongFieldCount")
+ private Integer encryptedLongFieldCount;
+
+ @JsonProperty("encryptedDoubleFieldCount")
+ private Integer encryptedDoubleFieldCount;
+
+ @JsonProperty("encryptionEnabled")
+ private Boolean encryptionEnabled;
+
+ @JsonProperty("bulkloadBatchSize")
+ private Integer bulkloadBatchSize;
+
+ @JsonProperty("testScenario")
+ private String testScenario;
+
+ @JsonProperty("environment")
+ private String environment;
+
+ @JsonProperty("useSync")
+ private Boolean useSync;
+
@JsonProperty("proactiveConnectionRegionsCount")
private Integer proactiveConnectionRegionsCount;
@@ -241,6 +282,11 @@ public boolean isDefaultLog4jLoggerEnabled() {
return isDefaultLog4jLoggerEnabled != null && isDefaultLog4jLoggerEnabled;
}
+ public Duration getDiagnosticsThresholdDuration() {
+ if (diagnosticsThresholdDuration == null) return Duration.ofSeconds(60);
+ return Duration.parse(diagnosticsThresholdDuration);
+ }
+
public Duration getMaxRunningTimeDuration() {
if (maxRunningTimeDuration == null) return null;
return Duration.parse(maxRunningTimeDuration);
@@ -268,6 +314,20 @@ public Duration getAggressiveWarmupDuration() {
return Duration.parse(aggressiveWarmupDuration);
}
+ public int getNumberOfCollectionForCtl() { return numberOfCollectionForCtl != null ? numberOfCollectionForCtl : 4; }
+ public String getReadWriteQueryReadManyPct() { return readWriteQueryReadManyPct != null ? readWriteQueryReadManyPct : "90,8,1,1"; }
+ public int getEncryptedStringFieldCount() { return encryptedStringFieldCount != null ? encryptedStringFieldCount : 1; }
+ public int getEncryptedLongFieldCount() { return encryptedLongFieldCount != null ? encryptedLongFieldCount : 0; }
+ public int getEncryptedDoubleFieldCount() { return encryptedDoubleFieldCount != null ? encryptedDoubleFieldCount : 0; }
+ public boolean isEncryptionEnabled() { return encryptionEnabled != null && encryptionEnabled; }
+ public int getBulkloadBatchSize() { return bulkloadBatchSize != null ? bulkloadBatchSize : 200000; }
+ public String getTestScenario() { return testScenario != null ? testScenario : "GET"; }
+ public Environment getEnvironment() {
+ if (environment == null) return Environment.Daily;
+ return Environment.valueOf(environment);
+ }
+ public boolean isSync() { return useSync != null && useSync; }
+
public ConnectionMode getConnectionMode() {
if (connectionMode == null) return ConnectionMode.DIRECT;
return ConnectionMode.valueOf(connectionMode.toUpperCase());
@@ -412,6 +472,8 @@ private void applyField(String key, String value, boolean overwrite) {
if (overwrite || isDefaultLog4jLoggerEnabled == null) isDefaultLog4jLoggerEnabled = Boolean.parseBoolean(value); break;
case "maxRunningTimeDuration":
if (overwrite || maxRunningTimeDuration == null) maxRunningTimeDuration = value; break;
+ case "diagnosticsThresholdDuration":
+ if (overwrite || diagnosticsThresholdDuration == null) diagnosticsThresholdDuration = value; break;
case "sparsityWaitTime":
if (overwrite || sparsityWaitTime == null) sparsityWaitTime = value; break;
case "isProactiveConnectionManagementEnabled":
@@ -434,6 +496,26 @@ private void applyField(String key, String value, boolean overwrite) {
if (overwrite || preferredRegionsList == null) preferredRegionsList = value; break;
case "manageDatabase":
if (overwrite || manageDatabase == null) manageDatabase = Boolean.parseBoolean(value); break;
+ case "numberOfCollectionForCtl":
+ if (overwrite || numberOfCollectionForCtl == null) numberOfCollectionForCtl = Integer.parseInt(value); break;
+ case "readWriteQueryReadManyPct":
+ if (overwrite || readWriteQueryReadManyPct == null) readWriteQueryReadManyPct = value; break;
+ case "encryptedStringFieldCount":
+ if (overwrite || encryptedStringFieldCount == null) encryptedStringFieldCount = Integer.parseInt(value); break;
+ case "encryptedLongFieldCount":
+ if (overwrite || encryptedLongFieldCount == null) encryptedLongFieldCount = Integer.parseInt(value); break;
+ case "encryptedDoubleFieldCount":
+ if (overwrite || encryptedDoubleFieldCount == null) encryptedDoubleFieldCount = Integer.parseInt(value); break;
+ case "encryptionEnabled":
+ if (overwrite || encryptionEnabled == null) encryptionEnabled = Boolean.parseBoolean(value); break;
+ case "bulkloadBatchSize":
+ if (overwrite || bulkloadBatchSize == null) bulkloadBatchSize = Integer.parseInt(value); break;
+ case "testScenario":
+ if (overwrite || testScenario == null) testScenario = value; break;
+ case "environment":
+ if (overwrite || environment == null) environment = value; break;
+ case "useSync":
+ if (overwrite || useSync == null) useSync = Boolean.parseBoolean(value); break;
// JVM-global properties (minConnectionPoolSizePerEndpoint, isPartitionLevelCircuitBreakerEnabled,
// isPerPartitionAutomaticFailoverRequired) are handled in BenchmarkConfig, not per-tenant.
case "minConnectionPoolSizePerEndpoint":
@@ -449,95 +531,24 @@ private void applyField(String key, String value, boolean overwrite) {
}
}
- // ======== Factory from Configuration (for tests and legacy paths) ========
-
- /**
- * Build a TenantWorkloadConfig from a legacy Configuration object.
- * Used by tests and CLI paths that still parse via JCommander.
- */
- public static TenantWorkloadConfig fromConfiguration(Configuration cfg) {
- TenantWorkloadConfig t = new TenantWorkloadConfig();
- t.id = "cli-tenant";
- t.serviceEndpoint = cfg.getServiceEndpoint();
- t.masterKey = cfg.getMasterKey();
- t.databaseId = cfg.getDatabaseId();
- t.containerId = cfg.getCollectionId();
- t.operation = cfg.getOperationType().name();
- t.concurrency = cfg.getConcurrency();
- t.numberOfOperations = cfg.getNumberOfOperations();
- t.numberOfPreCreatedDocuments = cfg.getNumberOfPreCreatedDocuments();
- t.throughput = cfg.getThroughput();
- t.skipWarmUpOperations = cfg.getSkipWarmUpOperations();
- t.documentDataFieldSize = cfg.getDocumentDataFieldSize();
- t.documentDataFieldCount = cfg.getDocumentDataFieldCount();
- t.contentResponseOnWriteEnabled = cfg.isContentResponseOnWriteEnabled();
- t.disablePassingPartitionKeyAsOptionOnWrite = cfg.isDisablePassingPartitionKeyAsOptionOnWrite();
- t.useNameLink = cfg.isUseNameLink();
- t.connectionMode = cfg.getConnectionMode().name();
- t.consistencyLevel = cfg.getConsistencyLevel().name();
- t.maxConnectionPoolSize = cfg.getMaxConnectionPoolSize();
- t.connectionSharingAcrossClientsEnabled = cfg.isConnectionSharingAcrossClientsEnabled();
- t.manageDatabase = cfg.shouldManageDatabase();
- t.applicationName = cfg.getApplicationName();
- t.isManagedIdentityRequired = cfg.isManagedIdentityRequired();
-
- // AAD auth
- t.aadLoginEndpoint = cfg.getInstanceAadLoginEndpoint();
- t.aadTenantId = cfg.getInstanceAadTenantId();
- t.aadManagedIdentityClientId = cfg.getInstanceAadManagedIdentityClientId();
-
- // Workload details
- t.tupleSize = cfg.getTupleSize();
- if (cfg.getMaxRunningTimeDuration() != null) {
- t.maxRunningTimeDuration = cfg.getMaxRunningTimeDuration().toString();
- }
- if (cfg.getSparsityWaitTime() != null) {
- t.sparsityWaitTime = cfg.getSparsityWaitTime().toString();
- }
-
- // Diagnostics thresholds
- t.pointOperationLatencyThresholdMs = cfg.getPointOperationThreshold().toMillis() < Duration.ofDays(100).toMillis()
- ? (int) cfg.getPointOperationThreshold().toMillis() : null;
- t.nonPointOperationLatencyThresholdMs = cfg.getNonPointOperationThreshold().toMillis() < Duration.ofDays(100).toMillis()
- ? (int) cfg.getNonPointOperationThreshold().toMillis() : null;
-
- // Feature flags
- t.isRegionScopedSessionContainerEnabled = cfg.isRegionScopedSessionContainerEnabled();
- t.isDefaultLog4jLoggerEnabled = cfg.isDefaultLog4jLoggerEnabled();
-
- // Proactive connection management
- t.isProactiveConnectionManagementEnabled = cfg.isProactiveConnectionManagementEnabled();
- t.isUseUnWarmedUpContainer = cfg.isUseUnWarmedUpContainer();
- t.proactiveConnectionRegionsCount = cfg.getProactiveConnectionRegionsCount();
- if (cfg.getAggressiveWarmupDuration() != null) {
- t.aggressiveWarmupDuration = cfg.getAggressiveWarmupDuration().toString();
- }
-
- // Connection
- t.preferredRegionsList = cfg.getPreferredRegionsList() != null
- ? String.join(",", cfg.getPreferredRegionsList()) : null;
-
- return t;
- }
-
// ======== Static parsing ========
- public static List parseTenantsFile(File tenantsFile) throws IOException {
- JsonNode root = OBJECT_MAPPER.readTree(tenantsFile);
+ public static List parseWorkloadConfig(File workloadConfigFile) throws IOException {
+ JsonNode root = OBJECT_MAPPER.readTree(workloadConfigFile);
- Map globalDefaults = new HashMap<>();
- JsonNode defaultsNode = root.get("globalDefaults");
+ Map tenantDefaults = new HashMap<>();
+ JsonNode defaultsNode = root.get("tenantDefaults");
if (defaultsNode != null && defaultsNode.isObject()) {
Iterator> fields = defaultsNode.fields();
while (fields.hasNext()) {
Map.Entry entry = fields.next();
- globalDefaults.put(entry.getKey(), entry.getValue().asText());
+ tenantDefaults.put(entry.getKey(), entry.getValue().asText());
}
}
- if (!globalDefaults.isEmpty()) {
- logger.info("tenants.json globalDefaults applied to all tenants (per-tenant values take priority): {}",
- globalDefaults.keySet());
+ if (!tenantDefaults.isEmpty()) {
+ logger.info("tenantDefaults applied to all tenants (per-tenant values take priority): {}",
+ tenantDefaults.keySet());
}
List tenants = new ArrayList<>();
@@ -546,13 +557,13 @@ public static List parseTenantsFile(File tenantsFile) thro
if (tenantsNode != null && tenantsNode.isArray()) {
for (JsonNode tenantNode : tenantsNode) {
TenantWorkloadConfig tenant = OBJECT_MAPPER.treeToValue(tenantNode, TenantWorkloadConfig.class);
- tenant.applyMap(globalDefaults, false);
+ tenant.applyMap(tenantDefaults, false);
validateTenantConfig(tenant);
tenants.add(tenant);
}
}
- logger.info("Parsed {} tenants from {}", tenants.size(), tenantsFile.getName());
+ logger.info("Parsed {} tenants from {}", tenants.size(), workloadConfigFile.getName());
return tenants;
}
diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ctl/AsyncCtlWorkload.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ctl/AsyncCtlWorkload.java
index 70ab569cc1e1..6959ff042dec 100644
--- a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ctl/AsyncCtlWorkload.java
+++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/benchmark/ctl/AsyncCtlWorkload.java
@@ -4,7 +4,6 @@
package com.azure.cosmos.benchmark.ctl;
import com.azure.core.credential.TokenCredential;
-import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ConnectionMode;
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
@@ -13,11 +12,11 @@
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.DirectConnectionConfig;
import com.azure.cosmos.GatewayConnectionConfig;
+import com.azure.cosmos.benchmark.Benchmark;
import com.azure.cosmos.benchmark.BenchmarkHelper;
import com.azure.cosmos.benchmark.BenchmarkRequestSubscriber;
-import com.azure.cosmos.benchmark.Configuration;
import com.azure.cosmos.benchmark.PojoizedJson;
-import com.azure.cosmos.benchmark.ScheduledReporterFactory;
+import com.azure.cosmos.benchmark.TenantWorkloadConfig;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.RequestOptions;
@@ -27,12 +26,7 @@
import com.azure.cosmos.models.ThroughputProperties;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.Timer;
-import com.codahale.metrics.jvm.CachedThreadStatesGaugeSet;
-import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-import io.micrometer.core.instrument.MeterRegistry;
import org.apache.commons.lang3.RandomStringUtils;
import org.mpierce.metrics.reservoir.hdrhistogram.HdrHistogramResetOnSnapshotReservoir;
import org.slf4j.Logger;
@@ -49,19 +43,18 @@
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-public class AsyncCtlWorkload {
+public class AsyncCtlWorkload implements Benchmark {
private final String PERCENT_PARSING_ERROR = "Unable to parse user provided readWriteQueryReadManyPct ";
private final String prefixUuidForCreate;
private final String dataFieldValue;
private final String partitionKey;
- private final MetricRegistry metricsRegistry = new MetricRegistry();
+ private final MetricRegistry metricsRegistry;
private final Logger logger;
private final CosmosAsyncClient cosmosClient;
- private final Configuration configuration;
+ private final TenantWorkloadConfig workloadConfig;
private final Map> docsToRead = new HashMap<>();
private final Map> itemIdentityMap = new HashMap<>();
private final Semaphore concurrencyControlSemaphore;
@@ -71,7 +64,6 @@ public class AsyncCtlWorkload {
private Timer writeLatency;
private Timer queryLatency;
private Timer readManyLatency;
- private ScheduledReporter reporter;
private Meter readSuccessMeter;
private Meter readFailureMeter;
@@ -91,73 +83,57 @@ public class AsyncCtlWorkload {
private int queryPct;
private int readManyPct;
- public AsyncCtlWorkload(Configuration cfg) {
- final TokenCredential credential = cfg.isManagedIdentityRequired()
- ? cfg.buildTokenCredential()
+ public AsyncCtlWorkload(TenantWorkloadConfig workloadCfg, MetricRegistry sharedRegistry) {
+ final TokenCredential credential = workloadCfg.isManagedIdentityRequired()
+ ? workloadCfg.buildTokenCredential()
: null;
- CosmosClientBuilder cosmosClientBuilder = cfg.isManagedIdentityRequired() ?
+ CosmosClientBuilder cosmosClientBuilder = workloadCfg.isManagedIdentityRequired() ?
new CosmosClientBuilder().credential(credential) :
- new CosmosClientBuilder().key(cfg.getMasterKey());
+ new CosmosClientBuilder().key(workloadCfg.getMasterKey());
cosmosClientBuilder
- .preferredRegions(cfg.getPreferredRegionsList())
- .endpoint(cfg.getServiceEndpoint())
- .consistencyLevel(cfg.getConsistencyLevel())
- .contentResponseOnWriteEnabled(cfg.isContentResponseOnWriteEnabled());
+ .preferredRegions(workloadCfg.getPreferredRegionsList())
+ .endpoint(workloadCfg.getServiceEndpoint())
+ .consistencyLevel(workloadCfg.getConsistencyLevel())
+ .contentResponseOnWriteEnabled(workloadCfg.isContentResponseOnWriteEnabled());
- if (cfg.getConnectionMode().equals(ConnectionMode.DIRECT)) {
+ if (workloadCfg.getConnectionMode().equals(ConnectionMode.DIRECT)) {
cosmosClientBuilder = cosmosClientBuilder.directMode(DirectConnectionConfig.getDefaultConfig());
} else {
GatewayConnectionConfig gatewayConnectionConfig = new GatewayConnectionConfig();
- gatewayConnectionConfig.setMaxConnectionPoolSize(cfg.getMaxConnectionPoolSize());
+ gatewayConnectionConfig.setMaxConnectionPoolSize(workloadCfg.getMaxConnectionPoolSize());
cosmosClientBuilder = cosmosClientBuilder.gatewayMode(gatewayConnectionConfig);
}
cosmosClient = cosmosClientBuilder.buildAsyncClient();
- configuration = cfg;
+ workloadConfig = workloadCfg;
+ metricsRegistry = sharedRegistry;
logger = LoggerFactory.getLogger(this.getClass());
- parsedReadWriteQueryReadManyPct(configuration.getReadWriteQueryReadManyPct());
+ parsedReadWriteQueryReadManyPct(workloadConfig.getReadWriteQueryReadManyPct());
- createDatabaseAndContainers(configuration);
+ createDatabaseAndContainers(workloadCfg);
partitionKey = containers.get(0).read().block().getProperties().getPartitionKeyDefinition()
.getPaths().iterator().next().split("/")[1];
- concurrencyControlSemaphore = new Semaphore(cfg.getConcurrency());
+ concurrencyControlSemaphore = new Semaphore(workloadCfg.getConcurrency());
- logger.info("PRE-populating {} documents ....", cfg.getNumberOfPreCreatedDocuments());
- dataFieldValue = RandomStringUtils.randomAlphabetic(configuration.getDocumentDataFieldSize());
- createPrePopulatedDocs(configuration.getNumberOfPreCreatedDocuments());
+ logger.info("PRE-populating {} documents ....", workloadCfg.getNumberOfPreCreatedDocuments());
+ dataFieldValue = RandomStringUtils.randomAlphabetic(workloadConfig.getDocumentDataFieldSize());
+ createPrePopulatedDocs(workloadConfig.getNumberOfPreCreatedDocuments());
createItemIdentityMap(docsToRead);
- if (configuration.isEnableJvmStats()) {
- metricsRegistry.register("gc", new GarbageCollectorMetricSet());
- metricsRegistry.register("threads", new CachedThreadStatesGaugeSet(10, TimeUnit.SECONDS));
- metricsRegistry.register("memory", new MemoryUsageGaugeSet());
- }
-
- reporter = ScheduledReporterFactory.create(cfg, metricsRegistry);
-
- MeterRegistry registry = configuration.getAzureMonitorMeterRegistry();
-
- if (registry != null) {
- BridgeInternal.monitorTelemetry(registry);
- }
-
- registry = configuration.getGraphiteMeterRegistry();
-
- if (registry != null) {
- BridgeInternal.monitorTelemetry(registry);
- }
prefixUuidForCreate = UUID.randomUUID().toString();
random = new Random();
}
public void shutdown() {
- if (this.databaseCreated) {
+ if (workloadConfig.isSuppressCleanup()) {
+ logger.info("Skipping cleanup of database/container (suppressCleanup=true)");
+ } else if (this.databaseCreated) {
cosmosAsyncDatabase.delete().block();
- logger.info("Deleted temporary database {} created for this test", this.configuration.getDatabaseId());
+ logger.info("Deleted temporary database {} created for this test", this.workloadConfig.getDatabaseId());
} else if (containerToClearAfterTest.size() > 0) {
for (String id : containerToClearAfterTest) {
cosmosAsyncDatabase.getContainer(id).delete().block();
@@ -174,7 +150,7 @@ private void performWorkload(BaseSubscriber