diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 1f23f545..9437616f 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -18,6 +18,16 @@ This is a **maintained fork** of [fjall-rs/fjall](https://github.com/fjall-rs/fj **Prefer issue suggestions over code suggestions for out-of-scope findings.** This keeps PRs focused and reviewable. +## Dependency Policy + +**`lsm-tree` uses `branch = "main"` intentionally.** This fork is in active alpha development alongside `structured-world/lsm-tree`. The `branch = "main"` git dependency is correct because: +- Both repos are under active co-development with frequent cross-repo changes +- Dependabot tracks and updates git branch dependencies +- Pinning to `rev` would block Dependabot and require manual bumps on every lsm-tree commit +- Will switch to a crates.io version once upstream fjall-rs/lsm-tree#265 is released + +**Do NOT suggest** pinning to `rev = "..."` or switching to a crates.io version for `lsm-tree`. This is a deliberate alpha-phase decision. + ## Rust Code Standards - **Unsafe code:** Prefer safe alternatives. `unsafe` requires `// SAFETY:` comment. diff --git a/Cargo.toml b/Cargo.toml index 29e29c64..90ff197d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,10 @@ __internal_whitebox = [] [dependencies] byteorder = { package = "byteorder-lite", version = "0.1.0" } byteview = "~0.10.1" -lsm-tree = { version = "~3.1.1", default-features = false, features = [] } +# Fork with SequenceNumberGenerator trait (structured-world/lsm-tree#10). +# branch=main: active development, Dependabot tracks updates. +# Switch to crates.io once fjall-rs/lsm-tree#265 is released. +lsm-tree = { git = "https://github.com/structured-world/lsm-tree.git", branch = "main", default-features = false, features = [] } log = "0.4.27" tempfile = "3.20.0" dashmap = "6.1.0" diff --git a/src/builder.rs b/src/builder.rs index 64ad4042..9978a3f5 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -3,7 +3,7 @@ // (found in the LICENSE-* files in the repository) use crate::{db_config::CompactionFilterAssigner, tx::single_writer::Openable, Config}; -use lsm_tree::{Cache, CompressionType, DescriptorTable}; +use lsm_tree::{Cache, CompressionType, DescriptorTable, SharedSequenceNumberGenerator}; use std::{marker::PhantomData, path::Path, sync::Arc}; /// Database builder @@ -188,4 +188,41 @@ impl Builder { self.inner.compaction_filter_factory_assigner = Some(f); self } + + /// Sets a custom sequence number generator for write operations. + /// + /// Takes `SharedSequenceNumberGenerator` (`Arc`) + /// directly because the generator must be shared across all keyspaces and the + /// internal storage layer. Accepting a generic would add a type parameter to + /// `Builder` that propagates through `Config` and `Database`. + /// + /// By default, the database uses [`crate::SequenceNumberCounter`], + /// a simple atomic counter. Use this to plug in a custom generator, + /// e.g., a Hybrid Logical Clock (HLC) for distributed databases. + /// + /// The generator is shared across all keyspaces and is used for + /// write sequencing (assigning seqnos to new writes). The MVCC + /// visibility watermark is managed internally by the snapshot tracker. + /// + /// # Examples + /// + /// ``` + /// use fjall::Database; + /// use fjall::{SharedSequenceNumberGenerator, SequenceNumberCounter}; + /// use std::sync::Arc; + /// + /// # let folder = tempfile::tempdir()?; + /// let generator: SharedSequenceNumberGenerator = + /// Arc::new(SequenceNumberCounter::default()); + /// + /// let db = Database::builder(&folder) + /// .seqno_generator(generator) + /// .open()?; + /// # Ok::<(), fjall::Error>(()) + /// ``` + #[must_use] + pub fn seqno_generator(mut self, generator: SharedSequenceNumberGenerator) -> Self { + self.inner.seqno_generator = Some(generator); + self + } } diff --git a/src/db.rs b/src/db.rs index 2ff935ec..6535cb9b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -23,7 +23,7 @@ use crate::{ write_buffer_manager::WriteBufferManager, HashMap, Keyspace, KeyspaceCreateOptions, }; -use lsm_tree::{AbstractTree, SequenceNumberCounter}; +use lsm_tree::{AbstractTree, SequenceNumberCounter, SharedSequenceNumberGenerator}; use std::{ fs::remove_dir_all, path::Path, @@ -56,7 +56,7 @@ pub struct DatabaseInner { pub(crate) stats: Arc, - pub(crate) keyspace_id_counter: SequenceNumberCounter, + pub(crate) keyspace_id_counter: SharedSequenceNumberGenerator, pub worker_pool: WorkerPool, @@ -582,6 +582,26 @@ impl Database { self.supervisor.snapshot_tracker.get() } + /// Creates the write seqno generator and a separate MVCC visibility watermark. + /// + /// The write generator comes from the user-supplied config (or falls back to + /// the default [`SequenceNumberCounter`]). The visibility watermark is always + /// a fresh counter — it tracks "up to which seqno are writes visible", not + /// "generate the next seqno". + fn make_seqno_generators( + config: &Config, + ) -> (SharedSequenceNumberGenerator, SharedSequenceNumberGenerator) { + let seqno: SharedSequenceNumberGenerator = config + .seqno_generator + .clone() + .unwrap_or_else(|| Arc::new(SequenceNumberCounter::default())); + + let visible_seqno: SharedSequenceNumberGenerator = + Arc::new(SequenceNumberCounter::default()); + + (seqno, visible_seqno) + } + fn check_version>(path: P) -> crate::Result<()> { let bytes = std::fs::read(path.as_ref().join(VERSION_MARKER))?; @@ -631,15 +651,14 @@ impl Database { let journal_manager = JournalManager::new(); - let seqno = SequenceNumberCounter::default(); - let visible_seqno = SequenceNumberCounter::default(); + let (seqno, visible_seqno) = Self::make_seqno_generators(&config); let keyspaces = Arc::new(RwLock::new(Keyspaces::with_capacity_and_hasher( 10, xxhash_rust::xxh3::Xxh3Builder::new(), ))); - let meta_tree = lsm_tree::Config::new( + let meta_tree = lsm_tree::Config::new_with_generators( config.path.join(KEYSPACES_FOLDER).join("0"), seqno.clone(), visible_seqno.clone(), @@ -690,7 +709,7 @@ impl Database { let inner = DatabaseInner { supervisor, worker_pool: WorkerPool::prepare(), - keyspace_id_counter: SequenceNumberCounter::new(1), + keyspace_id_counter: Arc::new(SequenceNumberCounter::new(1)), meta_keyspace: meta_keyspace.clone(), config, stop_signal: lsm_tree::stop_signal::StopSignal::default(), @@ -877,15 +896,14 @@ impl Database { fsync_directory(&keyspaces_folder_path)?; fsync_directory(&config.path)?; - let seqno = SequenceNumberCounter::default(); - let visible_seqno = SequenceNumberCounter::default(); + let (seqno, visible_seqno) = Self::make_seqno_generators(&config); let keyspaces = Arc::new(RwLock::new(Keyspaces::with_capacity_and_hasher( 10, xxhash_rust::xxh3::Xxh3Builder::new(), ))); - let meta_tree = lsm_tree::Config::new( + let meta_tree = lsm_tree::Config::new_with_generators( config.path.join(KEYSPACES_FOLDER).join("0"), seqno.clone(), visible_seqno.clone(), @@ -935,7 +953,7 @@ impl Database { let inner = DatabaseInner { supervisor, worker_pool: WorkerPool::prepare(), - keyspace_id_counter: SequenceNumberCounter::new(1), + keyspace_id_counter: Arc::new(SequenceNumberCounter::new(1)), meta_keyspace, config, stop_signal: lsm_tree::stop_signal::StopSignal::default(), diff --git a/src/db_config.rs b/src/db_config.rs index abc3a7dd..b112ad41 100644 --- a/src/db_config.rs +++ b/src/db_config.rs @@ -3,7 +3,7 @@ // (found in the LICENSE-* files in the repository) use crate::path::absolute_path; -use lsm_tree::{Cache, CompressionType, DescriptorTable}; +use lsm_tree::{Cache, CompressionType, DescriptorTable, SharedSequenceNumberGenerator}; use std::{ path::{Path, PathBuf}, sync::Arc, @@ -50,6 +50,12 @@ pub struct Config { // pub(crate) journal_recovery_mode: RecoveryMode, // pub(crate) compaction_filter_factory_assigner: Option, + + /// Custom sequence number generator. + /// + /// When set, this generator is used instead of the default + /// [`lsm_tree::SequenceNumberCounter`]. + pub(crate) seqno_generator: Option, } const DEFAULT_CPU_CORES: usize = 4; @@ -92,6 +98,7 @@ impl Config { cache: Arc::new(Cache::with_capacity_bytes(/* 32 MiB */ 32 * 1_024 * 1_024)), compaction_filter_factory_assigner: None, + seqno_generator: None, } } } diff --git a/src/keyspace/mod.rs b/src/keyspace/mod.rs index 9a843014..83c894dd 100644 --- a/src/keyspace/mod.rs +++ b/src/keyspace/mod.rs @@ -338,7 +338,7 @@ impl Keyspace { std::fs::create_dir_all(&base_folder)?; - let base_config = lsm_tree::Config::new( + let base_config = lsm_tree::Config::new_with_generators( base_folder, db.supervisor.seqno.clone(), db.supervisor.snapshot_tracker.get_ref(), diff --git a/src/lib.rs b/src/lib.rs index e648743b..3f689233 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -169,7 +169,8 @@ pub use tx::optimistic::{ pub use lsm_tree::{AbstractTree, AnyTree, Error as LsmError, TreeType}; pub use lsm_tree::{ - CompressionType, KvPair, KvSeparationOptions, SeqNo, Slice, UserKey, UserValue, + CompressionType, KvPair, KvSeparationOptions, SeqNo, SequenceNumberCounter, + SequenceNumberGenerator, SharedSequenceNumberGenerator, Slice, UserKey, UserValue, }; /// Utility functions diff --git a/src/meta_keyspace.rs b/src/meta_keyspace.rs index 049425e6..721f308f 100644 --- a/src/meta_keyspace.rs +++ b/src/meta_keyspace.rs @@ -4,7 +4,7 @@ use crate::{db::Keyspaces, keyspace::InternalKeyspaceId, Keyspace}; use byteview::StrView; -use lsm_tree::{AbstractTree, AnyTree, SeqNo, SequenceNumberCounter, UserValue}; +use lsm_tree::{AbstractTree, AnyTree, SeqNo, SharedSequenceNumberGenerator, UserValue}; use std::sync::{Arc, RwLock, RwLockWriteGuard}; pub fn encode_config_key( @@ -30,16 +30,16 @@ pub struct MetaKeyspace { #[doc(hidden)] pub keyspaces: Arc>, - seqno_generator: SequenceNumberCounter, - visible_seqno: SequenceNumberCounter, + seqno_generator: SharedSequenceNumberGenerator, + visible_seqno: SharedSequenceNumberGenerator, } impl MetaKeyspace { pub(crate) fn new( inner: AnyTree, keyspaces: Arc>, - seqno_generator: SequenceNumberCounter, - visible_seqno: SequenceNumberCounter, + seqno_generator: SharedSequenceNumberGenerator, + visible_seqno: SharedSequenceNumberGenerator, ) -> Self { Self { inner, diff --git a/src/recovery.rs b/src/recovery.rs index a0fb9253..527787aa 100644 --- a/src/recovery.rs +++ b/src/recovery.rs @@ -92,7 +92,7 @@ pub fn recover_keyspaces(db: &Database, meta_keyspace: &MetaKeyspace) -> crate:: recovered_config = recovered_config.with_compaction_filter_factory(f); } - let base_config = lsm_tree::Config::new( + let base_config = lsm_tree::Config::new_with_generators( path, db.supervisor.seqno.clone(), db.supervisor.snapshot_tracker.get_ref(), diff --git a/src/snapshot_tracker.rs b/src/snapshot_tracker.rs index 3d68fec2..ba25172f 100644 --- a/src/snapshot_tracker.rs +++ b/src/snapshot_tracker.rs @@ -4,12 +4,12 @@ use crate::{snapshot_nonce::SnapshotNonce, SeqNo}; use dashmap::DashMap; -use lsm_tree::SequenceNumberCounter; +use lsm_tree::SharedSequenceNumberGenerator; use std::sync::{atomic::AtomicU64, Arc, RwLock}; /// Keeps track of open snapshots pub struct SnapshotTrackerInner { - seqno: SequenceNumberCounter, + seqno: SharedSequenceNumberGenerator, gc_lock: RwLock<()>, @@ -33,7 +33,7 @@ impl std::ops::Deref for SnapshotTracker { } impl SnapshotTracker { - pub fn new(seqno: SequenceNumberCounter) -> Self { + pub fn new(seqno: SharedSequenceNumberGenerator) -> Self { Self(Arc::new(SnapshotTrackerInner { data: DashMap::default(), freed_count: AtomicU64::default(), @@ -43,7 +43,7 @@ impl SnapshotTracker { })) } - pub fn get_ref(&self) -> SequenceNumberCounter { + pub fn get_ref(&self) -> SharedSequenceNumberGenerator { self.seqno.clone() } @@ -183,6 +183,7 @@ impl SnapshotTracker { #[expect(clippy::unwrap_used)] mod tests { use super::*; + use lsm_tree::SequenceNumberCounter; use test_log::test; #[test] @@ -285,7 +286,7 @@ mod tests { fn snapshot_tracker_basic() { let global_seqno = SequenceNumberCounter::default(); - let map = SnapshotTracker::new(global_seqno.clone()); + let map = SnapshotTracker::new(global_seqno.clone().into()); let nonce = map.open(); assert_eq!(0, nonce.instant); @@ -304,7 +305,7 @@ mod tests { fn snapshot_tracker_increase_watermark() { let global_seqno = SequenceNumberCounter::default(); - let map = SnapshotTracker::new(global_seqno.clone()); + let map = SnapshotTracker::new(global_seqno.clone().into()); // Simulates some tx committing for _ in 0..100_000 { @@ -320,7 +321,7 @@ mod tests { fn snapshot_tracker_prevent_watermark() { let global_seqno = SequenceNumberCounter::default(); - let map = SnapshotTracker::new(global_seqno.clone()); + let map = SnapshotTracker::new(global_seqno.clone().into()); // This nonce prevents the watermark from increasing let _old_nonce = map.open(); @@ -342,7 +343,7 @@ mod tests { #[test] fn snapshot_tracker_close_never_opened_does_not_underflow_or_panic() { let global_seqno = SequenceNumberCounter::default(); - let map = SnapshotTracker::new(global_seqno); + let map = SnapshotTracker::new(global_seqno.into()); assert_eq!(map.len(), 0); map.close_raw(42); @@ -352,7 +353,7 @@ mod tests { #[test] fn snapshot_tracker_concurrent_open_same_seqno_counts_correctly() { let global_seqno = SequenceNumberCounter::default(); - let map = SnapshotTracker::new(global_seqno); + let map = SnapshotTracker::new(global_seqno.into()); // make sure seqno doesn't change between two opens let n1 = map.open(); @@ -372,7 +373,7 @@ mod tests { #[test] fn snapshot_tracker_publish_moves_seqno_forward_and_ignores_older() { let global_seqno = SequenceNumberCounter::default(); - let map = SnapshotTracker::new(global_seqno); + let map = SnapshotTracker::new(global_seqno.into()); let big = 100u64; map.publish(big); @@ -386,7 +387,7 @@ mod tests { #[test] fn snapshot_tracker_clone_snapshot_behaves_like_second_open() { let global_seqno = SequenceNumberCounter::default(); - let map = SnapshotTracker::new(global_seqno); + let map = SnapshotTracker::new(global_seqno.into()); let orig = map.open(); let clone = map.clone_snapshot(&orig); diff --git a/src/supervisor.rs b/src/supervisor.rs index a3206111..4567e2d1 100644 --- a/src/supervisor.rs +++ b/src/supervisor.rs @@ -2,7 +2,7 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) -use lsm_tree::SequenceNumberCounter; +use lsm_tree::SharedSequenceNumberGenerator; use crate::{ db::Keyspaces, @@ -23,7 +23,7 @@ pub struct SupervisorInner { pub(crate) write_buffer_size: WriteBufferManager, pub(crate) flush_manager: FlushManager, - pub seqno: SequenceNumberCounter, + pub seqno: SharedSequenceNumberGenerator, pub snapshot_tracker: SnapshotTracker, diff --git a/tests/custom_seqno_generator.rs b/tests/custom_seqno_generator.rs new file mode 100644 index 00000000..3b4e1c93 --- /dev/null +++ b/tests/custom_seqno_generator.rs @@ -0,0 +1,145 @@ +use fjall::{Database, KeyspaceCreateOptions, SeqNo, SequenceNumberGenerator}; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + +/// A tracking generator that records how many times each method was called. +#[derive(Debug)] +struct TrackingGenerator { + inner: AtomicU64, + next_count: AtomicU64, + get_count: AtomicU64, + set_count: AtomicU64, + fetch_max_count: AtomicU64, +} + +impl TrackingGenerator { + fn new() -> Self { + Self { + inner: AtomicU64::new(0), + next_count: AtomicU64::new(0), + get_count: AtomicU64::new(0), + set_count: AtomicU64::new(0), + fetch_max_count: AtomicU64::new(0), + } + } +} + +impl SequenceNumberGenerator for TrackingGenerator { + // NOTE: fetch_add returns the *previous* value — this matches SequenceNumberCounter's + // semantics where next() returns the current seqno then advances the counter. + fn next(&self) -> SeqNo { + self.next_count.fetch_add(1, Ordering::Relaxed); + self.inner.fetch_add(1, Ordering::AcqRel) + } + + fn get(&self) -> SeqNo { + self.get_count.fetch_add(1, Ordering::Relaxed); + self.inner.load(Ordering::Acquire) + } + + fn set(&self, value: SeqNo) { + self.set_count.fetch_add(1, Ordering::Relaxed); + self.inner.store(value, Ordering::Release); + } + + fn fetch_max(&self, value: SeqNo) { + self.fetch_max_count.fetch_add(1, Ordering::Relaxed); + self.inner.fetch_max(value, Ordering::AcqRel); + } +} + +#[test] +fn custom_generator_basic() -> fjall::Result<()> { + let folder = tempfile::tempdir()?; + + let generator = Arc::new(TrackingGenerator::new()); + let db = Database::builder(&folder) + .seqno_generator(generator.clone()) + .open()?; + + let tree = db.keyspace("default", KeyspaceCreateOptions::default)?; + + // Capture baseline after keyspace creation — metadata writes may have + // already advanced the generator before our inserts. + let baseline = generator.next_count.load(Ordering::Relaxed); + + tree.insert("a", "hello")?; + tree.insert("b", "world")?; + + let after_inserts = generator.next_count.load(Ordering::Relaxed); + assert!( + after_inserts - baseline >= 2, + "Custom generator should be called at least twice for two inserts (baseline={baseline}, after={after_inserts})", + ); + + assert_eq!(b"hello", &*tree.get("a")?.unwrap()); + assert_eq!(b"world", &*tree.get("b")?.unwrap()); + + Ok(()) +} + +#[test] +fn custom_generator_recovery() -> fjall::Result<()> { + let folder = tempfile::tempdir()?; + + // Write with custom generator + { + let generator = Arc::new(TrackingGenerator::new()); + let db = Database::builder(&folder) + .seqno_generator(generator.clone()) + .open()?; + + let tree = db.keyspace("default", KeyspaceCreateOptions::default)?; + tree.insert("key1", "value1")?; + tree.insert("key2", "value2")?; + + db.persist(fjall::PersistMode::SyncAll)?; + } + + // Recover with a new custom generator — recovery should call .set()/.fetch_max() + { + let generator = Arc::new(TrackingGenerator::new()); + let db = Database::builder(&folder) + .seqno_generator(generator.clone()) + .open()?; + + let tree = db.keyspace("default", KeyspaceCreateOptions::default)?; + + // Data survived recovery + assert_eq!(b"value1", &*tree.get("key1")?.unwrap()); + assert_eq!(b"value2", &*tree.get("key2")?.unwrap()); + + // Recovery advanced the generator state (via fetch_max or set) + let fetch_max_calls = generator.fetch_max_count.load(Ordering::Relaxed); + let set_calls = generator.set_count.load(Ordering::Relaxed); + assert!( + fetch_max_calls > 0 || set_calls > 0, + "Recovery should advance the custom generator state via fetch_max()/set()", + ); + + // Can write after recovery + tree.insert("key3", "value3")?; + assert_eq!(b"value3", &*tree.get("key3")?.unwrap()); + } + + Ok(()) +} + +#[test] +fn default_generator_unchanged() -> fjall::Result<()> { + let folder = tempfile::tempdir()?; + + // Without custom generator, everything works as before + let db = Database::builder(&folder).open()?; + let tree = db.keyspace("default", KeyspaceCreateOptions::default)?; + + tree.insert("a", "1")?; + tree.insert("b", "2")?; + + assert_eq!(b"1", &*tree.get("a")?.unwrap()); + assert_eq!(b"2", &*tree.get("b")?.unwrap()); + + Ok(()) +}