diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index a518497e8f3c..d48471cefe8f 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -231,6 +231,7 @@ public enum CassandraRelevantProperties DTEST_IS_IN_JVM_DTEST("org.apache.cassandra.dtest.is_in_jvm_dtest"), /** In_JVM dtest property indicating that the test should use "latest" configuration */ DTEST_JVM_DTESTS_USE_LATEST("jvm_dtests.latest"), + DTEST_MUTATION_TRACKING_ENABLED("jvm_dtest.mutation_tracking.enabled", "true"), ENABLE_DC_LOCAL_COMMIT("cassandra.enable_dc_local_commit", "true"), /** * Whether {@link org.apache.cassandra.db.ConsistencyLevel#NODE_LOCAL} should be allowed. diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index be47e7c2e2bf..0174301cc014 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -673,7 +673,7 @@ public static class SSTableConfig public boolean dynamic_data_masking_enabled = false; - public boolean mutation_tracking_enabled = false; + public MutationTrackingSpec mutation_tracking = new MutationTrackingSpec(); /** * Time in milliseconds after a warning will be emitted to the log and to the client that a UDF runs too long. diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 86da6de79911..b111ebb6a709 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -747,6 +747,11 @@ else if (conf.repair_session_space.toMebibytes() > (int) (Runtime.getRuntime().m if (commitLogWriteDiskAccessMode != conf.commitlog_disk_access_mode) logger.info("commitlog_disk_access_mode resolved to: {}", commitLogWriteDiskAccessMode); + if (conf.mutation_tracking.journal_directory == null) + { + conf.mutation_tracking.journal_directory = storagedirFor("mutation_journal"); + } + if (conf.accord.journal_directory == null) { conf.accord.journal_directory = storagedirFor("accord_journal"); @@ -2222,6 +2227,10 @@ public static void createAllDirectories() throw new ConfigurationException("commitlog_directory must be specified", false); FileUtils.createDirectory(conf.commitlog_directory); + if (conf.mutation_tracking.journal_directory == null) + throw new ConfigurationException("mutation_tracking.journal_directory must be specified", false); + FileUtils.createDirectory(conf.mutation_tracking.journal_directory); + if (conf.accord.journal_directory == null) throw new ConfigurationException("accord.journal_directory must be specified", false); FileUtils.createDirectory(conf.accord.journal_directory); @@ -5766,16 +5775,12 @@ public static void setDynamicDataMaskingEnabled(boolean enabled) public static boolean getMutationTrackingEnabled() { - return conf.mutation_tracking_enabled; + return conf.mutation_tracking.enabled; } - public static void setMutationTrackingEnabled(boolean enabled) + public static String getMutationTrackingJournalDirectory() { - if (enabled != conf.mutation_tracking_enabled) - { - logger.info("Setting mutation_tracking_enabled to {}", enabled); - conf.mutation_tracking_enabled = enabled; - } + return conf.mutation_tracking.journal_directory; } public static OptionalDouble getSeverityDuringDecommission() diff --git a/src/java/org/apache/cassandra/config/MutationTrackingSpec.java b/src/java/org/apache/cassandra/config/MutationTrackingSpec.java new file mode 100644 index 000000000000..d9b04a0f407a --- /dev/null +++ b/src/java/org/apache/cassandra/config/MutationTrackingSpec.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.config; + +public class MutationTrackingSpec +{ + public boolean enabled = false; + public String journal_directory; +} diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 2cd78a1d8a28..e596d4a88753 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -830,7 +830,7 @@ public ResultMessage executeInternalWithoutCondition(QueryState queryState, Quer { String keyspaceName = mutation.getKeyspaceName(); Token token = mutation.key().getToken(); - MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token); + MutationId id = MutationTrackingService.instance().nextMutationId(keyspaceName, token); mutation = mutation.withMutationId(id); mutation.apply(); } diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java index 14bdef0f68bc..4b02e43efcef 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterKeyspaceStatement.java @@ -87,6 +87,15 @@ public Keyspaces apply(ClusterMetadata metadata) if (attrs.getReplicationStrategyClass() != null && attrs.getReplicationStrategyClass().equals(SimpleStrategy.class.getSimpleName())) Guardrails.simpleStrategyEnabled.ensureEnabled(state); + if (newKeyspace.params.replicationType.isTracked() && !keyspace.params.replicationType.isTracked()) + { + if (SchemaConstants.isSystemKeyspace(keyspaceName)) + throw ire("Mutation tracking is not supported on system keyspaces"); + + if (!DatabaseDescriptor.getMutationTrackingEnabled()) + throw ire("Mutation tracking is disabled. Enable in cassandra.yaml to use."); + } + if (keyspace.params.replication.isMeta() && !keyspace.name.equals(SchemaConstants.METADATA_KEYSPACE_NAME)) throw ire("Can not alter a keyspace to use MetaReplicationStrategy"); diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java index 882117f21844..3827683d7777 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/CreateKeyspaceStatement.java @@ -27,6 +27,7 @@ import org.apache.cassandra.auth.FunctionResource; import org.apache.cassandra.auth.IResource; import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLStatement; import org.apache.cassandra.db.guardrails.Guardrails; import org.apache.cassandra.exceptions.AlreadyExistsException; @@ -83,6 +84,17 @@ public Keyspaces apply(ClusterMetadata metadata) KeyspaceMetadata keyspaceMetadata = KeyspaceMetadata.create(keyspaceName, attrs.asNewKeyspaceParams()); + if (keyspaceMetadata.params.replicationType.isTracked()) + { + if (SchemaConstants.isSystemKeyspace(keyspaceName)) + throw ire("Mutation tracking is not supported on system keyspaces"); + + if (!DatabaseDescriptor.getMutationTrackingEnabled()) + { + throw ire("Mutation tracking is disabled. Enable in cassandra.yaml to use."); + } + } + if (keyspaceMetadata.params.replication.klass.equals(LocalStrategy.class)) throw ire("Unable to use given strategy class: LocalStrategy is reserved for internal use."); diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index f4631e689a54..c6cb4460d0c3 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -505,7 +505,7 @@ public ColumnFamilyStore(Keyspace keyspace, { CommitLogPosition commitLogPosition; if (metadata().replicationType().isTracked()) - commitLogPosition = MutationJournal.instance.getCurrentPosition(); + commitLogPosition = MutationJournal.instance().getCurrentPosition(); else commitLogPosition = CommitLog.instance.getCurrentPosition(); initialMemtable = createMemtable(new AtomicReference<>(commitLogPosition)); @@ -1160,7 +1160,7 @@ public CommitLogPosition call() commitLogUpperBound = mainMemtable.getFinalCommitLogUpperBound(); TableMetadata metadata = metadata(); - MutationJournal.instance.notifyFlushed(metadata.id, commitLogLowerBound, commitLogUpperBound); + MutationJournal.instance().notifyFlushed(metadata.id, commitLogLowerBound, commitLogUpperBound); CommitLog.instance.discardCompletedSegments(metadata.id, commitLogLowerBound, commitLogUpperBound); } @@ -1439,7 +1439,7 @@ private static void setCommitLogUpperBound(AtomicReference co { CommitLogPosition commitLogPosition; if (useMutationJournal) - commitLogPosition = MutationJournal.instance.getCurrentPosition(); + commitLogPosition = MutationJournal.instance().getCurrentPosition(); else commitLogPosition = CommitLog.instance.getCurrentPosition(); @@ -3290,7 +3290,7 @@ void onTableDropped() // TODO (required): test mutation tracking + table dropping if (metadata().replicationType().isTracked()) - MutationJournal.instance.notifyFlushed(metadata.id, new CommitLogPosition(0, 0), MutationJournal.instance.getCurrentPosition()); + MutationJournal.instance().notifyFlushed(metadata.id, new CommitLogPosition(0, 0), MutationJournal.instance().getCurrentPosition()); else CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id)); diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index a527d0e105d9..5e5f412a9b46 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -614,6 +614,7 @@ else if (isDeferrable) */ private Future applyInternalTracked(Mutation mutation, Promise future) { + MutationTrackingService.ensureEnabled(); Preconditions.checkState(MigrationRouter.isFullyTracked(mutation) && !mutation.id().isNone()); ClusterMetadata cm = ClusterMetadata.current(); @@ -623,7 +624,7 @@ private Future applyInternalTracked(Mutation mutation, Promise future) boolean started; try (WriteContext ctx = trackedWriteHandler.beginWrite(mutation, true)) { - started = MutationTrackingService.instance.startWriting(mutation); + started = MutationTrackingService.instance().startWriting(mutation); if (started) { @@ -673,7 +674,7 @@ private Future applyInternalTracked(Mutation mutation, Promise future) } if (started) - MutationTrackingService.instance.finishWriting(mutation); + MutationTrackingService.instance().finishWriting(mutation); if (future != null) diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index c86426e87e51..445de683e21a 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -455,7 +455,7 @@ protected PartialTrackedRead createInProgressRead(UnfilteredPartitionIterator it @Override protected MutationSummary createMutationSummaryInternal(boolean includePending) { - return MutationTrackingService.instance.createSummaryForRange(dataRange.keyRange, metadata().id, includePending); + return MutationTrackingService.instance().createSummaryForRange(dataRange.keyRange, metadata().id, includePending); } @Override diff --git a/src/java/org/apache/cassandra/db/SSTableImporter.java b/src/java/org/apache/cassandra/db/SSTableImporter.java index 1725dcbe15e8..64a0e3777629 100644 --- a/src/java/org/apache/cassandra/db/SSTableImporter.java +++ b/src/java/org/apache/cassandra/db/SSTableImporter.java @@ -260,7 +260,7 @@ private static class TrackedBulkTransfer { private static void execute(String keyspace, Set sstables) { - MutationTrackingService.instance.executeTransfers(keyspace, sstables, ConsistencyLevel.ALL); + MutationTrackingService.instance().executeTransfers(keyspace, sstables, ConsistencyLevel.ALL); } } diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 4902e57a6fab..ac9f4bd8d8c0 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -898,7 +898,7 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ReadableView view, Co protected MutationSummary createMutationSummaryInternal(boolean includePending) { - return MutationTrackingService.instance.createSummaryForKey(partitionKey, metadata().id, includePending); + return MutationTrackingService.instance().createSummaryForKey(partitionKey, metadata().id, includePending); } @Override diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index d45f16e578de..6095be094c65 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -398,7 +398,7 @@ public static ImmutableCoordinatorLogOffsets getCoordinatorLogOffsets(Set MutationTrackingService.instance().isDurablyReconciled(id)); return builder.build(); } diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java index 89f869ee0618..5e835602eae0 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReceiver.java @@ -139,7 +139,7 @@ public synchronized void received(IncomingStream stream) { Preconditions.checkState(cfs.metadata().replicationType().isTracked()); PendingLocalTransfer transfer = new PendingLocalTransfer(cfs.metadata().id, session.planId(), sstables); - MutationTrackingService.instance.received(transfer); + MutationTrackingService.instance().received(transfer); } } diff --git a/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java b/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java index 50470f4aff0b..6d8ed9014920 100644 --- a/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java +++ b/src/java/org/apache/cassandra/db/tracked/TrackedKeyspaceWriteHandler.java @@ -42,7 +42,7 @@ public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws Re MigrationRouter.validateTrackedMutation(mutation); Tracing.trace("Appending to mutation journal"); - CommitLogPosition pointer = MutationJournal.instance.write(mutation.id(), mutation); + CommitLogPosition pointer = MutationJournal.instance().write(mutation.id(), mutation); return new CassandraWriteContext(group, pointer); } diff --git a/src/java/org/apache/cassandra/db/virtual/MutationJournalTable.java b/src/java/org/apache/cassandra/db/virtual/MutationJournalTable.java index 603e6d521b77..eea1ddcb1054 100644 --- a/src/java/org/apache/cassandra/db/virtual/MutationJournalTable.java +++ b/src/java/org/apache/cassandra/db/virtual/MutationJournalTable.java @@ -63,7 +63,7 @@ public DataSet data() { SimpleDataSet result = new SimpleDataSet(metadata()); - for (Segment segment : MutationJournal.instance.getAllSegments()) + for (Segment segment : MutationJournal.instance().getAllSegments()) { result.row(segment.id()) .column(IS_ACTIVE, segment instanceof ActiveSegment) diff --git a/src/java/org/apache/cassandra/db/virtual/MutationTrackingShardsTable.java b/src/java/org/apache/cassandra/db/virtual/MutationTrackingShardsTable.java index 724b6a596db9..8014c1c667c1 100644 --- a/src/java/org/apache/cassandra/db/virtual/MutationTrackingShardsTable.java +++ b/src/java/org/apache/cassandra/db/virtual/MutationTrackingShardsTable.java @@ -82,7 +82,7 @@ public DataSet data() { SimpleDataSet result = new SimpleDataSet(metadata()); - for (Shard shard : MutationTrackingService.instance.getShards()) + for (Shard shard : MutationTrackingService.instance().getShards()) { addShardRows(shard, result); } @@ -96,7 +96,7 @@ public DataSet data(DecoratedKey key) String keyspaceName = UTF8Type.instance.compose(key.getKey()); SimpleDataSet result = new SimpleDataSet(metadata()); - for (Shard shard : MutationTrackingService.instance.getShards()) + for (Shard shard : MutationTrackingService.instance().getShards()) { Shard.DebugInfo debugInfo = shard.getDebugInfo(); if (!debugInfo.keyspace.equals(keyspaceName)) diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 2506d0a9a668..07b479ca4a91 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -341,7 +341,7 @@ protected Map finalizeMetadata() if (metadata().replicationType().isTracked() && repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) { Preconditions.checkState(Objects.equals(pendingRepair, ActiveRepairService.NO_PENDING_REPAIR)); - if (MutationTrackingService.instance.isDurablyReconciled(coordinatorLogOffsets)) + if (MutationTrackingService.instance().isDurablyReconciled(coordinatorLogOffsets)) { repairedAt = Clock.Global.currentTimeMillis(); logger.debug("Marking SSTable {} as reconciled with repairedAt {}", descriptor, repairedAt); diff --git a/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java b/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java index 822d7c5ac12d..f7ab706b83e8 100644 --- a/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java +++ b/src/java/org/apache/cassandra/metrics/MutationTrackingMetrics.java @@ -47,11 +47,11 @@ private MutationTrackingMetrics() readSummarySize = Metrics.histogram(factory.createMetricName("ReadSummarySize"), true); unreconciledMutationCount = Metrics.register( factory.createMetricName("UnreconciledMutationCount"), - () -> MutationTrackingService.instance.getUnreconciledMutationCount() + () -> MutationTrackingService.instance().getUnreconciledMutationCount() ); journalDiskSpaceUsed = Metrics.register( factory.createMetricName("JournalDiskSpaceUsed"), - () -> MutationJournal.instance.getDiskSpaceUsed() + () -> MutationJournal.instance().getDiskSpaceUsed() ); } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java b/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java index e7d5504e9070..638d82ff5905 100644 --- a/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java +++ b/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java @@ -162,21 +162,21 @@ public boolean invokeOnFailure() @Override public void onResponse(Message msg) { - MutationTrackingService.instance.receivedWriteResponse(mutationId, toHost); + MutationTrackingService.instance().receivedWriteResponse(mutationId, toHost); } @Override public void onFailure(InetAddressAndPort from, RequestFailure failureReason) { - MutationTrackingService.instance.retryFailedWrite(mutationId, toHost, failureReason); + MutationTrackingService.instance().retryFailedWrite(mutationId, toHost, failureReason); } void send() { - RecordPointer pointer = MutationJournal.instance.lookUp(mutationId); + RecordPointer pointer = MutationJournal.instance().lookUp(mutationId); Preconditions.checkNotNull(pointer, "Mutation %s not found in the journal", mutationId); - MutationJournal.instance.read(pointer, (segment, position, key, buffer, version) -> { + MutationJournal.instance().read(pointer, (segment, position, key, buffer, version) -> { // don't send mutations to nodes that have migrated to, or are in the process of migrating to untracked replication try (DataInputBuffer in = new DataInputBuffer(buffer, true)) @@ -228,7 +228,7 @@ public boolean invokeOnFailure() public void onResponse(Message msg) { logger.debug("Received activation ack for TransferTask from {}", toHost); - MutationTrackingService.instance.receivedActivationResponse(transfer, toHost); + MutationTrackingService.instance().receivedActivationResponse(transfer, toHost); } @Override @@ -240,7 +240,7 @@ public void onFailure(InetAddressAndPort from, RequestFailure failureReason) public void onFailure(Throwable cause) { logger.debug("Received activation failure for TransferTask from {} due to", toHost, cause); - MutationTrackingService.instance.retryFailedTransfer(transfer, toHost, cause); + MutationTrackingService.instance().retryFailedTransfer(transfer, toHost, cause); } void send() diff --git a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java index abd23dc3f218..a9832f90c65d 100644 --- a/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java +++ b/src/java/org/apache/cassandra/replication/BroadcastLogOffsets.java @@ -67,8 +67,9 @@ public String toString() } public static final IVerbHandler verbHandler = message -> { + MutationTrackingService.ensureEnabled(); BroadcastLogOffsets replicatedOffsets = message.payload; - MutationTrackingService.instance.updateReplicatedOffsets(replicatedOffsets.keyspace, + MutationTrackingService.instance().updateReplicatedOffsets(replicatedOffsets.keyspace, replicatedOffsets.range, replicatedOffsets.replicatedOffsets, replicatedOffsets.durable, diff --git a/src/java/org/apache/cassandra/replication/CoordinatedTransfer.java b/src/java/org/apache/cassandra/replication/CoordinatedTransfer.java index d82b27c9b270..37180c84ff4d 100644 --- a/src/java/org/apache/cassandra/replication/CoordinatedTransfer.java +++ b/src/java/org/apache/cassandra/replication/CoordinatedTransfer.java @@ -451,7 +451,7 @@ public void onResponse(Message msg) logger.debug("{} Activation successfully applied on {}", logPrefix(), msg.from()); CoordinatedTransfer.this.streamResults.computeIfPresent(msg.from(), (peer, result) -> result.committed()); - MutationTrackingService.instance.receivedActivationResponse(CoordinatedTransfer.this, msg.from()); + MutationTrackingService.instance().receivedActivationResponse(CoordinatedTransfer.this, msg.from()); responses.remove(msg.from()); if (responses.isEmpty()) { @@ -465,7 +465,7 @@ public void onResponse(Message msg) public void onFailure(InetAddressAndPort from, RequestFailure failure) { logger.error("{} Failed activation on {} due to {}", logPrefix(), from, failure); - MutationTrackingService.instance.retryFailedTransfer(CoordinatedTransfer.this, from, failure.failure); + MutationTrackingService.instance().retryFailedTransfer(CoordinatedTransfer.this, from, failure.failure); // TODO(expected): should only fail if we don't meet requested CL tryFailure(new RuntimeException("Tracked import failed during COMMIT on " + from + " due to " + failure.reason)); } diff --git a/src/java/org/apache/cassandra/replication/ForwardedWrite.java b/src/java/org/apache/cassandra/replication/ForwardedWrite.java index e5e2bf8d8b02..97b5bcf4bd4b 100644 --- a/src/java/org/apache/cassandra/replication/ForwardedWrite.java +++ b/src/java/org/apache/cassandra/replication/ForwardedWrite.java @@ -105,7 +105,7 @@ public void applyLocallyAndForwardToReplicas(CoordinatorAckInfo ackTo) String keyspaceName = mutation.getKeyspaceName(); Token token = mutation.key().getToken(); - MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token); + MutationId id = MutationTrackingService.instance().nextMutationId(keyspaceName, token); // Do not wait for handler completion, since the coordinator is already waiting and we don't want to block the stage LeaderCallback handler = new LeaderCallback(id, ackTo); applyLocallyAndForwardToReplicas(mutation.withMutationId(id), recipients, handler, ackTo); @@ -276,6 +276,7 @@ public long serializedSize(MutationRequest request, int version) public static final IVerbHandler verbHandler = incoming -> { + MutationTrackingService.ensureEnabled(); if (approxTime.now() > incoming.expiresAtNanos()) { Tracing.trace("Discarding mutation from {} (timed out)", incoming.from()); @@ -317,7 +318,7 @@ public void onResponse(Message msg) { // Local mutations are witnessed from Keyspace.applyInternalTracked if (msg != null) - MutationTrackingService.instance.receivedWriteResponse(id, msg.from()); + MutationTrackingService.instance().receivedWriteResponse(id, msg.from()); // Local write needs to be ack'd to coordinator if (msg == null && ackTo != null) diff --git a/src/java/org/apache/cassandra/replication/MutationJournal.java b/src/java/org/apache/cassandra/replication/MutationJournal.java index 65684526992e..57a2cc61b835 100644 --- a/src/java/org/apache/cassandra/replication/MutationJournal.java +++ b/src/java/org/apache/cassandra/replication/MutationJournal.java @@ -59,7 +59,7 @@ // TODO (required): handle table truncations public class MutationJournal { - public static final MutationJournal instance = new MutationJournal(); + private static final MutationJournal instance = DatabaseDescriptor.getMutationTrackingEnabled() ? new MutationJournal() : null; private final Journal journal; private final Map segmentStateTrackers; @@ -97,7 +97,31 @@ public void close() private MutationJournal() { - this(new File(DatabaseDescriptor.getCommitLogLocation()), new JournalParams()); + this(new File(DatabaseDescriptor.getMutationTrackingJournalDirectory()), new JournalParams()); + } + + public static void start() + { + if (instance == null) + return; + + instance.startInternal(); + } + + public static void shutdown() + { + if (instance == null) + return; + + instance.shutdownBlocking(); + } + + public static MutationJournal instance() + { + if (instance == null) + throw new IllegalStateException("Mutation tracking is not enabled"); + + return instance; } @VisibleForTesting @@ -145,12 +169,12 @@ private void maybeCleanupStaticSegment(Segment segmen } } - public void start() + void startInternal() { journal.start(); } - public void shutdownBlocking() + void shutdownBlocking() { journal.shutdown(); } diff --git a/src/java/org/apache/cassandra/replication/MutationTrackingService.java b/src/java/org/apache/cassandra/replication/MutationTrackingService.java index 18394bdf5390..e247a44f2ef3 100644 --- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java +++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java @@ -43,8 +43,9 @@ import org.agrona.collections.IntArrayList; import org.agrona.collections.IntHashSet; import org.apache.cassandra.concurrent.ScheduledExecutorPlus; -import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.PartitionPosition; @@ -88,7 +89,61 @@ // TODO (expected): handle topology changes public class MutationTrackingService { - private static final ScheduledExecutorPlus executor = executorFactory().scheduled("Mutation-Tracking-Service", NORMAL); + + private static final MutationTrackingService instance; + private static final ScheduledExecutorPlus executor; + + static + { + if (DatabaseDescriptor.getMutationTrackingEnabled()) + { + instance = new MutationTrackingService(); + executor = executorFactory().scheduled("Mutation-Tracking-Service", NORMAL); + } + else + { + instance = null; + executor = null; + } + } + + /** + * Callers of this method should have validated that mutation tracking is enabled + * @return + */ + public static MutationTrackingService instance() + { + if (instance == null) + throw new IllegalStateException("Mutation tracking is not enabled"); + return instance; + } + + public static boolean isEnabled() + { + return DatabaseDescriptor.getMutationTrackingEnabled(); + } + + public static void ensureEnabled() + { + if (!DatabaseDescriptor.getMutationTrackingEnabled()) + { + throw new IllegalStateException("Mutation tracking is not enabled"); + } + } + + public static void start(ClusterMetadata metadata) + { + if (!isEnabled()) + return; + instance().startInternal(metadata); + } + + public static void shutdown() throws InterruptedException + { + if (!isEnabled()) + return; + instance().shutdownBlocking(); + } /** * Split ranges into this many shards. @@ -101,7 +156,6 @@ public class MutationTrackingService private static final int SHARD_MULTIPLIER = 1; private static final Logger logger = LoggerFactory.getLogger(MutationTrackingService.class); - public static final MutationTrackingService instance = new MutationTrackingService(); private final TrackedLocalReads localReads = new TrackedLocalReads(); private ConcurrentHashMap keyspaceShards = new ConcurrentHashMap<>(); @@ -145,7 +199,7 @@ public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next, boolean }; } - public synchronized void start(ClusterMetadata metadata) + private synchronized void startInternal(ClusterMetadata metadata) { if (started) return; @@ -205,7 +259,7 @@ public synchronized boolean isStarted() return started; } - public void shutdownBlocking() throws InterruptedException + private void shutdownBlocking() throws InterruptedException { ClusterMetadataService.instance().log().removeListener(tcmListener); activeReconciler.shutdownBlocking(); @@ -767,7 +821,7 @@ private void truncateMutationJournal() { Log2OffsetsMap.Mutable reconciledOffsets = new Log2OffsetsMap.Mutable(); collectDurablyReconciledOffsets(reconciledOffsets); - MutationJournal.instance.dropReconciledSegments(reconciledOffsets); + MutationJournal.instance().dropReconciledSegments(reconciledOffsets); } /** @@ -1122,7 +1176,7 @@ public void pauseOffsetBroadcast(boolean pause) public void run(boolean durable) { - MutationTrackingService.instance.forEachKeyspace(ks -> run(ks, durable)); + MutationTrackingService.instance().forEachKeyspace(ks -> run(ks, durable)); } private void run(KeyspaceShards shards, boolean durable) @@ -1163,9 +1217,9 @@ public void run() private void run(boolean dropSegments) { - MutationTrackingService.instance.forEachKeyspace(this::run); + MutationTrackingService.instance().forEachKeyspace(this::run); if (dropSegments) - MutationTrackingService.instance.truncateMutationJournal(); + MutationTrackingService.instance().truncateMutationJournal(); } private void run(KeyspaceShards shards) diff --git a/src/java/org/apache/cassandra/replication/PullMutationsRequest.java b/src/java/org/apache/cassandra/replication/PullMutationsRequest.java index d8e6f0febb98..fa6d00285fbb 100644 --- a/src/java/org/apache/cassandra/replication/PullMutationsRequest.java +++ b/src/java/org/apache/cassandra/replication/PullMutationsRequest.java @@ -66,10 +66,11 @@ public long serializedSize(PullMutationsRequest pull, int version) @Override public void doVerb(Message message) { + MutationTrackingService.ensureEnabled(); InetAddressAndPort forHost = message.from(); Offsets offsets = message.payload.offsets; logger.trace("Received pull mutations request from {} for {}", forHost, offsets); - MutationTrackingService.instance.requestMissingMutations(offsets, forHost); + MutationTrackingService.instance().requestMissingMutations(offsets, forHost); } }; diff --git a/src/java/org/apache/cassandra/replication/PushMutationRequest.java b/src/java/org/apache/cassandra/replication/PushMutationRequest.java index 84f50ed97e04..c104b7b3060d 100644 --- a/src/java/org/apache/cassandra/replication/PushMutationRequest.java +++ b/src/java/org/apache/cassandra/replication/PushMutationRequest.java @@ -64,7 +64,7 @@ class Referenced implements PushMutationRequest public long serializedSize(int version) { // TODO (expected): handle mismatched (messaging) versions - int size = MutationJournal.instance.sizeOfRecord(pointer); + int size = MutationJournal.instance().sizeOfRecord(pointer); Preconditions.checkState(size > 0, "Couldn't read mutation %s size from the mutation journal", id); return size; } @@ -72,7 +72,7 @@ public long serializedSize(int version) @Override public void serialize(DataOutputPlus out, int version) throws IOException { - boolean read = MutationJournal.instance.read(pointer, (segment, position, key, buffer, userVersion) -> + boolean read = MutationJournal.instance().read(pointer, (segment, position, key, buffer, userVersion) -> { try { @@ -168,6 +168,7 @@ public Materialized deserialize(DataInputPlus in, int version) throws IOExceptio @Override public void doVerb(Message message) { + MutationTrackingService.ensureEnabled(); if (approxTime.now() > message.expiresAtNanos()) { Tracing.trace("Discarding mutation from {} (timed out)", message.from()); diff --git a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java index b45eae9f74dc..022e3b16c2ee 100644 --- a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java +++ b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java @@ -101,7 +101,7 @@ public static AbstractWriteResponseHandler perform( if (logger.isTraceEnabled()) logger.trace("Local tracked request {} {}", mutation, plan); writeMetrics.localRequests.mark(); - MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token); + MutationId id = MutationTrackingService.instance().nextMutationId(keyspaceName, token); mutation = mutation.withMutationId(id); if (logger.isTraceEnabled()) @@ -217,7 +217,7 @@ public static void applyLocallyAndSendToReplicas(Mutation mutation, ReplicaPlan. { if (logger.isTraceEnabled()) logger.trace("Sending mutation {} to remote replicas {}", mutation.id(), remoteReplicas); - MutationTrackingService.instance.sentWriteRequest(mutation, remoteReplicas); + MutationTrackingService.instance().sentWriteRequest(mutation, remoteReplicas); } } diff --git a/src/java/org/apache/cassandra/replication/TransferActivation.java b/src/java/org/apache/cassandra/replication/TransferActivation.java index ba0c918286d8..29f0553c127e 100644 --- a/src/java/org/apache/cassandra/replication/TransferActivation.java +++ b/src/java/org/apache/cassandra/replication/TransferActivation.java @@ -111,7 +111,7 @@ public boolean isCommit() public void apply() { - MutationTrackingService.instance.activateLocal(this); + MutationTrackingService.instance().activateLocal(this); } public static final Serializer serializer = new Serializer(); diff --git a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java index 22c15502306b..8c48c3942645 100644 --- a/src/java/org/apache/cassandra/replication/UnreconciledMutations.java +++ b/src/java/org/apache/cassandra/replication/UnreconciledMutations.java @@ -268,7 +268,7 @@ static UnreconciledMutations loadFromJournal(Node2OffsetsMap witnessedOffsets, i for (int offset = iter.start(), end = iter.end(); offset <= end; offset++) { ShortMutationId id = new ShortMutationId(witnessed.logId, offset); - Mutation mutation = MutationJournal.instance.read(id); + Mutation mutation = MutationJournal.instance().read(id); if (mutation != null) { result.addDirectly(mutation); diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 11647410e2f5..5d6da8076af9 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -257,7 +257,7 @@ protected void setup() Keyspace.setInitialized(); CommitLog.instance.start(); - MutationJournal.instance.start(); + MutationJournal.start(); SnapshotManager.instance.start(false); SnapshotManager.instance.clearExpiredSnapshots(); @@ -340,7 +340,7 @@ protected void setup() try { CommitLog.instance.recoverSegmentsOnDisk(); - MutationJournal.instance.replayStaticSegments(); + MutationJournal.instance().replayStaticSegments(); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 1c497544a70b..eea21afe8194 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3921,7 +3921,7 @@ protected synchronized void drain(boolean isFinalShutdown) throws IOException, I SnapshotManager.instance.shutdownAndWait(1L, MINUTES); HintsService.instance.shutdownBlocking(); - MutationTrackingService.instance.shutdownBlocking(); + MutationTrackingService.shutdown(); // Interrupt ongoing compactions and shutdown CM to prevent further compactions. CompactionManager.instance.forceShutdown(); @@ -3931,7 +3931,7 @@ protected synchronized void drain(boolean isFinalShutdown) throws IOException, I CommitLog.instance.forceRecycleAllSegments(); CommitLog.instance.shutdownBlocking(); - MutationJournal.instance.shutdownBlocking(); + MutationJournal.shutdown(); AutoRepair.instance.shutdownBlocking(); diff --git a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java index 38e4480c9d87..fedc0dc43ee3 100644 --- a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java @@ -56,7 +56,7 @@ public void onResponse(Message msg) { if (logger.isTraceEnabled()) logger.trace("Received write response for mutation {} from {}", mutationId, msg.from()); - MutationTrackingService.instance.receivedWriteResponse(mutationId, msg.from()); + MutationTrackingService.instance().receivedWriteResponse(mutationId, msg.from()); } wrapped.onResponse(msg); } @@ -66,7 +66,7 @@ public void onFailure(InetAddressAndPort from, RequestFailure failure) { if (logger.isTraceEnabled()) logger.trace("Write failed for mutation {} from {}: {}", mutationId, from, failure); - MutationTrackingService.instance.retryFailedWrite(mutationId, from, failure); + MutationTrackingService.instance().retryFailedWrite(mutationId, from, failure); wrapped.onFailure(from, failure); } diff --git a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java index cebac789095e..5a8607b3c577 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java @@ -254,7 +254,7 @@ void augment(Log2OffsetsMap augmentingOffsets) */ void augment(ShortMutationId mutationId) { - Mutation mutation = MutationJournal.instance.read(mutationId); + Mutation mutation = MutationJournal.instance().read(mutationId); if (mutation == null) { logger.error("Could not augment read with mutation not present in journal {}", mutationId); diff --git a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileAck.java b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileAck.java index bfa716a9e055..6d4688cad317 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileAck.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconcileAck.java @@ -26,6 +26,7 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.replication.MutationTrackingService; /** * Notifies the reconcile coordinator (data node) that this node has received @@ -50,6 +51,7 @@ public String toString() public static final IVerbHandler verbHandler = message -> { + MutationTrackingService.ensureEnabled(); ReadReconcileAck notify = message.payload; logger.trace("Received reconcile ack from {}, for {}", message.from(), notify.readId); ReadReconciliations.instance.acceptSyncAck(message.from(), notify.readId); diff --git a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java index 33d3c68e2fec..48722ea51221 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/ReadReconciliations.java @@ -222,7 +222,7 @@ boolean acceptLocalSummary(MutationSummary summary) if (logId.hostId() != LOCAL_NODE) continue; - MutationTrackingService.instance.collectRemotelyMissingMutations(ourOffsets, remoteNodes, missingOffsets); + MutationTrackingService.instance().collectRemotelyMissingMutations(ourOffsets, remoteNodes, missingOffsets); // we don't listen to or block on delivery of these mutations, intentionally missingOffsets.forEach(ReadReconciliations::push); missingOffsets.clear(); @@ -239,7 +239,7 @@ boolean acceptLocalSummary(MutationSummary summary) boolean acceptRemoteSummary(MutationSummary summary, int remoteNode) { Log2OffsetsMap.Mutable missingMutations = new Log2OffsetsMap.Mutable(); - MutationTrackingService.instance.collectLocallyMissingMutations(summary, missingMutations); + MutationTrackingService.instance().collectLocallyMissingMutations(summary, missingMutations); // don't request what's already been requested for other remote summaries // TODO (consider): rely entirely on IncomingMutations deduplication instead @@ -363,7 +363,7 @@ private boolean updateRemainingAndMaybeComplete(int mutationsDelta, int summarie private boolean complete() { if (isDataNode()) - MutationTrackingService.instance.localReads().acknowledgeReconcile(id, augmentingOffsets()); + MutationTrackingService.instance().localReads().acknowledgeReconcile(id, augmentingOffsets()); else MessagingService.instance().send(Message.out(Verb.READ_RECONCILE_ACK, new ReadReconcileAck(id)), host(dataNode)); @@ -392,7 +392,7 @@ private static void pull(int node, Offsets offsets, IncomingMutations.Callback c Offsets.Mutable toPull = new Offsets.Mutable(offsets.logId()); for (ShortMutationId id : offsets) - if (MutationTrackingService.instance.registerMutationCallback(id, callback)) + if (MutationTrackingService.instance().registerMutationCallback(id, callback)) toPull.add(id.offset()); if (!toPull.isEmpty()) diff --git a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java index 6a10357cf858..7abe7ab46c5a 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java @@ -283,7 +283,7 @@ private void start(Dispatcher.RequestTime requestTime, Consumer { AsyncPromise promise = - MutationTrackingService.instance.localReads().beginRead(readId, ClusterMetadata.current(), command, consistencyLevel, summaryNodes, requestTime, completer); + MutationTrackingService.instance().localReads().beginRead(readId, ClusterMetadata.current(), command, consistencyLevel, summaryNodes, requestTime, completer); promise.addCallback((response, error) -> { if (error != null) { @@ -452,7 +452,7 @@ public void executeLocally(Message message, ClusterMetadata m { Dispatcher.RequestTime requestTime = new Dispatcher.RequestTime(message.createdAtNanos()); AsyncPromise promise = - MutationTrackingService.instance + MutationTrackingService.instance() .localReads() .beginRead(readId, metadata, command, consistencyLevel, summaryNodes, requestTime, TrackedLocalReads.Completer.DEFAULT); promise.addCallback((response, error) -> { @@ -561,6 +561,7 @@ public long serializedSize(SummaryRequest request, int version) @Override protected void performRead(Message message, ClusterMetadata metadata) { + MutationTrackingService.ensureEnabled(); message.payload.executeLocally(message, metadata); } diff --git a/src/java/org/apache/cassandra/service/reads/tracked/TrackedSummaryResponse.java b/src/java/org/apache/cassandra/service/reads/tracked/TrackedSummaryResponse.java index ac6a4ddbeced..7512f5a745e3 100644 --- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedSummaryResponse.java +++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedSummaryResponse.java @@ -23,6 +23,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.replication.MutationSummary; +import org.apache.cassandra.replication.MutationTrackingService; import java.io.IOException; @@ -48,6 +49,7 @@ public TrackedSummaryResponse(TrackedRead.Id readId, MutationSummary summary, in public static final IVerbHandler verbHandler = message -> { + MutationTrackingService.ensureEnabled(); TrackedSummaryResponse response = message.payload; if (logger.isTraceEnabled()) logger.trace("Received summary {} from {}, for {}", response.summary, message.from(), response.readId); diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index e93dec699f2b..24f9c67d8dc3 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -510,12 +510,12 @@ synchronized void addTransferRanges(String keyspace, RangesAtEndpoint replicas, private LogTransferTask createLogTransferTask() { - ReconciledLogSnapshot reconciled = MutationTrackingService.instance.snapshotReconciledLogs(); + ReconciledLogSnapshot reconciled = MutationTrackingService.instance().snapshotReconciledLogs(); // TODO: consider tradeoffs of eagerly reading the index of each segment and filtering out the ones that // only contain fully reconciled ids vs just filtering out fully reconciled when reading out of the // snapshot for streaming - MutationJournal.Snapshot snapshot = MutationJournal.instance.snapshot(); + MutationJournal.Snapshot snapshot = MutationJournal.instance().snapshot(); try { // TODO: grab references to all current segments and the relevant reconciled sets diff --git a/src/java/org/apache/cassandra/streaming/messages/IncomingMutationLogStreamMessage.java b/src/java/org/apache/cassandra/streaming/messages/IncomingMutationLogStreamMessage.java index fea433419e89..5e79bbfac9a4 100644 --- a/src/java/org/apache/cassandra/streaming/messages/IncomingMutationLogStreamMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/IncomingMutationLogStreamMessage.java @@ -87,7 +87,7 @@ public IncomingMutationLogStreamMessage deserialize(DataInputPlus in, int versio mutation.apply(); } - MutationTrackingService.instance.recordFullyReconciledOffsets(header.reconciled); + MutationTrackingService.instance().recordFullyReconciledOffsets(header.reconciled); return new IncomingMutationLogStreamMessage(header, session); } diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index f92134f19f5f..d65f823653f8 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -156,14 +156,14 @@ public static void initializeAsNonCmsNode(Function wrapPro .withStorage(LogStorage.SystemKeyspace) .afterReplay(Startup::scrubDataDirectories, (metadata) -> StorageService.instance.registerMBeans(), - MutationTrackingService.instance::start) + MutationTrackingService::start) .withDefaultListeners(); ClusterMetadataService.setInstance(new ClusterMetadataService(new UniformRangePlacement(), wrapProcessor, ClusterMetadataService::state, logSpec)); ClusterMetadataService.instance().log().ready(); - MutationTrackingService.instance.registerTCMListener(); + MutationTrackingService.instance().registerTCMListener(); NodeId nodeId = ClusterMetadata.current().myNodeId(); UUID currentHostId = SystemKeyspace.getLocalHostId(); @@ -271,7 +271,7 @@ public static void initializeFromGossip(Function wrapProce .withInitialState(emptyFromSystemTables) .afterReplay(Startup::scrubDataDirectories, (metadata) -> StorageService.instance.registerMBeans(), - MutationTrackingService.instance::start) + MutationTrackingService::start) .withStorage(LogStorage.SystemKeyspace) .withDefaultListeners(); @@ -281,12 +281,12 @@ public static void initializeFromGossip(Function wrapProce logSpec)); ClusterMetadataService.instance().log().ready(); - MutationTrackingService.instance.registerTCMListener(); + MutationTrackingService.instance().registerTCMListener(); initMessaging.run(); try { CommitLog.instance.recoverSegmentsOnDisk(); - MutationJournal.instance.replayStaticSegments(); + MutationJournal.instance().replayStaticSegments(); } catch (IOException e) { @@ -381,7 +381,7 @@ public static void reinitializeWithClusterMetadata(String fileName, Function StorageService.instance.registerMBeans(), - MutationTrackingService.instance::start) + MutationTrackingService::start) .withPreviousState(prev) .withInitialState(metadata) .withStorage(LogStorage.SystemKeyspace) @@ -394,7 +394,7 @@ public static void reinitializeWithClusterMetadata(String fileName, Function cluster) throws IOException, NoSuchFie CassandraDaemon.getInstanceForTesting().migrateSystemDataIfNeeded(); CommitLog.instance.start(); - MutationJournal.instance.start(); + MutationJournal.start(); SnapshotManager.instance.start(false); SnapshotManager.instance.clearExpiredSnapshots(); @@ -825,7 +825,7 @@ protected void partialStartup(ICluster cluster) throws IOException, NoSuchFie try { CommitLog.instance.recoverSegmentsOnDisk(); - MutationJournal.instance.replayStaticSegments(); + MutationJournal.instance().replayStaticSegments(); } catch (IOException e) { @@ -1039,8 +1039,8 @@ public Future shutdown(boolean runOnExitThreads, boolean shutdownMessaging // CommitLog must shut down after Stage, or threads from the latter may attempt to use the former. // (ex. A Mutation stage thread may attempt to add a mutation to the CommitLog.) error = parallelRun(error, executor, CommitLog.instance::shutdownBlocking); - error = parallelRun(error, executor, MutationJournal.instance::shutdownBlocking); - error = parallelRun(error, executor, MutationTrackingService.instance::shutdownBlocking); + error = parallelRun(error, executor, MutationJournal::shutdown); + error = parallelRun(error, executor, MutationTrackingService::shutdown); error = parallelRun(error, executor, () -> shutdownAndWait(Collections.singletonList(JMXBroadcastExecutor.executor)) ); diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java index 45ef68cb81f5..ed21555a673b 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java @@ -32,6 +32,7 @@ import com.vdurmont.semver4j.Semver; import org.apache.cassandra.config.AccordSpec; import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.MutationTrackingSpec; import org.apache.cassandra.config.OptionaldPositiveInt; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInstanceConfig; @@ -42,6 +43,7 @@ import org.apache.cassandra.locator.SimpleSeedProvider; import static org.apache.cassandra.config.CassandraRelevantProperties.DTEST_ACCORD_ENABLED; +import static org.apache.cassandra.config.CassandraRelevantProperties.DTEST_MUTATION_TRACKING_ENABLED; public class InstanceConfig implements IInstanceConfig { @@ -77,6 +79,7 @@ private InstanceConfig(int num, String hints_directory, String cdc_raw_directory, AccordSpec accord, + MutationTrackingSpec mutationTracking, Collection initial_token, int storage_port, int native_transport_port, @@ -103,6 +106,8 @@ private InstanceConfig(int num, .set("accord.command_store_shard_count", accord.command_store_shard_count.toString()) .set("accord.expire_txn", accord.expire_txn) .set("accord.enable_virtual_debug_only_keyspace", "true") + .set("mutation_tracking.enabled", mutationTracking.enabled) + .set("mutation_tracking.journal_directory", mutationTracking.journal_directory) .set("partitioner", "org.apache.cassandra.dht.Murmur3Partitioner") .set("start_native_transport", true) .set("concurrent_writes", 2) @@ -331,6 +336,9 @@ public static InstanceConfig generate(int nodeNum, accordSpec.journal_directory = String.format("%s/node%d/accord_journal", root, nodeNum); accordSpec.queue_shard_count = new OptionaldPositiveInt(2); accordSpec.command_store_shard_count = new OptionaldPositiveInt(4); + MutationTrackingSpec mutationTrackingSpec = new MutationTrackingSpec(); + mutationTrackingSpec.enabled = DTEST_MUTATION_TRACKING_ENABLED.getBoolean(); + mutationTrackingSpec.journal_directory = String.format("%s/node%d/mutation_journal", root, nodeNum); return new InstanceConfig(nodeNum, networkTopology, provisionStrategy.ipAddress(nodeNum), @@ -345,6 +353,7 @@ public static InstanceConfig generate(int nodeNum, String.format("%s/node%d/hints", root, nodeNum), String.format("%s/node%d/cdc", root, nodeNum), accordSpec, + mutationTrackingSpec, tokens, provisionStrategy.storagePort(nodeNum), provisionStrategy.nativeTransportPort(nodeNum), diff --git a/test/distributed/org/apache/cassandra/distributed/test/MutationTrackingMigrationTest.java b/test/distributed/org/apache/cassandra/distributed/test/MutationTrackingMigrationTest.java index e4b352101a8f..51344e893d57 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/MutationTrackingMigrationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/MutationTrackingMigrationTest.java @@ -112,7 +112,7 @@ private static int countJournalEntries() { int[] count = new int[1]; - MutationJournal.instance.snapshot().readAll((segment, position, key, buffer, version) -> { + MutationJournal.instance().snapshot().readAll((segment, position, key, buffer, version) -> { count[0]++; }); diff --git a/test/distributed/org/apache/cassandra/distributed/test/TrackedBatchTest.java b/test/distributed/org/apache/cassandra/distributed/test/TrackedBatchTest.java index 9b683cf7e2de..2fd1aff4a7a1 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/TrackedBatchTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/TrackedBatchTest.java @@ -44,8 +44,7 @@ public void testMultipleTrackedMutations() throws Throwable { try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) - .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true")) + .with(Feature.GOSSIP)) .start()) { // Create tracked keyspace @@ -89,8 +88,7 @@ public void testMixedTrackedUntracked() throws Throwable { try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) - .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true")) + .with(Feature.GOSSIP)) .start()) { // Create tracked keyspace diff --git a/test/distributed/org/apache/cassandra/distributed/test/WitnessAlwaysReadsFullReplicaTest.java b/test/distributed/org/apache/cassandra/distributed/test/WitnessAlwaysReadsFullReplicaTest.java index fea9b29544b7..342beea4bd6c 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/WitnessAlwaysReadsFullReplicaTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/WitnessAlwaysReadsFullReplicaTest.java @@ -82,7 +82,6 @@ public static void setUpClass() throws Exception setupCluster(6, builder -> builder.withRacks(2, 1, 3) // 2 DCs with 3 nodes each .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") .set("transient_replication_enabled", "true") .set("dynamic_snitch", false) // Disable dynamic snitch .set("node_proximity", TransientFirstProximity.class.getName()))); // Use our custom proximity diff --git a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithMutationTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithMutationTrackingTest.java index b730901ca9d3..3dfe75e24a72 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithMutationTrackingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkWithMutationTrackingTest.java @@ -69,7 +69,6 @@ protected List supportedIndexers() protected void clusterConfig(IInstanceConfig c) { super.clusterConfig(c); - c.set("mutation_tracking_enabled", true); IGNORED_ISSUES.remove(AF_MULTI_NODE_MULTI_COLUMN_AND_NODE_LOCAL_WRITES); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/TrackedHostReplacementTest.java b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/TrackedHostReplacementTest.java index 589738a0097f..df8315272d3c 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/TrackedHostReplacementTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/hostreplacement/TrackedHostReplacementTest.java @@ -64,7 +64,7 @@ private static void pauseLogBroadcasts(Cluster cluster, boolean pause) cluster.stream() .filter(node -> !node.isShutdown()) .forEach(node -> node.runOnInstance(() -> { - MutationTrackingService.instance.pauseOffsetBroadcast(pause); + MutationTrackingService.instance().pauseOffsetBroadcast(pause); })); } @@ -334,7 +334,7 @@ private void advanceMutationLogSegment(Cluster cluster) { cluster.stream().filter(node -> !node.isShutdown()).forEach( node -> { node.runOnInstance(() -> { - MutationJournal.instance.advanceSegment(); + MutationJournal.instance().advanceSegment(); }); }); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingServiceTest.java b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingServiceTest.java index 4f75a00ea885..f305a635d426 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingServiceTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/replication/MutationTrackingServiceTest.java @@ -46,7 +46,7 @@ public void testIrrelevantTopologyChange() throws Throwable // Test that shards remain the same object (no reconfiguration) Boolean shardsUnchanged = cluster.get(1).callOnInstance(() -> { - var service = MutationTrackingService.instance; + var service = MutationTrackingService.instance(); Object initialShards = MutationTrackingService.TestAccess.getKeyspaceShards(service, KS_NAME); Object newShards = MutationTrackingService.TestAccess.getKeyspaceShards(service, KS_NAME); return initialShards == newShards; // Same object reference diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/BulkTransfersTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/BulkTransfersTest.java index 2c85dd81b961..633e00a4e408 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/BulkTransfersTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/BulkTransfersTest.java @@ -729,7 +729,6 @@ private static Cluster cluster(IInstanceInitializer initializer) throws IOExcept return Cluster.build(NODES) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") .set("write_request_timeout", "1000ms") .set("autocompaction_on_startup_enabled", false) .set("repair_request_timeout", "2s") @@ -838,7 +837,7 @@ private static void assertSummary(Iterable validate, IIsolat MutationSummary summary; try { - summary = MutationTrackingService.instance.createSummaryForKey(key, tableId, false); + summary = MutationTrackingService.instance().createSummaryForKey(key, tableId, false); } catch (UnknownShardException e) { @@ -869,8 +868,8 @@ private static void assertCompaction(Cluster cluster, Iterable { i.runOnInstance(() -> { - MutationTrackingService.instance.persistLogStateForTesting(); - MutationTrackingService.instance.broadcastOffsetsForTesting(); + MutationTrackingService.instance().persistLogStateForTesting(); + MutationTrackingService.instance().broadcastOffsetsForTesting(); }); }); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java index 3e0a1ad260dc..935cd3db6563 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounceTest.java @@ -74,7 +74,7 @@ public void bounceTest() throws Throwable history.insert(pk); if (++counter % 10 == 0) - cluster.get(1).runOnInstance(() -> MutationJournal.instance.closeCurrentSegmentForTestingIfNonEmpty()); + cluster.get(1).runOnInstance(() -> MutationJournal.instance().closeCurrentSegmentForTestingIfNonEmpty()); } ClusterUtils.stopUnchecked(cluster.get(1)); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounce_ValidateRunnable.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounce_ValidateRunnable.java index 0afb402b1dfb..88a16e41af87 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounce_ValidateRunnable.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingBounce_ValidateRunnable.java @@ -44,7 +44,7 @@ public MutationTrackingBounce_ValidateRunnable(int expectedMutations) public void run() { AtomicInteger counter = new AtomicInteger(); - MutationJournal.instance.replay(new DeserializedRecordConsumer<>(MutationJournal.MutationSerializer.INSTANCE) + MutationJournal.instance().replay(new DeserializedRecordConsumer<>(MutationJournal.MutationSerializer.INSTANCE) { Set seen = new HashSet<>(); @Override @@ -55,7 +55,7 @@ protected void accept(long segment, int position, ShortMutationId key, Mutation for (PartitionUpdate partitionUpdate : mutation.getPartitionUpdates()) { - if (!MutationTrackingService.instance.createSummaryForKey(partitionUpdate.partitionKey(), partitionUpdate.metadata().id, false) + if (!MutationTrackingService.instance().createSummaryForKey(partitionUpdate.partitionKey(), partitionUpdate.metadata().id, false) .contains(key)) { throw new AssertionError(String.format("Mutation %s should have been witnessed (%s)", mutation, key)); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingLogPersisterTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingLogPersisterTest.java index 5d0785454e89..9d8349a6398a 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingLogPersisterTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingLogPersisterTest.java @@ -46,8 +46,7 @@ public void testLogPersisterClearsStaticSegments() throws Throwable { try (Cluster cluster = builder().withNodes(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) - .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true")) + .with(Feature.GOSSIP)) .start()) { int tables = 3; @@ -81,16 +80,16 @@ public void testLogPersisterClearsStaticSegments() throws Throwable history.insert(pk); if (++counter % 10 == 0) - cluster.get(1).runOnInstance(() -> MutationJournal.instance.closeCurrentSegmentForTestingIfNonEmpty()); + cluster.get(1).runOnInstance(() -> MutationJournal.instance().closeCurrentSegmentForTestingIfNonEmpty()); } cluster.forEach(i -> i.nodetoolResult("flush", KEYSPACE).asserts().success()); - cluster.forEach(i -> i.runOnInstance(() -> MutationTrackingService.instance.persistLogStateForTesting())); - cluster.forEach(i -> i.runOnInstance(() -> MutationTrackingService.instance.broadcastOffsetsForTesting())); - cluster.forEach(i -> i.runOnInstance(() -> MutationTrackingService.instance.persistLogStateForTesting())); + cluster.forEach(i -> i.runOnInstance(() -> MutationTrackingService.instance().persistLogStateForTesting())); + cluster.forEach(i -> i.runOnInstance(() -> MutationTrackingService.instance().broadcastOffsetsForTesting())); + cluster.forEach(i -> i.runOnInstance(() -> MutationTrackingService.instance().persistLogStateForTesting())); cluster.forEach(i -> i.runOnInstance(() -> { - int staticSegments = MutationJournal.instance.countStaticSegmentsForTesting(); + int staticSegments = MutationJournal.instance().countStaticSegmentsForTesting(); Assert.assertEquals("Expected no static segments after log persister runs", 0, staticSegments); })); }); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingMetricsTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingMetricsTest.java index 54d3c6b97348..2f07e0e12223 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingMetricsTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingMetricsTest.java @@ -146,7 +146,7 @@ public void testBroadcastOffsetsDiscoveredMetric() throws Throwable // Broadcast offsets from node 1 to other nodes // This tells node 3 about mutations it's missing - cluster.get(1).runOnInstance(() -> MutationTrackingService.instance.broadcastOffsetsForTesting()); + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance().broadcastOffsetsForTesting()); // Wait for broadcasts to propagate to node 3 long[] previousCount = {0}; @@ -165,7 +165,7 @@ public void testBroadcastOffsetsDiscoveredMetric() throws Throwable long afterFirstBroadcast = cluster.get(3).callOnInstance(() -> MutationTrackingMetrics.instance.broadcastOffsetsDiscovered.getCount()); // Broadcast the same offsets again (duplicate) - should NOT increment metric - cluster.get(1).runOnInstance(() -> MutationTrackingService.instance.broadcastOffsetsForTesting()); + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance().broadcastOffsetsForTesting()); // Wait for duplicate broadcast to propagate, then verify metric stayed the same // We poll to ensure the broadcast had time to arrive, then check it didn't increment @@ -343,7 +343,7 @@ public void testJournalDiskSpaceUsedMetric() throws Throwable // Close segment every 20 writes to create multiple segments if (i % 20 == 0 && i > 0) - cluster.get(1).runOnInstance(() -> MutationJournal.instance.closeCurrentSegmentForTestingIfNonEmpty()); + cluster.get(1).runOnInstance(() -> MutationJournal.instance().closeCurrentSegmentForTestingIfNonEmpty()); } // Verify disk space increased diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPartitionReadTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPartitionReadTest.java index 119ec6152364..3c1d16bfb0a3 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPartitionReadTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPartitionReadTest.java @@ -49,7 +49,6 @@ public static void setup() throws IOException cluster = Cluster.build() .withNodes(REPLICAS) .withConfig(cfg -> cfg.with(Feature.NETWORK, Feature.GOSSIP) - .set("mutation_tracking_enabled", true) .set("hinted_handoff_enabled", false)) .start(); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java index ae49c0038312..5c88a2e7f123 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java @@ -88,7 +88,7 @@ public void testPendingWriteInclusion() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { @@ -121,7 +121,7 @@ public void testPendingWriteInclusion() throws Throwable Assert.assertEquals(1, secondIds.offsetCount()); // create a mutation - MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, dk.getToken()); + MutationId id = MutationTrackingService.instance().nextMutationId(keyspaceName, dk.getToken()); SimpleBuilders.MutationBuilder builder = new SimpleBuilders.MutationBuilder(id, keyspaceName, dk); PartitionUpdate.SimpleBuilder tableBuilder = builder.update(metadata); tableBuilder.row(bytes(1)).add("v", 1); @@ -144,7 +144,7 @@ public void testPendingWriteInclusion() throws Throwable initialSummary = command.createMutationSummary(false); ReadExecutionController controller = command.executionController(false); - MutationTrackingService.instance.startWriting(mutation); + MutationTrackingService.instance().startWriting(mutation); read = command.beginTrackedRead(controller); // Create another summary once initial data has been read fully. We do this to catch @@ -204,7 +204,7 @@ public void testPendingReadInclusion() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { @@ -229,12 +229,12 @@ public void testPendingReadInclusion() throws Throwable int nowInSeconds = (int) FBUtilities.nowInSeconds(); SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(metadata, nowInSeconds, dk); -// try (ListeningPendingRead pendingRead = (ListeningPendingRead) MutationTrackingService.instance.startReading(command)) +// try (ListeningPendingRead pendingRead = (ListeningPendingRead) MutationTrackingService.instance().startReading(command)) // { // Assert.assertTrue(pendingRead.mutationIds().isEmpty()); // // // create and apply a mutation -// MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, dk.getToken()); +// MutationId id = MutationTrackingService.instance().nextMutationId(keyspaceName, dk.getToken()); // SimpleBuilders.MutationBuilder builder = new SimpleBuilders.MutationBuilder(id, keyspaceName, dk); // PartitionUpdate.SimpleBuilder tableBuilder = builder.update(metadata); // tableBuilder.row(bytes(1)).add("v", 1); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingRangeReadTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingRangeReadTest.java index 43b8804f7c63..dc198e719186 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingRangeReadTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingRangeReadTest.java @@ -44,9 +44,7 @@ public static void setup() throws IOException { cluster = Cluster.build() .withNodes(REPLICAS) - .withConfig(cfg -> cfg.with(Feature.NETWORK, Feature.GOSSIP) - .set("mutation_tracking_enabled", true) - .set("hinted_handoff_enabled", false)) + .withConfig(cfg -> cfg.with(Feature.NETWORK, Feature.GOSSIP).set("hinted_handoff_enabled", false)) .start(); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java index 37f6c6a46f37..6690b7bb9d82 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java @@ -114,7 +114,7 @@ public void testBasicReadReconciliation() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { @@ -192,7 +192,7 @@ public void testReadReconciliationApplyMutations() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { @@ -269,7 +269,7 @@ public void testBasicRangeReadReconciliation() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { @@ -345,7 +345,7 @@ public void testBasicRangeReadReconciliationApplyMutations() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java index 94b1b489aa96..1aa200eb27fb 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java @@ -73,7 +73,7 @@ public void testBasicWritePath() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true")) + .set("mutation_tracking.enabled", "true")) .start()) { @@ -95,7 +95,7 @@ public void testBasicWritePath() throws Throwable cluster.get(1).runOnInstance(() -> { TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, "tbl"); DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); - MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(dk, table.id, false); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); CoordinatorLogId logId = getOnlyLogId(summary); Offsets summaryIds = summaryIdSpace(summary.get(logId)); assertEquals(1, summaryIds.offsetCount()); @@ -122,7 +122,6 @@ private void testWitnessPaxosReads(String paxosVariant) throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") .set("transient_replication_enabled", "true") .set("paxos_variant", paxosVariant)) .start()) @@ -145,13 +144,13 @@ private void testWitnessPaxosReads(String paxosVariant) throws Throwable // Two nodes should know about the mutation for (int i = 1; i <= 2; i++) cluster.get(i).runOnInstance(() -> { - MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(Util.dk(1), ColumnFamilyStore.getIfExists(keyspaceName, "tbl").metadata.id, true); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(Util.dk(1), ColumnFamilyStore.getIfExists(keyspaceName, "tbl").metadata.id, true); assertEquals(1, summary.size()); }); // Filter should stop the witness from getting the mutation so we can test pushing the mutation summary to the witness cluster.get(3).runOnInstance(() -> { - MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(Util.dk(1), ColumnFamilyStore.getIfExists(keyspaceName, "tbl").metadata.id, true); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(Util.dk(1), ColumnFamilyStore.getIfExists(keyspaceName, "tbl").metadata.id, true); assertEquals(0, summary.size()); }); @@ -183,7 +182,7 @@ private void testWitnessPaxosReads(String paxosVariant) throws Throwable // The read at SERIAL should propagate the mutation to the witness cluster.get(3).runOnInstance(() -> { - MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(Util.dk(1), ColumnFamilyStore.getIfExists(keyspaceName, "tbl").metadata.id, true); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(Util.dk(1), ColumnFamilyStore.getIfExists(keyspaceName, "tbl").metadata.id, true); assertEquals(1, summary.size()); }); } @@ -208,7 +207,6 @@ private void testWitnessBatchWrites(boolean logged) throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") .set("transient_replication_enabled", "true")) .start()) { @@ -242,7 +240,7 @@ private void testWitnessBatchWrites(boolean logged) throws Throwable cluster.get(1).runOnInstance(() -> { TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, "tbl"); DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); - MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(dk, table.id, false); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); CoordinatorLogId logId = getOnlyLogId(summary); Offsets summaryIds = summaryIdSpace(summary.get(logId)); @@ -299,7 +297,6 @@ private void testWitnessWrites(String insertCql, ConsistencyLevel cl, String pax try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") .set("transient_replication_enabled", "true") .set("paxos_variant", paxosVariantFinal)) .start()) @@ -333,7 +330,7 @@ private void testWitnessWrites(String insertCql, ConsistencyLevel cl, String pax cluster.get(1).runOnInstance(() -> { TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, "tbl"); DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); - MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(dk, table.id, false); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); CoordinatorLogId logId = getOnlyLogId(summary); Offsets summaryIds = summaryIdSpace(summary.get(logId)); @@ -364,7 +361,6 @@ public void testWitnessWriteSkippedPath() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") .set("transient_replication_enabled", "true")) .start()) { @@ -398,7 +394,7 @@ public void testWitnessWriteSkippedPath() throws Throwable cluster.get(1).runOnInstance(() -> { TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, "tbl"); DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); - MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(dk, table.id, false); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); CoordinatorLogId logId = getOnlyLogId(summary); Offsets summaryIds = summaryIdSpace(summary.get(logId)); @@ -429,7 +425,7 @@ public void testHintsNotWrittenOnFailedWrite() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { @@ -465,7 +461,7 @@ public void testFailedMutationRedelivery() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .start()) { @@ -481,7 +477,7 @@ public void testFailedMutationRedelivery() throws Throwable cluster.filters().verbs(Verb.MUTATION_REQ.id).to(3).drop(); // pause reconciler temporarily - cluster.get(1).runOnInstance(() -> MutationTrackingService.instance.pauseActiveReconciler()); + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance().pauseActiveReconciler()); // issue a write - should fail on node 3 cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (k, v) VALUES (1, 1)"), ConsistencyLevel.QUORUM); @@ -492,21 +488,21 @@ public void testFailedMutationRedelivery() throws Throwable { TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, "tbl"); DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); - MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(dk, table.id, false); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); CoordinatorLogId logId = getOnlyLogId(summary); Assert.assertEquals(1, summary.get(logId).unreconciled.offsetCount()); Assert.assertEquals(0, summary.get(logId).reconciled.offsetCount()); }); // resume the reconciler - cluster.get(1).runOnInstance(() -> MutationTrackingService.instance.resumeActiveReconciler()); + cluster.get(1).runOnInstance(() -> MutationTrackingService.instance().resumeActiveReconciler()); Thread.sleep(1000); // wait for reconiciler to do its job cluster.get(1).runOnInstance(() -> { TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, "tbl"); DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); - MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(dk, table.id, false); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); CoordinatorLogId logId = getOnlyLogId(summary); Assert.assertEquals(0, summary.get(logId).unreconciled.offsetCount()); Assert.assertEquals(1, summary.get(logId).reconciled.offsetCount()); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingUtils.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingUtils.java index e1168dcf17b9..7b6883dbbe7f 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingUtils.java @@ -118,7 +118,7 @@ public static MutationSummary decodeSummary(byte[] bytes) public static MutationSummary summaryForKey(String keyspaceName, String tableName, DecoratedKey dk) { TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, tableName); - return MutationTrackingService.instance.createSummaryForKey(dk, table.id, false); + return MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); } public static MutationSummary summaryForTable(String keyspaceName, String tableName) @@ -155,7 +155,7 @@ public static MutationSummary summaryForKey(String keyspaceName, String tableNam public static MutationSummary summaryForRange(String keyspaceName, String tableName, Range range) { TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, tableName); - return MutationTrackingService.instance.createSummaryForRange(range, table.id, false); + return MutationTrackingService.instance().createSummaryForRange(range, table.id, false); } public static Offsets summaryIdSpace(CoordinatorSummary summary) @@ -290,7 +290,7 @@ public static CoordinatorLogId getOnlyLogId(MutationSummary summary) public static Mutation createMutation(TableMetadata tableMetadata, int k, int v) { DecoratedKey key = tableMetadata.partitioner.decorateKey(ByteBufferUtil.bytes(k)); - MutationId mutationId = MutationTrackingService.instance.nextMutationId(tableMetadata.keyspace, key.getToken()); + MutationId mutationId = MutationTrackingService.instance().nextMutationId(tableMetadata.keyspace, key.getToken()); SimpleBuilders.MutationBuilder builder = new SimpleBuilders.MutationBuilder(mutationId, tableMetadata.keyspace, key); PartitionUpdate.SimpleBuilder partition = builder.update(tableMetadata); partition.row().add("v", v); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java index 6209c1c1b637..506a3e078769 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java @@ -65,7 +65,7 @@ public void testBasicWriteForwarding() throws Throwable try (Cluster cluster = Cluster.build(NODES) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true") + .set("mutation_tracking.enabled", "true") .set("write_request_timeout", "1000ms")) .withNodeIdTopology(topology) .start()) @@ -97,7 +97,7 @@ public void testBasicWriteForwarding() throws Throwable Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); Range fullRange = new Range<>(token, token); TableId tableId = Schema.instance.getTableMetadata(keyspaceName, tableName).id; - MutationSummary summary = MutationTrackingService.instance.createSummaryForRange(fullRange, tableId, true); + MutationSummary summary = MutationTrackingService.instance().createSummaryForRange(fullRange, tableId, true); return summary.reconciledIds(); }); @@ -105,7 +105,7 @@ public void testBasicWriteForwarding() throws Throwable Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); Range fullRange = new Range<>(token, token); TableId tableId = Schema.instance.getTableMetadata(keyspaceName, tableName).id; - MutationSummary summary = MutationTrackingService.instance.createSummaryForRange(fullRange, tableId, true); + MutationSummary summary = MutationTrackingService.instance().createSummaryForRange(fullRange, tableId, true); return summary.unreconciledIds(); }); diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java index 5cd9c8cb2ccf..0668a2e3dd4e 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java @@ -46,7 +46,7 @@ public void testBroadcastOffsets() throws Throwable try (Cluster cluster = Cluster.build(3) .withConfig(cfg -> cfg.with(Feature.NETWORK) .with(Feature.GOSSIP) - .set("mutation_tracking_enabled", "true")) + .set("mutation_tracking.enabled", "true")) .start()) { @@ -61,14 +61,14 @@ public void testBroadcastOffsets() throws Throwable cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl (k, v) VALUES (1, 1)"), ConsistencyLevel.QUORUM); for (int i = 1; i <= cluster.size(); ++i) - cluster.get(i).runOnInstance(() -> MutationTrackingService.instance.broadcastOffsetsForTesting()); + cluster.get(i).runOnInstance(() -> MutationTrackingService.instance().broadcastOffsetsForTesting()); for (int i = 1; i <= cluster.size(); ++i) { cluster.get(i).runOnInstance(() -> { TableMetadata table = Schema.instance.getTableMetadata(keyspaceName, "tbl"); DecoratedKey dk = Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1)); - MutationSummary summary = MutationTrackingService.instance.createSummaryForKey(dk, table.id, false); + MutationSummary summary = MutationTrackingService.instance().createSummaryForKey(dk, table.id, false); MutationSummary.CoordinatorSummary coordinatorSummary = summary.get(getOnlyLogId(summary)); Assert.assertEquals(1, coordinatorSummary.reconciled.offsetCount()); Assert.assertEquals(0, coordinatorSummary.unreconciled.offsetCount()); diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index dc8adb72c9b4..2de315458c91 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -119,7 +119,7 @@ static public void initialize() throws IOException } SchemaLoader.loadSchema(); - MutationJournal.instance.start(); + MutationJournal.start(); SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour CommitLog.instance.stopUnsafe(true); diff --git a/test/unit/org/apache/cassandra/ServerTestUtils.java b/test/unit/org/apache/cassandra/ServerTestUtils.java index 649b4cd101bc..9e9e59182c52 100644 --- a/test/unit/org/apache/cassandra/ServerTestUtils.java +++ b/test/unit/org/apache/cassandra/ServerTestUtils.java @@ -155,7 +155,7 @@ public static void prepareServerNoRegister() daemonInitialization(); // Need to happen after daemonInitialization for config to be set, but before CFS initialization - MutationJournal.instance.start(); + MutationJournal.start(); if (isServerPrepared) return; diff --git a/test/unit/org/apache/cassandra/db/CleanupTransientTest.java b/test/unit/org/apache/cassandra/db/CleanupTransientTest.java index bf4f65f2538d..e5603c65455b 100644 --- a/test/unit/org/apache/cassandra/db/CleanupTransientTest.java +++ b/test/unit/org/apache/cassandra/db/CleanupTransientTest.java @@ -74,7 +74,6 @@ public class CleanupTransientTest extends CassandraTestBase @BeforeClass public static void setup() throws Exception { - DatabaseDescriptor.setMutationTrackingEnabled(true); DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simpleWitness("2/1"), @@ -159,7 +158,7 @@ protected void fillCF(ColumnFamilyStore cfs, String colName, int rowsPerSSTable) .clustering(COLUMN) .add(colName, VALUE) .build(); - mutation.withMutationId(MutationTrackingService.instance.nextMutationId(cfs.metadata().keyspace, mutation.key().getToken())) + mutation.withMutationId(MutationTrackingService.instance().nextMutationId(cfs.metadata().keyspace, mutation.key().getToken())) .applyUnsafe(); } diff --git a/test/unit/org/apache/cassandra/db/CoordinatorLogOffsetsLifecycleTest.java b/test/unit/org/apache/cassandra/db/CoordinatorLogOffsetsLifecycleTest.java index b19cc1190eaf..cdb1f433e5bc 100644 --- a/test/unit/org/apache/cassandra/db/CoordinatorLogOffsetsLifecycleTest.java +++ b/test/unit/org/apache/cassandra/db/CoordinatorLogOffsetsLifecycleTest.java @@ -96,7 +96,7 @@ public static Collection memtableConfigs() private static Mutation createMutation(TableMetadata tableMetadata, int k, int v) { DecoratedKey key = tableMetadata.partitioner.decorateKey(ByteBufferUtil.bytes(k)); - MutationId mutationId = MutationTrackingService.instance.nextMutationId(tableMetadata.keyspace, key.getToken()); + MutationId mutationId = MutationTrackingService.instance().nextMutationId(tableMetadata.keyspace, key.getToken()); SimpleBuilders.MutationBuilder builder = new SimpleBuilders.MutationBuilder(mutationId, tableMetadata.keyspace, key); PartitionUpdate.SimpleBuilder partition = builder.update(tableMetadata); partition.row().add("v", v); @@ -185,7 +185,7 @@ public void mutationIdLifecycleTest() // flush 1 { - MutationTrackingService.instance.persistLogStateForTesting(); + MutationTrackingService.instance().persistLogStateForTesting(); cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); View view = cfs.getTracker().getView(); @@ -219,7 +219,7 @@ public void mutationIdLifecycleTest() // flush 2 { - MutationTrackingService.instance.persistLogStateForTesting(); + MutationTrackingService.instance().persistLogStateForTesting(); cfs.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); View view = cfs.getTracker().getView(); diff --git a/test/unit/org/apache/cassandra/db/MutationTrackingMutationVerbHandlerMigrationTest.java b/test/unit/org/apache/cassandra/db/MutationTrackingMutationVerbHandlerMigrationTest.java index 025e0e9031d6..47d9e81fb49d 100644 --- a/test/unit/org/apache/cassandra/db/MutationTrackingMutationVerbHandlerMigrationTest.java +++ b/test/unit/org/apache/cassandra/db/MutationTrackingMutationVerbHandlerMigrationTest.java @@ -75,7 +75,7 @@ public class MutationTrackingMutationVerbHandlerMigrationTest public static void init() throws Exception { ServerTestUtils.prepareServerNoRegister(); - MutationJournal.instance.start(); + MutationJournal.start(); TableMetadata trackedTable = TableMetadata.builder(TRACKED_KEYSPACE, TABLE) .addPartitionKeyColumn("pk", Int32Type.instance) @@ -103,7 +103,6 @@ public static void init() throws Exception ServerTestUtils.markCMS(); StorageService.instance.unsafeSetInitialized(); - DatabaseDescriptor.setMutationTrackingEnabled(true); } @Before diff --git a/test/unit/org/apache/cassandra/db/MutationVerbHandlerOutOfRangeTest.java b/test/unit/org/apache/cassandra/db/MutationVerbHandlerOutOfRangeTest.java index b4b5cabe9bc5..9e0dd5d99ab3 100644 --- a/test/unit/org/apache/cassandra/db/MutationVerbHandlerOutOfRangeTest.java +++ b/test/unit/org/apache/cassandra/db/MutationVerbHandlerOutOfRangeTest.java @@ -77,11 +77,10 @@ public class MutationVerbHandlerOutOfRangeTest public static void init() throws Exception { ServerTestUtils.prepareServerNoRegister(); - MutationJournal.instance.start(); + MutationJournal.start(); SchemaLoader.schemaDefinition(TEST_NAME); ServerTestUtils.markCMS(); StorageService.instance.unsafeSetInitialized(); - org.apache.cassandra.config.DatabaseDescriptor.setMutationTrackingEnabled(true); } @Before diff --git a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerOutOfRangeTest.java b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerOutOfRangeTest.java index 2429a2fa9f40..a1f11fc7cb07 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerOutOfRangeTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerOutOfRangeTest.java @@ -76,7 +76,7 @@ public class ReadCommandVerbHandlerOutOfRangeTest public static void init() throws Throwable { ServerTestUtils.prepareServerNoRegister(); - MutationJournal.instance.start(); + MutationJournal.start(); SchemaLoader.schemaDefinition(TEST_NAME); ServerTestUtils.markCMS(); metadata_nonreplicated = Schema.instance.getTableMetadata(KEYSPACE_NONREPLICATED, TABLE); diff --git a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java index d2f3e52957f4..4b6c9a2605d7 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java @@ -73,7 +73,7 @@ public class ReadCommandVerbHandlerTest public static void init() throws Throwable { ServerTestUtils.prepareServerNoRegister(); - MutationJournal.instance.start(); + MutationJournal.start(); SchemaLoader.schemaDefinition(TEST_NAME); metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE); metadata_with_transient = Schema.instance.getTableMetadata(KEYSPACE_WITH_TRANSIENT, TABLE); diff --git a/test/unit/org/apache/cassandra/db/virtual/MutationJournalTableTest.java b/test/unit/org/apache/cassandra/db/virtual/MutationJournalTableTest.java index 42f40fe7e743..cbf7aeaed700 100644 --- a/test/unit/org/apache/cassandra/db/virtual/MutationJournalTableTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/MutationJournalTableTest.java @@ -57,7 +57,7 @@ public void setUp() public void testSelectAll() throws Throwable { // Start the mutation journal - MutationJournal.instance.start(); + MutationJournal.start(); // Write data to trigger journal writes for (int i = 0; i < 100; i++) diff --git a/test/unit/org/apache/cassandra/db/virtual/MutationTrackingShardsTableTest.java b/test/unit/org/apache/cassandra/db/virtual/MutationTrackingShardsTableTest.java index 221daa235371..2d20a1f1d5e9 100644 --- a/test/unit/org/apache/cassandra/db/virtual/MutationTrackingShardsTableTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/MutationTrackingShardsTableTest.java @@ -56,8 +56,8 @@ public static void setUpClass() public void setUp() { // Start required services for mutation tracking - MutationJournal.instance.start(); - MutationTrackingService.instance.start(ClusterMetadata.current()); + MutationJournal.start(); + MutationTrackingService.start(ClusterMetadata.current()); // Create a tracked keyspace schemaChange("CREATE KEYSPACE IF NOT EXISTS tracked_ks WITH replication = " + @@ -73,7 +73,7 @@ public void setUp() public void tearDown() throws InterruptedException { // Shutdown the service to prevent test hanging - MutationTrackingService.instance.shutdownBlocking(); + MutationTrackingService.shutdown(); } @Test diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java index 1eb89254d14f..001aa61ac799 100644 --- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java +++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java @@ -93,7 +93,7 @@ public static void setup() throws ConfigurationException oldPartitioner = StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance); ServerTestUtils.prepareServerNoRegister(); SchemaLoader.startGossiper(); - MutationJournal.instance.start(); + MutationJournal.start(); SchemaLoader.schemaDefinition("BootStrapperTest"); RangeStreamer.ALIVE_PREDICATE = Predicates.alwaysTrue(); ServerTestUtils.markCMS(); diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java index f855d6243e21..3301578b7768 100644 --- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java @@ -82,7 +82,6 @@ public class SimpleStrategyTest extends CassandraTestBase @Before public void defineSchema() { - DatabaseDescriptor.setMutationTrackingEnabled(true); DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); recreateCMS(); SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1)); diff --git a/test/unit/org/apache/cassandra/repair/RepairMessageVerbHandlerOutOfRangeTest.java b/test/unit/org/apache/cassandra/repair/RepairMessageVerbHandlerOutOfRangeTest.java index 86a02aae968c..2d8041a171ef 100644 --- a/test/unit/org/apache/cassandra/repair/RepairMessageVerbHandlerOutOfRangeTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairMessageVerbHandlerOutOfRangeTest.java @@ -85,7 +85,7 @@ public static void init() throws Exception SchemaLoader.loadSchema(); DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); ServerTestUtils.recreateCMS(); - MutationJournal.instance.start(); + MutationJournal.start(); SchemaLoader.schemaDefinition(TEST_NAME); ClusterMetadataTestHelper.register(broadcastAddress); ServerTestUtils.markCMS(); diff --git a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java index f2d6387bfb12..026d5394fdbc 100644 --- a/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java +++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogOffsetsTest.java @@ -246,7 +246,7 @@ public Args(List ids, int contentions) public void reconciledBounds() throws InterruptedException, ExecutionException { DatabaseDescriptor.daemonInitialization(); DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); - MutationJournal.instance.start(); + MutationJournal.start(); String ks = "ks"; String tbl = "tbl"; @@ -274,7 +274,7 @@ public void reconciledBounds() throws InterruptedException, ExecutionException { ClusterMetadataTestHelper.commit(new AlterSchema(SchemaTransformations.addTable(tableMetadata, false))); CommitLog.instance.start(); - MutationTrackingService.instance.start(metadata); + MutationTrackingService.start(metadata); // Eventually, will also run perturbations before checking isReconciled (like log truncation, durability, etc.) // to ensure that we don't prune data required to check what's been reconciled @@ -282,45 +282,45 @@ public void reconciledBounds() throws InterruptedException, ExecutionException { // Applied at all replicas { Mutation mutation = MutationTrackingUtils.createMutation(tableMetadata, 1, 1); - MutationTrackingService.instance.startWriting(mutation); + MutationTrackingService.instance().startWriting(mutation); - MutationTrackingService.instance.finishWriting(mutation); - MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr2); - MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr3); - MutationTrackingService.instance.persistLogStateForTesting(); + MutationTrackingService.instance().finishWriting(mutation); + MutationTrackingService.instance().receivedWriteResponse(mutation.id(), addr2); + MutationTrackingService.instance().receivedWriteResponse(mutation.id(), addr3); + MutationTrackingService.instance().persistLogStateForTesting(); ImmutableCoordinatorLogOffsets logOffsets = new ImmutableCoordinatorLogOffsets.Builder() .add(mutation.id()) .build(); Range range = getShardRange(mutation); List offsets = Collections.singletonList(logOffsets.mutations().offsets(mutation.id().logId())); - MutationTrackingService.instance.updateReplicatedOffsets(ks, range, offsets, true, addr2); - MutationTrackingService.instance.updateReplicatedOffsets(ks, range, offsets, true, addr3); + MutationTrackingService.instance().updateReplicatedOffsets(ks, range, offsets, true, addr2); + MutationTrackingService.instance().updateReplicatedOffsets(ks, range, offsets, true, addr3); - Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isTrue(); + Assertions.assertThat(MutationTrackingService.instance().isDurablyReconciled(logOffsets)).isTrue(); } // Applied locally but not on remote replicas { Mutation mutation = MutationTrackingUtils.createMutation(tableMetadata, 2, 2); - MutationTrackingService.instance.startWriting(mutation); - MutationTrackingService.instance.finishWriting(mutation); - MutationTrackingService.instance.persistLogStateForTesting(); + MutationTrackingService.instance().startWriting(mutation); + MutationTrackingService.instance().finishWriting(mutation); + MutationTrackingService.instance().persistLogStateForTesting(); ImmutableCoordinatorLogOffsets logOffsets = new ImmutableCoordinatorLogOffsets.Builder() .add(mutation.id()) .build(); - Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isFalse(); + Assertions.assertThat(MutationTrackingService.instance().isDurablyReconciled(logOffsets)).isFalse(); } // Applied on remote replicas but not locally { Mutation mutation = MutationTrackingUtils.createMutation(tableMetadata, 3, 3); - MutationTrackingService.instance.startWriting(mutation); + MutationTrackingService.instance().startWriting(mutation); - MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr2); - MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr3); - MutationTrackingService.instance.persistLogStateForTesting(); + MutationTrackingService.instance().receivedWriteResponse(mutation.id(), addr2); + MutationTrackingService.instance().receivedWriteResponse(mutation.id(), addr3); + MutationTrackingService.instance().persistLogStateForTesting(); ImmutableCoordinatorLogOffsets logOffsets = new ImmutableCoordinatorLogOffsets.Builder() .add(mutation.id()) @@ -328,21 +328,21 @@ public void reconciledBounds() throws InterruptedException, ExecutionException { Range range = getShardRange(mutation); List offsets = Collections.singletonList(logOffsets.mutations().offsets(mutation.id().logId())); - MutationTrackingService.instance.updateReplicatedOffsets(ks, range, offsets, true, addr2); - MutationTrackingService.instance.updateReplicatedOffsets(ks, range, offsets, true, addr3); + MutationTrackingService.instance().updateReplicatedOffsets(ks, range, offsets, true, addr2); + MutationTrackingService.instance().updateReplicatedOffsets(ks, range, offsets, true, addr3); - Assertions.assertThat(MutationTrackingService.instance.isDurablyReconciled(logOffsets)).isFalse(); + Assertions.assertThat(MutationTrackingService.instance().isDurablyReconciled(logOffsets)).isFalse(); } // If no replicas are aware of a log, it should be considered unreconciled out of caution { Mutation mutation = MutationTrackingUtils.createMutation(tableMetadata, 4, 4); - MutationTrackingService.instance.startWriting(mutation); + MutationTrackingService.instance().startWriting(mutation); - MutationTrackingService.instance.finishWriting(mutation); - MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr2); - MutationTrackingService.instance.receivedWriteResponse(mutation.id(), addr3); - MutationTrackingService.instance.persistLogStateForTesting(); + MutationTrackingService.instance().finishWriting(mutation); + MutationTrackingService.instance().receivedWriteResponse(mutation.id(), addr2); + MutationTrackingService.instance().receivedWriteResponse(mutation.id(), addr3); + MutationTrackingService.instance().persistLogStateForTesting(); MutationId fakeMutationId = new MutationId(CoordinatorLogId.asLong(111, 222), MutationId.sequenceId(333, 444)); Assertions.assertThat(metadata.directory.version(new NodeId(fakeMutationId.hostId()))).isNull(); @@ -352,19 +352,19 @@ public void reconciledBounds() throws InterruptedException, ExecutionException { ImmutableCoordinatorLogOffsets.Builder logOffsetsBuilder = new ImmutableCoordinatorLogOffsets.Builder(); logOffsetsBuilder.add(fakeMutationId); - Assertions.assertThatThrownBy(() -> MutationTrackingService.instance.isDurablyReconciled(logOffsetsBuilder.build())) + Assertions.assertThatThrownBy(() -> MutationTrackingService.instance().isDurablyReconciled(logOffsetsBuilder.build())) .hasSameClassAs(new IllegalStateException()) .hasMessageMatching("Could not find shard for logId \\d+"); } - MutationTrackingService.instance.shutdownBlocking(); + MutationTrackingService.shutdown(); CommitLog.instance.stopUnsafe(true); } private Range getShardRange(Mutation mutation) { Map> ksRanges = new HashMap<>(); - MutationTrackingService.instance.forEachKeyspace(shards -> { + MutationTrackingService.instance().forEachKeyspace(shards -> { Shard shard = shards.lookUp(mutation); ksRanges.put(shard.keyspace, shard.range); }); diff --git a/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java b/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java index 7da22a6c1fa2..d58faa88316b 100644 --- a/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java +++ b/test/unit/org/apache/cassandra/replication/CoordinatorLogTest.java @@ -68,7 +68,7 @@ public static void setUp() throws IOException .addClusteringColumn("ck", UTF8Type.instance) .addRegularColumn("value", UTF8Type.instance) .build()); - MutationJournal.instance.start(); + MutationJournal.start(); } private static Token tk(String key) @@ -176,8 +176,8 @@ private void testPersistAndLoadRoundtrip(CoordinatorLogId logId) Mutation mutation2 = createMutation(new MutationId(logId.asLong(), MutationId.sequenceId(2, 0))); unreconciled.addDirectly(mutation1); unreconciled.addDirectly(mutation2); - MutationJournal.instance.write(mutation1.id(), mutation1); - MutationJournal.instance.write(mutation2.id(), mutation2); + MutationJournal.instance().write(mutation1.id(), mutation1); + MutationJournal.instance().write(mutation2.id(), mutation2); CoordinatorLog log = CoordinatorLog.recreate(KEYSPACE, range, LOCAL_HOST_ID, logId, PARTICIPANTS, witnessed, witnessed, unreconciled); diff --git a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java index 72017bc8a1e0..60c4e39f539d 100644 --- a/test/unit/org/apache/cassandra/replication/MutationJournalTest.java +++ b/test/unit/org/apache/cassandra/replication/MutationJournalTest.java @@ -74,7 +74,7 @@ public static void setUp() throws IOException directory.deleteRecursiveOnExit(); journal = new MutationJournal(directory, TestParams.MUTATION_JOURNAL); - journal.start(); + journal.startInternal(); } @AfterClass diff --git a/test/unit/org/apache/cassandra/replication/ShardTest.java b/test/unit/org/apache/cassandra/replication/ShardTest.java index d8c62554f178..37b1ea819399 100644 --- a/test/unit/org/apache/cassandra/replication/ShardTest.java +++ b/test/unit/org/apache/cassandra/replication/ShardTest.java @@ -56,7 +56,7 @@ public static void setUp() throws IOException .addClusteringColumn("ck", UTF8Type.instance) .addRegularColumn("value", UTF8Type.instance) .build()); - MutationJournal.instance.start(); + MutationJournal.start(); } private static Token tk(String key) diff --git a/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java b/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java index 281c8da07944..d283eadede01 100644 --- a/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java +++ b/test/unit/org/apache/cassandra/replication/UnreconciledMutationsTest.java @@ -422,7 +422,7 @@ public void testSingleTokenCollectionVsFullRange() @Test public void testLoadFromJournal() { - MutationJournal.instance.start(); + MutationJournal.start(); CoordinatorLogId logId = new CoordinatorLogId(1, 1); @@ -438,8 +438,8 @@ public void testLoadFromJournal() Mutation mutation6 = createMutation(6, 6, 6); Mutation mutation7 = createMutation(7, 7, 7); - MutationJournal.instance.write(mutation6.id(), mutation6); - MutationJournal.instance.write(mutation7.id(), mutation7); + MutationJournal.instance().write(mutation6.id(), mutation6); + MutationJournal.instance().write(mutation7.id(), mutation7); Offsets.Mutable loadedOffsets = new Offsets.Mutable(logId); UnreconciledMutations unreconciled = UnreconciledMutations.loadFromJournal(witnessed, 1); diff --git a/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java b/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java index 46fd20c69f6f..ec37b5c40c12 100644 --- a/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java +++ b/test/unit/org/apache/cassandra/service/BootstrapTransientTest.java @@ -95,7 +95,6 @@ public static void setUpClass() throws Exception { DatabaseDescriptor.daemonInitialization(); DatabaseDescriptor.setPartitionerUnsafe(OrderPreservingPartitioner.instance); - DatabaseDescriptor.setMutationTrackingEnabled(true); DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); address02 = InetAddressAndPort.getByName("127.0.0.2"); address03 = InetAddressAndPort.getByName("127.0.0.3"); diff --git a/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java b/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java index 55b5e8dce858..92643d3a6871 100644 --- a/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java +++ b/test/unit/org/apache/cassandra/service/JoinTokenRingTest.java @@ -46,7 +46,7 @@ public static void setup() throws ConfigurationException DatabaseDescriptor.daemonInitialization(); ServerTestUtils.prepareServerNoRegister(); SchemaLoader.startGossiper(); - MutationJournal.instance.start(); + MutationJournal.start(); SchemaLoader.schemaDefinition("JoinTokenRingTest"); } diff --git a/test/unit/org/apache/cassandra/service/PaxosStateTest.java b/test/unit/org/apache/cassandra/service/PaxosStateTest.java index f9c089c43764..d2407e925526 100644 --- a/test/unit/org/apache/cassandra/service/PaxosStateTest.java +++ b/test/unit/org/apache/cassandra/service/PaxosStateTest.java @@ -64,7 +64,7 @@ public class PaxosStateTest public static void setUpClass() throws Throwable { SchemaLoader.loadSchema(); - MutationJournal.instance.start(); + MutationJournal.start(); SchemaLoader.schemaDefinition("PaxosStateTest"); } diff --git a/test/unit/org/apache/cassandra/service/StorageServiceTest.java b/test/unit/org/apache/cassandra/service/StorageServiceTest.java index 4616df8b5b10..deb31d729c65 100644 --- a/test/unit/org/apache/cassandra/service/StorageServiceTest.java +++ b/test/unit/org/apache/cassandra/service/StorageServiceTest.java @@ -75,7 +75,7 @@ public static void setUpClass() throws Exception SimpleLocationProvider.LOCATION, NodeVersion.CURRENT)); CommitLog.instance.start(); - MutationJournal.instance.start(); + MutationJournal.start(); } @Before diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java index 93d369762f87..1fd1925b2eef 100644 --- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java +++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java @@ -89,8 +89,7 @@ public class WriteResponseHandlerTransientTest public static void setupClass() throws Throwable { SchemaLoader.loadSchema(); - MutationJournal.instance.start(); - DatabaseDescriptor.setMutationTrackingEnabled(true); + MutationJournal.start(); DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); diff --git a/test/unit/org/apache/cassandra/service/paxos/PaxosStateTest.java b/test/unit/org/apache/cassandra/service/paxos/PaxosStateTest.java index a7da95fd2f00..cc5e4dc790a0 100644 --- a/test/unit/org/apache/cassandra/service/paxos/PaxosStateTest.java +++ b/test/unit/org/apache/cassandra/service/paxos/PaxosStateTest.java @@ -57,7 +57,7 @@ public class PaxosStateTest public static void setUpClass() throws Throwable { SchemaLoader.loadSchema(); - MutationJournal.instance.start(); + MutationJournal.start(); SchemaLoader.schemaDefinition("PaxosStateTest"); metadata = Keyspace.open("PaxosStateTestKeyspace1").getColumnFamilyStore("Standard1").metadata.get(); metadata.withSwapped(metadata.params.unbuild().gcGraceSeconds(3600).build()); diff --git a/test/unit/org/apache/cassandra/service/paxos/PaxosVerbHandlerOutOfRangeTest.java b/test/unit/org/apache/cassandra/service/paxos/PaxosVerbHandlerOutOfRangeTest.java index a11f20a74387..ea3640f257fe 100644 --- a/test/unit/org/apache/cassandra/service/paxos/PaxosVerbHandlerOutOfRangeTest.java +++ b/test/unit/org/apache/cassandra/service/paxos/PaxosVerbHandlerOutOfRangeTest.java @@ -76,7 +76,7 @@ public class PaxosVerbHandlerOutOfRangeTest // PaxosV1 out of range tests - V2 i public static void init() throws Exception { ServerTestUtils.prepareServerNoRegister(); - MutationJournal.instance.start(); + MutationJournal.start(); SchemaLoader.schemaDefinition(TEST_NAME); ServerTestUtils.markCMS(); StorageService.instance.unsafeSetInitialized(); diff --git a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java index 4e04b3e659e0..d091c80aa8dc 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java @@ -62,7 +62,7 @@ public class RepairedDataVerifierTest public static void init() { SchemaLoader.loadSchema(); - MutationJournal.instance.start(); + MutationJournal.start(); SchemaLoader.schemaDefinition(TEST_NAME); DatabaseDescriptor.reportUnconfirmedRepairedDataMismatches(true); } diff --git a/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java b/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java index 9fa30bef3abe..275c1de300ea 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamReaderTest.java @@ -96,7 +96,7 @@ public static void setupClass() throws Exception DatabaseDescriptor.daemonInitialization(); DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); ServerTestUtils.prepareServerNoRegister(); - MutationJournal.instance.start(); + MutationJournal.start(); SchemaLoader.schemaDefinition(TEST_NAME); ServerTestUtils.markCMS(); } diff --git a/test/unit/org/apache/cassandra/streaming/StreamSessionOwnedRangesTest.java b/test/unit/org/apache/cassandra/streaming/StreamSessionOwnedRangesTest.java index 57ff0276b62d..c9f0559bec0f 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamSessionOwnedRangesTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamSessionOwnedRangesTest.java @@ -74,7 +74,7 @@ public static void setupClass() throws Exception SchemaLoader.loadSchema(); DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance); ServerTestUtils.recreateCMS(); - MutationJournal.instance.start(); + MutationJournal.start(); SchemaLoader.schemaDefinition(TEST_NAME); ClusterMetadataTestHelper.register(broadcastAddress); ServerTestUtils.markCMS(); diff --git a/test/unit/org/apache/cassandra/tcm/transformations/AlterSchemaMutationTrackingTest.java b/test/unit/org/apache/cassandra/tcm/transformations/AlterSchemaMutationTrackingTest.java index 12bd501a866c..a82b516d7714 100644 --- a/test/unit/org/apache/cassandra/tcm/transformations/AlterSchemaMutationTrackingTest.java +++ b/test/unit/org/apache/cassandra/tcm/transformations/AlterSchemaMutationTrackingTest.java @@ -59,7 +59,7 @@ public static void setUpClass() throws Exception CassandraRelevantProperties.PARTITIONER.setString(Murmur3Partitioner.class.getName()); ServerTestUtils.daemonInitialization(); ServerTestUtils.prepareServer(); - MutationJournal.instance.start(); + MutationJournal.start(); partitioner = (Murmur3Partitioner) DatabaseDescriptor.getPartitioner(); }