Skip to content

Conversation

@aparna0522
Copy link
Contributor

patch by Aparna Naik; reviewed by [Reviewers] for CASSANDRA-21098

@aparna0522 aparna0522 marked this pull request as ready for review January 17, 2026 00:35
@aweisberg aweisberg self-requested a review January 20, 2026 19:12

void captureTargets()
{
BroadcastLogOffsets current = shard.collectReplicatedOffsets(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets the offsets to wait for from the local coordinator. That doesn't make for a complete happens before edge between this repair and any already acknowledged writes that may not have been visible to this coordinator. So this repair could claim everything is replicated at ALL, but there are acknowledged writes which may not have been replicated yet.

The repair would need to contact the other replicas and find out the highest known offset for every coordinator they know about and then wait on that.

I think this is called out in the JIRA description as:

that would be collecting mutation offsets from all replicas for relevant shards, compute their union and wait until all these replicas report having received them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, you're right, I think I missed understanding the meaning of "ALL" replicas.

getOrCreateShards(keyspace).updateReplicatedOffsets(range, offsets, durable, onHost);

// Notify any registered sync coordinators about the offset update
Set<MutationTrackingSyncCoordinator> coordinators = syncCoordinatorsByKeyspace.get(keyspace);
Copy link
Contributor

@aweisberg aweisberg Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many shards are there generally going to be? Does this need to be done under this lock?

I am wondering if validating the impact of these offsets being updated can be done outside the lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there would be a lot of shards, but it would be a good practice to keep the notifying logic out of the locks. It doesn't really need to be under a lock to notify!

return range;
}

public Future<Void> awaitCompletion()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused

@aparna0522 aparna0522 force-pushed the incremental-repair-for-MT branch from 237e027 to 65471d8 Compare January 21, 2026 23:34
* Returns the UNION of all witnessed offsets from all participants.
* This represents all offsets that ANY replica has witnessed.
*/
Offsets.Immutable collectUnionOfWitnessedOffsets()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is operating at the MutationTrackingSyncCoordinator which is just one node's view of the offsets from the coordinators that can be arbitrarily behind or incomplete.

To collect the offsets I think the best thing to do is contact every replica of the shard and ask them for all the coordinators they know about and what the maximum offset seen was and union the result of that.

@bdeggleston do I have this right? I don't think the repair will have happens before relationship for acknowledged writes without it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's right. We'd need to either proactively contact each replica, or just listen to what offsets the other nodes are broadcasting starting after the IR starts.

You'd need to timeout if you don't get offsets from a participant after some amount of time too. I think this might be a bit easier by listening to incoming offsets and waiting on a future that times out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How often will they broadcast if we just listen? I'm just wondering in the interests of repairs starting quickly so things like tests run quickly we should either proactively message or maybe change the broadcast interval for tests.

Copy link
Contributor

@aweisberg aweisberg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found a few things, TY!

}

@Test
public void testSyncCoordinatorCompletesAfterDataSync() throws Throwable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test ends up being redundant with testSyncCoordinatorWaitsForAllReplicasMutations?

}

@Test
public void testSyncCoordinatorWaitsForAllReplicasMutations() throws Throwable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test doesn't successfully get at what we are checking because it follows the happy path a little too closely and then doesn't validate that MutationTrackingSyncCoordinator actually had to wait.

When you write at CL.ONE it still sends the write to all replicas so you don't actually end up having to wait for mutation tracking to do anything.

I think the thing to do in this scenario is write at CL.ONE on one coordinator and block all messages (there are message filters you can set pretty easily) from that coordinator so you know the write hasn't propagated.

Then create the MutationTrackingSyncCoordinator and have another thread wait on it doing its thing. This should not complete at this point because messages are still blocked. One thing this will highlight is the lack of retries in repair collecting the offsets (if you don't add retries).

After waiting some amount of time to see that it is indeed waiting on the end goal you can remove the message filters and then you should see the thread waiting on MutationTrackingSyncCoordinatorcomplete.

At that point you need to go in and read the data from each node and check that it actually propagated and matches.

When reading from individual nodes there is a path that skips all the distributed machinery and guarantees you are reading precisely from that node and nothing else. If you read at CL.ONE it's possible that the replica a coordinator selects isn't actually the node coordinating the query.

/** Incremental repair task for keyspaces using mutation tracking */
public class MutationTrackingIncrementalRepairTask extends AbstractRepairTask
{
private static final long SYNC_TIMEOUT_MINUTES = 30;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be configurable, but also there is existing retry and timeout stuff for repair messaging. Maybe that can be re-used without introducing additional configuration.

It might be we don't have timeouts and things retry forever in repair. I would need to look.

else
{
// Pure mutation tracking - create successful result
resultPromise.trySuccess(CoordinatedRepairResult.create(rangeCollections, List.of()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the list of RepairSessionResult needs to be populated for other things like migration to Accord to work. In general it's pretty sus to leave that out given there are downstream consumers.

* Returns the UNION of all witnessed offsets from all participants.
* This represents all offsets that ANY replica has witnessed.
*/
Offsets.Immutable collectUnionOfWitnessedOffsets()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How often will they broadcast if we just listen? I'm just wondering in the interests of repairs starting quickly so things like tests run quickly we should either proactively message or maybe change the broadcast interval for tests.

private final AsyncPromise<Void> completionFuture = new AsyncPromise<>();

// Per-shard state: tracks what each node has reported for that shard
private final Map<Range<Token>, ShardSyncState> shardStates = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be a CHM because it's built once and needs to be built and populated before it can be accessed from multiple threads. More on this in a different comment.

MutationTrackingService.instance.registerSyncCoordinator(this);

// Initialize state for each shard and capture targets
for (Shard shard : overlappingShards)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this before registering the sync coordinator so it doesn't need to be a CHM since it will be immutable.

It's also a correctness issue because while the shardStates is being populated it's possible that the listener fires in another thread, sees no shards, and decides the sync is complete.

* Tests that the sync coordinator correctly waits for offset convergence
* across all nodes in a cluster.
*/
public class MutationTrackingSyncCoordinatorTest extends TestBaseImpl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is instantiating the coordinator directly and isn't end to end testing repair. There should be an end to end test somewhere.

We also need end to end tests of migration to/from mutation tracking.

I think we also support cancellation of repairs so we need a test that cancellation works, but you should double check how that works.

*/
private static class ShardSyncState
{
private final Shard shard;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reference to the shard can become stale because Shard.withParticipants can get called. This can also result in new logs being added or removed and also having withParticipants on the logs being called.

This ties into how repair handles topology change in general. I think the right behavior for now is to detect it and fail the repair and we can come back and do something more involved later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants