From 637012768040f09694a9aa802c2c2f2940aa5a99 Mon Sep 17 00:00:00 2001 From: Blake Eggleston Date: Thu, 25 Sep 2025 12:06:18 -0700 Subject: [PATCH 1/5] initial closing off of MutationTrackingService if not enabled --- .../org/apache/cassandra/config/Config.java | 2 +- .../cassandra/config/DatabaseDescriptor.java | 15 ++--- .../config/MutationTrackingSpec.java | 25 ++++++++ .../statements/ModificationStatement.java | 2 +- .../org/apache/cassandra/db/Keyspace.java | 7 ++- .../db/PartitionRangeReadCommand.java | 2 +- .../db/SinglePartitionReadCommand.java | 2 +- .../io/sstable/format/SSTableWriter.java | 2 +- .../replication/ActiveLogReconciler.java | 4 +- .../replication/BroadcastLogOffsets.java | 3 +- .../cassandra/replication/ForwardedWrite.java | 5 +- .../replication/MutationJournal.java | 4 +- .../replication/MutationTrackingService.java | 62 +++++++++++++++++-- .../replication/PullMutationsRequest.java | 3 +- .../replication/PushMutationRequest.java | 1 + .../replication/TrackedWriteRequest.java | 4 +- .../cassandra/service/StorageService.java | 2 +- .../service/TrackedWriteResponseHandler.java | 4 +- .../reads/tracked/ReadReconcileAck.java | 2 + .../reads/tracked/ReadReconciliations.java | 8 +-- .../service/reads/tracked/TrackedRead.java | 5 +- .../reads/tracked/TrackedSummaryResponse.java | 2 + .../org/apache/cassandra/tcm/Startup.java | 6 +- .../cassandra/distributed/impl/Instance.java | 2 +- ...NodeTableWalkWithMutationTrackingTest.java | 2 +- .../MutationTrackingPendingReadTest.java | 8 +-- .../MutationTrackingRangeReadTest.java | 2 +- ...utationTrackingReadReconciliationTest.java | 8 +-- .../test/tracking/MutationTrackingTest.java | 16 ++--- .../test/tracking/MutationTrackingUtils.java | 6 +- .../MutationTrackingWriteForwardingTest.java | 6 +- .../test/tracking/OffsetBroadcastTest.java | 6 +- .../CoordinatorLogOffsetsLifecycleTest.java | 6 +- .../CoordinatorLogOffsetsTest.java | 38 ++++++------ 34 files changed, 181 insertions(+), 91 deletions(-) create mode 100644 src/java/org/apache/cassandra/config/MutationTrackingSpec.java diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index be47e7c2e2bf..0174301cc014 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -673,7 +673,7 @@ public static class SSTableConfig public boolean dynamic_data_masking_enabled = false; - public boolean mutation_tracking_enabled = false; + public MutationTrackingSpec mutation_tracking = new MutationTrackingSpec(); /** * Time in milliseconds after a warning will be emitted to the log and to the client that a UDF runs too long. diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 86da6de79911..5a908bb4ae15 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -747,6 +747,11 @@ else if (conf.repair_session_space.toMebibytes() > (int) (Runtime.getRuntime().m if (commitLogWriteDiskAccessMode != conf.commitlog_disk_access_mode) logger.info("commitlog_disk_access_mode resolved to: {}", commitLogWriteDiskAccessMode); + if (conf.mutation_tracking.journal_directory == null) + { + conf.mutation_tracking.journal_directory = storagedirFor("mutation_journal"); + } + if (conf.accord.journal_directory == null) { conf.accord.journal_directory = storagedirFor("accord_journal"); @@ -5766,16 +5771,12 @@ public static void setDynamicDataMaskingEnabled(boolean enabled) public static boolean getMutationTrackingEnabled() { - return conf.mutation_tracking_enabled; + return conf.mutation_tracking.enabled; } - public static void setMutationTrackingEnabled(boolean enabled) + public static String getMutationTrackingJournalDirectory() { - if (enabled != conf.mutation_tracking_enabled) - { - logger.info("Setting mutation_tracking_enabled to {}", enabled); - conf.mutation_tracking_enabled = enabled; - } + return conf.mutation_tracking.journal_directory; } public static OptionalDouble getSeverityDuringDecommission() diff --git a/src/java/org/apache/cassandra/config/MutationTrackingSpec.java b/src/java/org/apache/cassandra/config/MutationTrackingSpec.java new file mode 100644 index 000000000000..d9b04a0f407a --- /dev/null +++ b/src/java/org/apache/cassandra/config/MutationTrackingSpec.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.config; + +public class MutationTrackingSpec +{ + public boolean enabled = false; + public String journal_directory; +} diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index b2d916a2de78..fd3221e8f509 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -829,7 +829,7 @@ public ResultMessage executeInternalWithoutCondition(QueryState queryState, Quer String keyspaceName = mutation.getKeyspaceName(); Token token = mutation.key().getToken(); - MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token); + MutationId id = MutationTrackingService.instance().nextMutationId(keyspaceName, token); mutation = mutation.withMutationId(id); mutation.apply(); } diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 5805de313dad..67ba8f827230 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -399,7 +399,7 @@ public void initCf(TableMetadata metadata, boolean loadSSTables) public Future applyFuture(Mutation mutation, boolean writeCommitLog, boolean updateIndexes) { - return getMetadata().useMutationTracking() + return getMetadata().useMutationTracking() && MutationTrackingService.isEnabled() ? applyInternalTracked(mutation, new AsyncPromise<>()) : applyInternal(mutation, writeCommitLog, updateIndexes, true, true, new AsyncPromise<>()); } @@ -608,6 +608,7 @@ else if (isDeferrable) */ private Future applyInternalTracked(Mutation mutation, Promise future) { + MutationTrackingService.ensureEnabled(); Preconditions.checkState(getMetadata().useMutationTracking() && !mutation.id().isNone()); if (TEST_FAIL_WRITES && getMetadata().name.equals(TEST_FAIL_WRITES_KS)) @@ -616,7 +617,7 @@ private Future applyInternalTracked(Mutation mutation, Promise future) boolean started; try (WriteContext ctx = trackedWriteHandler.beginWrite(mutation, true)) { - started = MutationTrackingService.instance.startWriting(mutation); + started = MutationTrackingService.instance().startWriting(mutation); if (started) { @@ -635,7 +636,7 @@ private Future applyInternalTracked(Mutation mutation, Promise future) } if (started) - MutationTrackingService.instance.finishWriting(mutation); + MutationTrackingService.instance().finishWriting(mutation); if (future != null) diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 0c5386c3adba..4e7d69985406 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -486,7 +486,7 @@ protected PartialTrackedRead createInProgressRead(UnfilteredPartitionIterator it @Override protected MutationSummary createMutationSummaryInternal(boolean includePending) { - return MutationTrackingService.instance.createSummaryForRange(dataRange.keyRange, metadata().id, includePending); + return MutationTrackingService.instance().createSummaryForRange(dataRange.keyRange, metadata().id, includePending); } @Override diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index ecd7cc7253ec..515497b67e8e 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -920,7 +920,7 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs protected MutationSummary createMutationSummaryInternal(boolean includePending) { - return MutationTrackingService.instance.createSummaryForKey(partitionKey, metadata().id, includePending); + return MutationTrackingService.instance().createSummaryForKey(partitionKey, metadata().id, includePending); } @Override diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index bc183a56faa9..fa3fc27c41a8 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -343,7 +343,7 @@ protected Map finalizeMetadata() if (metadata().replicationType().isTracked() && repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) { Preconditions.checkState(Objects.equals(pendingRepair, ActiveRepairService.NO_PENDING_REPAIR)); - if (MutationTrackingService.instance.isDurablyReconciled(coordinatorLogOffsets)) + if (MutationTrackingService.instance().isDurablyReconciled(coordinatorLogOffsets)) { repairedAt = Clock.Global.currentTimeMillis(); logger.debug("Marking SSTable {} as reconciled with repairedAt {}", descriptor, repairedAt); diff --git a/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java b/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java index 54ded3766c75..5d685bbb124e 100644 --- a/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java +++ b/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java @@ -134,13 +134,13 @@ public boolean invokeOnFailure() @Override public void onResponse(Message msg) { - MutationTrackingService.instance.receivedWriteResponse(mutationId, toHost); + MutationTrackingService.instance().receivedWriteResponse(mutationId, toHost); } @Override public void onFailure(InetAddressAndPort from, RequestFailure failureReason) { - MutationTrackingService.instance.retryFailedWrite(mutationId, toHost, failureReason); + MutationTrackingService.instance().retryFailedWrite(mutationId, toHost, failureReason); } void send() diff --git a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java index ce8fe512c9d6..97c3c8aa111a 100644 --- a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java +++ b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java @@ -63,9 +63,10 @@ public String toString() } public static final IVerbHandler verbHandler = message -> { + MutationTrackingService.ensureEnabled(); BroadcastLogOffsets replicatedOffsets = message.payload; logger.trace("Received replicated offsets {} from {}", replicatedOffsets, message.from()); - MutationTrackingService.instance.updateReplicatedOffsets(replicatedOffsets.keyspace, + MutationTrackingService.instance().updateReplicatedOffsets(replicatedOffsets.keyspace, replicatedOffsets.range, replicatedOffsets.replicatedOffsets, replicatedOffsets.durable, diff --git a/src/java/org/apache/cassandra/replication/ForwardedWrite.java b/src/java/org/apache/cassandra/replication/ForwardedWrite.java index c6014257e800..478431f16a78 100644 --- a/src/java/org/apache/cassandra/replication/ForwardedWrite.java +++ b/src/java/org/apache/cassandra/replication/ForwardedWrite.java @@ -105,7 +105,7 @@ public void applyLocallyAndForwardToReplicas(CoordinatorAckInfo ackTo) String keyspaceName = mutation.getKeyspaceName(); Token token = mutation.key().getToken(); - MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token); + MutationId id = MutationTrackingService.instance().nextMutationId(keyspaceName, token); // Do not wait for handler completion, since the coordinator is already waiting and we don't want to block the stage LeaderCallback handler = new LeaderCallback(id, ackTo); applyLocallyAndForwardToReplicas(mutation.withMutationId(id), recipients, handler, ackTo); @@ -276,6 +276,7 @@ public long serializedSize(MutationRequest request, int version) public static final IVerbHandler verbHandler = incoming -> { + MutationTrackingService.ensureEnabled(); if (approxTime.now() > incoming.expiresAtNanos()) { Tracing.trace("Discarding mutation from {} (timed out)", incoming.from()); @@ -317,7 +318,7 @@ public void onResponse(Message msg) { // Local mutations are witnessed from Keyspace.applyInternalTracked if (msg != null) - MutationTrackingService.instance.receivedWriteResponse(id, msg.from()); + MutationTrackingService.instance().receivedWriteResponse(id, msg.from()); // Local write needs to be ack'd to coordinator if (msg == null && ackTo != null) diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java b/src/java/org/apache/cassandra/replication/MutationJournal.java index aeccc3f052a7..27b8ee157b43 100644 --- a/src/java/org/apache/cassandra/replication/MutationJournal.java +++ b/src/java/org/apache/cassandra/replication/MutationJournal.java @@ -46,13 +46,13 @@ public class MutationJournal { - public static final MutationJournal instance = new MutationJournal(); + public static final MutationJournal instance = DatabaseDescriptor.getMutationTrackingEnabled() ? new MutationJournal() : null; private final Journal journal; private MutationJournal() { - this(new File(DatabaseDescriptor.getCommitLogLocation()), new JournalParams()); + this(new File(DatabaseDescriptor.getMutationTrackingJournalDirectory()), new JournalParams()); } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index 6dc1bcc963ce..da302d62f8a3 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -40,6 +40,7 @@ import org.agrona.collections.IntArrayList; import org.agrona.collections.IntHashSet; import org.apache.cassandra.concurrent.ScheduledExecutorPlus; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Mutation; @@ -79,7 +80,61 @@ // TODO (expected): handle topology changes public class MutationTrackingService { - private static final ScheduledExecutorPlus executor = executorFactory().scheduled("Mutation-Tracking-Service", NORMAL); + + private static final MutationTrackingService instance; + private static final ScheduledExecutorPlus executor; + + static + { + if (DatabaseDescriptor.getMutationTrackingEnabled()) + { + instance = new MutationTrackingService(); + executor = executorFactory().scheduled("Mutation-Tracking-Service", NORMAL); + } + else + { + instance = null; + executor = null; + } + } + + /** + * Callers of this method should have validated that mutation tracking is enabled + * @return + */ + public static MutationTrackingService instance() + { + if (instance == null) + throw new IllegalStateException("Mutation tracking is not enabled"); + return instance; + } + + public static boolean isEnabled() + { + return DatabaseDescriptor.getMutationTrackingEnabled(); + } + + public static void ensureEnabled() + { + if (!DatabaseDescriptor.getMutationTrackingEnabled()) + { + throw new IllegalStateException("Mutation tracking is not enabled"); + } + } + + public static void start(ClusterMetadata metadata) + { + if (!isEnabled()) + return; + instance().start(metadata); + } + + public static void shutdown() throws InterruptedException + { + if (!isEnabled()) + return; + instance().shutdownBlocking(); + } /** * Split ranges into this many shards. @@ -89,7 +144,6 @@ public class MutationTrackingService private static final int SHARD_MULTIPLIER = 8; private static final Logger logger = LoggerFactory.getLogger(MutationTrackingService.class); - public static final MutationTrackingService instance = new MutationTrackingService(); private final TrackedLocalReads localReads = new TrackedLocalReads(); private final ConcurrentHashMap keyspaceShards = new ConcurrentHashMap<>(); @@ -107,7 +161,7 @@ public class MutationTrackingService private MutationTrackingService() {} // TODO (expected): implement a TCM ChangeListener - public synchronized void start(ClusterMetadata metadata) + private synchronized void startInternal(ClusterMetadata metadata) { if (started) return; @@ -133,7 +187,7 @@ public synchronized boolean isStarted() return started; } - public void shutdownBlocking() throws InterruptedException + private void shutdownBlocking() throws InterruptedException { // TODO: FIXME activeReconciler.shutdownBlocking(); diff --git a/src/java/org/apache/cassandra/replication/PullMutationsRequest.java b/src/java/org/apache/cassandra/replication/PullMutationsRequest.java index ec5632ab5ba0..bc9c8ef5c58f 100644 --- a/src/java/org/apache/cassandra/replication/PullMutationsRequest.java +++ b/src/java/org/apache/cassandra/replication/PullMutationsRequest.java @@ -66,10 +66,11 @@ public long serializedSize(PullMutationsRequest pull, int version) @Override public void doVerb(Message message) { + MutationTrackingService.ensureEnabled(); InetAddressAndPort forHost = message.from(); Offsets offsets = message.payload.offsets; logger.trace("Received pull mutations request from {} for {}", forHost, offsets); - MutationTrackingService.instance.requestMissingMutations(offsets, forHost); + MutationTrackingService.instance().requestMissingMutations(offsets, forHost); } }; } diff --git a/src/java/org/apache/cassandra/replication/PushMutationRequest.java b/src/java/org/apache/cassandra/replication/PushMutationRequest.java index 8e673816b5f3..eac84dd0fdb4 100644 --- a/src/java/org/apache/cassandra/replication/PushMutationRequest.java +++ b/src/java/org/apache/cassandra/replication/PushMutationRequest.java @@ -141,6 +141,7 @@ public Materialized deserialize(DataInputPlus in, int version) throws IOExceptio @Override public void doVerb(Message message) { + MutationTrackingService.ensureEnabled(); if (approxTime.now() > message.expiresAtNanos()) { Tracing.trace("Discarding mutation from {} (timed out)", message.from()); diff --git a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java index e9ee3ce7cf04..1c3dd7a2007e 100644 --- a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java +++ b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java @@ -101,7 +101,7 @@ public static AbstractWriteResponseHandler perform( if (logger.isTraceEnabled()) logger.trace("Local tracked request {} {}", mutation, plan); writeMetrics.localRequests.mark(); - MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token); + MutationId id = MutationTrackingService.instance().nextMutationId(keyspaceName, token); mutation = mutation.withMutationId(id); TrackedWriteResponseHandler handler = TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null, WriteType.SIMPLE, null, requestTime), id); @@ -203,7 +203,7 @@ public static void applyLocallyAndSendToReplicas(Mutation mutation, ReplicaPlan. } if (remoteReplicas != null) - MutationTrackingService.instance.sentWriteRequest(mutation, remoteReplicas); + MutationTrackingService.instance().sentWriteRequest(mutation, remoteReplicas); } static void applyMutationLocally(Mutation mutation, RequestCallback handler) diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 5625fe1c8107..70577a798782 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3923,7 +3923,7 @@ protected synchronized void drain(boolean isFinalShutdown) throws IOException, I SnapshotManager.instance.shutdownAndWait(1L, MINUTES); HintsService.instance.shutdownBlocking(); - MutationTrackingService.instance.shutdownBlocking(); + MutationTrackingService.shutdown(); // Interrupt ongoing compactions and shutdown CM to prevent further compactions. CompactionManager.instance.forceShutdown(); diff --git a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java index 213cc90976c9..8e40688ea8ce 100644 --- a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java @@ -49,14 +49,14 @@ public void onResponse(Message msg) { // Local mutations are witnessed from Keyspace.applyInternalTracked if (msg != null) - MutationTrackingService.instance.receivedWriteResponse(mutationId, msg.from()); + MutationTrackingService.instance().receivedWriteResponse(mutationId, msg.from()); wrapped.onResponse(msg); } @Override public void onFailure(InetAddressAndPort from, RequestFailure failure) { - MutationTrackingService.instance.retryFailedWrite(mutationId, from, failure); + MutationTrackingService.instance().retryFailedWrite(mutationId, from, failure); wrapped.onFailure(from, failure); } diff --git a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileAck.java b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileAck.java index bfa716a9e055..6d4688cad317 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileAck.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileAck.java @@ -26,6 +26,7 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.replication.MutationTrackingService; /** * Notifies the reconcile coordinator (data node) that this node has received @@ -50,6 +51,7 @@ public String toString() public static final IVerbHandler verbHandler = message -> { + MutationTrackingService.ensureEnabled(); ReadReconcileAck notify = message.payload; logger.trace("Received reconcile ack from {}, for {}", message.from(), notify.readId); ReadReconciliations.instance.acceptSyncAck(message.from(), notify.readId); diff --git a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java index 1c57781c00cd..f416cd67a789 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java @@ -216,7 +216,7 @@ boolean acceptLocalSummary(MutationSummary summary) if (logId.hostId() != LOCAL_NODE) continue; - MutationTrackingService.instance.collectRemotelyMissingMutations(ourOffsets, remoteNodes, missingOffsets); + MutationTrackingService.instance().collectRemotelyMissingMutations(ourOffsets, remoteNodes, missingOffsets); // we don't listen to or block on delivery of these mutations, intentionally missingOffsets.forEach(ReadReconciliations::push); missingOffsets.clear(); @@ -233,7 +233,7 @@ boolean acceptLocalSummary(MutationSummary summary) boolean acceptRemoteSummary(MutationSummary summary, int remoteNode) { Log2OffsetsMap.Mutable missingMutations = new Log2OffsetsMap.Mutable(); - MutationTrackingService.instance.collectLocallyMissingMutations(summary, missingMutations); + MutationTrackingService.instance().collectLocallyMissingMutations(summary, missingMutations); // don't request what's already been requested for other remote summaries // TODO (consider): rely entirely on IncomingMutations deduplication instead @@ -356,7 +356,7 @@ private boolean updateRemainingAndMaybeComplete(int mutationsDelta, int summarie private boolean complete() { if (isDataNode()) - MutationTrackingService.instance.localReads().acknowledgeReconcile(id, augmentingOffsets()); + MutationTrackingService.instance().localReads().acknowledgeReconcile(id, augmentingOffsets()); else MessagingService.instance().send(Message.out(Verb.READ_RECONCILE_ACK, new ReadReconcileAck(id)), host(dataNode)); @@ -383,7 +383,7 @@ private static void pull(int node, Offsets offsets, IncomingMutations.Callback c Offsets.Mutable toPull = new Offsets.Mutable(offsets.logId()); for (ShortMutationId id : offsets) - if (MutationTrackingService.instance.registerMutationCallback(id, callback)) + if (MutationTrackingService.instance().registerMutationCallback(id, callback)) toPull.add(id.offset()); if (!toPull.isEmpty()) diff --git a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java index 267670a63ebb..2ed79513c7bc 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java @@ -280,7 +280,7 @@ private void start(Dispatcher.RequestTime requestTime, Consumer { AsyncPromise promise = - MutationTrackingService.instance.localReads().beginRead(readId, ClusterMetadata.current(), command, consistencyLevel, summaryNodes, requestTime, partialReadConsumer); + MutationTrackingService.instance().localReads().beginRead(readId, ClusterMetadata.current(), command, consistencyLevel, summaryNodes, requestTime, partialReadConsumer); promise.addCallback((response, error) -> { if (error != null) { @@ -449,7 +449,7 @@ public void executeLocally(Message message, ClusterMetadata m { Dispatcher.RequestTime requestTime = new Dispatcher.RequestTime(message.createdAtNanos()); AsyncPromise promise = - MutationTrackingService.instance + MutationTrackingService.instance() .localReads() .beginRead(readId, metadata, command, consistencyLevel, summaryNodes, requestTime, null); promise.addCallback((response, error) -> { @@ -563,6 +563,7 @@ public long serializedSize(SummaryRequest request, int version) @Override protected void performRead(Message message, ClusterMetadata metadata) { + MutationTrackingService.ensureEnabled(); message.payload.executeLocally(message, metadata); } diff --git a/src/java/org/apache/cassandra/service/reads/tracked/TrackedSummaryResponse.java b/src/java/org/apache/cassandra/service/reads/tracked/TrackedSummaryResponse.java index ac6a4ddbeced..7512f5a745e3 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedSummaryResponse.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedSummaryResponse.java @@ -23,6 +23,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.replication.MutationSummary; +import org.apache.cassandra.replication.MutationTrackingService; import java.io.IOException; @@ -48,6 +49,7 @@ public TrackedSummaryResponse(TrackedRead.Id readId, MutationSummary summary, in public static final IVerbHandler verbHandler = message -> { + MutationTrackingService.ensureEnabled(); TrackedSummaryResponse response = message.payload; if (logger.isTraceEnabled()) logger.trace("Received summary {} from {}, for {}", response.summary, message.from(), response.readId); diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index 7fc336809c53..95a754ce7493 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -155,7 +155,7 @@ public static void initializeAsNonCmsNode(Function wrapPro .withStorage(LogStorage.SystemKeyspace) .afterReplay(Startup::scrubDataDirectories, (metadata) -> StorageService.instance.registerMBeans(), - MutationTrackingService.instance::start) + MutationTrackingService::start) .withDefaultListeners(); ClusterMetadataService.setInstance(new ClusterMetadataService(new UniformRangePlacement(), wrapProcessor, @@ -269,7 +269,7 @@ public static void initializeFromGossip(Function wrapProce .withInitialState(emptyFromSystemTables) .afterReplay(Startup::scrubDataDirectories, (metadata) -> StorageService.instance.registerMBeans(), - MutationTrackingService.instance::start) + MutationTrackingService::start) .withStorage(LogStorage.SystemKeyspace) .withDefaultListeners(); @@ -377,7 +377,7 @@ public static void reinitializeWithClusterMetadata(String fileName, Function StorageService.instance.registerMBeans(), - MutationTrackingService.instance::start) + MutationTrackingService::start) .withPreviousState(prev) .withInitialState(metadata) .withStorage(LogStorage.SystemKeyspace) diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index bd9f69451ab6..2f497dbdf8ce 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -1039,7 +1039,7 @@ public Future shutdown(boolean runOnExitThreads, boolean shutdownMessaging // (ex. A Mutation stage thread may attempt to add a mutation to the CommitLog.) error = parallelRun(error, executor, CommitLog.instance::shutdownBlocking); error = parallelRun(error, executor, MutationJournal.instance::shutdownBlocking); - error = parallelRun(error, executor, MutationTrackingService.instance::shutdownBlocking); + error = parallelRun(error, executor, MutationTrackingService::shutdown); error = parallelRun(error, executor, () -> shutdownAndWait(Collections.singletonList(JMXBroadcastExecutor.executor)) ); diff --git a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithMutationTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithMutationTrackingTest.java index 5cec4be22403..c46cad9169dc 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithMutationTrackingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithMutationTrackingTest.java @@ -75,7 +75,7 @@ protected List supportedIndexers() protected void clusterConfig(IInstanceConfig c) { super.clusterConfig(c); - c.set("mutation_tracking_enabled", "true"); + c.set("mutation_tracking.enabled", "true"); } @Test diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java index ae49c0038312..ab6d1dfd9295 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java @@ -88,7 +88,7 @@ public void testPendingWriteInclusion() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { @@ -121,7 +121,7 @@ public void testPendingWriteInclusion() throws Throwable Assert.assertEquals(1, secondIds.offsetCount()); // create a mutation - MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, dk.getToken()); + MutationId id = MutationTrackingService.instance().nextMutationId(keyspaceName, dk.getToken()); SimpleBuilders.MutationBuilder builder = new SimpleBuilders.MutationBuilder(id, keyspaceName, dk); PartitionUpdate.SimpleBuilder tableBuilder = builder.update(metadata); tableBuilder.row(bytes(1)).add("v", 1); @@ -144,7 +144,7 @@ public void testPendingWriteInclusion() throws Throwable initialSummary = command.createMutationSummary(false); ReadExecutionController controller = command.executionController(false); - MutationTrackingService.instance.startWriting(mutation); + MutationTrackingService.instance().startWriting(mutation); read = command.beginTrackedRead(controller); // Create another summary once initial data has been read fully. We do this to catch @@ -204,7 +204,7 @@ public void testPendingReadInclusion() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingRangeReadTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingRangeReadTest.java index 7fb4fcb960a8..3da50bb4dd2c 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingRangeReadTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingRangeReadTest.java @@ -44,7 +44,7 @@ public static void setup() throws IOException { cluster = Cluster.build() .withNodes(REPLICAS) - .withConfig(cfg -> cfg.with(Feature.NETWORK).with(Feature.GOSSIP).set("mutation_tracking_enabled", "true")) + .withConfig(cfg -> cfg.with(Feature.NETWORK).with(Feature.GOSSIP).set("mutation_tracking.enabled", "true")) .start(); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java index 37f6c6a46f37..6690b7bb9d82 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java @@ -114,7 +114,7 @@ public void testBasicReadReconciliation() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { @@ -192,7 +192,7 @@ public void testReadReconciliationApplyMutations() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { @@ -269,7 +269,7 @@ public void testBasicRangeReadReconciliation() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { @@ -345,7 +345,7 @@ public void testBasicRangeReadReconciliationApplyMutations() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java index 8e6a8e108fa7..ce6b090dd264 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java @@ -58,7 +58,7 @@ public void testBasicWritePath() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true")) + .set("mutation_tracking.enabled", "true")) .start()) { @@ -80,7 +80,7 @@ public void testBasicWritePath() throws Throwable cluster.get(1).runOnInstance(() -> { TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, "tbl"); DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); - MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(dk, table.id, false); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); CoordinatorLogId logId = getOnlyLogId(summary); Offsets summaryIds = summaryIdSpace(summary.get(logId)); Assert.assertEquals(1, summaryIds.offsetCount()); @@ -94,7 +94,7 @@ public void testHintsNotWrittenOnFailedWrite() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { @@ -130,7 +130,7 @@ public void testFailedMutationRedelivery() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { @@ -146,7 +146,7 @@ public void testFailedMutationRedelivery() throws Throwable cluster.filters().verbs(Verb.MUTATION_REQ.id).to(3).drop(); // pause reconciler temporarily - cluster.get(1).runOnInstance(() -> MutationTrackingService.instance.pauseActiveReconciler()); + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance().pauseActiveReconciler()); // issue a write - should fail on node 3 cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (k, v) VALUES (1, 1)"), ConsistencyLevel.QUORUM); @@ -157,21 +157,21 @@ public void testFailedMutationRedelivery() throws Throwable { TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, "tbl"); DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); - MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(dk, table.id, false); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); CoordinatorLogId logId = getOnlyLogId(summary); Assert.assertEquals(1, summary.get(logId).unreconciled.offsetCount()); Assert.assertEquals(0, summary.get(logId).reconciled.offsetCount()); }); // resume the reconciler - cluster.get(1).runOnInstance(() -> MutationTrackingService.instance.resumeActiveReconciler()); + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance().resumeActiveReconciler()); Thread.sleep(1000); // wait for reconiciler to do its job cluster.get(1).runOnInstance(() -> { TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, "tbl"); DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); - MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(dk, table.id, false); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); CoordinatorLogId logId = getOnlyLogId(summary); Assert.assertEquals(0, summary.get(logId).unreconciled.offsetCount()); Assert.assertEquals(1, summary.get(logId).reconciled.offsetCount()); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingUtils.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingUtils.java index 091bda1338f9..fa16c4b1c4cc 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingUtils.java @@ -118,7 +118,7 @@ public static MutationSummary decodeSummary(byte[] bytes) public static MutationSummary summaryForKey(String keyspaceName, String tableName, DecoratedKey dk) { TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, tableName); - return MutationTrackingService.instance.createSummaryForKey(dk, table.id, false); + return MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); } public static MutationSummary summaryForTable(String keyspaceName, String tableName) @@ -155,7 +155,7 @@ public static MutationSummary summaryForKey(String keyspaceName, String tableNam public static MutationSummary summaryForRange(String keyspaceName, String tableName, Range range) { TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, tableName); - return MutationTrackingService.instance.createSummaryForRange(range, table.id, false); + return MutationTrackingService.instance().createSummaryForRange(range, table.id, false); } public static Offsets summaryIdSpace(CoordinatorSummary summary) @@ -308,7 +308,7 @@ public static CoordinatorLogId getOnlyLogId(MutationSummary summary) public static Mutation createMutation(TableMetadata tableMetadata, int k, int v) { DecoratedKey key = tableMetadata.partitioner.decorateKey(ByteBufferUtil.bytes(k)); - MutationId mutationId = MutationTrackingService.instance.nextMutationId(tableMetadata.keyspace, key.getToken()); + MutationId mutationId = MutationTrackingService.instance().nextMutationId(tableMetadata.keyspace, key.getToken()); SimpleBuilders.MutationBuilder builder = new SimpleBuilders.MutationBuilder(mutationId, tableMetadata.keyspace, key); PartitionUpdate.SimpleBuilder partition = builder.update(tableMetadata); partition.row().add("v", v); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java index 6209c1c1b637..506a3e078769 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java @@ -65,7 +65,7 @@ public void testBasicWriteForwarding() throws Throwable try (Cluster cluster = Cluster.build(NODES) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .withNodeIdTopology(topology) .start()) @@ -97,7 +97,7 @@ public void testBasicWriteForwarding() throws Throwable Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); Range fullRange = new Range<>(token, token); TableId tableId = Schema.instance.getTableMetadata(keyspaceName, tableName).id; - MutationSummary summary = MutationTrackingService.instance.createSummaryForRange(fullRange, tableId, true); + MutationSummary summary = MutationTrackingService.instance().createSummaryForRange(fullRange, tableId, true); return summary.reconciledIds(); }); @@ -105,7 +105,7 @@ public void testBasicWriteForwarding() throws Throwable Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); Range fullRange = new Range<>(token, token); TableId tableId = Schema.instance.getTableMetadata(keyspaceName, tableName).id; - MutationSummary summary = MutationTrackingService.instance.createSummaryForRange(fullRange, tableId, true); + MutationSummary summary = MutationTrackingService.instance().createSummaryForRange(fullRange, tableId, true); return summary.unreconciledIds(); }); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java index 5cd9c8cb2ccf..0668a2e3dd4e 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java @@ -46,7 +46,7 @@ public void testBroadcastOffsets() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true")) + .set("mutation_tracking.enabled", "true")) .start()) { @@ -61,14 +61,14 @@ public void testBroadcastOffsets() throws Throwable cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (k, v) VALUES (1, 1)"), ConsistencyLevel.QUORUM); for (int i = 1; i <= cluster.size(); ++i) - cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.broadcastOffsetsForTesting()); + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance().broadcastOffsetsForTesting()); for (int i = 1; i <= cluster.size(); ++i) { cluster.get(i).runOnInstance(() -> { TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, "tbl"); DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); - MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(dk, table.id, false); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); MutationSummary.CoordinatorSummary coordinatorSummary = summary.get(getOnlyLogId(summary)); Assert.assertEquals(1, coordinatorSummary.reconciled.offsetCount()); Assert.assertEquals(0, coordinatorSummary.unreconciled.offsetCount()); diff --git a/test/unit/org/apache/cassandra/db/CoordinatorLogOffsetsLifecycleTest.java b/test/unit/org/apache/cassandra/db/CoordinatorLogOffsetsLifecycleTest.java index 20d098fcb10b..4ac2abd0b6e5 100644 --- a/test/unit/org/apache/cassandra/db/CoordinatorLogOffsetsLifecycleTest.java +++ b/test/unit/org/apache/cassandra/db/CoordinatorLogOffsetsLifecycleTest.java @@ -97,7 +97,7 @@ public static Collection memtableConfigs() private static Mutation createMutation(TableMetadata tableMetadata, int k, int v) { DecoratedKey key = tableMetadata.partitioner.decorateKey(ByteBufferUtil.bytes(k)); - MutationId mutationId = MutationTrackingService.instance.nextMutationId(tableMetadata.keyspace, key.getToken()); + MutationId mutationId = MutationTrackingService.instance().nextMutationId(tableMetadata.keyspace, key.getToken()); SimpleBuilders.MutationBuilder builder = new SimpleBuilders.MutationBuilder(mutationId, tableMetadata.keyspace, key); PartitionUpdate.SimpleBuilder partition = builder.update(tableMetadata); partition.row().add("v", v); @@ -186,7 +186,7 @@ public void mutationIdLifecycleTest() // flush 1 { - MutationTrackingService.instance.persistLogStateForTesting(); + MutationTrackingService.instance().persistLogStateForTesting(); cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); View view = cfs.getTracker().getView(); @@ -220,7 +220,7 @@ public void mutationIdLifecycleTest() // flush 2 { - MutationTrackingService.instance.persistLogStateForTesting(); + MutationTrackingService.instance().persistLogStateForTesting(); cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); View view = cfs.getTracker().getView(); diff --git a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java index 07f4bc5eafcc..034cf9292d36 100644 --- a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java +++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java @@ -263,7 +263,7 @@ public void reconciledBounds() throws InterruptedException, ExecutionException { ClusterMetadataTestHelper.createKeyspace(ks, KeyspaceParams.simple(3, ReplicationType.tracked)); ClusterMetadataTestHelper.commit(new AlterSchema(SchemaTransformations.addTable(tableMetadata, false))); - MutationTrackingService.instance.start(metadata); + MutationTrackingService.instance().start(metadata); // Eventually, will also run perturbations before checking isReconciled (like log truncation, durability, etc.) // to ensure that we don't prune data required to check what's been reconciled @@ -271,52 +271,52 @@ public void reconciledBounds() throws InterruptedException, ExecutionException { // Applied at all replicas { Mutation mutation = MutationTrackingUtils.createMutation(tableMetadata, 1, 1); - MutationTrackingService.instance.startWriting(mutation); + MutationTrackingService.instance().startWriting(mutation); - MutationTrackingService.instance.finishWriting(mutation); - MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr2); - MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr3); + MutationTrackingService.instance().finishWriting(mutation); + MutationTrackingService.instance().receivedWriteResponse(mutation.id(), addr2); + MutationTrackingService.instance().receivedWriteResponse(mutation.id(), addr3); ImmutableCoordinatorLogOffsets logOffsets = new ImmutableCoordinatorLogOffsets.Builder() .add(mutation.id()) .build(); - Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isTrue(); + Assertions.assertThat(MutationTrackingService.instance().isDurablyReconciled(logOffsets)).isTrue(); } // Applied locally but not on remote replicas { Mutation mutation = MutationTrackingUtils.createMutation(tableMetadata, 2, 2); - MutationTrackingService.instance.startWriting(mutation); - MutationTrackingService.instance.finishWriting(mutation); + MutationTrackingService.instance().startWriting(mutation); + MutationTrackingService.instance().finishWriting(mutation); ImmutableCoordinatorLogOffsets logOffsets = new ImmutableCoordinatorLogOffsets.Builder() .add(mutation.id()) .build(); - Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isFalse(); + Assertions.assertThat(MutationTrackingService.instance().isDurablyReconciled(logOffsets)).isFalse(); } // Applied on remote replicas but not locally { Mutation mutation = MutationTrackingUtils.createMutation(tableMetadata, 3, 3); - MutationTrackingService.instance.startWriting(mutation); + MutationTrackingService.instance().startWriting(mutation); - MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr2); - MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr3); + MutationTrackingService.instance().receivedWriteResponse(mutation.id(), addr2); + MutationTrackingService.instance().receivedWriteResponse(mutation.id(), addr3); ImmutableCoordinatorLogOffsets logOffsets = new ImmutableCoordinatorLogOffsets.Builder() .add(mutation.id()) .build(); - Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isFalse(); + Assertions.assertThat(MutationTrackingService.instance().isDurablyReconciled(logOffsets)).isFalse(); } // If no replicas are aware of a log, it should be considered unreconciled out of caution { Mutation mutation = MutationTrackingUtils.createMutation(tableMetadata, 4, 4); - MutationTrackingService.instance.startWriting(mutation); + MutationTrackingService.instance().startWriting(mutation); - MutationTrackingService.instance.finishWriting(mutation); - MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr2); - MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr3); + MutationTrackingService.instance().finishWriting(mutation); + MutationTrackingService.instance().receivedWriteResponse(mutation.id(), addr2); + MutationTrackingService.instance().receivedWriteResponse(mutation.id(), addr3); MutationId fakeMutationId = new MutationId(CoordinatorLogId.asLong(111, 222), MutationId.sequenceId(333, 444)); Assertions.assertThat(metadata.directory.version(new NodeId(fakeMutationId.hostId()))).isNull(); @@ -326,9 +326,9 @@ public void reconciledBounds() throws InterruptedException, ExecutionException { ImmutableCoordinatorLogOffsets.Builder logOffsetsBuilder = new ImmutableCoordinatorLogOffsets.Builder(); logOffsetsBuilder.add(fakeMutationId); - Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsetsBuilder.build())).isFalse(); + Assertions.assertThat(MutationTrackingService.instance().isDurablyReconciled(logOffsetsBuilder.build())).isFalse(); } - MutationTrackingService.instance.shutdownBlocking(); + MutationTrackingService.shutdown(); } } \ No newline at end of file From 3c5ffd89a69dce5feb5a2e1a1023b09774212308 Mon Sep 17 00:00:00 2001 From: Blake Eggleston Date: Thu, 25 Sep 2025 13:15:59 -0700 Subject: [PATCH 2/5] add DML checks --- .../statements/schema/AlterKeyspaceStatement.java | 9 +++++++++ .../statements/schema/CreateKeyspaceStatement.java | 12 ++++++++++++ 2 files changed, 21 insertions(+) diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java index 14bdef0f68bc..4b02e43efcef 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java @@ -87,6 +87,15 @@ public Keyspaces apply(ClusterMetadata metadata) if (attrs.getReplicationStrategyClass() != null && attrs.getReplicationStrategyClass().equals(SimpleStrategy.class.getSimpleName())) Guardrails.simpleStrategyEnabled.ensureEnabled(state); + if (newKeyspace.params.replicationType.isTracked() && !keyspace.params.replicationType.isTracked()) + { + if (SchemaConstants.isSystemKeyspace(keyspaceName)) + throw ire("Mutation tracking is not supported on system keyspaces"); + + if (!DatabaseDescriptor.getMutationTrackingEnabled()) + throw ire("Mutation tracking is disabled. Enable in cassandra.yaml to use."); + } + if (keyspace.params.replication.isMeta() && !keyspace.name.equals(SchemaConstants.METADATA_KEYSPACE_NAME)) throw ire("Can not alter a keyspace to use MetaReplicationStrategy"); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java index 882117f21844..3827683d7777 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java @@ -27,6 +27,7 @@ import org.apache.cassandra.auth.FunctionResource; import org.apache.cassandra.auth.IResource; import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLStatement; import org.apache.cassandra.db.guardrails.Guardrails; import org.apache.cassandra.exceptions.AlreadyExistsException; @@ -83,6 +84,17 @@ public Keyspaces apply(ClusterMetadata metadata) KeyspaceMetadata keyspaceMetadata = KeyspaceMetadata.create(keyspaceName, attrs.asNewKeyspaceParams()); + if (keyspaceMetadata.params.replicationType.isTracked()) + { + if (SchemaConstants.isSystemKeyspace(keyspaceName)) + throw ire("Mutation tracking is not supported on system keyspaces"); + + if (!DatabaseDescriptor.getMutationTrackingEnabled()) + { + throw ire("Mutation tracking is disabled. Enable in cassandra.yaml to use."); + } + } + if (keyspaceMetadata.params.replication.klass.equals(LocalStrategy.class)) throw ire("Unable to use given strategy class: LocalStrategy is reserved for internal use."); From 090f2ac9ed228ab5ac7e48bcd61ffb375b546eb3 Mon Sep 17 00:00:00 2001 From: Blake Eggleston Date: Thu, 25 Sep 2025 14:16:37 -0700 Subject: [PATCH 3/5] additional journal fixes --- .../tracked/TrackedKeyspaceWriteHandler.java | 2 +- .../replication/ActiveLogReconciler.java | 2 +- .../replication/MutationJournal.java | 30 +++++++++++++++++-- .../replication/PushMutationRequest.java | 4 +-- .../replication/UnreconciledMutations.java | 2 +- .../cassandra/service/CassandraDaemon.java | 2 +- .../cassandra/service/StorageService.java | 2 +- .../reads/tracked/PartialTrackedRead.java | 2 +- test/conf/cassandra.yaml | 4 +++ .../cassandra/distributed/impl/Instance.java | 4 +-- .../CoordinatorLogOffsetsLifecycleTest.java | 2 +- .../CoordinatorLogOffsetsTest.java | 2 +- .../replication/CoordinatorLogTest.java | 6 ++-- .../replication/MutationJournalTest.java | 2 +- .../cassandra/replication/ShardTest.java | 2 +- .../UnreconciledMutationsTest.java | 6 ++-- .../cassandra/service/StorageServiceTest.java | 2 +- 17 files changed, 52 insertions(+), 24 deletions(-) diff --git a/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java b/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java index bfd6d593c6d7..ff7599ba2564 100644 --- a/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java +++ b/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java @@ -40,7 +40,7 @@ public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws Re group = Keyspace.writeOrder.start(); Tracing.trace("Appending to mutation journal"); - RecordPointer pointer = MutationJournal.instance.write(mutation.id(), mutation); + RecordPointer pointer = MutationJournal.instance().write(mutation.id(), mutation); // TODO (preferred): update journal to return CommitLogPosition or otherwise remove requirement to allocate second object here return new CassandraWriteContext(group, new CommitLogPosition(pointer.segment, pointer.position)); diff --git a/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java b/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java index 5d685bbb124e..bd95c1dce0c7 100644 --- a/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java +++ b/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java @@ -145,7 +145,7 @@ public void onFailure(InetAddressAndPort from, RequestFailure failureReason) void send() { - RecordPointer pointer = MutationJournal.instance.lookUp(mutationId); + RecordPointer pointer = MutationJournal.instance().lookUp(mutationId); Preconditions.checkNotNull(pointer, "Mutation %s not found in the journal", mutationId); Message message = diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java b/src/java/org/apache/cassandra/replication/MutationJournal.java index 27b8ee157b43..ed7336807ddd 100644 --- a/src/java/org/apache/cassandra/replication/MutationJournal.java +++ b/src/java/org/apache/cassandra/replication/MutationJournal.java @@ -46,7 +46,7 @@ public class MutationJournal { - public static final MutationJournal instance = DatabaseDescriptor.getMutationTrackingEnabled() ? new MutationJournal() : null; + private static final MutationJournal instance = DatabaseDescriptor.getMutationTrackingEnabled() ? new MutationJournal() : null; private final Journal journal; @@ -55,18 +55,42 @@ private MutationJournal() this(new File(DatabaseDescriptor.getMutationTrackingJournalDirectory()), new JournalParams()); } + public static void start() + { + if (instance == null) + return; + + instance.startInternal(); + } + + public static void shutdown() + { + if (instance == null) + return; + + instance.shutdownBlocking(); + } + + public static MutationJournal instance() + { + if (instance == null) + throw new IllegalStateException("Mutation tracking is not enabled"); + + return instance; + } + @VisibleForTesting MutationJournal(File directory, Params params) { journal = new Journal<>("MutationJournal", directory, params, new MutationIdSupport(), new MutationSerializer(), SegmentCompactor.noop()); } - public void start() + void startInternal() { journal.start(); } - public void shutdownBlocking() + void shutdownBlocking() { journal.shutdown(); } diff --git a/src/java/org/apache/cassandra/replication/PushMutationRequest.java b/src/java/org/apache/cassandra/replication/PushMutationRequest.java index eac84dd0fdb4..6f1a7a7f5123 100644 --- a/src/java/org/apache/cassandra/replication/PushMutationRequest.java +++ b/src/java/org/apache/cassandra/replication/PushMutationRequest.java @@ -63,7 +63,7 @@ class Referenced implements PushMutationRequest public long serializedSize(int version) { // TODO (expected): handle mismatched (messaging) versions - int size = MutationJournal.instance.sizeOfRecord(pointer); + int size = MutationJournal.instance().sizeOfRecord(pointer); Preconditions.checkState(size > 0, "Couldn't read mutation %s size from the mutation journal", id); return size; } @@ -71,7 +71,7 @@ public long serializedSize(int version) @Override public void serialize(DataOutputPlus out, int version) throws IOException { - boolean read = MutationJournal.instance.read(pointer, (segment, position, key, buffer, userVersion) -> + boolean read = MutationJournal.instance().read(pointer, (segment, position, key, buffer, userVersion) -> { try { diff --git a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java index 63dff77349f4..153972a150b2 100644 --- a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java +++ b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java @@ -229,7 +229,7 @@ static UnreconciledMutations loadFromJournal(Node2OffsetsMap witnessedOffsets, i for (int offset = iter.start(), end = iter.end(); offset <= end; offset++) { ShortMutationId id = new ShortMutationId(witnessed.logId, offset); - result.addDirectly(MutationJournal.instance.read(id)); + result.addDirectly(MutationJournal.instance().read(id)); } } diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index b66611d8f93d..5a438ac1e1d6 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -257,7 +257,7 @@ protected void setup() Keyspace.setInitialized(); CommitLog.instance.start(); - MutationJournal.instance.start(); + MutationJournal.start(); SnapshotManager.instance.start(false); SnapshotManager.instance.clearExpiredSnapshots(); diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 70577a798782..ca7b663db48b 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3933,7 +3933,7 @@ protected synchronized void drain(boolean isFinalShutdown) throws IOException, I CommitLog.instance.forceRecycleAllSegments(); CommitLog.instance.shutdownBlocking(); - MutationJournal.instance.shutdownBlocking(); + MutationJournal.shutdown(); AutoRepair.instance.shutdownBlocking(); diff --git a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java index c4db7ce259d9..6287f3a2524b 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java @@ -97,7 +97,7 @@ default void augment(Log2OffsetsMap augmentingOffsets) default void augment(ShortMutationId mutationId) { - Mutation mutation = MutationJournal.instance.read(mutationId); + Mutation mutation = MutationJournal.instance().read(mutationId); Preconditions.checkNotNull(mutation); if (!command().selectsKey(mutation.key())) { diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index 20c1d2fd70dd..ad354b7ab6aa 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -68,6 +68,10 @@ local_read_size_fail_threshold: 8192KiB row_index_read_size_warn_threshold: 4096KiB row_index_read_size_fail_threshold: 8192KiB +mutation_tracking: + enabled: true + journal_directory: build/test/cassandra/mutation_journal + accord: enabled: true journal_directory: build/test/cassandra/accord_journal diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 2f497dbdf8ce..90242d0683cc 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -777,7 +777,7 @@ protected void partialStartup(ICluster cluster) throws IOException, NoSuchFie CassandraDaemon.getInstanceForTesting().migrateSystemDataIfNeeded(); CommitLog.instance.start(); - MutationJournal.instance.start(); + MutationJournal.start(); SnapshotManager.instance.start(false); SnapshotManager.instance.clearExpiredSnapshots(); @@ -1038,7 +1038,7 @@ public Future shutdown(boolean runOnExitThreads, boolean shutdownMessaging // CommitLog must shut down after Stage, or threads from the latter may attempt to use the former. // (ex. A Mutation stage thread may attempt to add a mutation to the CommitLog.) error = parallelRun(error, executor, CommitLog.instance::shutdownBlocking); - error = parallelRun(error, executor, MutationJournal.instance::shutdownBlocking); + error = parallelRun(error, executor, MutationJournal::shutdown); error = parallelRun(error, executor, MutationTrackingService::shutdown); error = parallelRun(error, executor, () -> shutdownAndWait(Collections.singletonList(JMXBroadcastExecutor.executor)) diff --git a/test/unit/org/apache/cassandra/db/CoordinatorLogOffsetsLifecycleTest.java b/test/unit/org/apache/cassandra/db/CoordinatorLogOffsetsLifecycleTest.java index 4ac2abd0b6e5..856518945d8f 100644 --- a/test/unit/org/apache/cassandra/db/CoordinatorLogOffsetsLifecycleTest.java +++ b/test/unit/org/apache/cassandra/db/CoordinatorLogOffsetsLifecycleTest.java @@ -61,7 +61,7 @@ public class CoordinatorLogOffsetsLifecycleTest static { DatabaseDescriptor.daemonInitialization(); - MutationJournal.instance.start(); + MutationJournal.start(); } @BeforeClass diff --git a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java index 034cf9292d36..f16c4eac6b8e 100644 --- a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java +++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java @@ -236,7 +236,7 @@ public Args(List ids, int contentions) public void reconciledBounds() throws InterruptedException, ExecutionException { DatabaseDescriptor.daemonInitialization(); DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); - MutationJournal.instance.start(); + MutationJournal.start(); String ks = "ks"; String tbl = "tbl"; diff --git a/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java b/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java index 5122683b41ab..a06848500b82 100644 --- a/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java +++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java @@ -67,7 +67,7 @@ public static void setUp() throws IOException .addClusteringColumn("ck", UTF8Type.instance) .addRegularColumn("value", UTF8Type.instance) .build()); - MutationJournal.instance.start(); + MutationJournal.start(); } private static Token tk(String key) @@ -175,8 +175,8 @@ private void testPersistAndLoadRoundtrip(CoordinatorLogId logId) Mutation mutation2 = createMutation(new MutationId(logId.asLong(), MutationId.sequenceId(2, 0))); unreconciled.addDirectly(mutation1); unreconciled.addDirectly(mutation2); - MutationJournal.instance.write(mutation1.id(), mutation1); - MutationJournal.instance.write(mutation2.id(), mutation2); + MutationJournal.instance().write(mutation1.id(), mutation1); + MutationJournal.instance().write(mutation2.id(), mutation2); CoordinatorLog log = CoordinatorLog.recreate(KEYSPACE, range, LOCAL_HOST_ID, logId, PARTICIPANTS, witnessed, witnessed, unreconciled); diff --git a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java index a0c778a3e482..02a190498c90 100644 --- a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java +++ b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java @@ -67,7 +67,7 @@ public static void setUp() throws IOException directory.deleteRecursiveOnExit(); journal = new MutationJournal(directory, TestParams.MUTATION_JOURNAL); - journal.start(); + journal.startInternal(); } @AfterClass diff --git a/test/unit/org/apache/cassandra/replication/ShardTest.java b/test/unit/org/apache/cassandra/replication/ShardTest.java index d8c62554f178..37b1ea819399 100644 --- a/test/unit/org/apache/cassandra/replication/ShardTest.java +++ b/test/unit/org/apache/cassandra/replication/ShardTest.java @@ -56,7 +56,7 @@ public static void setUp() throws IOException .addClusteringColumn("ck", UTF8Type.instance) .addRegularColumn("value", UTF8Type.instance) .build()); - MutationJournal.instance.start(); + MutationJournal.start(); } private static Token tk(String key) diff --git a/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java b/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java index 281c8da07944..d283eadede01 100644 --- a/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java +++ b/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java @@ -422,7 +422,7 @@ public void testSingleTokenCollectionVsFullRange() @Test public void testLoadFromJournal() { - MutationJournal.instance.start(); + MutationJournal.start(); CoordinatorLogId logId = new CoordinatorLogId(1, 1); @@ -438,8 +438,8 @@ public void testLoadFromJournal() Mutation mutation6 = createMutation(6, 6, 6); Mutation mutation7 = createMutation(7, 7, 7); - MutationJournal.instance.write(mutation6.id(), mutation6); - MutationJournal.instance.write(mutation7.id(), mutation7); + MutationJournal.instance().write(mutation6.id(), mutation6); + MutationJournal.instance().write(mutation7.id(), mutation7); Offsets.Mutable loadedOffsets = new Offsets.Mutable(logId); UnreconciledMutations unreconciled = UnreconciledMutations.loadFromJournal(witnessed, 1); diff --git a/test/unit/org/apache/cassandra/service/StorageServiceTest.java b/test/unit/org/apache/cassandra/service/StorageServiceTest.java index 4616df8b5b10..deb31d729c65 100644 --- a/test/unit/org/apache/cassandra/service/StorageServiceTest.java +++ b/test/unit/org/apache/cassandra/service/StorageServiceTest.java @@ -75,7 +75,7 @@ public static void setUpClass() throws Exception SimpleLocationProvider.LOCATION, NodeVersion.CURRENT)); CommitLog.instance.start(); - MutationJournal.instance.start(); + MutationJournal.start(); } @Before From 397319dd51da5bc2f4e7680271bfd51198ab47ca Mon Sep 17 00:00:00 2001 From: Blake Eggleston Date: Thu, 25 Sep 2025 15:20:31 -0700 Subject: [PATCH 4/5] more directory stuff --- src/java/org/apache/cassandra/config/DatabaseDescriptor.java | 4 ++++ .../apache/cassandra/replication/MutationTrackingService.java | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 5a908bb4ae15..b111ebb6a709 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2227,6 +2227,10 @@ public static void createAllDirectories() throw new ConfigurationException("commitlog_directory must be specified", false); FileUtils.createDirectory(conf.commitlog_directory); + if (conf.mutation_tracking.journal_directory == null) + throw new ConfigurationException("mutation_tracking.journal_directory must be specified", false); + FileUtils.createDirectory(conf.mutation_tracking.journal_directory); + if (conf.accord.journal_directory == null) throw new ConfigurationException("accord.journal_directory must be specified", false); FileUtils.createDirectory(conf.accord.journal_directory); diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index da302d62f8a3..22150636271a 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -126,7 +126,7 @@ public static void start(ClusterMetadata metadata) { if (!isEnabled()) return; - instance().start(metadata); + instance().startInternal(metadata); } public static void shutdown() throws InterruptedException From eb1d0125fd62b885cdeec711b3a1e7076597f476 Mon Sep 17 00:00:00 2001 From: Blake Eggleston Date: Thu, 25 Sep 2025 20:47:35 -0700 Subject: [PATCH 5/5] more tests fixes stuff --- .../cassandra/config/CassandraRelevantProperties.java | 1 + .../cassandra/distributed/impl/InstanceConfig.java | 9 +++++++++ .../cassandra/replication/CoordinatorLogOffsetsTest.java | 2 +- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index a518497e8f3c..d48471cefe8f 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -231,6 +231,7 @@ public enum CassandraRelevantProperties DTEST_IS_IN_JVM_DTEST("org.apache.cassandra.dtest.is_in_jvm_dtest"), /** In_JVM dtest property indicating that the test should use "latest" configuration */ DTEST_JVM_DTESTS_USE_LATEST("jvm_dtests.latest"), + DTEST_MUTATION_TRACKING_ENABLED("jvm_dtest.mutation_tracking.enabled", "true"), ENABLE_DC_LOCAL_COMMIT("cassandra.enable_dc_local_commit", "true"), /** * Whether {@link org.apache.cassandra.db.ConsistencyLevel#NODE_LOCAL} should be allowed. diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java index 45ef68cb81f5..ed21555a673b 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java @@ -32,6 +32,7 @@ import com.vdurmont.semver4j.Semver; import org.apache.cassandra.config.AccordSpec; import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.MutationTrackingSpec; import org.apache.cassandra.config.OptionaldPositiveInt; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInstanceConfig; @@ -42,6 +43,7 @@ import org.apache.cassandra.locator.SimpleSeedProvider; import static org.apache.cassandra.config.CassandraRelevantProperties.DTEST_ACCORD_ENABLED; +import static org.apache.cassandra.config.CassandraRelevantProperties.DTEST_MUTATION_TRACKING_ENABLED; public class InstanceConfig implements IInstanceConfig { @@ -77,6 +79,7 @@ private InstanceConfig(int num, String hints_directory, String cdc_raw_directory, AccordSpec accord, + MutationTrackingSpec mutationTracking, Collection initial_token, int storage_port, int native_transport_port, @@ -103,6 +106,8 @@ private InstanceConfig(int num, .set("accord.command_store_shard_count", accord.command_store_shard_count.toString()) .set("accord.expire_txn", accord.expire_txn) .set("accord.enable_virtual_debug_only_keyspace", "true") + .set("mutation_tracking.enabled", mutationTracking.enabled) + .set("mutation_tracking.journal_directory", mutationTracking.journal_directory) .set("partitioner", "org.apache.cassandra.dht.Murmur3Partitioner") .set("start_native_transport", true) .set("concurrent_writes", 2) @@ -331,6 +336,9 @@ public static InstanceConfig generate(int nodeNum, accordSpec.journal_directory = String.format("%s/node%d/accord_journal", root, nodeNum); accordSpec.queue_shard_count = new OptionaldPositiveInt(2); accordSpec.command_store_shard_count = new OptionaldPositiveInt(4); + MutationTrackingSpec mutationTrackingSpec = new MutationTrackingSpec(); + mutationTrackingSpec.enabled = DTEST_MUTATION_TRACKING_ENABLED.getBoolean(); + mutationTrackingSpec.journal_directory = String.format("%s/node%d/mutation_journal", root, nodeNum); return new InstanceConfig(nodeNum, networkTopology, provisionStrategy.ipAddress(nodeNum), @@ -345,6 +353,7 @@ public static InstanceConfig generate(int nodeNum, String.format("%s/node%d/hints", root, nodeNum), String.format("%s/node%d/cdc", root, nodeNum), accordSpec, + mutationTrackingSpec, tokens, provisionStrategy.storagePort(nodeNum), provisionStrategy.nativeTransportPort(nodeNum), diff --git a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java index f16c4eac6b8e..de71a83a9bf5 100644 --- a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java +++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java @@ -263,7 +263,7 @@ public void reconciledBounds() throws InterruptedException, ExecutionException { ClusterMetadataTestHelper.createKeyspace(ks, KeyspaceParams.simple(3, ReplicationType.tracked)); ClusterMetadataTestHelper.commit(new AlterSchema(SchemaTransformations.addTable(tableMetadata, false))); - MutationTrackingService.instance().start(metadata); + MutationTrackingService.start(metadata); // Eventually, will also run perturbations before checking isReconciled (like log truncation, durability, etc.) // to ensure that we don't prune data required to check what's been reconciled