Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ public enum CassandraRelevantProperties
DTEST_IS_IN_JVM_DTEST("org.apache.cassandra.dtest.is_in_jvm_dtest"),
/** In_JVM dtest property indicating that the test should use "latest" configuration */
DTEST_JVM_DTESTS_USE_LATEST("jvm_dtests.latest"),
DTEST_MUTATION_TRACKING_ENABLED("jvm_dtest.mutation_tracking.enabled", "true"),
ENABLE_DC_LOCAL_COMMIT("cassandra.enable_dc_local_commit", "true"),
/**
* Whether {@link org.apache.cassandra.db.ConsistencyLevel#NODE_LOCAL} should be allowed.
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ public static class SSTableConfig

public boolean dynamic_data_masking_enabled = false;

public boolean mutation_tracking_enabled = false;
public MutationTrackingSpec mutation_tracking = new MutationTrackingSpec();

/**
* Time in milliseconds after a warning will be emitted to the log and to the client that a UDF runs too long.
Expand Down
19 changes: 12 additions & 7 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,11 @@ else if (conf.repair_session_space.toMebibytes() > (int) (Runtime.getRuntime().m
if (commitLogWriteDiskAccessMode != conf.commitlog_disk_access_mode)
logger.info("commitlog_disk_access_mode resolved to: {}", commitLogWriteDiskAccessMode);

if (conf.mutation_tracking.journal_directory == null)
{
conf.mutation_tracking.journal_directory = storagedirFor("mutation_journal");
}

if (conf.accord.journal_directory == null)
{
conf.accord.journal_directory = storagedirFor("accord_journal");
Expand Down Expand Up @@ -2222,6 +2227,10 @@ public static void createAllDirectories()
throw new ConfigurationException("commitlog_directory must be specified", false);
FileUtils.createDirectory(conf.commitlog_directory);

if (conf.mutation_tracking.journal_directory == null)
throw new ConfigurationException("mutation_tracking.journal_directory must be specified", false);
FileUtils.createDirectory(conf.mutation_tracking.journal_directory);

if (conf.accord.journal_directory == null)
throw new ConfigurationException("accord.journal_directory must be specified", false);
FileUtils.createDirectory(conf.accord.journal_directory);
Expand Down Expand Up @@ -5766,16 +5775,12 @@ public static void setDynamicDataMaskingEnabled(boolean enabled)

public static boolean getMutationTrackingEnabled()
{
return conf.mutation_tracking_enabled;
return conf.mutation_tracking.enabled;
}

public static void setMutationTrackingEnabled(boolean enabled)
public static String getMutationTrackingJournalDirectory()
{
if (enabled != conf.mutation_tracking_enabled)
{
logger.info("Setting mutation_tracking_enabled to {}", enabled);
conf.mutation_tracking_enabled = enabled;
}
return conf.mutation_tracking.journal_directory;
}

public static OptionalDouble getSeverityDuringDecommission()
Expand Down
25 changes: 25 additions & 0 deletions src/java/org/apache/cassandra/config/MutationTrackingSpec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.config;

public class MutationTrackingSpec
{
public boolean enabled = false;
public String journal_directory;
}
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ public ResultMessage executeInternalWithoutCondition(QueryState queryState, Quer
{
String keyspaceName = mutation.getKeyspaceName();
Token token = mutation.key().getToken();
MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token);
MutationId id = MutationTrackingService.instance().nextMutationId(keyspaceName, token);
mutation = mutation.withMutationId(id);
mutation.apply();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ public Keyspaces apply(ClusterMetadata metadata)
if (attrs.getReplicationStrategyClass() != null && attrs.getReplicationStrategyClass().equals(SimpleStrategy.class.getSimpleName()))
Guardrails.simpleStrategyEnabled.ensureEnabled(state);

if (newKeyspace.params.replicationType.isTracked() && !keyspace.params.replicationType.isTracked())
{
if (SchemaConstants.isSystemKeyspace(keyspaceName))
throw ire("Mutation tracking is not supported on system keyspaces");

if (!DatabaseDescriptor.getMutationTrackingEnabled())
throw ire("Mutation tracking is disabled. Enable in cassandra.yaml to use.");
}

if (keyspace.params.replication.isMeta() && !keyspace.name.equals(SchemaConstants.METADATA_KEYSPACE_NAME))
throw ire("Can not alter a keyspace to use MetaReplicationStrategy");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.cassandra.auth.FunctionResource;
import org.apache.cassandra.auth.IResource;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.exceptions.AlreadyExistsException;
Expand Down Expand Up @@ -83,6 +84,17 @@ public Keyspaces apply(ClusterMetadata metadata)

KeyspaceMetadata keyspaceMetadata = KeyspaceMetadata.create(keyspaceName, attrs.asNewKeyspaceParams());

if (keyspaceMetadata.params.replicationType.isTracked())
{
if (SchemaConstants.isSystemKeyspace(keyspaceName))
throw ire("Mutation tracking is not supported on system keyspaces");

if (!DatabaseDescriptor.getMutationTrackingEnabled())
{
throw ire("Mutation tracking is disabled. Enable in cassandra.yaml to use.");
}
}

if (keyspaceMetadata.params.replication.klass.equals(LocalStrategy.class))
throw ire("Unable to use given strategy class: LocalStrategy is reserved for internal use.");

Expand Down
8 changes: 4 additions & 4 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ public ColumnFamilyStore(Keyspace keyspace,
{
CommitLogPosition commitLogPosition;
if (metadata().replicationType().isTracked())
commitLogPosition = MutationJournal.instance.getCurrentPosition();
commitLogPosition = MutationJournal.instance().getCurrentPosition();
else
commitLogPosition = CommitLog.instance.getCurrentPosition();
initialMemtable = createMemtable(new AtomicReference<>(commitLogPosition));
Expand Down Expand Up @@ -1160,7 +1160,7 @@ public CommitLogPosition call()
commitLogUpperBound = mainMemtable.getFinalCommitLogUpperBound();
TableMetadata metadata = metadata();

MutationJournal.instance.notifyFlushed(metadata.id, commitLogLowerBound, commitLogUpperBound);
MutationJournal.instance().notifyFlushed(metadata.id, commitLogLowerBound, commitLogUpperBound);
CommitLog.instance.discardCompletedSegments(metadata.id, commitLogLowerBound, commitLogUpperBound);
}

Expand Down Expand Up @@ -1439,7 +1439,7 @@ private static void setCommitLogUpperBound(AtomicReference<CommitLogPosition> co
{
CommitLogPosition commitLogPosition;
if (useMutationJournal)
commitLogPosition = MutationJournal.instance.getCurrentPosition();
commitLogPosition = MutationJournal.instance().getCurrentPosition();
else
commitLogPosition = CommitLog.instance.getCurrentPosition();

Expand Down Expand Up @@ -3290,7 +3290,7 @@ void onTableDropped()

// TODO (required): test mutation tracking + table dropping
if (metadata().replicationType().isTracked())
MutationJournal.instance.notifyFlushed(metadata.id, new CommitLogPosition(0, 0), MutationJournal.instance.getCurrentPosition());
MutationJournal.instance().notifyFlushed(metadata.id, new CommitLogPosition(0, 0), MutationJournal.instance().getCurrentPosition());
else
CommitLog.instance.forceRecycleAllSegments(Collections.singleton(metadata.id));

Expand Down
5 changes: 3 additions & 2 deletions src/java/org/apache/cassandra/db/Keyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,7 @@ else if (isDeferrable)
*/
private Future<?> applyInternalTracked(Mutation mutation, Promise<?> future)
{
MutationTrackingService.ensureEnabled();
Preconditions.checkState(MigrationRouter.isFullyTracked(mutation) && !mutation.id().isNone());
ClusterMetadata cm = ClusterMetadata.current();

Expand All @@ -623,7 +624,7 @@ private Future<?> applyInternalTracked(Mutation mutation, Promise<?> future)
boolean started;
try (WriteContext ctx = trackedWriteHandler.beginWrite(mutation, true))
{
started = MutationTrackingService.instance.startWriting(mutation);
started = MutationTrackingService.instance().startWriting(mutation);

if (started)
{
Expand Down Expand Up @@ -673,7 +674,7 @@ private Future<?> applyInternalTracked(Mutation mutation, Promise<?> future)
}

if (started)
MutationTrackingService.instance.finishWriting(mutation);
MutationTrackingService.instance().finishWriting(mutation);


if (future != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ protected PartialTrackedRead createInProgressRead(UnfilteredPartitionIterator it
@Override
protected MutationSummary createMutationSummaryInternal(boolean includePending)
{
return MutationTrackingService.instance.createSummaryForRange(dataRange.keyRange, metadata().id, includePending);
return MutationTrackingService.instance().createSummaryForRange(dataRange.keyRange, metadata().id, includePending);
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/SSTableImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ private static class TrackedBulkTransfer
{
private static void execute(String keyspace, Set<SSTableReader> sstables)
{
MutationTrackingService.instance.executeTransfers(keyspace, sstables, ConsistencyLevel.ALL);
MutationTrackingService.instance().executeTransfers(keyspace, sstables, ConsistencyLevel.ALL);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ReadableView view, Co

protected MutationSummary createMutationSummaryInternal(boolean includePending)
{
return MutationTrackingService.instance.createSummaryForKey(partitionKey, metadata().id, includePending);
return MutationTrackingService.instance().createSummaryForKey(partitionKey, metadata().id, includePending);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public static ImmutableCoordinatorLogOffsets getCoordinatorLogOffsets(Set<SSTabl
ImmutableCoordinatorLogOffsets.Builder builder = new ImmutableCoordinatorLogOffsets.Builder();
for (SSTableReader sstable : sstables)
builder.addAll(sstable.getCoordinatorLogOffsets());
builder.purgeTransfers(MutationTrackingService.instance::isDurablyReconciled);
builder.purgeTransfers(id -> MutationTrackingService.instance().isDurablyReconciled(id));
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public synchronized void received(IncomingStream stream)
{
Preconditions.checkState(cfs.metadata().replicationType().isTracked());
PendingLocalTransfer transfer = new PendingLocalTransfer(cfs.metadata().id, session.planId(), sstables);
MutationTrackingService.instance.received(transfer);
MutationTrackingService.instance().received(transfer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws Re
MigrationRouter.validateTrackedMutation(mutation);

Tracing.trace("Appending to mutation journal");
CommitLogPosition pointer = MutationJournal.instance.write(mutation.id(), mutation);
CommitLogPosition pointer = MutationJournal.instance().write(mutation.id(), mutation);

return new CassandraWriteContext(group, pointer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public DataSet data()
{
SimpleDataSet result = new SimpleDataSet(metadata());

for (Segment<ShortMutationId, Mutation> segment : MutationJournal.instance.getAllSegments())
for (Segment<ShortMutationId, Mutation> segment : MutationJournal.instance().getAllSegments())
{
result.row(segment.id())
.column(IS_ACTIVE, segment instanceof ActiveSegment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public DataSet data()
{
SimpleDataSet result = new SimpleDataSet(metadata());

for (Shard shard : MutationTrackingService.instance.getShards())
for (Shard shard : MutationTrackingService.instance().getShards())
{
addShardRows(shard, result);
}
Expand All @@ -96,7 +96,7 @@ public DataSet data(DecoratedKey key)
String keyspaceName = UTF8Type.instance.compose(key.getKey());
SimpleDataSet result = new SimpleDataSet(metadata());

for (Shard shard : MutationTrackingService.instance.getShards())
for (Shard shard : MutationTrackingService.instance().getShards())
{
Shard.DebugInfo debugInfo = shard.getDebugInfo();
if (!debugInfo.keyspace.equals(keyspaceName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ protected Map<MetadataType, MetadataComponent> finalizeMetadata()
if (metadata().replicationType().isTracked() && repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
{
Preconditions.checkState(Objects.equals(pendingRepair, ActiveRepairService.NO_PENDING_REPAIR));
if (MutationTrackingService.instance.isDurablyReconciled(coordinatorLogOffsets))
if (MutationTrackingService.instance().isDurablyReconciled(coordinatorLogOffsets))
{
repairedAt = Clock.Global.currentTimeMillis();
logger.debug("Marking SSTable {} as reconciled with repairedAt {}", descriptor, repairedAt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ private MutationTrackingMetrics()
readSummarySize = Metrics.histogram(factory.createMetricName("ReadSummarySize"), true);
unreconciledMutationCount = Metrics.register(
factory.createMetricName("UnreconciledMutationCount"),
() -> MutationTrackingService.instance.getUnreconciledMutationCount()
() -> MutationTrackingService.instance().getUnreconciledMutationCount()
);
journalDiskSpaceUsed = Metrics.register(
factory.createMetricName("JournalDiskSpaceUsed"),
() -> MutationJournal.instance.getDiskSpaceUsed()
() -> MutationJournal.instance().getDiskSpaceUsed()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,21 @@ public boolean invokeOnFailure()
@Override
public void onResponse(Message<NoPayload> msg)
{
MutationTrackingService.instance.receivedWriteResponse(mutationId, toHost);
MutationTrackingService.instance().receivedWriteResponse(mutationId, toHost);
}

@Override
public void onFailure(InetAddressAndPort from, RequestFailure failureReason)
{
MutationTrackingService.instance.retryFailedWrite(mutationId, toHost, failureReason);
MutationTrackingService.instance().retryFailedWrite(mutationId, toHost, failureReason);
}

void send()
{
RecordPointer pointer = MutationJournal.instance.lookUp(mutationId);
RecordPointer pointer = MutationJournal.instance().lookUp(mutationId);
Preconditions.checkNotNull(pointer, "Mutation %s not found in the journal", mutationId);

MutationJournal.instance.read(pointer, (segment, position, key, buffer, version) -> {
MutationJournal.instance().read(pointer, (segment, position, key, buffer, version) -> {

// don't send mutations to nodes that have migrated to, or are in the process of migrating to untracked replication
try (DataInputBuffer in = new DataInputBuffer(buffer, true))
Expand Down Expand Up @@ -228,7 +228,7 @@ public boolean invokeOnFailure()
public void onResponse(Message<NoPayload> msg)
{
logger.debug("Received activation ack for TransferTask from {}", toHost);
MutationTrackingService.instance.receivedActivationResponse(transfer, toHost);
MutationTrackingService.instance().receivedActivationResponse(transfer, toHost);
}

@Override
Expand All @@ -240,7 +240,7 @@ public void onFailure(InetAddressAndPort from, RequestFailure failureReason)
public void onFailure(Throwable cause)
{
logger.debug("Received activation failure for TransferTask from {} due to", toHost, cause);
MutationTrackingService.instance.retryFailedTransfer(transfer, toHost, cause);
MutationTrackingService.instance().retryFailedTransfer(transfer, toHost, cause);
}

void send()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ public String toString()
}

public static final IVerbHandler<BroadcastLogOffsets> verbHandler = message -> {
MutationTrackingService.ensureEnabled();
BroadcastLogOffsets replicatedOffsets = message.payload;
MutationTrackingService.instance.updateReplicatedOffsets(replicatedOffsets.keyspace,
MutationTrackingService.instance().updateReplicatedOffsets(replicatedOffsets.keyspace,
replicatedOffsets.range,
replicatedOffsets.replicatedOffsets,
replicatedOffsets.durable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ public void onResponse(Message<Void> msg)
logger.debug("{} Activation successfully applied on {}", logPrefix(), msg.from());
CoordinatedTransfer.this.streamResults.computeIfPresent(msg.from(), (peer, result) -> result.committed());

MutationTrackingService.instance.receivedActivationResponse(CoordinatedTransfer.this, msg.from());
MutationTrackingService.instance().receivedActivationResponse(CoordinatedTransfer.this, msg.from());
responses.remove(msg.from());
if (responses.isEmpty())
{
Expand All @@ -465,7 +465,7 @@ public void onResponse(Message<Void> msg)
public void onFailure(InetAddressAndPort from, RequestFailure failure)
{
logger.error("{} Failed activation on {} due to {}", logPrefix(), from, failure);
MutationTrackingService.instance.retryFailedTransfer(CoordinatedTransfer.this, from, failure.failure);
MutationTrackingService.instance().retryFailedTransfer(CoordinatedTransfer.this, from, failure.failure);
// TODO(expected): should only fail if we don't meet requested CL
tryFailure(new RuntimeException("Tracked import failed during COMMIT on " + from + " due to " + failure.reason));
}
Expand Down
Loading