From 96eed4077736ec7e6b4f858255cf9a2a3d1a9a65 Mon Sep 17 00:00:00 2001 From: minal-kyada Date: Fri, 9 Jan 2026 11:26:25 -0800 Subject: [PATCH] compression-experiment-64KB --- .../io/compress/LZ4CompressionTest.java | 207 +++++ .../io/compress/SimpleRecompressTest.java | 496 ++++++++++++ .../io/compress/ZstdCompressionTest.java | 313 ++++++++ .../ZstdDictionaryCompressionTest.java | 718 ++++++++++++++++++ 4 files changed, 1734 insertions(+) create mode 100644 test/long/org/apache/cassandra/io/compress/LZ4CompressionTest.java create mode 100644 test/long/org/apache/cassandra/io/compress/SimpleRecompressTest.java create mode 100644 test/long/org/apache/cassandra/io/compress/ZstdCompressionTest.java create mode 100644 test/long/org/apache/cassandra/io/compress/ZstdDictionaryCompressionTest.java diff --git a/test/long/org/apache/cassandra/io/compress/LZ4CompressionTest.java b/test/long/org/apache/cassandra/io/compress/LZ4CompressionTest.java new file mode 100644 index 000000000000..a6dde8df8c3b --- /dev/null +++ b/test/long/org/apache/cassandra/io/compress/LZ4CompressionTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.File; +import java.util.Collections; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.StorageService; + +/** + * Test to measure LZ4 High Level 17 compression metrics (baseline) + */ +public class LZ4CompressionTest +{ + private static final Logger logger = LoggerFactory.getLogger(LZ4CompressionTest.class); +// private static final String KEYSPACE = "p01_clouddb"; + private static final String KEYSPACE = "p115_clouddb"; + private static final String TABLE = "custom_zone_records"; +// private static final String SSTABLE_PATH = "/Users/minalkyada/Desktop/p01_clouddb/custom_zone_records-5fb29180edd8300084c8712786214e10"; + private static final String SSTABLE_PATH = "/Users/minalkyada/Desktop/github/forked/cassandra/test/long/org/apache/cassandra/io/compress/level4SST/p115_clouddb/custom_zone_records-00000000000000000000000003986882"; + + private static ColumnFamilyStore store; + + @BeforeClass + public static void setup() + { + try + { + // Initialize Cassandra + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + ServerTestUtils.prepareServerNoRegister(); + + Keyspace.setInitialized(); + StorageService.instance.initServer(); + createTable(); + importSSTables(); + } + catch (Exception e) + { + logger.error("FATAL ERROR during setup", e); + throw new RuntimeException("Setup failed", e); + } + } + + private static void createTable() + { + logger.info("Creating keyspace {} and table {}", KEYSPACE, TABLE); + + TableMetadata customZone = CreateTableStatement.parse("CREATE TABLE " + KEYSPACE + ".custom_zone_records (\n" + + " container text,\n" + + " owner_dsid bigint,\n" + + " virtual_owner_dsid bigint,\n" + + " zone text,\n" + + " column_type ascii,\n" + + " ref text,\n" + + " rev bigint,\n" + + " idx_name text,\n" + + " idx_val blob,\n" + + " asset blob,\n" + + " val blob,\n" + + " PRIMARY KEY ((container, owner_dsid, virtual_owner_dsid, zone), column_type, ref, rev, idx_name, idx_val)\n" + + ") WITH CLUSTERING ORDER BY (column_type ASC, ref ASC, rev ASC, idx_name ASC, idx_val ASC)\n" + + " AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}\n" + + " AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor', 'lz4_compressor_type': 'high', 'lz4_high_compressor_level': '17'};" + , KEYSPACE) + .build(); + + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), customZone); + logger.info("Keyspace and table created successfully"); + } + + private static void importSSTables() + { + logger.info("Import path: {}", SSTABLE_PATH); + + // Check if directory exists + File sstableDir = new File(SSTABLE_PATH); + if (!sstableDir.exists()) + { + logger.error("ERROR: SSTable directory does not exist: {}", SSTABLE_PATH); + throw new RuntimeException("SSTable directory not found: " + SSTABLE_PATH); + } + + File[] files = sstableDir.listFiles(); + if (files == null || files.length == 0) + { + logger.error("ERROR: SSTable directory is empty: {}", SSTABLE_PATH); + throw new RuntimeException("SSTable directory is empty: " + SSTABLE_PATH); + } + + logger.info("Found {} files in SSTable directory:", files.length); + for (File file : files) + { + logger.info(" - {}", file.getName()); + } + + try + { + store = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE); + logger.info("Opened ColumnFamilyStore for {}.{}", KEYSPACE, TABLE); + logger.info("Starting SSTable import..."); + store.importNewSSTables( + Collections.singleton(SSTABLE_PATH), + true, // resetLevel + true, // clearRepaired + false, // verifySSTables + false, // verifyTokens + true, // invalidateCaches + false, // extendedVerify + true // copyData + ); + + int sstableCount = store.getLiveSSTables().size(); + logger.info("Import completed. Live SSTables count: {}", sstableCount); + } + catch (Exception e) + { + logger.error("ERROR during SSTable import", e); + throw new RuntimeException("SSTable import failed", e); + } + } + + @Test + public void testLZ4HighLevel17Metrics() throws Exception + { + logger.info("================================================================================"); + logger.info(" LZ4 HIGH LEVEL 17 COMPRESSION METRICS (BASELINE)"); + logger.info("================================================================================"); + logger.info(""); + + if (store.getLiveSSTables().isEmpty()) + { + logger.error("ABORTING: No SSTables available for testing"); + throw new RuntimeException("No SSTables to measure"); + } + + // Capture baseline metrics + long compressedSize = 0; + long uncompressedSize = 0; + int sstableCount = 0; + + for (org.apache.cassandra.io.sstable.format.SSTableReader sstable : store.getLiveSSTables()) + { + long sstableCompressed = sstable.onDiskLength(); + long sstableUncompressed = sstable.uncompressedLength(); + + compressedSize += sstableCompressed; + uncompressedSize += sstableUncompressed; + sstableCount++; + + logger.info("SSTable #{}: compressed={} bytes, uncompressed={} bytes", + sstableCount, sstableCompressed, sstableUncompressed); + } + + double compressionRatio = (double) compressedSize / uncompressedSize; + long compressedMB = compressedSize / 1024 / 1024; + long uncompressedMB = uncompressedSize / 1024 / 1024; + long savedBytes = uncompressedSize - compressedSize; + long savedMB = savedBytes / 1024 / 1024; + double spaceReductionPercent = ((double) savedBytes / uncompressedSize) * 100; + + logger.info(""); + logger.info("================================================================================"); + logger.info(" RESULTS SUMMARY"); + logger.info("================================================================================"); + logger.info("Compression Type: LZ4 High Level 17"); + logger.info("SSTable count: {}", sstableCount); + logger.info("Compressed size: {} bytes ({} MB)", compressedSize, compressedMB); + logger.info("Uncompressed size: {} bytes ({} MB)", uncompressedSize, uncompressedMB); + logger.info("Space saved: {} bytes ({} MB = {}%)", savedBytes, savedMB, String.format("%.2f", spaceReductionPercent)); + logger.info("Compression ratio: {}", String.format("%.4f", compressionRatio)); + logger.info(""); + logger.info("NOTE: These metrics represent the baseline for comparison with Zstd."); + logger.info("================================================================================"); + } +} \ No newline at end of file diff --git a/test/long/org/apache/cassandra/io/compress/SimpleRecompressTest.java b/test/long/org/apache/cassandra/io/compress/SimpleRecompressTest.java new file mode 100644 index 000000000000..fd51f511a83e --- /dev/null +++ b/test/long/org/apache/cassandra/io/compress/SimpleRecompressTest.java @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.File; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import com.sun.management.OperatingSystemMXBean; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.StorageService; + +public class SimpleRecompressTest +{ + private static final Logger logger = LoggerFactory.getLogger(SimpleRecompressTest.class); + private static final String KEYSPACE = "p01_clouddb"; + private static final String TABLE = "custom_zone_records"; + private static final String SSTABLE_PATH = "/Users/minalkyada/Desktop/p01_clouddb/custom_zone_records-5fb29180edd8300084c8712786214e10"; + + private static ColumnFamilyStore store; + + @BeforeClass + public static void setup() + { + try + { + // Initialize Cassandra + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + ServerTestUtils.prepareServerNoRegister(); + + Keyspace.setInitialized(); + StorageService.instance.initServer(); + createTable(); + importSSTables(); + } + catch (Exception e) + { + logger.error("FATAL ERROR during setup", e); + throw new RuntimeException("Setup failed", e); + } + } + + private static void createTable() + { + logger.info("Creating keyspace {} and table {}", KEYSPACE, TABLE); + + TableMetadata customZone = CreateTableStatement.parse("CREATE TABLE " + KEYSPACE + ".custom_zone_records (\n" + + " container text,\n" + + " owner_dsid bigint,\n" + + " virtual_owner_dsid bigint,\n" + + " zone text,\n" + + " column_type ascii,\n" + + " ref text,\n" + + " rev bigint,\n" + + " idx_name text,\n" + + " idx_val blob,\n" + + " asset blob,\n" + + " val blob,\n" + + " PRIMARY KEY ((container, owner_dsid, virtual_owner_dsid, zone), column_type, ref, rev, idx_name, idx_val)\n" + + ") WITH CLUSTERING ORDER BY (column_type ASC, ref ASC, rev ASC, idx_name ASC, idx_val ASC)\n" + + " AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}\n" + + " AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor', 'lz4_compressor_type': 'high', 'lz4_high_compressor_level': '17'};" + , KEYSPACE) + .build(); + + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), customZone); + logger.info("Keyspace and table created successfully"); + } + + private static void importSSTables() + { + logger.info("Import path: {}", SSTABLE_PATH); + + // Check if directory exists + File sstableDir = new File(SSTABLE_PATH); + if (!sstableDir.exists()) + { + logger.error("ERROR: SSTable directory does not exist: {}", SSTABLE_PATH); + throw new RuntimeException("SSTable directory not found: " + SSTABLE_PATH); + } + + File[] files = sstableDir.listFiles(); + if (files == null || files.length == 0) + { + logger.error("ERROR: SSTable directory is empty: {}", SSTABLE_PATH); + throw new RuntimeException("SSTable directory is empty: " + SSTABLE_PATH); + } + + logger.info("Found {} files in SSTable directory:", files.length); + for (File file : files) + { + logger.info(" - {}", file.getName()); + } + + try + { + store = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE); + logger.info("Opened ColumnFamilyStore for {}.{}", KEYSPACE, TABLE); + logger.info("Starting SSTable import..."); + store.importNewSSTables( + Collections.singleton(SSTABLE_PATH), + true, // resetLevel + true, // clearRepaired + false, // verifySSTables + false, // verifyTokens + true, // invalidateCaches + false, // extendedVerify + true // copyData + ); + + int sstableCount = store.getLiveSSTables().size(); + logger.info("Import completed. Live SSTables count: {}", sstableCount); + } + catch (Exception e) + { + logger.error("ERROR during SSTable import", e); + throw new RuntimeException("SSTable import failed", e); + } + } + + /** + * Helper class to store compression test results + */ + private static class CompressionResult + { + final String type; // "LZ4", "Zstd", or "Zstd+Dict" + final int level; + final long compressedSizeBytes; // On-disk size of compressed SSTable + final long uncompressedSizeBytes; // Original uncompressed data size + final double compressionRatio; + final long durationMs; + final long cpuTimeMs; + + CompressionResult(String type, int level, long compressedSizeBytes, long uncompressedSizeBytes, + long durationMs, long cpuTimeMs) + { + this.type = type; + this.level = level; + this.compressedSizeBytes = compressedSizeBytes; + this.uncompressedSizeBytes = uncompressedSizeBytes; + this.compressionRatio = (double) compressedSizeBytes / uncompressedSizeBytes; + this.durationMs = durationMs; + this.cpuTimeMs = cpuTimeMs; + } + + @Override + public String toString() + { + return String.format("%s Level %2d: Ratio=%.4f, Compressed=%,d MB, Uncompressed=%,d MB, Duration=%,d ms, CPU=%,d ms", + type, + level, + compressionRatio, + compressedSizeBytes / 1024 / 1024, + uncompressedSizeBytes / 1024 / 1024, + durationMs, + cpuTimeMs); + } + } + + /** + * Capture baseline LZ4 compression metrics + */ + private CompressionResult captureBaselineMetrics() throws Exception + { + logger.info("========================================"); + logger.info("Capturing LZ4 Baseline Metrics"); + logger.info("========================================"); + + long compressedSize = 0; + long uncompressedSize = 0; + + for (org.apache.cassandra.io.sstable.format.SSTableReader sstable : store.getLiveSSTables()) + { + compressedSize += sstable.onDiskLength(); + uncompressedSize += sstable.uncompressedLength(); + } + + CompressionResult baseline = new CompressionResult("LZ4", 17, compressedSize, uncompressedSize, 0, 0); + + logger.info("Baseline (LZ4 High Level 17):"); + logger.info(" - Compressed size: {} bytes ({} MB)", compressedSize, compressedSize / 1024 / 1024); + logger.info(" - Uncompressed size: {} bytes ({} MB)", uncompressedSize, uncompressedSize / 1024 / 1024); + logger.info(" - Compression ratio: {}", String.format("%.4f", baseline.compressionRatio)); + logger.info(""); + + return baseline; + } + + /** + * Test Zstd compression (without dictionary) at a specific level + */ + private CompressionResult testZstdCompression(int compressionLevel) throws Exception + { + logger.info("========================================"); + logger.info("Testing Zstd Compression Level {}", compressionLevel); + logger.info("========================================"); + + // Set Zstd compression parameters (without dictionary) + logger.info("Setting compression to Zstd level {} (no dictionary)", compressionLevel); + store.setCompressionParametersJson( + "{\"chunk_length_in_kb\": \"64\", " + + "\"class\": \"org.apache.cassandra.io.compress.ZstdCompressor\", " + + "\"compression_level\": \"" + compressionLevel + "\"}" + ); + + return performRecompression("Zstd", compressionLevel); + } + + /** + * Test Zstd compression with dictionary at a specific level + */ + private CompressionResult testZstdDictionaryCompression(int compressionLevel) throws Exception + { + logger.info("========================================"); + logger.info("Testing Zstd+Dictionary Compression Level {}", compressionLevel); + logger.info("========================================"); + + // Set Zstd dictionary compression parameters + logger.info("Setting compression to Zstd level {} WITH dictionary", compressionLevel); + store.setCompressionParametersJson( + "{\"chunk_length_in_kb\": \"64\", " + + "\"class\": \"org.apache.cassandra.io.compress.ZstdDictionaryCompressor\", " + + "\"compression_level\": \"" + compressionLevel + "\"}" + ); + + return performRecompression("Zstd+Dict", compressionLevel); + } + + /** + * Common method to perform recompression and collect metrics + */ + private CompressionResult performRecompression(String compressionType, int compressionLevel) throws Exception + { + // Get CPU time tracker + OperatingSystemMXBean osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); + long cpuTimeBefore = osBean.getProcessCpuTime(); + + // Perform recompression + logger.info("Starting SSTable rewrite with {} level {}...", compressionType, compressionLevel); + long startTime = System.nanoTime(); + + StorageService.instance.upgradeSSTables(KEYSPACE, false, TABLE); + + long duration = System.nanoTime() - startTime; + long durationMs = duration / 1_000_000; + + // Capture CPU time + long cpuTimeAfter = osBean.getProcessCpuTime(); + long cpuTimeMs = (cpuTimeAfter - cpuTimeBefore) / 1_000_000; + + // Get SSTable sizes (THESE ARE DETERMINISTIC - same input + same compression level = same output) + long compressedSizeBytes = 0; + long uncompressedSizeBytes = 0; + int sstableCount = 0; + + for (org.apache.cassandra.io.sstable.format.SSTableReader sstable : store.getLiveSSTables()) + { + long sstableCompressed = sstable.onDiskLength(); + long sstableUncompressed = sstable.uncompressedLength(); + + compressedSizeBytes += sstableCompressed; + uncompressedSizeBytes += sstableUncompressed; + sstableCount++; + + logger.info(" SSTable #{}: compressed={} bytes, uncompressed={} bytes", + sstableCount, sstableCompressed, sstableUncompressed); + } + + double compressionRatio = (double) compressedSizeBytes / uncompressedSizeBytes; + long compressedMB = compressedSizeBytes / 1024 / 1024; + long uncompressedMB = uncompressedSizeBytes / 1024 / 1024; + long savedBytes = uncompressedSizeBytes - compressedSizeBytes; + long savedMB = savedBytes / 1024 / 1024; + double spaceReductionPercent = ((double) savedBytes / uncompressedSizeBytes) * 100; + + logger.info("Recompression completed in {} ms", durationMs); + logger.info("Results:"); + logger.info(" - SSTable count: {}", sstableCount); + logger.info(" - Compressed size: {} bytes ({} MB)", compressedSizeBytes, compressedMB); + logger.info(" - Uncompressed size: {} bytes ({} MB)", uncompressedSizeBytes, uncompressedMB); + logger.info(" - Space saved: {} bytes ({} MB = {}%)", savedBytes, savedMB, String.format("%.2f", spaceReductionPercent)); + logger.info(" - Compression ratio: {}", String.format("%.4f", compressionRatio)); + logger.info(" - CPU time used: {} ms", cpuTimeMs); + logger.info(" - Wall clock duration: {} ms", durationMs); + + return new CompressionResult(compressionType, compressionLevel, compressedSizeBytes, uncompressedSizeBytes, + durationMs, cpuTimeMs); + } + + /** + * Print a summary table of all compression results + */ + private void printResultsSummary(List zstdResults, + List zstdDictResults, + CompressionResult baseline) + { + logger.info(""); + logger.info("================================================================================"); + logger.info(" COMPRESSION RESULTS SUMMARY"); + logger.info("================================================================================"); + logger.info(""); + logger.info("Baseline (LZ4 High Level 17):"); + logger.info(" - Compressed size: {} MB", baseline.compressedSizeBytes / 1024 / 1024); + logger.info(" - Uncompressed size: {} MB", baseline.uncompressedSizeBytes / 1024 / 1024); + logger.info(" - Compression ratio: {}", String.format("%.4f", baseline.compressionRatio)); + logger.info(""); + logger.info("--------------------------------------------------------------------------------"); + logger.info(String.format("%-12s | %-6s | %-10s | %-16s | %-12s | %-12s | %-16s", + "Type", "Level", "Ratio", "Compressed (MB)", "Duration (ms)", "CPU Time (ms)", "vs Baseline")); + logger.info("--------------------------------------------------------------------------------"); + + // Print Zstd results + for (CompressionResult result : zstdResults) + { + printResultRow(result, baseline); + } + + // Print Zstd+Dict results + for (CompressionResult result : zstdDictResults) + { + printResultRow(result, baseline); + } + + logger.info("================================================================================"); + logger.info(""); + + // Print analysis + logger.info("ANALYSIS:"); + logger.info(""); + + // Combine all results for analysis + List allResults = new ArrayList<>(); + allResults.addAll(zstdResults); + allResults.addAll(zstdDictResults); + + // Find best compression ratio + CompressionResult bestRatio = allResults.stream() + .min((r1, r2) -> Double.compare(r1.compressionRatio, r2.compressionRatio)) + .orElse(null); + if (bestRatio != null) + { + long savedBytes = baseline.compressedSizeBytes - bestRatio.compressedSizeBytes; + logger.info(" Best compression ratio: {} Level {} with {} ({} MB compressed, saved {} MB vs baseline)", + bestRatio.type, bestRatio.level, String.format("%.4f", bestRatio.compressionRatio), + bestRatio.compressedSizeBytes / 1024 / 1024, + savedBytes / 1024 / 1024); + } + + // Find fastest compression + CompressionResult fastest = allResults.stream() + .min((r1, r2) -> Long.compare(r1.durationMs, r2.durationMs)) + .orElse(null); + if (fastest != null) + { + logger.info(" Fastest compression: {} Level {} in {} ms", + fastest.type, fastest.level, fastest.durationMs); + } + + // Find lowest CPU time + CompressionResult lowestCpu = allResults.stream() + .min((r1, r2) -> Long.compare(r1.cpuTimeMs, r2.cpuTimeMs)) + .orElse(null); + if (lowestCpu != null) + { + logger.info(" Lowest CPU usage: {} Level {} with {} ms CPU time", + lowestCpu.level, lowestCpu.type, lowestCpu.cpuTimeMs); + } + + logger.info(""); + logger.info("NOTE: Compressed/uncompressed sizes are DETERMINISTIC (same every run)."); + logger.info(" Duration and CPU metrics may vary between runs."); + logger.info(""); + logger.info("================================================================================"); + } + + /** + * Helper method to print a single result row + */ + private void printResultRow(CompressionResult result, CompressionResult baseline) + { + double sizeDiffPercent = ((double) (result.compressedSizeBytes - baseline.compressedSizeBytes) / baseline.compressedSizeBytes) * 100; + long savedMB = (baseline.compressedSizeBytes - result.compressedSizeBytes) / 1024 / 1024; + + logger.info(String.format("%-12s | %-6d | %-10.4f | %,16d | %,12d | %,12d | %+6d MB (%+.2f%%)", + result.type, + result.level, + result.compressionRatio, + result.compressedSizeBytes / 1024 / 1024, + result.durationMs, + result.cpuTimeMs, + savedMB, + -sizeDiffPercent)); + } + + @Test + public void testZstdCompressionLevels8To15() throws Exception + { + logger.info("================================================================================"); + logger.info(" COMPRESSION COMPARISON TEST: LZ4 vs Zstd vs Zstd+Dictionary (Levels 8-15)"); + logger.info("================================================================================"); + logger.info(""); + + if (store.getLiveSSTables().isEmpty()) + { + logger.error("ABORTING: No SSTables available for testing"); + throw new RuntimeException("No SSTables to compress"); + } + + // 1. Capture baseline LZ4 metrics + CompressionResult baseline = captureBaselineMetrics(); + + // 2. Test Zstd compression (without dictionary) for levels 8-15 + logger.info("================================================================================"); + logger.info("PHASE 1: Testing Zstd Compression (NO Dictionary)"); + logger.info("================================================================================"); + logger.info(""); + + List zstdResults = new ArrayList<>(); + for (int level = 8; level <= 15; level++) + { + try + { + CompressionResult result = testZstdCompression(level); + zstdResults.add(result); + + // Brief pause between tests to allow system to stabilize + Thread.sleep(2000); + } + catch (Exception e) + { + logger.error("ERROR testing Zstd compression level {}", level, e); + throw e; + } + } + + // 3. Test Zstd+Dictionary compression for levels 8-15 + logger.info(""); + logger.info("================================================================================"); + logger.info("PHASE 2: Testing Zstd Compression WITH Dictionary"); + logger.info("================================================================================"); + logger.info(""); + + List zstdDictResults = new ArrayList<>(); + for (int level = 8; level <= 15; level++) + { + try + { + CompressionResult result = testZstdDictionaryCompression(level); + zstdDictResults.add(result); + + // Brief pause between tests to allow system to stabilize + Thread.sleep(2000); + } + catch (Exception e) + { + logger.error("ERROR testing Zstd+Dict compression level {}", level, e); + throw e; + } + } + + // 4. Print comprehensive summary comparing all three approaches + printResultsSummary(zstdResults, zstdDictResults, baseline); + } +} \ No newline at end of file diff --git a/test/long/org/apache/cassandra/io/compress/ZstdCompressionTest.java b/test/long/org/apache/cassandra/io/compress/ZstdCompressionTest.java new file mode 100644 index 000000000000..1cb5a7791ed2 --- /dev/null +++ b/test/long/org/apache/cassandra/io/compress/ZstdCompressionTest.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.File; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import com.sun.management.OperatingSystemMXBean; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.StorageService; + +/** + * Test Zstd compression levels 9-19 compared against level 8 baseline + * CloudKit currently uses Zstd level 8 for L1+ compactions + */ +public class ZstdCompressionTest +{ + private static final Logger logger = LoggerFactory.getLogger(ZstdCompressionTest.class); +// private static final String KEYSPACE = "p01_clouddb"; + private static final String KEYSPACE = "p115_clouddb"; + private static final String TABLE = "custom_zone_records"; +// private static final String SSTABLE_PATH = "/Users/minalkyada/Desktop/p01_clouddb/custom_zone_records-5fb29180edd8300084c8712786214e10"; + private static final String SSTABLE_PATH = "/Users/minalkyada/Desktop/github/forked/cassandra/test/long/org/apache/cassandra/io/compress/level4SST/p115_clouddb/custom_zone_records-00000000000000000000000003986882"; + + + private static ColumnFamilyStore store; + + /** + * Helper class to store compression test results + */ + private static class CompressionResult + { + final int level; + final long compressedSizeBytes; + final long uncompressedSizeBytes; + final double compressionRatio; + final long durationMs; + final long cpuTimeMs; + final int iterations; + + CompressionResult(int level, long compressedSizeBytes, long uncompressedSizeBytes, + long durationMs, long cpuTimeMs, int iterations) + { + this.level = level; + this.compressedSizeBytes = compressedSizeBytes; + this.uncompressedSizeBytes = uncompressedSizeBytes; + this.compressionRatio = (double) compressedSizeBytes / uncompressedSizeBytes; + this.durationMs = durationMs; + this.cpuTimeMs = cpuTimeMs; + this.iterations = iterations; + } + } + + @BeforeClass + public static void setup() + { + try + { + // Initialize Cassandra + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + ServerTestUtils.prepareServerNoRegister(); + + Keyspace.setInitialized(); + StorageService.instance.initServer(); + createTable(); + importSSTables(); + } + catch (Exception e) + { + logger.error("FATAL ERROR during setup", e); + throw new RuntimeException("Setup failed", e); + } + } + + private static void createTable() + { + logger.info("Creating keyspace {} and table {}", KEYSPACE, TABLE); + + TableMetadata customZone = CreateTableStatement.parse("CREATE TABLE " + KEYSPACE + ".custom_zone_records (\n" + + " container text,\n" + + " owner_dsid bigint,\n" + + " virtual_owner_dsid bigint,\n" + + " zone text,\n" + + " column_type ascii,\n" + + " ref text,\n" + + " rev bigint,\n" + + " idx_name text,\n" + + " idx_val blob,\n" + + " asset blob,\n" + + " val blob,\n" + + " PRIMARY KEY ((container, owner_dsid, virtual_owner_dsid, zone), column_type, ref, rev, idx_name, idx_val)\n" + + ") WITH CLUSTERING ORDER BY (column_type ASC, ref ASC, rev ASC, idx_name ASC, idx_val ASC)\n" + + " AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}\n" + + " AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor', 'lz4_compressor_type': 'high', 'lz4_high_compressor_level': '17'};" + , KEYSPACE) + .build(); + + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), customZone); + logger.info("Keyspace and table created successfully"); + } + + private static void importSSTables() + { + logger.info("Import path: {}", SSTABLE_PATH); + + File sstableDir = new File(SSTABLE_PATH); + if (!sstableDir.exists()) + { + logger.error("ERROR: SSTable directory does not exist: {}", SSTABLE_PATH); + throw new RuntimeException("SSTable directory not found: " + SSTABLE_PATH); + } + + File[] files = sstableDir.listFiles(); + if (files == null || files.length == 0) + { + logger.error("ERROR: SSTable directory is empty: {}", SSTABLE_PATH); + throw new RuntimeException("SSTable directory is empty: " + SSTABLE_PATH); + } + + logger.info("Found {} files in SSTable directory:", files.length); + for (File file : files) + { + logger.info(" - {}", file.getName()); + } + + try + { + store = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE); + logger.info("Opened ColumnFamilyStore for {}.{}", KEYSPACE, TABLE); + logger.info("Starting SSTable import..."); + store.importNewSSTables( + Collections.singleton(SSTABLE_PATH), + true, // resetLevel + true, // clearRepaired + false, // verifySSTables + false, // verifyTokens + true, // invalidateCaches + false, // extendedVerify + true // copyData + ); + + int sstableCount = store.getLiveSSTables().size(); + logger.info("Import completed. Live SSTables count: {}", sstableCount); + } + catch (Exception e) + { + logger.error("ERROR during SSTable import", e); + throw new RuntimeException("SSTable import failed", e); + } + } + + /** + * Test a compression level multiple times and return averaged results + */ + private CompressionResult testZstdCompressionWithIterations(int compressionLevel, int iterations) throws Exception + { + long totalCompressedSize = 0; + long totalUncompressedSize = 0; + long totalDuration = 0; + long totalCpuTime = 0; + + for (int i = 1; i <= iterations; i++) + { + // Set Zstd compression parameters (without dictionary) + store.setCompressionParametersJson( + "{\"chunk_length_in_kb\": \"64\", " + + "\"class\": \"org.apache.cassandra.io.compress.ZstdCompressor\", " + + "\"compression_level\": \"" + compressionLevel + "\"}" + ); + + // Get CPU time tracker + OperatingSystemMXBean osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); + long cpuTimeBefore = osBean.getProcessCpuTime(); + + // Perform recompression + long startTime = System.nanoTime(); + StorageService.instance.upgradeSSTables(KEYSPACE, false, TABLE); + long duration = System.nanoTime() - startTime; + long durationMs = duration / 1_000_000; + + // Capture CPU time + long cpuTimeAfter = osBean.getProcessCpuTime(); + long cpuTimeMs = (cpuTimeAfter - cpuTimeBefore) / 1_000_000; + + // Get SSTable sizes + long compressedSizeBytes = 0; + long uncompressedSizeBytes = 0; + + for (org.apache.cassandra.io.sstable.format.SSTableReader sstable : store.getLiveSSTables()) + { + compressedSizeBytes += sstable.onDiskLength(); + uncompressedSizeBytes += sstable.uncompressedLength(); + } + + // Accumulate totals + totalCompressedSize += compressedSizeBytes; + totalUncompressedSize += uncompressedSizeBytes; + totalDuration += durationMs; + totalCpuTime += cpuTimeMs; + + // Brief pause between iterations + if (i < iterations) + { + Thread.sleep(1000); + } + } + + // Calculate averages + long avgCompressedSize = totalCompressedSize / iterations; + long avgDuration = totalDuration / iterations; + long avgCpuTime = totalCpuTime / iterations; + + return new CompressionResult(compressionLevel, avgCompressedSize, totalUncompressedSize, + avgDuration, avgCpuTime, iterations); + } + + @Test + public void testZstdCompressionLevels9To19() throws Exception + { + final int ITERATIONS = 5; + + if (store.getLiveSSTables().isEmpty()) + { + throw new RuntimeException("No SSTables to compress"); + } + + // Capture baseline with Zstd level 8 + CompressionResult baseline = testZstdCompressionWithIterations(8, ITERATIONS); + + // Test compression levels 9 through 19 + List results = new ArrayList<>(); + for (int level = 9; level <= 19; level++) + { + CompressionResult result = testZstdCompressionWithIterations(level, ITERATIONS); + results.add(result); + Thread.sleep(2000); + } + + // Print summary + printResultsSummary(results, baseline); + } + + private void printResultsSummary(List results, CompressionResult baseline) + { + logger.info(""); + logger.info("================================================================================"); + logger.info(" ZSTD COMPRESSION RESULTS (AVERAGED OVER {} ITERATIONS)", baseline.iterations); + logger.info("================================================================================"); + logger.info(""); + logger.info("BASELINE (Zstd Level 8 - CloudKit Current):"); + logger.info(" Compressed: {} MB | Uncompressed: {} MB | Ratio: {} | CPU: {} ms | Duration: {} ms", + baseline.compressedSizeBytes / 1024 / 1024, + baseline.uncompressedSizeBytes / 1024 / 1024, + String.format("%.4f", baseline.compressionRatio), + baseline.cpuTimeMs, + baseline.durationMs); + logger.info(""); + logger.info("--------------------------------------------------------------------------------"); + logger.info(String.format("%-6s | %-10s | %-16s | %-12s | %-12s | %-20s", + "Level", "Ratio", "Compressed (MB)", "Duration (ms)", "CPU Time (ms)", "vs Baseline")); + logger.info("--------------------------------------------------------------------------------"); + + for (CompressionResult result : results) + { + double sizeDiffPercent = ((double) (result.compressedSizeBytes - baseline.compressedSizeBytes) / baseline.compressedSizeBytes) * 100; + long savedMB = (baseline.compressedSizeBytes - result.compressedSizeBytes) / 1024 / 1024; + + logger.info(String.format("%-6d | %-10.4f | %,16d | %,12d | %,12d | %+6d MB (%+.2f%%)", + result.level, + result.compressionRatio, + result.compressedSizeBytes / 1024 / 1024, + result.durationMs, + result.cpuTimeMs, + savedMB, + -sizeDiffPercent)); + } + + logger.info("================================================================================"); + } +} \ No newline at end of file diff --git a/test/long/org/apache/cassandra/io/compress/ZstdDictionaryCompressionTest.java b/test/long/org/apache/cassandra/io/compress/ZstdDictionaryCompressionTest.java new file mode 100644 index 000000000000..0e6e6f1ffebf --- /dev/null +++ b/test/long/org/apache/cassandra/io/compress/ZstdDictionaryCompressionTest.java @@ -0,0 +1,718 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import com.sun.management.OperatingSystemMXBean; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.compression.*; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.schema.*; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.db.compaction.OperationType; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.lang.management.ManagementFactory; +import java.util.*; + +public class ZstdDictionaryCompressionTest { + private static final Logger logger = LoggerFactory.getLogger(ZstdDictionaryCompressionTest.class); + private static final String KEYSPACE = "p115_clouddb"; + private static final String TABLE = "custom_zone_records"; + private static final String SSTABLE_PATH = "/Users/minalkyada/Desktop/github/forked/cassandra/test/long/org/apache/cassandra/io/compress/level4SST/p115_clouddb/custom_zone_records-00000000000000000000000003986882"; + + private static final int DICT_SIZE = 64 * 1024; // 64KB dictionary + private static final int MAX_SAMPLES = 1000; // Maximum number of samples to collect + private static final int SAMPLE_SIZE = 16 * 1024; // 16KB per sample + + private static ColumnFamilyStore store; + private static CompressionDictionary dictionary; + + /** + * Helper class to store compression test results + */ + private static class CompressionResult + { + final int level; + final long compressedSizeBytes; + final long uncompressedSizeBytes; + final double compressionRatio; + final long durationMs; + final long cpuTimeMs; + final int iterations; + + CompressionResult(int level, long compressedSizeBytes, long uncompressedSizeBytes, + long durationMs, long cpuTimeMs, int iterations) + { + this.level = level; + this.compressedSizeBytes = compressedSizeBytes; + this.uncompressedSizeBytes = uncompressedSizeBytes; + this.compressionRatio = (double) compressedSizeBytes / uncompressedSizeBytes; + this.durationMs = durationMs; + this.cpuTimeMs = cpuTimeMs; + this.iterations = iterations; + } + } + + @BeforeClass + public static void setup() + { + try + { + // Initialize Cassandra + DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); + ServerTestUtils.prepareServerNoRegister(); + + Keyspace.setInitialized(); + StorageService.instance.initServer(); + createTable(); + importSSTables(); + } + catch (Exception e) + { + logger.error("FATAL ERROR during setup", e); + throw new RuntimeException("Setup failed", e); + } + } + + private static void createTable() + { + // Create table with ZstdDictionaryCompressor enabled + TableMetadata customZone = CreateTableStatement.parse("CREATE TABLE " + KEYSPACE + ".custom_zone_records (\n" + + " container text,\n" + + " owner_dsid bigint,\n" + + " virtual_owner_dsid bigint,\n" + + " zone text,\n" + + " column_type ascii,\n" + + " ref text,\n" + + " rev bigint,\n" + + " idx_name text,\n" + + " idx_val blob,\n" + + " asset blob,\n" + + " val blob,\n" + + " PRIMARY KEY ((container, owner_dsid, virtual_owner_dsid, zone), column_type, ref, rev, idx_name, idx_val)\n" + + ") WITH CLUSTERING ORDER BY (column_type ASC, ref ASC, rev ASC, idx_name ASC, idx_val ASC)\n" + + " AND compaction = {'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}\n" + + " AND compression = {" + + " 'chunk_length_in_kb': '64', " + + " 'class': 'org.apache.cassandra.io.compress.ZstdDictionaryCompressor', " + + " 'compression_level': '3', " + + " 'enabled': 'true'" + + " };" + , KEYSPACE) + .build(); + + SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), customZone); + } + + private static void importSSTables() + { + File sstableDir = new File(SSTABLE_PATH); + if (!sstableDir.exists()) + { + throw new RuntimeException("SSTable directory not found: " + SSTABLE_PATH); + } + + File[] files = sstableDir.listFiles(); + if (files == null || files.length == 0) + { + throw new RuntimeException("SSTable directory is empty: " + SSTABLE_PATH); + } + + logger.info("Found {} files in SSTable directory: {}", files.length, SSTABLE_PATH); + for (File file : files) + { + logger.info(" - {}", file.getName()); + } + + try + { + store = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE); + logger.info("Importing SSTables from: {}", SSTABLE_PATH); + + store.importNewSSTables( + Collections.singleton(SSTABLE_PATH), + true, // resetLevel + true, // clearRepaired + false, // verifySSTables + false, // verifyTokens + true, // invalidateCaches + false, // extendedVerify + true // copyData + ); + + int sstableCount = store.getLiveSSTables().size(); + logger.info("Import completed. Live SSTables count: {}", sstableCount); + + if (sstableCount == 0) + { + throw new RuntimeException("No SSTables imported! Check that SSTable files are valid and match the schema."); + } + + // Train dictionary from imported SSTables + trainDictionary(); + } + catch (Exception e) + { + logger.error("ERROR during SSTable import", e); + throw new RuntimeException("SSTable import failed", e); + } + } + + /** + * Train a compression dictionary from the imported SSTable data and register it with Cassandra + */ + private static void trainDictionary() + { + logger.info("================================================================================"); + logger.info("TRAINING ZSTD COMPRESSION DICTIONARY FROM SSTABLE DATA"); + logger.info("================================================================================"); + + try + { + CompressionParams compressionParams = store.metadata().params.compression; + CompressionDictionaryTrainingConfig config = CompressionDictionaryTrainingConfig.builder() + .maxDictionarySize(DICT_SIZE) + .maxTotalSampleSize(MAX_SAMPLES * SAMPLE_SIZE) + .samplingRate(1.0f) + .chunkSize(SAMPLE_SIZE) + .build(); + + ICompressionDictionaryTrainer trainer = ICompressionDictionaryTrainer.create( + KEYSPACE, + TABLE, + compressionParams, + config + ); + + if (!trainer.start(true)) + { + throw new RuntimeException("Failed to start dictionary trainer"); + } + + SSTableChunkSampler.sampleFromSSTables( + store.getLiveSSTables(), + trainer, + config + ); + + dictionary = trainer.trainDictionary(false); + + if (dictionary == null) + { + throw new RuntimeException("Dictionary training failed - returned null"); + } + + logger.info("Dictionary trained successfully:"); + logger.info(" - Dictionary size: {} bytes ({} KB)", + dictionary.rawDictionary().length, + dictionary.rawDictionary().length / 1024); + logger.info(" - Dictionary ID: {}", dictionary.dictId()); + logger.info(" - Dictionary Kind: {}", dictionary.kind()); + + SystemDistributedKeyspace.storeCompressionDictionary(KEYSPACE, TABLE, dictionary); + store = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE); + + CompressionDictionaryManager dictionaryManager = store.compressionDictionaryManager(); + if (dictionaryManager != null && dictionaryManager.isEnabled()) + { + logger.info("Waiting 10 seconds for dictionary to load into cache..."); + Thread.sleep(10000); + } + else + { + logger.warn("Dictionary manager is not enabled or not available"); + } + + logger.info("================================================================================"); + + trainer.close(); + } + catch (Exception e) + { + logger.error("FATAL ERROR during dictionary training", e); + throw new RuntimeException("Dictionary training failed", e); + } + } + + @AfterClass + public static void tearDown() + { + if (dictionary != null) + { + dictionary.close(); + } + } + + /** + * Update compression parameters for the table with specified level + */ + private void updateCompressionLevel(int compressionLevel) throws Exception + { + Map compressionOptions = new HashMap<>(); + compressionOptions.put("class", "org.apache.cassandra.io.compress.ZstdDictionaryCompressor"); + compressionOptions.put("chunk_length_in_kb", "64"); + compressionOptions.put("compression_level", String.valueOf(compressionLevel)); + compressionOptions.put("enabled", "true"); + + CompressionParams newCompressionParams = CompressionParams.fromMap(compressionOptions); + + TableMetadata currentMetadata = store.metadata(); + TableMetadata.Builder metadataBuilder = currentMetadata.unbuild(); + metadataBuilder.compression(newCompressionParams); + TableMetadata newMetadata = metadataBuilder.build(); + + KeyspaceMetadata keyspaceMetadata = Keyspace.open(KEYSPACE).getMetadata(); + Tables updatedTables = keyspaceMetadata.tables.withSwapped(newMetadata); + KeyspaceMetadata updatedKeyspace = keyspaceMetadata.withSwapped(updatedTables); + SchemaTestUtil.submit(metadata -> metadata.schema.getKeyspaces().withAddedOrUpdated(updatedKeyspace)); + } + + /** + * Perform recompression and collect metrics for a single iteration + */ + private SingleIterationMetrics performSingleCompression() throws Exception + { + OperatingSystemMXBean osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); + long cpuTimeBefore = osBean.getProcessCpuTime(); + + long startTime = System.nanoTime(); + StorageService.instance.upgradeSSTables(KEYSPACE, false, TABLE); + long duration = System.nanoTime() - startTime; + long durationMs = duration / 1_000_000; + + long cpuTimeAfter = osBean.getProcessCpuTime(); + long cpuTimeMs = (cpuTimeAfter - cpuTimeBefore) / 1_000_000; + + long compressedSizeBytes = 0; + long uncompressedSizeBytes = 0; + + for (org.apache.cassandra.io.sstable.format.SSTableReader sstable : store.getLiveSSTables()) + { + compressedSizeBytes += sstable.onDiskLength(); + uncompressedSizeBytes += sstable.uncompressedLength(); + } + + return new SingleIterationMetrics(compressedSizeBytes, uncompressedSizeBytes, durationMs, cpuTimeMs); + } + + /** + * Verify dictionary attachment to compressor + */ + private void verifyDictionaryAttachment(int compressionLevel) + { + for (org.apache.cassandra.io.sstable.format.SSTableReader sstable : store.getLiveSSTables()) + { + CompressionMetadata sstableCompression = sstable.getCompressionMetadata(); + if (sstableCompression != null) + { + ICompressor compressor = sstableCompression.compressor(); + if (compressor instanceof ZstdDictionaryCompressor) + { + ZstdDictionaryCompressor dictCompressor = (ZstdDictionaryCompressor) compressor; + boolean hasDictionary = dictCompressor.dictionary() != null; + String dictId = hasDictionary ? String.valueOf(dictCompressor.dictionary().dictId()) : "NONE"; + + logger.info("Level {}: compressor={}, level={}, hasDictionary={}, dictId={}", + compressionLevel, + compressor.getClass().getSimpleName(), + dictCompressor.compressionLevel(), + hasDictionary, + dictId); + + if (!hasDictionary) + { + logger.warn("WARNING [Level {}]: Dictionary NOT attached to compressor!", compressionLevel); + } + } + } + } + } + + /** + * Test compression at specified level with multiple iterations and average the results + */ + private CompressionResult testZstdDictionaryCompressionWithIterations(int compressionLevel, int iterations) throws Exception + { + logger.info("Testing level {} with {} iterations", compressionLevel, iterations); + + updateCompressionLevel(compressionLevel); + + long totalCompressedSize = 0; + long totalDuration = 0; + long totalCpuTime = 0; + long uncompressedSizeBytes = 0; // Uncompressed size is the same for all iterations + + for (int i = 1; i <= iterations; i++) + { + logger.info(" Iteration {}/{} for level {}", i, iterations, compressionLevel); + + SingleIterationMetrics metrics = performSingleCompression(); + + totalCompressedSize += metrics.compressedSizeBytes; + totalDuration += metrics.durationMs; + totalCpuTime += metrics.cpuTimeMs; + + // Uncompressed size is deterministic and same for all iterations, so just capture it once + if (i == 1) + { + uncompressedSizeBytes = metrics.uncompressedSizeBytes; + } + + if (i < iterations) + { + ZstdDictionaryCompressor.invalidateCache(); + Thread.sleep(1000); + } + } + + // Verify dictionary attachment (check once at the end) + verifyDictionaryAttachment(compressionLevel); + + // Calculate averages + long avgCompressedSize = totalCompressedSize / iterations; + long avgDuration = totalDuration / iterations; + long avgCpuTime = totalCpuTime / iterations; + + return new CompressionResult(compressionLevel, avgCompressedSize, uncompressedSizeBytes, + avgDuration, avgCpuTime, iterations); + } + + /** + * Helper class to store metrics from a single compression iteration + */ + private static class SingleIterationMetrics + { + final long compressedSizeBytes; + final long uncompressedSizeBytes; + final long durationMs; + final long cpuTimeMs; + + SingleIterationMetrics(long compressedSizeBytes, long uncompressedSizeBytes, long durationMs, long cpuTimeMs) + { + this.compressedSizeBytes = compressedSizeBytes; + this.uncompressedSizeBytes = uncompressedSizeBytes; + this.durationMs = durationMs; + this.cpuTimeMs = cpuTimeMs; + } + } + + @Test + public void testZstdDictionaryCompressionLevels8To19() throws Exception + { + final int ITERATIONS = 5; + + logger.info("================================================================================"); + logger.info(" ZSTD+DICTIONARY COMPRESSION TEST - LEVELS 8-19 ({} iterations per level)", ITERATIONS); + logger.info("================================================================================"); + logger.info(""); + + if (store.getLiveSSTables().isEmpty()) + { + throw new RuntimeException("No SSTables to compress"); + } + + // Capture baseline with level 8 (averaged over 5 iterations) + CompressionResult baseline = testZstdDictionaryCompressionWithIterations(8, ITERATIONS); + ZstdDictionaryCompressor.invalidateCache(); + Thread.sleep(2000); + + // Test compression levels 9 through 19 (each averaged over 5 iterations) + List results = new ArrayList<>(); + for (int level = 9; level <= 19; level++) + { + try + { + CompressionResult result = testZstdDictionaryCompressionWithIterations(level, ITERATIONS); + results.add(result); + + ZstdDictionaryCompressor.invalidateCache(); + Thread.sleep(2000); + } + catch (Exception e) + { + logger.error("ERROR testing Zstd+Dictionary compression level {}", level, e); + throw e; + } + } + + printResultsSummary(results, baseline); + } + + private void printResultsSummary(List results, CompressionResult baseline) + { + logger.info(""); + logger.info("================================================================================"); + logger.info(" ZSTD+DICTIONARY COMPRESSION RESULTS (AVERAGED OVER {} ITERATIONS)", baseline.iterations); + logger.info("================================================================================"); + logger.info(""); + logger.info("BASELINE (Zstd+Dictionary Level 8):"); + logger.info(" Compressed: {} MB | Uncompressed: {} MB | Ratio: {} | CPU: {} ms | Duration: {} ms", + baseline.compressedSizeBytes / 1024 / 1024, + baseline.uncompressedSizeBytes / 1024 / 1024, + String.format("%.4f", baseline.compressionRatio), + baseline.cpuTimeMs, + baseline.durationMs); + logger.info(""); + logger.info("--------------------------------------------------------------------------------"); + logger.info(String.format("%-6s | %-10s | %-16s | %-12s | %-12s | %-20s", + "Level", "Ratio", "Compressed (MB)", "Duration (ms)", "CPU Time (ms)", "vs Baseline")); + logger.info("--------------------------------------------------------------------------------"); + + for (CompressionResult result : results) + { + double sizeDiffPercent = ((double) (result.compressedSizeBytes - baseline.compressedSizeBytes) / baseline.compressedSizeBytes) * 100; + long savedMB = (baseline.compressedSizeBytes - result.compressedSizeBytes) / 1024 / 1024; + + logger.info(String.format("%-6d | %-10.4f | %,16d | %,12d | %,12d | %+6d MB (%+.2f%%)", + result.level, + result.compressionRatio, + result.compressedSizeBytes / 1024 / 1024, + result.durationMs, + result.cpuTimeMs, + savedMB, + -sizeDiffPercent)); + } + + logger.info("================================================================================"); + logger.info(""); + logger.info("NOTE: All metrics (Compressed size, Duration, CPU time) are averaged over {} iterations.", baseline.iterations); + logger.info(" Compressed size is deterministic; Duration and CPU time may vary between runs."); + logger.info("================================================================================"); + } + + @Test + public void testCrossSStableZstdDictionaryCompression() throws Exception + { + final int ITERATIONS = 5; + final String TRAINING_SSTABLE_PATH = "/Users/minalkyada/Desktop/github/forked/cassandra/test/long/org/apache/cassandra/io/compress/level4SST/p115_clouddb/custom_zone_records-00000000000000000000000003986882"; + final String COMPRESSION_SSTABLE_PATH = "/Users/minalkyada/Desktop/github/forked/cassandra/test/long/org/apache/cassandra/io/compress/level4SST/p115_clouddb/custom_zone_records-00000000000000000000000005640577"; + + logger.info("================================================================================"); + logger.info(" CROSS-SSTABLE DICTIONARY COMPRESSION TEST - LEVELS 8-19"); + logger.info(" Training Dictionary: custom_zone_records-3986882"); + logger.info(" Compressing SSTable: custom_zone_records-5640577"); + logger.info("================================================================================"); + logger.info(""); + + try + { + // Step 1: Import training SSTable and train dictionary + importAndTrainDictionary(TRAINING_SSTABLE_PATH); + + // Step 2: Clear SSTables and import the target SSTable for compression + clearAndImportTargetSSTable(COMPRESSION_SSTABLE_PATH); + + // Step 3: Run compression tests on all levels (8-19) + logger.info("Starting compression tests on target SSTable with trained dictionary"); + logger.info(""); + + // Capture baseline with level 8 + CompressionResult baseline = testZstdDictionaryCompressionWithIterations(8, ITERATIONS); + ZstdDictionaryCompressor.invalidateCache(); + Thread.sleep(2000); + + // Test compression levels 9 through 19 + List results = new ArrayList<>(); + for (int level = 9; level <= 19; level++) + { + try + { + CompressionResult result = testZstdDictionaryCompressionWithIterations(level, ITERATIONS); + results.add(result); + + ZstdDictionaryCompressor.invalidateCache(); + Thread.sleep(2000); + } + catch (Exception e) + { + logger.error("ERROR testing Zstd+Dictionary compression level {}", level, e); + throw e; + } + } + + printCrossSSTableResultsSummary(results, baseline); + } + catch (Exception e) + { + logger.error("FATAL ERROR in cross-SSTable dictionary compression test", e); + throw e; + } + } + + /** + * Import SSTable for training and train dictionary from it + */ + private void importAndTrainDictionary(String trainingSSTablePath) throws Exception + { + logger.info("================================================================================"); + logger.info("STEP 1: IMPORTING TRAINING SSTABLE AND TRAINING DICTIONARY"); + logger.info("Training SSTable: {}", trainingSSTablePath); + logger.info("================================================================================"); + + // Clear all existing SSTables first (from initial setup) + logger.info("Clearing existing SSTables from setup..."); + for (org.apache.cassandra.io.sstable.format.SSTableReader sstable : store.getLiveSSTables()) + { + store.markObsolete(Collections.singleton(sstable), OperationType.UNKNOWN); + } + logger.info("Existing SSTables cleared"); + + File sstableDir = new File(trainingSSTablePath); + if (!sstableDir.exists()) + { + throw new RuntimeException("Training SSTable directory not found: " + trainingSSTablePath); + } + + // Import training SSTable + store.importNewSSTables( + Collections.singleton(trainingSSTablePath), + true, // resetLevel + true, // clearRepaired + false, // verifySSTables + false, // verifyTokens + true, // invalidateCaches + false, // extendedVerify + true // copyData + ); + + int sstableCount = store.getLiveSSTables().size(); + logger.info("Imported {} SSTable(s) for training", sstableCount); + + if (sstableCount == 0) + { + throw new RuntimeException("No training SSTables imported!"); + } + + if (sstableCount != 1) + { + logger.warn("Expected 1 SSTable for training, but found {}. Proceeding with training...", sstableCount); + } + + // Train dictionary from the imported SSTable + trainDictionary(); + } + + /** + * Clear existing SSTables and import the target SSTable for compression + */ + private void clearAndImportTargetSSTable(String targetSSTablePath) throws Exception + { + logger.info("================================================================================"); + logger.info("STEP 2: IMPORTING TARGET SSTABLE FOR COMPRESSION"); + logger.info("Target SSTable: {}", targetSSTablePath); + logger.info("================================================================================"); + + // Clear all existing SSTables (training SSTable) + logger.info("Clearing training SSTables..."); + for (org.apache.cassandra.io.sstable.format.SSTableReader sstable : store.getLiveSSTables()) + { + store.markObsolete(Collections.singleton(sstable), OperationType.UNKNOWN); + } + logger.info("Training SSTables cleared"); + + // Import target SSTable for compression + File targetDir = new File(targetSSTablePath); + if (!targetDir.exists()) + { + throw new RuntimeException("Target SSTable directory not found: " + targetSSTablePath); + } + + store.importNewSSTables( + Collections.singleton(targetSSTablePath), + true, // resetLevel + true, // clearRepaired + false, // verifySSTables + false, // verifyTokens + true, // invalidateCaches + false, // extendedVerify + true // copyData + ); + + int sstableCount = store.getLiveSSTables().size(); + logger.info("Imported {} SSTable(s) for compression", sstableCount); + + if (sstableCount == 0) + { + throw new RuntimeException("No target SSTables imported!"); + } + + logger.info("================================================================================"); + logger.info(""); + } + + /** + * Print results summary for cross-SSTable compression test + */ + private void printCrossSSTableResultsSummary(List results, CompressionResult baseline) + { + logger.info(""); + logger.info("================================================================================"); + logger.info(" CROSS-SSTABLE DICTIONARY COMPRESSION RESULTS (AVERAGED OVER {} ITERATIONS)", baseline.iterations); + logger.info("================================================================================"); + logger.info(""); + logger.info("Dictionary trained on: custom_zone_records-3986882"); + logger.info("Compression applied to: custom_zone_records-5640577"); + logger.info(""); + logger.info("BASELINE (Zstd+Dictionary Level 8):"); + logger.info(" Compressed: {} MB | Uncompressed: {} MB | Ratio: {} | CPU: {} ms | Duration: {} ms", + baseline.compressedSizeBytes / 1024 / 1024, + baseline.uncompressedSizeBytes / 1024 / 1024, + String.format("%.4f", baseline.compressionRatio), + baseline.cpuTimeMs, + baseline.durationMs); + logger.info(""); + logger.info("--------------------------------------------------------------------------------"); + logger.info(String.format("%-6s | %-10s | %-16s | %-12s | %-12s | %-20s", + "Level", "Ratio", "Compressed (MB)", "Duration (ms)", "CPU Time (ms)", "vs Baseline")); + logger.info("--------------------------------------------------------------------------------"); + + for (CompressionResult result : results) + { + double sizeDiffPercent = ((double) (result.compressedSizeBytes - baseline.compressedSizeBytes) / baseline.compressedSizeBytes) * 100; + long savedMB = (baseline.compressedSizeBytes - result.compressedSizeBytes) / 1024 / 1024; + + logger.info(String.format("%-6d | %-10.4f | %,16d | %,12d | %,12d | %+6d MB (%+.2f%%)", + result.level, + result.compressionRatio, + result.compressedSizeBytes / 1024 / 1024, + result.durationMs, + result.cpuTimeMs, + savedMB, + -sizeDiffPercent)); + } + + logger.info("================================================================================"); + logger.info(""); + logger.info("NOTE: Dictionary trained on one SSTable, applied to compress a different SSTable."); + logger.info(" All metrics averaged over {} iterations.", baseline.iterations); + logger.info(" This tests dictionary generalization across different SSTables."); + logger.info("================================================================================"); + } +}