-
Notifications
You must be signed in to change notification settings - Fork 17
draft: RFC for Horizontal scalability for ahnlich
#276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Horizontal scalability for ahnlich
Test Results63 tests 63 ✅ 2m 39s ⏱️ Results for commit 1f15cc5. ♻️ This comment has been updated with latest results. |
Benchmark Results |
|
|
||
| They are (NOTE: these names are tentative): | ||
|
|
||
| - `LogStore` - This is where the logs from the Raft cluster activities will be stored. Here is an in-memory impl from the openraft guys that they used in their example: <https://github.com/databendlabs/openraft/blob/4f0fd5fa034413d2f367306da4a0016f7603fb7e/examples/mem-log/src/log_store.rs>, and I believe we can easily co-opt this into a write-to-disk-log file service, or any other log storage we settle on. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the considerations for storing raft cluster activity on disk vs in-memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's pretty much durability vs speed. On-disk allows for easier recovery from logs and snapshots, but is slower to write and read from. In-memory can still recover, but only in the case that there's at least 1 node to recover from, else if all nodes go down, all data is lost; but it is faster to write to and read from. I think the question for us to answer is which of these guarantees will be more important to users that will be horizontally scaling their ahnlich
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently (without clustering), we run primarily in-memory but we have a background persistence thread that takes snapshots at intervals
Wondering if that behaviour would be easy to also propagate here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I'm curious, what API controls potentially loading a snapshot from disk when a replica starts up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently (without clustering), we run primarily in-memory but we have a background persistence thread that takes snapshots at intervals
Wondering if that behaviour would be easy to also propagate here
Makes sense. Snapshots are stored on disk, yes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I'm curious, what API controls potentially loading a snapshot from disk when a replica starts up
There's a install_snapshot() in the StateMachineStore:
#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn install_snapshot(&mut self, meta: &SnapshotMeta, snapshot: SnapshotData) -> Result<(), io::Error> {
tracing::info!("install snapshot");
let new_snapshot = StoredSnapshot {
meta: meta.clone(),
data: snapshot,
};
// Update the state machine.
{
let d: pb::StateMachineData = prost::Message::decode(new_snapshot.data.as_ref())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let mut state_machine = self.state_machine.lock().await;
*state_machine = d;
}
// Update current snapshot.
let mut current_snapshot = self.current_snapshot.lock().unwrap();
*current_snapshot = Some(new_snapshot);
Ok(())
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohhhh nice nice .... we can somewhat reuse our existing snapshot in this case then
|
|
||
| - `LogStore` - This is where the logs from the Raft cluster activities will be stored. Here is an in-memory impl from the openraft guys that they used in their example: <https://github.com/databendlabs/openraft/blob/4f0fd5fa034413d2f367306da4a0016f7603fb7e/examples/mem-log/src/log_store.rs>, and I believe we can easily co-opt this into a write-to-disk-log file service, or any other log storage we settle on. | ||
|
|
||
| - `StateMachineStore` - This is where the last known state (snapshot) is stored and read from. They have a neat impl here as well: <https://github.com/databendlabs/openraft/blob/4f0fd5fa034413d2f367306da4a0016f7603fb7e/examples/raft-kv-memstore-grpc/src/store/mod.rs>, and I think the bit we need to figure out are where we want to 'store' the state machine. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is a snapshot in this sense with respect to the state? All the application data in the store in some serializable format?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Affirmative. In their example impl, they just saved it as a Vec<u8>:
openraft::declare_raft_types!(
/// Declare the type configuration, for example, K/V store.
pub TypeConfig:
D = pb::SetRequest,
R = pb::Response,
LeaderId = pb::LeaderId,
Vote = pb::Vote,
Entry = pb::Entry,
Node = pb::Node,
SnapshotData = Vec<u8>, /// <- Note here
);Here's the SnapshotMeta:
pub struct SnapshotMeta<NID, N>where
NID: NodeId,
N: Node,{
pub last_log_id: Option<LogId<NID>>,
pub last_membership: StoredMembership<NID, N>,
pub snapshot_id: SnapshotId,
}Here's the StoredSnapshot:
#[derive(Debug)]
pub struct StoredSnapshot {
pub meta: SnapshotMeta, /// <- Note here
/// The data of the state machine at the time of this snapshot.
pub data: SnapshotData, /// <- Note here
}Here's the StateMachineData proto:
// All the data in a state machine, including user-defined data and membership data.
message StateMachineData {
// The last log ID that has been applied to the state machine
LogId last_applied = 1;
// User data in a map
map<string, string> data = 2;
// The ID of the last membership config log entry that is applied.
LogId last_membership_log_id = 3;
// The last membership config that is applied.
Membership last_membership = 4;
}And here's the usage in the StateMachineStore:
/// Defines a state machine for the Raft cluster. This state machine represents a copy of the
/// data for this node. Additionally, it is responsible for storing the last snapshot of the data.
#[derive(Debug, Default)]
pub struct StateMachineStore {
/// The Raft state machine.
pub state_machine: tokio::sync::Mutex<pb::StateMachineData>, /// <- Note here
snapshot_idx: Mutex<u64>,
/// The last received snapshot.
current_snapshot: Mutex<Option<StoredSnapshot>>, /// <- Note here
}|
|
||
| - `StateMachineStore` - This is where the last known state (snapshot) is stored and read from. They have a neat impl here as well: <https://github.com/databendlabs/openraft/blob/4f0fd5fa034413d2f367306da4a0016f7603fb7e/examples/raft-kv-memstore-grpc/src/store/mod.rs>, and I think the bit we need to figure out are where we want to 'store' the state machine. | ||
|
|
||
| - `Network` - This is the communication layer for the nodes. They have a tonic gRPC impl here: <https://github.com/databendlabs/openraft/blob/4f0fd5fa034413d2f367306da4a0016f7603fb7e/examples/raft-kv-memstore-grpc/src/network/mod.rs>, we can adopt from. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice... how will this play with our already existing public facing gRPC endpoints
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I can see, it should be fine. We only need to make sure that the channels (endpoints) being used for Raft don't collide with any ones used for the AI and DB clients (or any other running services on the machine), which can be solved with os_select_port()
|
|
||
| - `AppService` - This is the client/application gRPC service. Here, the agent/app running the Raft cluster can issue commands to change the state, behaviour and roles of the nodes in the cluster. Impl here: <https://github.com/databendlabs/openraft/blob/4f0fd5fa034413d2f367306da4a0016f7603fb7e/examples/raft-kv-memstore-grpc/src/grpc/app_service.rs>. | ||
|
|
||
| - `Server` - This is the server to add the above services to, and essentially listen for the requests. It's a `tonic` Server, so we're just importing from tonic, adding our services and giving it a port to listen to requests at, like they do in their example here: <https://github.com/databendlabs/openraft/blob/4f0fd5fa034413d2f367306da4a0016f7603fb7e/examples/raft-kv-memstore-grpc/src/app.rs#L47> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh neat... this shows that impl AppService and impl RaftService are the two large parts of the puzzle where the former is the public facing APIs. If we are in clustering mode, we should then sure that the former uses self.raft for write operations otherwise
|
|
||
| - `Server` - This is the server to add the above services to, and essentially listen for the requests. It's a `tonic` Server, so we're just importing from tonic, adding our services and giving it a port to listen to requests at, like they do in their example here: <https://github.com/databendlabs/openraft/blob/4f0fd5fa034413d2f367306da4a0016f7603fb7e/examples/raft-kv-memstore-grpc/src/app.rs#L47> | ||
|
|
||
| For the App and Raft Service, they have a bunch of types defined in protobuf, as seen here: <https://github.com/databendlabs/openraft/tree/4f0fd5fa034413d2f367306da4a0016f7603fb7e/examples/raft-kv-memstore-grpc/proto>, and they seem to be importing them directly (without a pre-generation step) into their Rust code using tonic, as shown here: <https://github.com/databendlabs/openraft/blob/4f0fd5fa034413d2f367306da4a0016f7603fb7e/examples/raft-kv-memstore-grpc/src/lib.rs#L8> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah tonic::include_proto can do that and is quite helpful... we went the build file route for ours as we had a bit of postprocessing to do and that felt more natural in that regard
|
|
||
| `AhnlichRaftService` - this is where were are going to plug in Ahnlich, by adding some service that will instantiate the Faft service based on some config/commands passed by the CLI. I'm not sure exactly what goes here yet, but i think some of the things we would want to do here are: | ||
|
|
||
| - Allow for Raft nodes to be created based on some config/command issued via the CLI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think first step is deciding if we are in cluster-mode or not ... where the default is NOT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep 💯
| - Allow for the cluster to be restarted | ||
|
|
||
| The next step I think is answering the following questions. | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also interested in the question of could we reuse a ton of their log and snapshot infrastructure to play nicely with our need for persistence? (even outside clustering mode as we know that in clustering mode that's a given)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the in-memory examples are largely helpful but not all the way as clients would be able to enable persistence. I see some implementation insight in the rocksdb example which shows implementation of log_store
But perhaps taking a step back it would largely be a lot to grok at once... and we can figure out a reasonable way to dissect the change such that we can first of all implement clustering and then try make the persistence bits nicer or vice versa
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I can see how it should be possible. Their examples have placeholders for where they expect us to do any persistence stuff we want to add, for example, these lines from here: https://github.com/databendlabs/openraft/blob/956d6f6a6c344d63a92b5bebf81ee9051a1aace2/examples/raft-kv-memstore-grpc/src/store/mod.rs#L91, see below:
// Emulation of storing snapshot locally
{
let mut current_snapshot = self.current_snapshot.lock().unwrap();
*current_snapshot = Some(stored);
}so basically we should be able to just plug in some persistence function in there
For the logs, they are saved in memory as a BTreeMap:
pub struct LogStoreInner<C: RaftTypeConfig> {
/// The last purged log id.
last_purged_log_id: Option<LogIdOf<C>>,
/// The Raft log.
log: BTreeMap<u64, C::Entry>, /// <- Note here
/// The commit log id.
committed: Option<LogIdOf<C>>,
/// The current granted vote.
vote: Option<VoteOf<C>>,
}And appended with this function:
async fn append<I>(&mut self, entries: I, callback: IOFlushed<C>) -> Result<(), io::Error>
where I: IntoIterator<Item = C::Entry> {
// Simple implementation that calls the flush-before-return `append_to_log`.
for entry in entries {
self.log.insert(entry.index(), entry); /// <- Note here
}
callback.io_completed(Ok(())).await;
Ok(())
}We should be able to add a persistent function call inside the append()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay so in the case where a replica goes down and comes back up... does it receive the logs from other nodes in the cluster or does it load from it's possibly outdated logs
I'm guessing it's the former... if so then we may not need to persist RAFT logs on disk and we could stick to instead persisting only the snapshots as we currently do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the in-memory examples are largely helpful but not all the way as clients would be able to enable persistence. I see some implementation insight in the rocksdb example which shows implementation of log_store
But perhaps taking a step back it would largely be a lot to grok at once... and we can figure out a reasonable way to dissect the change such that we can first of all implement clustering and then try make the persistence bits nicer or vice versa
I agree with this as well, and I'll say we should do the clustering first, then figure out what we want to persist, before adding persistence
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay so in the case where a replica goes down and comes back up... does it receive the logs from other nodes in the cluster or does it load from it's possibly outdated logs
I'm guessing it's the former... if so then we may not need to persist RAFT logs on disk and we could stick to instead persisting only the snapshots as we currently do
Yeah, it would only make sense to be the former (would have to look inside openraft-rs itself to be sure how it's updating the log BTreeMap), but can you explain a bit how/why this would reduce the need for persisting the logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cuz if it receives the logs from other nodes in the cluster, then on startup it shouldn't absolutely need to recover from disk as it would get the logs and snapshots right?
Assuming there's atleast other nodes in a good state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cuz if it receives the logs from other nodes in the cluster, then on startup it shouldn't absolutely need to recover from disk as it would get the logs and snapshots right?
Assuming there's atleast other nodes in a good state
Ahh, I understand now
|
|
||
| 1. Where/how are we storing the logs? | ||
|
|
||
| 1. Where/how are we storing the state machine snapshots? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where/how are we storing the logs/state machine would largely depend on how they affect the hot request path and also whether or not they are safe to have in-memory (potentially lose)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
Issue - Horizontal scalability for ahnlich