Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ public enum CassandraRelevantProperties
DTEST_IS_IN_JVM_DTEST("org.apache.cassandra.dtest.is_in_jvm_dtest"),
/** In_JVM dtest property indicating that the test should use "latest" configuration */
DTEST_JVM_DTESTS_USE_LATEST("jvm_dtests.latest"),
DTEST_MUTATION_TRACKING_ENABLED("jvm_dtest.mutation_tracking.enabled", "true"),
ENABLE_DC_LOCAL_COMMIT("cassandra.enable_dc_local_commit", "true"),
/**
* Whether {@link org.apache.cassandra.db.ConsistencyLevel#NODE_LOCAL} should be allowed.
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ public static class SSTableConfig

public boolean dynamic_data_masking_enabled = false;

public boolean mutation_tracking_enabled = false;
public MutationTrackingSpec mutation_tracking = new MutationTrackingSpec();

/**
* Time in milliseconds after a warning will be emitted to the log and to the client that a UDF runs too long.
Expand Down
19 changes: 12 additions & 7 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,11 @@ else if (conf.repair_session_space.toMebibytes() > (int) (Runtime.getRuntime().m
if (commitLogWriteDiskAccessMode != conf.commitlog_disk_access_mode)
logger.info("commitlog_disk_access_mode resolved to: {}", commitLogWriteDiskAccessMode);

if (conf.mutation_tracking.journal_directory == null)
{
conf.mutation_tracking.journal_directory = storagedirFor("mutation_journal");
}

if (conf.accord.journal_directory == null)
{
conf.accord.journal_directory = storagedirFor("accord_journal");
Expand Down Expand Up @@ -2222,6 +2227,10 @@ public static void createAllDirectories()
throw new ConfigurationException("commitlog_directory must be specified", false);
FileUtils.createDirectory(conf.commitlog_directory);

if (conf.mutation_tracking.journal_directory == null)
throw new ConfigurationException("mutation_tracking.journal_directory must be specified", false);
FileUtils.createDirectory(conf.mutation_tracking.journal_directory);

if (conf.accord.journal_directory == null)
throw new ConfigurationException("accord.journal_directory must be specified", false);
FileUtils.createDirectory(conf.accord.journal_directory);
Expand Down Expand Up @@ -5766,16 +5775,12 @@ public static void setDynamicDataMaskingEnabled(boolean enabled)

public static boolean getMutationTrackingEnabled()
{
return conf.mutation_tracking_enabled;
return conf.mutation_tracking.enabled;
}

public static void setMutationTrackingEnabled(boolean enabled)
public static String getMutationTrackingJournalDirectory()
{
if (enabled != conf.mutation_tracking_enabled)
{
logger.info("Setting mutation_tracking_enabled to {}", enabled);
conf.mutation_tracking_enabled = enabled;
}
return conf.mutation_tracking.journal_directory;
}

public static OptionalDouble getSeverityDuringDecommission()
Expand Down
25 changes: 25 additions & 0 deletions src/java/org/apache/cassandra/config/MutationTrackingSpec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.config;

public class MutationTrackingSpec
{
public boolean enabled = false;
public String journal_directory;
}
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ public ResultMessage executeInternalWithoutCondition(QueryState queryState, Quer

String keyspaceName = mutation.getKeyspaceName();
Token token = mutation.key().getToken();
MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token);
MutationId id = MutationTrackingService.instance().nextMutationId(keyspaceName, token);
mutation = mutation.withMutationId(id);
mutation.apply();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ public Keyspaces apply(ClusterMetadata metadata)
if (attrs.getReplicationStrategyClass() != null && attrs.getReplicationStrategyClass().equals(SimpleStrategy.class.getSimpleName()))
Guardrails.simpleStrategyEnabled.ensureEnabled(state);

if (newKeyspace.params.replicationType.isTracked() && !keyspace.params.replicationType.isTracked())
{
if (SchemaConstants.isSystemKeyspace(keyspaceName))
throw ire("Mutation tracking is not supported on system keyspaces");

if (!DatabaseDescriptor.getMutationTrackingEnabled())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to move this, and probably some of the other pre-existing validation, out of apply and into validate.
DDL statements are encoded as entries in the metadata log and once committed must be able to be applied by all nodes, so divergent config across instances would create an issue here.
Also, log entries may be replayed after a bounce and changes to config could interfere with that & prevent the node from coming back up (c.f. CASSANDRA-20452)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beobal Thanks for the note. I'm going to pick this up soon...

throw ire("Mutation tracking is disabled. Enable in cassandra.yaml to use.");
}

if (keyspace.params.replication.isMeta() && !keyspace.name.equals(SchemaConstants.METADATA_KEYSPACE_NAME))
throw ire("Can not alter a keyspace to use MetaReplicationStrategy");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.cassandra.auth.FunctionResource;
import org.apache.cassandra.auth.IResource;
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLStatement;
import org.apache.cassandra.db.guardrails.Guardrails;
import org.apache.cassandra.exceptions.AlreadyExistsException;
Expand Down Expand Up @@ -83,6 +84,17 @@ public Keyspaces apply(ClusterMetadata metadata)

KeyspaceMetadata keyspaceMetadata = KeyspaceMetadata.create(keyspaceName, attrs.asNewKeyspaceParams());

if (keyspaceMetadata.params.replicationType.isTracked())
{
if (SchemaConstants.isSystemKeyspace(keyspaceName))
throw ire("Mutation tracking is not supported on system keyspaces");

if (!DatabaseDescriptor.getMutationTrackingEnabled())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same point as on AlterKeyspaceStatement applies here

{
throw ire("Mutation tracking is disabled. Enable in cassandra.yaml to use.");
}
}

if (keyspaceMetadata.params.replication.klass.equals(LocalStrategy.class))
throw ire("Unable to use given strategy class: LocalStrategy is reserved for internal use.");

Expand Down
7 changes: 4 additions & 3 deletions src/java/org/apache/cassandra/db/Keyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ public void initCf(TableMetadata metadata, boolean loadSSTables)

public Future<?> applyFuture(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
{
return getMetadata().useMutationTracking()
return getMetadata().useMutationTracking() && MutationTrackingService.isEnabled()
? applyInternalTracked(mutation, new AsyncPromise<>())
: applyInternal(mutation, writeCommitLog, updateIndexes, true, true, new AsyncPromise<>());
}
Expand Down Expand Up @@ -608,6 +608,7 @@ else if (isDeferrable)
*/
private Future<?> applyInternalTracked(Mutation mutation, Promise<?> future)
{
MutationTrackingService.ensureEnabled();
Preconditions.checkState(getMetadata().useMutationTracking() && !mutation.id().isNone());

if (TEST_FAIL_WRITES && getMetadata().name.equals(TEST_FAIL_WRITES_KS))
Expand All @@ -616,7 +617,7 @@ private Future<?> applyInternalTracked(Mutation mutation, Promise<?> future)
boolean started;
try (WriteContext ctx = trackedWriteHandler.beginWrite(mutation, true))
{
started = MutationTrackingService.instance.startWriting(mutation);
started = MutationTrackingService.instance().startWriting(mutation);

if (started)
{
Expand All @@ -635,7 +636,7 @@ private Future<?> applyInternalTracked(Mutation mutation, Promise<?> future)
}

if (started)
MutationTrackingService.instance.finishWriting(mutation);
MutationTrackingService.instance().finishWriting(mutation);


if (future != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ protected PartialTrackedRead createInProgressRead(UnfilteredPartitionIterator it
@Override
protected MutationSummary createMutationSummaryInternal(boolean includePending)
{
return MutationTrackingService.instance.createSummaryForRange(dataRange.keyRange, metadata().id, includePending);
return MutationTrackingService.instance().createSummaryForRange(dataRange.keyRange, metadata().id, includePending);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs

protected MutationSummary createMutationSummaryInternal(boolean includePending)
{
return MutationTrackingService.instance.createSummaryForKey(partitionKey, metadata().id, includePending);
return MutationTrackingService.instance().createSummaryForKey(partitionKey, metadata().id, includePending);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public WriteContext beginWrite(Mutation mutation, boolean makeDurable) throws Re
group = Keyspace.writeOrder.start();

Tracing.trace("Appending to mutation journal");
RecordPointer pointer = MutationJournal.instance.write(mutation.id(), mutation);
RecordPointer pointer = MutationJournal.instance().write(mutation.id(), mutation);

// TODO (preferred): update journal to return CommitLogPosition or otherwise remove requirement to allocate second object here
return new CassandraWriteContext(group, new CommitLogPosition(pointer.segment, pointer.position));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ protected Map<MetadataType, MetadataComponent> finalizeMetadata()
if (metadata().replicationType().isTracked() && repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
{
Preconditions.checkState(Objects.equals(pendingRepair, ActiveRepairService.NO_PENDING_REPAIR));
if (MutationTrackingService.instance.isDurablyReconciled(coordinatorLogOffsets))
if (MutationTrackingService.instance().isDurablyReconciled(coordinatorLogOffsets))
{
repairedAt = Clock.Global.currentTimeMillis();
logger.debug("Marking SSTable {} as reconciled with repairedAt {}", descriptor, repairedAt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,18 @@ public boolean invokeOnFailure()
@Override
public void onResponse(Message<NoPayload> msg)
{
MutationTrackingService.instance.receivedWriteResponse(mutationId, toHost);
MutationTrackingService.instance().receivedWriteResponse(mutationId, toHost);
}

@Override
public void onFailure(InetAddressAndPort from, RequestFailure failureReason)
{
MutationTrackingService.instance.retryFailedWrite(mutationId, toHost, failureReason);
MutationTrackingService.instance().retryFailedWrite(mutationId, toHost, failureReason);
}

void send()
{
RecordPointer pointer = MutationJournal.instance.lookUp(mutationId);
RecordPointer pointer = MutationJournal.instance().lookUp(mutationId);
Preconditions.checkNotNull(pointer, "Mutation %s not found in the journal", mutationId);

Message<PushMutationRequest> message =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ public String toString()
}

public static final IVerbHandler<BroadcastLogOffsets> verbHandler = message -> {
MutationTrackingService.ensureEnabled();
BroadcastLogOffsets replicatedOffsets = message.payload;
logger.trace("Received replicated offsets {} from {}", replicatedOffsets, message.from());
MutationTrackingService.instance.updateReplicatedOffsets(replicatedOffsets.keyspace,
MutationTrackingService.instance().updateReplicatedOffsets(replicatedOffsets.keyspace,
replicatedOffsets.range,
replicatedOffsets.replicatedOffsets,
replicatedOffsets.durable,
Expand Down
5 changes: 3 additions & 2 deletions src/java/org/apache/cassandra/replication/ForwardedWrite.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void applyLocallyAndForwardToReplicas(CoordinatorAckInfo ackTo)
String keyspaceName = mutation.getKeyspaceName();
Token token = mutation.key().getToken();

MutationId id = MutationTrackingService.instance.nextMutationId(keyspaceName, token);
MutationId id = MutationTrackingService.instance().nextMutationId(keyspaceName, token);
// Do not wait for handler completion, since the coordinator is already waiting and we don't want to block the stage
LeaderCallback handler = new LeaderCallback(id, ackTo);
applyLocallyAndForwardToReplicas(mutation.withMutationId(id), recipients, handler, ackTo);
Expand Down Expand Up @@ -276,6 +276,7 @@ public long serializedSize(MutationRequest request, int version)

public static final IVerbHandler<MutationRequest> verbHandler = incoming ->
{
MutationTrackingService.ensureEnabled();
if (approxTime.now() > incoming.expiresAtNanos())
{
Tracing.trace("Discarding mutation from {} (timed out)", incoming.from());
Expand Down Expand Up @@ -317,7 +318,7 @@ public void onResponse(Message<NoPayload> msg)
{
// Local mutations are witnessed from Keyspace.applyInternalTracked
if (msg != null)
MutationTrackingService.instance.receivedWriteResponse(id, msg.from());
MutationTrackingService.instance().receivedWriteResponse(id, msg.from());

// Local write needs to be ack'd to coordinator
if (msg == null && ackTo != null)
Expand Down
32 changes: 28 additions & 4 deletions src/java/org/apache/cassandra/replication/MutationJournal.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,37 @@

public class MutationJournal
{
public static final MutationJournal instance = new MutationJournal();
private static final MutationJournal instance = DatabaseDescriptor.getMutationTrackingEnabled() ? new MutationJournal() : null;

private final Journal<ShortMutationId, Mutation> journal;

private MutationJournal()
{
this(new File(DatabaseDescriptor.getCommitLogLocation()), new JournalParams());
this(new File(DatabaseDescriptor.getMutationTrackingJournalDirectory()), new JournalParams());
}

public static void start()
{
if (instance == null)
return;

instance.startInternal();
}

public static void shutdown()
{
if (instance == null)
return;

instance.shutdownBlocking();
}

public static MutationJournal instance()
{
if (instance == null)
throw new IllegalStateException("Mutation tracking is not enabled");

return instance;
}

@VisibleForTesting
Expand All @@ -61,12 +85,12 @@ private MutationJournal()
journal = new Journal<>("MutationJournal", directory, params, new MutationIdSupport(), new MutationSerializer(), SegmentCompactor.noop());
}

public void start()
void startInternal()
{
journal.start();
}

public void shutdownBlocking()
void shutdownBlocking()
{
journal.shutdown();
}
Expand Down
Loading