-
Notifications
You must be signed in to change notification settings - Fork 3.8k
CEP-45: Incremental Repair Blocking Wait for offsets #4569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: cep-45-mutation-tracking
Are you sure you want to change the base?
CEP-45: Incremental Repair Blocking Wait for offsets #4569
Conversation
|
|
||
| void captureTargets() | ||
| { | ||
| BroadcastLogOffsets current = shard.collectReplicatedOffsets(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This gets the offsets to wait for from the local coordinator. That doesn't make for a complete happens before edge between this repair and any already acknowledged writes that may not have been visible to this coordinator. So this repair could claim everything is replicated at ALL, but there are acknowledged writes which may not have been replicated yet.
The repair would need to contact the other replicas and find out the highest known offset for every coordinator they know about and then wait on that.
I think this is called out in the JIRA description as:
that would be collecting mutation offsets from all replicas for relevant shards, compute their union and wait until all these replicas report having received them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, you're right, I think I missed understanding the meaning of "ALL" replicas.
| getOrCreateShards(keyspace).updateReplicatedOffsets(range, offsets, durable, onHost); | ||
|
|
||
| // Notify any registered sync coordinators about the offset update | ||
| Set<MutationTrackingSyncCoordinator> coordinators = syncCoordinatorsByKeyspace.get(keyspace); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How many shards are there generally going to be? Does this need to be done under this lock?
I am wondering if validating the impact of these offsets being updated can be done outside the lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there would be a lot of shards, but it would be a good practice to keep the notifying logic out of the locks. It doesn't really need to be under a lock to notify!
| return range; | ||
| } | ||
|
|
||
| public Future<Void> awaitCompletion() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused
237e027 to
65471d8
Compare
| * Returns the UNION of all witnessed offsets from all participants. | ||
| * This represents all offsets that ANY replica has witnessed. | ||
| */ | ||
| Offsets.Immutable collectUnionOfWitnessedOffsets() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is operating at the MutationTrackingSyncCoordinator which is just one node's view of the offsets from the coordinators that can be arbitrarily behind or incomplete.
To collect the offsets I think the best thing to do is contact every replica of the shard and ask them for all the coordinators they know about and what the maximum offset seen was and union the result of that.
@bdeggleston do I have this right? I don't think the repair will have happens before relationship for acknowledged writes without it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that's right. We'd need to either proactively contact each replica, or just listen to what offsets the other nodes are broadcasting starting after the IR starts.
You'd need to timeout if you don't get offsets from a participant after some amount of time too. I think this might be a bit easier by listening to incoming offsets and waiting on a future that times out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How often will they broadcast if we just listen? I'm just wondering in the interests of repairs starting quickly so things like tests run quickly we should either proactively message or maybe change the broadcast interval for tests.
aweisberg
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found a few things, TY!
| } | ||
|
|
||
| @Test | ||
| public void testSyncCoordinatorCompletesAfterDataSync() throws Throwable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this test ends up being redundant with testSyncCoordinatorWaitsForAllReplicasMutations?
| } | ||
|
|
||
| @Test | ||
| public void testSyncCoordinatorWaitsForAllReplicasMutations() throws Throwable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this test doesn't successfully get at what we are checking because it follows the happy path a little too closely and then doesn't validate that MutationTrackingSyncCoordinator actually had to wait.
When you write at CL.ONE it still sends the write to all replicas so you don't actually end up having to wait for mutation tracking to do anything.
I think the thing to do in this scenario is write at CL.ONE on one coordinator and block all messages (there are message filters you can set pretty easily) from that coordinator so you know the write hasn't propagated.
Then create the MutationTrackingSyncCoordinator and have another thread wait on it doing its thing. This should not complete at this point because messages are still blocked. One thing this will highlight is the lack of retries in repair collecting the offsets (if you don't add retries).
After waiting some amount of time to see that it is indeed waiting on the end goal you can remove the message filters and then you should see the thread waiting on MutationTrackingSyncCoordinatorcomplete.
At that point you need to go in and read the data from each node and check that it actually propagated and matches.
When reading from individual nodes there is a path that skips all the distributed machinery and guarantees you are reading precisely from that node and nothing else. If you read at CL.ONE it's possible that the replica a coordinator selects isn't actually the node coordinating the query.
| /** Incremental repair task for keyspaces using mutation tracking */ | ||
| public class MutationTrackingIncrementalRepairTask extends AbstractRepairTask | ||
| { | ||
| private static final long SYNC_TIMEOUT_MINUTES = 30; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be configurable, but also there is existing retry and timeout stuff for repair messaging. Maybe that can be re-used without introducing additional configuration.
It might be we don't have timeouts and things retry forever in repair. I would need to look.
| else | ||
| { | ||
| // Pure mutation tracking - create successful result | ||
| resultPromise.trySuccess(CoordinatedRepairResult.create(rangeCollections, List.of())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the list of RepairSessionResult needs to be populated for other things like migration to Accord to work. In general it's pretty sus to leave that out given there are downstream consumers.
| * Returns the UNION of all witnessed offsets from all participants. | ||
| * This represents all offsets that ANY replica has witnessed. | ||
| */ | ||
| Offsets.Immutable collectUnionOfWitnessedOffsets() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How often will they broadcast if we just listen? I'm just wondering in the interests of repairs starting quickly so things like tests run quickly we should either proactively message or maybe change the broadcast interval for tests.
| private final AsyncPromise<Void> completionFuture = new AsyncPromise<>(); | ||
|
|
||
| // Per-shard state: tracks what each node has reported for that shard | ||
| private final Map<Range<Token>, ShardSyncState> shardStates = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this needs to be a CHM because it's built once and needs to be built and populated before it can be accessed from multiple threads. More on this in a different comment.
| MutationTrackingService.instance.registerSyncCoordinator(this); | ||
|
|
||
| // Initialize state for each shard and capture targets | ||
| for (Shard shard : overlappingShards) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this before registering the sync coordinator so it doesn't need to be a CHM since it will be immutable.
It's also a correctness issue because while the shardStates is being populated it's possible that the listener fires in another thread, sees no shards, and decides the sync is complete.
| * Tests that the sync coordinator correctly waits for offset convergence | ||
| * across all nodes in a cluster. | ||
| */ | ||
| public class MutationTrackingSyncCoordinatorTest extends TestBaseImpl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is instantiating the coordinator directly and isn't end to end testing repair. There should be an end to end test somewhere.
We also need end to end tests of migration to/from mutation tracking.
I think we also support cancellation of repairs so we need a test that cancellation works, but you should double check how that works.
| */ | ||
| private static class ShardSyncState | ||
| { | ||
| private final Shard shard; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reference to the shard can become stale because Shard.withParticipants can get called. This can also result in new logs being added or removed and also having withParticipants on the logs being called.
This ties into how repair handles topology change in general. I think the right behavior for now is to detect it and fail the repair and we can come back and do something more involved later.
patch by Aparna Naik; reviewed by [Reviewers] for CASSANDRA-21098