Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,7 @@ public Refs<SSTableReader> getAndReferenceOverlappingLiveSSTables(Iterable<SSTab
*
* param @ filename - filename just flushed to disk
*/
@VisibleForTesting
public void addSSTable(SSTableReader sstable)
{
assert sstable.getColumnFamilyName().equals(name);
Expand Down
13 changes: 1 addition & 12 deletions src/java/org/apache/cassandra/db/SSTableImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ synchronized List<String> importNewSSTables(Options options)
cfs.indexManager.buildSSTableAttachedIndexesBlocking(newSSTables);

if (isTracked)
TrackedBulkTransfer.execute(cfs.keyspace.getName(), newSSTables);
MutationTrackingService.instance.executeTransfers(cfs.keyspace.getName(), newSSTables, ConsistencyLevel.ALL);
else
cfs.getTracker().addSSTables(newSSTables);

Expand All @@ -253,17 +253,6 @@ synchronized List<String> importNewSSTables(Options options)
return failedDirectories;
}

/**
* TODO: Support user-defined consistency level for import, for import with replicas down
*/
private static class TrackedBulkTransfer
{
private static void execute(String keyspace, Set<SSTableReader> sstables)
{
MutationTrackingService.instance.executeTransfers(keyspace, sstables, ConsistencyLevel.ALL);
}
}

/**
* Check the state of this node and throws an {@link InterruptedException} if it is currently draining
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, R
Set<SSTableReader> sstables = Sets.newHashSet();
SSTableIntervalTree intervalTree = buildSSTableIntervalTree(ImmutableList.copyOf(view.select(SSTableSet.CANONICAL)));
Predicate<SSTableReader> predicate;
// reconciledKeyspaceOffsets are only included when mutation logs are streamed, since we include logs
// for all unreconciled mutations, and SSTables for all reconciled mutations
if (reconciledKeyspaceOffsets != null)
{
// TODO: relax these restrictions as repair support is add
Preconditions.checkArgument(previewKind == PreviewKind.NONE);
Preconditions.checkArgument(pendingRepair == ActiveRepairService.NO_PENDING_REPAIR);
predicate = getSSTablePredicateForKeyspaceRanges(reconciledKeyspaceOffsets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.replication.PendingLocalTransfer;
import org.apache.cassandra.streaming.IncomingStream;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamReceiver;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.tcm.ClusterMetadata;
Expand Down Expand Up @@ -135,7 +134,7 @@ public synchronized void received(IncomingStream stream)
sstables.addAll(finished);
receivedEntireSSTable = file.isEntireSSTable();

if (session.streamOperation() == StreamOperation.TRACKED_TRANSFER)
if (session.requiresTrackedActivation())
{
Preconditions.checkState(cfs.metadata().replicationType().isTracked());
PendingLocalTransfer transfer = new PendingLocalTransfer(cfs.metadata().id, session.planId(), sstables);
Expand Down Expand Up @@ -267,7 +266,7 @@ public void finished()
logger.debug("[Stream #{}] Received {} sstables from {} ({})", session.planId(), readers.size(), session.peer, readers);

// Don't mark as live until activated by the stream coordinator
if (session.streamOperation() == StreamOperation.TRACKED_TRANSFER)
if (session.requiresTrackedActivation())
return;

cfs.addSSTables(readers);
Expand Down
28 changes: 24 additions & 4 deletions src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@

package org.apache.cassandra.repair;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;


import com.google.common.base.Preconditions;

import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RepairException;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.messages.SyncRequest;
import org.apache.cassandra.replication.ShortMutationId;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.SessionSummary;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.TimeUUID;

/**
* AsymmetricRemoteSyncTask sends {@link SyncRequest} to target node to repair(stream)
Expand All @@ -38,9 +44,23 @@
*/
public class AsymmetricRemoteSyncTask extends SyncTask implements CompletableRemoteSyncTask
{
public AsymmetricRemoteSyncTask(SharedContext ctx, RepairJobDesc desc, InetAddressAndPort to, InetAddressAndPort from, List<Range<Token>> differences, PreviewKind previewKind)
public AsymmetricRemoteSyncTask(SharedContext ctx, RepairJobDesc desc, InetAddressAndPort to, InetAddressAndPort from, List<Range<Token>> differences, PreviewKind previewKind, ShortMutationId transferId)
{
super(ctx, desc, to, from, differences, previewKind, transferId);
}

@Override
public SyncTask withRanges(Collection<Range<Token>> newRanges)
{
List<Range<Token>> rangeList = newRanges instanceof List ? (List<Range<Token>>) newRanges : new ArrayList<>(newRanges);
return new AsymmetricRemoteSyncTask(ctx, desc, nodePair.coordinator, nodePair.peer, rangeList, previewKind, transferId);
}

@Override
public SyncTask withTransferId(ShortMutationId transferId)
{
super(ctx, desc, to, from, differences, previewKind);
Preconditions.checkState(this.transferId == null);
return new AsymmetricRemoteSyncTask(ctx, desc, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind, transferId);
}

public void startSync()
Expand All @@ -52,11 +72,11 @@ public void startSync()
sendRequest(request, request.src);
}

public void syncComplete(boolean success, List<SessionSummary> summaries)
public void syncComplete(boolean success, List<SessionSummary> summaries, TimeUUID planId)
{
if (success)
{
trySuccess(stat.withSummaries(summaries));
trySuccess(stat.withSummaries(summaries, planId));
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import java.util.List;

import org.apache.cassandra.streaming.SessionSummary;
import org.apache.cassandra.utils.TimeUUID;

public interface CompletableRemoteSyncTask
{
void syncComplete(boolean success, List<SessionSummary> summaries);
void syncComplete(boolean success, List<SessionSummary> summaries, TimeUUID planId);
}
30 changes: 27 additions & 3 deletions src/java/org/apache/cassandra/repair/LocalSyncTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.cassandra.repair;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -30,6 +32,7 @@
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.replication.ShortMutationId;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamEvent;
Expand Down Expand Up @@ -66,9 +69,9 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler

public LocalSyncTask(SharedContext ctx, RepairJobDesc desc, InetAddressAndPort local, InetAddressAndPort remote,
List<Range<Token>> diff, TimeUUID pendingRepair,
boolean requestRanges, boolean transferRanges, PreviewKind previewKind)
boolean requestRanges, boolean transferRanges, PreviewKind previewKind, ShortMutationId transferId)
{
super(ctx, desc, local, remote, diff, previewKind);
super(ctx, desc, local, remote, diff, previewKind, transferId);
Preconditions.checkArgument(requestRanges || transferRanges, "Nothing to do in a sync job");
Preconditions.checkArgument(local.equals(ctx.broadcastAddressAndPort()));

Expand All @@ -77,6 +80,27 @@ public LocalSyncTask(SharedContext ctx, RepairJobDesc desc, InetAddressAndPort l
this.transferRanges = transferRanges;
}

public LocalSyncTask(SharedContext ctx, RepairJobDesc desc, InetAddressAndPort local, InetAddressAndPort remote,
List<Range<Token>> diff, TimeUUID pendingRepair,
boolean requestRanges, boolean transferRanges, PreviewKind previewKind)
{
this(ctx, desc, local, remote, diff, pendingRepair, requestRanges, transferRanges, previewKind, null);
}

@Override
public SyncTask withRanges(Collection<Range<Token>> newRanges)
{
List<Range<Token>> rangeList = newRanges instanceof List ? (List<Range<Token>>) newRanges : new ArrayList<>(newRanges);
return new LocalSyncTask(ctx, desc, nodePair.coordinator, nodePair.peer, rangeList, pendingRepair, requestRanges, transferRanges, previewKind);
}

@Override
public SyncTask withTransferId(ShortMutationId transferId)
{
Preconditions.checkState(this.transferId == null);
return new LocalSyncTask(ctx, desc, nodePair.coordinator, nodePair.peer, rangesToSync, pendingRepair, requestRanges, transferRanges, previewKind, transferId);
}

@VisibleForTesting
StreamPlan createStreamPlan()
{
Expand Down Expand Up @@ -167,7 +191,7 @@ public void onSuccess(StreamState result)
status, desc.sessionId, nodePair.coordinator, nodePair.peer, desc.columnFamily);
logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message);
Tracing.traceRepair(message);
trySuccess(result.hasAbortedSession() ? stat : stat.withSummaries(result.createSummaries()));
trySuccess(result.hasAbortedSession() ? stat : stat.withSummaries(result.createSummaries(), result.planId));
finished();
}
}
Expand Down
25 changes: 25 additions & 0 deletions src/java/org/apache/cassandra/repair/RepairCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.repair.state.CoordinatorState;
import org.apache.cassandra.repair.state.ParticipateState;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.ReplicationType;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
import org.apache.cassandra.schema.TableMetadata;
Expand Down Expand Up @@ -298,6 +301,28 @@ private static void validate(RepairOption options, List<ColumnFamilyStore> colum
if (options.paxosOnly() && options.accordOnly())
throw new IllegalArgumentException("Cannot specify a repair as both paxos only and accord only");

// Check for tracked keyspaces - repairs are not supported on tracked keyspaces
Set<String> trackedKeyspaces = new HashSet<>();
for (ColumnFamilyStore cfs : columnFamilies)
{
String keyspaceName = cfs.keyspace.getName();
KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(keyspaceName);
if (keyspaceMetadata != null && keyspaceMetadata.params.replicationType == ReplicationType.tracked)
{
trackedKeyspaces.add(keyspaceName);
}
}

/*
if (!trackedKeyspaces.isEmpty())
{
throw new IllegalArgumentException(String.format("Repair is not supported on tracked keyspaces: %s. " +
"Tracked keyspaces use mutation tracking for durability guarantees " +
"and are incompatible with traditional repair mechanisms.",
String.join(", ", trackedKeyspaces)));
}
*/

for (ColumnFamilyStore cfs : columnFamilies)
{
TableMetadata metadata = cfs.metadata();
Expand Down
40 changes: 29 additions & 11 deletions src/java/org/apache/cassandra/repair/RepairJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter;
import org.apache.cassandra.repair.asymmetric.ReduceHelper;
import org.apache.cassandra.repair.state.JobState;
import org.apache.cassandra.replication.LocalTransfers;
import org.apache.cassandra.schema.SystemDistributedKeyspace;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.accord.IAccordService;
Expand Down Expand Up @@ -120,7 +121,6 @@ public RepairJob(RepairSession session, String columnFamily)

if ((!session.repairData && !session.repairPaxos) && !metadata.requiresAccordSupport())
throw new IllegalArgumentException(String.format("Cannot run accord only repair on %s.%s, which isn't configured for accord operations", cfs.keyspace.getName(), cfs.name));

}

public long getNowInSeconds()
Expand All @@ -136,6 +136,11 @@ public long getNowInSeconds()
}
}

public Collection<SyncTask> getSyncTasks()
{
return syncTasks;
}

@Override
public void run()
{
Expand Down Expand Up @@ -254,6 +259,11 @@ protected void runRepair()
// that there are in memory at once. When all validations complete, submit sync tasks out of the scheduler.
syncResults = session.validationScheduler.schedule(() -> createSyncTasks(accordRepair, allSnapshotTasks, allEndpoints), taskExecutor)
.flatMap(this::executeTasks, taskExecutor);

// For tracked keyspaces, we need to ensure sync'd data is present in the log
boolean isTracked = cfs.metadata().replicationType().isTracked();
if (isTracked)
syncResults = LocalTransfers.instance().onRepairSyncCompletion(this, syncResults, taskExecutor);
}
else
{
Expand Down Expand Up @@ -306,7 +316,7 @@ public void onFailure(Throwable t)
}, taskExecutor);
}

private Future<List<SyncTask>> createSyncTasks(Future<AccordRepairResult> accordRepair, Future<?> allSnapshotTasks, List<InetAddressAndPort> allEndpoints)
private Future<SyncTasks> createSyncTasks(Future<AccordRepairResult> accordRepair, Future<?> allSnapshotTasks, List<InetAddressAndPort> allEndpoints)
{
Future<List<TreeResponse>> treeResponses;
if (allSnapshotTasks != null)
Expand All @@ -330,9 +340,14 @@ private Future<List<SyncTask>> createSyncTasks(Future<AccordRepairResult> accord
return a;
});

return treeResponses.map(session.optimiseStreams && !session.pullRepair
? this::createOptimisedSyncingSyncTasks
: this::createStandardSyncTasks, taskExecutor);
return treeResponses.map(trees -> {
List<SyncTask> syncTasks;
if (session.optimiseStreams && !session.pullRepair)
syncTasks = createOptimisedSyncingSyncTasks(trees);
else
syncTasks = createStandardSyncTasks(trees);
return SyncTasks.alignedToShardBoundaries(desc, syncTasks);
}, taskExecutor);
}

public synchronized void abort(@Nullable Throwable reason)
Expand Down Expand Up @@ -413,18 +428,18 @@ static List<SyncTask> createStandardSyncTasks(SharedContext ctx,
continue;

task = new LocalSyncTask(ctx, desc, self.endpoint, remote.endpoint, differences, isIncremental ? desc.parentSessionId : null,
requestRanges, transferRanges, previewKind);
requestRanges, transferRanges, previewKind, null);
}
else if (isTransient.test(r1.endpoint) || isTransient.test(r2.endpoint))
{
// Stream only from transient replica
TreeResponse streamFrom = isTransient.test(r1.endpoint) ? r1 : r2;
TreeResponse streamTo = isTransient.test(r1.endpoint) ? r2 : r1;
task = new AsymmetricRemoteSyncTask(ctx, desc, streamTo.endpoint, streamFrom.endpoint, differences, previewKind);
task = new AsymmetricRemoteSyncTask(ctx, desc, streamTo.endpoint, streamFrom.endpoint, differences, previewKind, null);
}
else
{
task = new SymmetricRemoteSyncTask(ctx, desc, r1.endpoint, r2.endpoint, differences, previewKind);
task = new SymmetricRemoteSyncTask(ctx, desc, r1.endpoint, r2.endpoint, differences, previewKind, null);
}
syncTasks.add(task);
}
Expand All @@ -437,7 +452,7 @@ else if (isTransient.test(r1.endpoint) || isTransient.test(r2.endpoint))
}

@VisibleForTesting
Future<List<SyncStat>> executeTasks(List<SyncTask> tasks)
Future<List<SyncStat>> executeTasks(SyncTasks tasks)
{
try
{
Expand All @@ -447,6 +462,9 @@ Future<List<SyncStat>> executeTasks(List<SyncTask> tasks)
if (!tasks.isEmpty())
state.phase.streamSubmitted();

if (cfs.metadata().replicationType().isTracked())
LocalTransfers.instance().onRepairSyncExecution(this, desc, tasks);

for (SyncTask task : tasks)
{
if (!task.isLocal())
Expand Down Expand Up @@ -535,11 +553,11 @@ static List<SyncTask> createOptimisedSyncingSyncTasks(SharedContext ctx,
if (address.equals(local))
{
task = new LocalSyncTask(ctx, desc, address, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null,
true, false, previewKind);
true, false, previewKind, null);
}
else
{
task = new AsymmetricRemoteSyncTask(ctx, desc, address, fetchFrom, toFetch, previewKind);
task = new AsymmetricRemoteSyncTask(ctx, desc, address, fetchFrom, toFetch, previewKind, null);
}
syncTasks.add(task);

Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/repair/RepairSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public void syncComplete(RepairJobDesc desc, Message<SyncResponse> message)

if (logger.isDebugEnabled())
logger.debug("{} Repair completed between {} and {} on {}", previewKind.logPrefix(getId()), nodes.coordinator, nodes.peer, desc.columnFamily);
task.syncComplete(message.payload.success, message.payload.summaries);
task.syncComplete(message.payload.success, message.payload.summaries, message.payload.planId);
}

@VisibleForTesting
Expand Down
Loading