diff --git a/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java b/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java new file mode 100644 index 000000000000..d70746408adf --- /dev/null +++ b/src/java/org/apache/cassandra/repair/MutationTrackingIncrementalRepairTask.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.concurrent.ExecutorPlus; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.service.replication.migration.KeyspaceMigrationInfo; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; + +/** Incremental repair task for keyspaces using mutation tracking */ +public class MutationTrackingIncrementalRepairTask extends AbstractRepairTask +{ + private static final long SYNC_TIMEOUT_MINUTES = 30; + + private final TimeUUID parentSession; + private final RepairCoordinator.NeighborsAndRanges neighborsAndRanges; + private final String[] cfnames; + + protected MutationTrackingIncrementalRepairTask(RepairCoordinator coordinator, + TimeUUID parentSession, + RepairCoordinator.NeighborsAndRanges neighborsAndRanges, + String[] cfnames) + { + super(coordinator); + this.parentSession = parentSession; + this.neighborsAndRanges = neighborsAndRanges; + this.cfnames = cfnames; + } + + @Override + public String name() + { + return "MutationTrackingIncrementalRepair"; + } + + @Override + public Future performUnsafe(ExecutorPlus executor, Scheduler validationScheduler) + { + List allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames); + + if (allRanges.isEmpty()) + { + logger.info("No common ranges to repair for keyspace {}", keyspace); + return new AsyncPromise().setSuccess(CoordinatedRepairResult.create(List.of(), List.of())); + } + + List syncCoordinators = new ArrayList<>(); + List>> rangeCollections = new ArrayList<>(); + + for (CommonRange commonRange : allRanges) + { + for (Range range : commonRange.ranges) + { + MutationTrackingSyncCoordinator syncCoordinator = new MutationTrackingSyncCoordinator(keyspace, range); + syncCoordinator.start(); + syncCoordinators.add(syncCoordinator); + rangeCollections.add(List.of(range)); + + logger.info("Started mutation tracking sync for range {}", range); + } + } + + coordinator.notifyProgress("Started mutation tracking sync for " + syncCoordinators.size() + " ranges"); + + AsyncPromise resultPromise = new AsyncPromise<>(); + + executor.execute(() -> { + try + { + waitForSyncCompletion(syncCoordinators, executor, validationScheduler, allRanges, rangeCollections, resultPromise); + } + catch (Exception e) + { + logger.error("Error during mutation tracking repair", e); + resultPromise.tryFailure(e); + } + }); + + return resultPromise; + } + + private void waitForSyncCompletion(List syncCoordinators, + ExecutorPlus executor, + Scheduler validationScheduler, + List allRanges, + List>> rangeCollections, + AsyncPromise resultPromise) throws InterruptedException + { + boolean allSucceeded = true; + for (MutationTrackingSyncCoordinator syncCoordinator : syncCoordinators) + { + boolean completed = syncCoordinator.awaitCompletion(SYNC_TIMEOUT_MINUTES, TimeUnit.MINUTES); + if (!completed) + { + logger.warn("Mutation tracking sync timed out for keyspace {} range {}", + keyspace, syncCoordinator.getRange()); + syncCoordinator.cancel(); + allSucceeded = false; + } + } + + if (!allSucceeded) + { + resultPromise.tryFailure(new RuntimeException("Mutation tracking sync timed out for some ranges")); + return; + } + + coordinator.notifyProgress("Mutation tracking sync completed for all ranges"); + + if (requiresTraditionalRepair(keyspace)) + { + runTraditionalRepairForMigration(executor, validationScheduler, allRanges, resultPromise); + } + else + { + // Pure mutation tracking - create successful result + resultPromise.trySuccess(CoordinatedRepairResult.create(rangeCollections, List.of())); + } + } + + private void runTraditionalRepairForMigration(ExecutorPlus executor, + Scheduler validationScheduler, + List allRanges, + AsyncPromise resultPromise) + { + coordinator.notifyProgress("Running traditional repair for migration"); + + // Use the inherited runRepair method from AbstractRepairTask + Future traditionalRepair = runRepair(parentSession, true, executor, + validationScheduler, allRanges, + neighborsAndRanges.shouldExcludeDeadParticipants, + cfnames); + + traditionalRepair.addListener(f -> { + try + { + CoordinatedRepairResult result = (CoordinatedRepairResult) f.get(); + resultPromise.setSuccess(result); + } + catch (Exception e) + { + resultPromise.setFailure(e); + } + }); + } + + /** + * Determines if this keyspace should use mutation tracking incremental repair. + * Returns true if: + * - Keyspace uses mutation tracking replication, OR + * - Keyspace is currently migrating (either direction) + */ + public static boolean shouldUseMutationTrackingRepair(String keyspace) + { + ClusterMetadata metadata = ClusterMetadata.current(); + KeyspaceMetadata ksm = metadata.schema.maybeGetKeyspaceMetadata(keyspace).orElse(null); + if (ksm == null) + return false; + + // Check if keyspace uses mutation tracking + if (ksm.useMutationTracking()) + return true; + + // Check if keyspace is in migration (either direction) + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(keyspace); + return migrationInfo != null; + } + + /** + * Determines if we also need to run traditional repair. + * Returns true during migration: + * - Migrating TO mutation tracking: need traditional repair to sync pre-migration data + * - Migrating FROM mutation tracking: need traditional repair for post-migration consistency + */ + public static boolean requiresTraditionalRepair(String keyspace) + { + ClusterMetadata metadata = ClusterMetadata.current(); + KeyspaceMigrationInfo migrationInfo = metadata.mutationTrackingMigrationState.getKeyspaceInfo(keyspace); + return migrationInfo != null; + } +} diff --git a/src/java/org/apache/cassandra/repair/RepairCoordinator.java b/src/java/org/apache/cassandra/repair/RepairCoordinator.java index b511d081c984..55274dd7b996 100644 --- a/src/java/org/apache/cassandra/repair/RepairCoordinator.java +++ b/src/java/org/apache/cassandra/repair/RepairCoordinator.java @@ -503,7 +503,15 @@ private Future>> repair(String[] } else if (state.options.isIncremental()) { - task = new IncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames); + // For keyspaces using mutation tracking, use the mutation tracking repair task + if (MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(state.keyspace)) + { + task = new MutationTrackingIncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames); + } + else + { + task = new IncrementalRepairTask(this, state.id, neighborsAndRanges, cfnames); + } } else { diff --git a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java index abd23dc3f218..6246a7286859 100644 --- a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java +++ b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.cassandra.db.TypeSizes; @@ -51,6 +52,11 @@ boolean isEmpty() return replicatedOffsets.isEmpty(); } + public List getOffsets() + { + return Collections.unmodifiableList(replicatedOffsets); + } + @Override public String toString() { diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java b/src/java/org/apache/cassandra/replication/CoordinatorLog.java index 56180c25945a..50ec6c24d8b2 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java +++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java @@ -283,6 +283,24 @@ Offsets.Immutable collectReconciledOffsets() } } + /** + * Returns the UNION of all witnessed offsets from all participants. + * This represents all offsets that ANY replica has witnessed. + */ + Offsets.Immutable collectUnionOfWitnessedOffsets() + { + lock.readLock().lock(); + try + { + Offsets.Mutable union = witnessedOffsets.union(); + return union.isEmpty() ? null : Offsets.Immutable.copy(union); + } + finally + { + lock.readLock().unlock(); + } + } + public long getUnreconciledCount() { lock.readLock().lock(); diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index 18394bdf5390..e28fe825f61e 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -131,6 +131,8 @@ public class MutationTrackingService private final IncomingMutations incomingMutations = new IncomingMutations(); private final OutgoingMutations outgoingMutations = new OutgoingMutations(); + private final Map> syncCoordinatorsByKeyspace = new ConcurrentHashMap<>(); + private volatile boolean started = false; private MutationTrackingService() @@ -310,6 +312,19 @@ public void updateReplicatedOffsets(String keyspace, Range range, List coordinators = syncCoordinatorsByKeyspace.get(keyspace); + if (coordinators != null) + { + for (MutationTrackingSyncCoordinator coordinator : coordinators) + { + if (range.intersects(coordinator.getRange())) + { + coordinator.onOffsetsReceived(); + } + } + } } public void recordFullyReconciledOffsets(ReconciledLogSnapshot reconciledSnapshot) @@ -367,6 +382,30 @@ public boolean registerMutationCallback(ShortMutationId mutationId, IncomingMuta return incomingMutations.subscribe(mutationId, callback); } + /** + * Register a sync coordinator to be notified when offset updates arrive. + */ + public void registerSyncCoordinator(MutationTrackingSyncCoordinator coordinator) + { + syncCoordinatorsByKeyspace.computeIfAbsent(coordinator.getKeyspace(), k -> ConcurrentHashMap.newKeySet()) + .add(coordinator); + } + + /** + * Unregister a sync coordinator. + */ + public void unregisterSyncCoordinator(MutationTrackingSyncCoordinator coordinator) + { + Set coordinators = syncCoordinatorsByKeyspace.get(coordinator.getKeyspace()); + if (coordinators != null) + { + coordinators.remove(coordinator); + + if (coordinators.isEmpty()) + syncCoordinatorsByKeyspace.remove(coordinator.getKeyspace(), coordinators); + } + } + public void executeTransfers(String keyspace, Set sstables, ConsistencyLevel cl) { shardLock.readLock().lock(); @@ -495,6 +534,21 @@ public Iterable getShards() return shards; } + public void forEachShardInKeyspace(String keyspace, Consumer consumer) + { + shardLock.readLock().lock(); + try + { + KeyspaceShards ksShards = keyspaceShards.get(keyspace); + if (ksShards != null) + ksShards.forEachShard(consumer); + } + finally + { + shardLock.readLock().unlock(); + } + } + public void collectLocallyMissingMutations(MutationSummary remoteSummary, Log2OffsetsMap.Mutable into) { shardLock.readLock().lock(); diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java new file mode 100644 index 000000000000..4210d4638306 --- /dev/null +++ b/src/java/org/apache/cassandra/replication/MutationTrackingSyncCoordinator.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.replication; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.utils.concurrent.AsyncPromise; + +public class MutationTrackingSyncCoordinator +{ + private static final Logger logger = LoggerFactory.getLogger(MutationTrackingSyncCoordinator.class); + + private final String keyspace; + private final Range range; + private final AsyncPromise completionFuture = new AsyncPromise<>(); + + // Per-shard state: tracks what each node has reported for that shard + private final Map, ShardSyncState> shardStates = new ConcurrentHashMap<>(); + + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean completed = new AtomicBoolean(false); + + public MutationTrackingSyncCoordinator(String keyspace, Range range) + { + this.keyspace = keyspace; + this.range = range; + } + + public void start() + { + if (!started.compareAndSet(false, true)) + throw new IllegalStateException("Sync coordinator already started"); + + List overlappingShards; + + overlappingShards = new ArrayList<>(); + MutationTrackingService.instance.forEachShardInKeyspace(keyspace, shard -> { + if (shard.range.intersects(range)) + overlappingShards.add(shard); + }); + + if (overlappingShards.isEmpty()) + { + completionFuture.setSuccess(null); + return; + } + + // Register to receive offset updates + MutationTrackingService.instance.registerSyncCoordinator(this); + + // Initialize state for each shard and capture targets + for (Shard shard : overlappingShards) + { + ShardSyncState state = new ShardSyncState(shard); + state.captureTargets(); + shardStates.put(shard.range, state); + } + + if (checkIfComplete()) + { + complete(); + return; + } + + logger.info("Sync coordinator started for keyspace {} range {}, tracking {} shards", + keyspace, range, overlappingShards.size()); + } + + private void complete() + { + if (!completed.compareAndSet(false, true)) + return; + MutationTrackingService.instance.unregisterSyncCoordinator(this); + completionFuture.setSuccess(null); + } + + private boolean checkIfComplete() + { + for (ShardSyncState state : shardStates.values()) + { + if (!state.isComplete()) + return false; + } + return true; + } + + public void onOffsetsReceived() + { + if (completed.get()) + return; + + // The underlying CoordinatorLog already updates its reconciled offsets. + // We just need to re-check if we're now complete. + if (checkIfComplete()) + { + complete(); + } + } + + public String getKeyspace() + { + return keyspace; + } + + public Range getRange() + { + return range; + } + + /** + * Blocks until sync completes or timeout is reached. + * + * @param timeout Maximum time to wait + * @param unit Time unit + * @return true if completed, false if timed out + */ + public boolean awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException + { + try + { + completionFuture.get(timeout, unit); + return true; + } + catch (java.util.concurrent.TimeoutException e) + { + return false; + } + catch (java.util.concurrent.ExecutionException e) + { + throw new RuntimeException(e.getCause()); + } + } + + public void cancel() + { + if (completed.compareAndSet(false, true)) + { + MutationTrackingService.instance.unregisterSyncCoordinator(this); + completionFuture.setFailure(new RuntimeException("Sync cancelled")); + } + } + + /** + * Tracks sync state for a single shard. + */ + private static class ShardSyncState + { + private final Shard shard; + + // Target offsets: LogId -> the offsets we're waiting for all nodes to have + private final Map targets = new ConcurrentHashMap<>(); + + ShardSyncState(Shard shard) + { + this.shard = shard; + } + + void captureTargets() + { + Map unionOffsets = shard.collectUnionOfWitnessedOffsetsPerLog(); + targets.putAll(unionOffsets); + } + + boolean isComplete() + { + Map currentReconciled = shard.collectReconciledOffsetsPerLog(); + + for (Map.Entry entry : targets.entrySet()) + { + CoordinatorLogId logId = entry.getKey(); + Offsets.Immutable target = entry.getValue(); + + Offsets.Immutable reconciled = currentReconciled.get(logId); + if (reconciled == null) + return false; + + // Check if reconciled contains all offsets in target + if (!containsAll(reconciled, target)) + return false; + } + return true; + } + + private boolean containsAll(Offsets reconciled, Offsets target) + { + for (ShortMutationId id : target) + { + if (!reconciled.contains(id.offset())) + return false; + } + return true; + } + } +} diff --git a/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java b/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java index ac6fcc0dafae..8d943feb1050 100644 --- a/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java +++ b/src/java/org/apache/cassandra/replication/Node2OffsetsMap.java @@ -73,6 +73,19 @@ Offsets.Mutable intersection() return intersection; } + Offsets.Mutable union() + { + Iterator iter = offsetsMap.values().iterator(); + if (offsetsMap.size() == 1) + return Offsets.Mutable.copy(iter.next()); + + Offsets.Mutable union = Offsets.Mutable.copy(iter.next()); + while (iter.hasNext()) + union.addAll(iter.next()); + + return union; + } + public void add(int node, Offsets offsets) { Offsets.Mutable current = offsetsMap.get(node); diff --git a/src/java/org/apache/cassandra/replication/Shard.java b/src/java/org/apache/cassandra/replication/Shard.java index 5f7f1e7ee641..f164d3d4e678 100644 --- a/src/java/org/apache/cassandra/replication/Shard.java +++ b/src/java/org/apache/cassandra/replication/Shard.java @@ -20,7 +20,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -406,6 +408,38 @@ void collectShardReconciledOffsetsToBuilder(ReconciledKeyspaceOffsets.Builder ke logs.values().forEach(log -> keyspaceBuilder.put(log.logId, log.collectReconciledOffsets(), range)); } + /** + * Returns the reconciled offsets for each coordinator log in this shard. + * Reconciled offsets are the intersection of what all participants have. + */ + public Map collectReconciledOffsetsPerLog() + { + Map result = new HashMap<>(); + for (CoordinatorLog log : logs.values()) + { + Offsets.Immutable reconciled = log.collectReconciledOffsets(); + if (reconciled != null && !reconciled.isEmpty()) + result.put(log.logId, reconciled); + } + return result; + } + + /** + * Returns the UNION of witnessed offsets from all participants for each coordinator log. + * Union = all offsets that ANY replica has witnessed. + */ + public Map collectUnionOfWitnessedOffsetsPerLog() + { + Map result = new HashMap<>(); + for (CoordinatorLog log : logs.values()) + { + Offsets.Immutable union = log.collectUnionOfWitnessedOffsets(); + if (union != null && !union.isEmpty()) + result.put(log.logId, union); + } + return result; + } + public DebugInfo getDebugInfo() { SortedMap logDebugState = new TreeMap<>(Comparator.comparing(CoordinatorLogId::asLong)); diff --git a/test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingIncrementalRepairTaskTest.java b/test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingIncrementalRepairTaskTest.java new file mode 100644 index 000000000000..5383fea0d79e --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/repair/MutationTrackingIncrementalRepairTaskTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.repair; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.repair.MutationTrackingIncrementalRepairTask; + +import static org.junit.Assert.*; + +/** + * Tests for MutationTrackingIncrementalRepairTask. + * Tests the decision logic for when to use mutation tracking repair. + * + * Uses a shared cluster across all tests to minimize overhead. + */ +public class MutationTrackingIncrementalRepairTaskTest extends TestBaseImpl +{ + private static Cluster CLUSTER; + private static final AtomicInteger ksCounter = new AtomicInteger(); + + @BeforeClass + public static void setupCluster() throws IOException + { + CLUSTER = Cluster.build() + .withNodes(3) + .withConfig(cfg -> cfg.with(Feature.NETWORK, Feature.GOSSIP) + .set("mutation_tracking_enabled", true)) + .start(); + } + + @AfterClass + public static void teardownCluster() + { + if (CLUSTER != null) + CLUSTER.close(); + } + + private static String nextKsName() + { + return "mtirt_ks" + ksCounter.incrementAndGet(); + } + + @Test + public void testShouldUseMutationTrackingRepairForTrackedKeyspace() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + + Boolean shouldUse = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + + assertTrue("Tracked keyspace should use mutation tracking repair", shouldUse); + } + + @Test + public void testShouldNotUseMutationTrackingRepairForUntrackedKeyspace() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='untracked'"); + + Boolean shouldUse = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + + assertFalse("Untracked keyspace should not use mutation tracking repair", shouldUse); + } + + @Test + public void testRequiresTraditionalRepairReturnsFalseForNonMigratingKeyspace() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + + Boolean requiresTraditional = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.requiresTraditionalRepair(ksName)); + + assertFalse("Non-migrating keyspace should not require traditional repair", requiresTraditional); + } + + @Test + public void testShouldUseMutationTrackingRepairForNonexistentKeyspace() throws Throwable + { + Boolean shouldUse = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair("nonexistent_ks_xyz")); + + assertFalse("Nonexistent keyspace should return false", shouldUse); + } + + @Test + public void testMigrationFromUntrackedToTracked() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='untracked'"); + CLUSTER.schemaChange("CREATE TABLE " + ksName + ".tbl (k int PRIMARY KEY, v int)"); + + // Verify initial state + Boolean shouldUseBefore = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + assertFalse("Untracked keyspace should not use mutation tracking repair", shouldUseBefore); + + Boolean requiresBefore = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.requiresTraditionalRepair(ksName)); + assertFalse("Non-migrating keyspace should not require traditional repair", requiresBefore); + + // Trigger migration by altering to tracked + CLUSTER.schemaChange("ALTER KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + + // Verify migration state - both methods should now return true + Boolean shouldUseAfter = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + assertTrue("Migrating keyspace should use mutation tracking repair", shouldUseAfter); + + Boolean requiresAfter = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.requiresTraditionalRepair(ksName)); + assertTrue("Migrating keyspace should require traditional repair", requiresAfter); + } + + @Test + public void testMigrationFromTrackedToUntracked() throws Throwable + { + String ksName = nextKsName(); + CLUSTER.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + CLUSTER.schemaChange("CREATE TABLE " + ksName + ".tbl (k int PRIMARY KEY, v int)"); + + // Verify initial state + Boolean shouldUseBefore = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + assertTrue("Tracked keyspace should use mutation tracking repair", shouldUseBefore); + + Boolean requiresBefore = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.requiresTraditionalRepair(ksName)); + assertFalse("Non-migrating tracked keyspace should not require traditional repair", requiresBefore); + + // Migrate back to untracked + CLUSTER.schemaChange("ALTER KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='untracked'"); + + // During reverse migration, both should still apply + Boolean shouldUseAfter = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.shouldUseMutationTrackingRepair(ksName)); + assertTrue("Keyspace migrating from tracked should still use mutation tracking repair", shouldUseAfter); + + Boolean requiresAfter = CLUSTER.get(1).callOnInstance(() -> MutationTrackingIncrementalRepairTask.requiresTraditionalRepair(ksName)); + assertTrue("Keyspace migrating from tracked should require traditional repair", requiresAfter); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java new file mode 100644 index 000000000000..4ca689666f26 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingSyncCoordinatorTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.replication; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.replication.MutationTrackingService; +import org.apache.cassandra.replication.MutationTrackingSyncCoordinator; + +import static org.junit.Assert.*; + +/** + * Distributed tests for MutationTrackingSyncCoordinator. + * + * Tests that the sync coordinator correctly waits for offset convergence + * across all nodes in a cluster. + */ +public class MutationTrackingSyncCoordinatorTest extends TestBaseImpl +{ + private static final String KS_NAME = "sync_test_ks"; + private static final String TBL_NAME = "sync_test_tbl"; + + private void createTrackedKeyspace(Cluster cluster, String keyspaceSuffix) + { + String ksName = KS_NAME + keyspaceSuffix; + cluster.schemaChange("CREATE KEYSPACE " + ksName + " WITH replication = " + + "{'class': 'SimpleStrategy', 'replication_factor': 3} " + + "AND replication_type='tracked'"); + cluster.schemaChange("CREATE TABLE " + ksName + '.' + TBL_NAME + " (k int PRIMARY KEY, v int)"); + } + + private String tableName(String suffix) + { + return KS_NAME + suffix + '.' + TBL_NAME; + } + + private void pauseOffsetBroadcasts(Cluster cluster, boolean pause) + { + for (int i = 1; i <= cluster.size(); i++) + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.pauseOffsetBroadcast(pause)); + } + + private static Range fullTokenRange() + { + return new Range<>( + new Murmur3Partitioner.LongToken(Long.MIN_VALUE), + new Murmur3Partitioner.LongToken(Long.MAX_VALUE) + ); + } + + @Test + public void testSyncCoordinatorCompletesWhenNoShards() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + createTrackedKeyspace(cluster, ""); + + // Create a sync coordinator for a range that has no data + // It should complete immediately since there are no offsets to sync + Boolean completed = cluster.get(1).callOnInstance(() -> { + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME, fullTokenRange()); + coordinator.start(); + + try + { + return coordinator.awaitCompletion(5, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + }); + + assertTrue("Sync coordinator should complete when there are no pending offsets", completed); + } + } + + @Test + public void testSyncCoordinatorCompletesAfterDataSync() throws Throwable + { + try (Cluster cluster = builder().withNodes(6).start()) + { + createTrackedKeyspace(cluster, "2"); + + // Insert some data to create mutations + for (int i = 0; i < 10000; i++) + { + cluster.coordinator(1).execute( + "INSERT INTO " + tableName("2") + " (k, v) VALUES (?, ?)", + ConsistencyLevel.ALL, i, i + ); + } + + Thread.sleep(500); // Wait for offset broadcasts to propagate + + // Create a sync coordinator - should complete since all data is synced (CL.ALL) + Boolean completed = cluster.get(1).callOnInstance(() -> { + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + '2', fullTokenRange()); + coordinator.start(); + + try + { + // Give it enough time for broadcasts to arrive + return coordinator.awaitCompletion(15, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + }); + + assertTrue("Sync coordinator should complete after data is fully replicated", completed); + } + } + + @Test + public void testSyncCoordinatorWaitsForAllReplicasMutations() throws Throwable + { + try (Cluster cluster = builder().withNodes(6).start()) + { + createTrackedKeyspace(cluster, "3"); + + // Pause broadcasts so nodes don't share offsets yet + pauseOffsetBroadcasts(cluster, true); + + // Write from different nodes with CL.ONE - each node has different mutations + // Different coordinators create mutations that only their local replica group knows about initially + cluster.coordinator(1).execute("INSERT INTO " + tableName("3") + " (k, v) VALUES (1, 1)", ConsistencyLevel.ONE); + cluster.coordinator(2).execute("INSERT INTO " + tableName("3") + " (k, v) VALUES (2, 2)", ConsistencyLevel.ONE); + cluster.coordinator(3).execute("INSERT INTO " + tableName("3") + " (k, v) VALUES (3, 3)", ConsistencyLevel.ONE); + + // Resume broadcasts so nodes can share their offsets + pauseOffsetBroadcasts(cluster, false); + + // Trigger broadcasts to share offsets between nodes + for (int i = 1; i <= 6; i++) + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.broadcastOffsetsForTesting()); + + Thread.sleep(500); // Wait for broadcasts to propagate + + Boolean completed = cluster.get(4).callOnInstance(() -> { + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + "3", fullTokenRange()); + coordinator.start(); + + try + { + return coordinator.awaitCompletion(30, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + }); + + assertTrue("Sync should complete after all mutations from all nodes are reconciled", completed); + } + } + + @Test + public void testSyncCoordinatorCancel() throws Throwable + { + try (Cluster cluster = builder().withNodes(3).start()) + { + createTrackedKeyspace(cluster, "4"); + + // Pause offset broadcasts on all nodes to prevent sync from completing + pauseOffsetBroadcasts(cluster, true); + + for (int i = 0; i < 100; i++) + { + cluster.coordinator(1).execute( + "INSERT INTO " + tableName("4") + " (k, v) VALUES (?, ?)", + ConsistencyLevel.ONE, i, i); + } + + // Start coordinator - it will be stuck waiting for offsets + Boolean wasCancelled = cluster.get(1).callOnInstance(() -> { + MutationTrackingSyncCoordinator coordinator = new MutationTrackingSyncCoordinator(KS_NAME + "4", fullTokenRange()); + coordinator.start(); + + try + { + Thread.sleep(100); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + + coordinator.cancel(); // Cancel it + + // Verify it was cancelled + try + { + coordinator.awaitCompletion(1, TimeUnit.SECONDS); + return false; // Should have thrown + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + catch (RuntimeException e) + { + return e.getMessage() != null && e.getMessage().contains("cancelled"); + } + }); + assertTrue("Sync coordinator should be cancelled", wasCancelled); + } + } +}