diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 411be9352..949b1cf3d 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -62,10 +62,24 @@ cargo fmt --all -- --check # Format check | `bytes_1` | Use `bytes` crate for Slice type | | `metrics` | Expose prometheus metrics | +## Design Decision Analysis (CRITICAL) + +**Before flagging any potential issue, trace the FULL call chain:** + +1. **Read the caller** — does the caller already handle the case you're about to flag? If a function returns a value that looks problematic in isolation, check every call site. The caller may handle the edge case explicitly (e.g., empty collections, `None` vs `Some(empty)`). +2. **Check type-level guarantees** — does the type system prevent the issue? Different enum variants, wrapper types, or visibility modifiers may make a "collision" or "misuse" structurally impossible. +3. **Read adjacent comments** — comments starting with "NOTE:", "Use X instead of Y because...", or explaining WHY a specific approach was chosen document deliberate design decisions. If the comment accurately describes the code behavior, the design is intentional. + +**Only flag an issue if it survives all three checks.** A finding that looks like a bug in one function but is handled by its caller is not a bug — it is a protocol between the two functions. + ## Architecture Notes - `src/table/block/` — On-disk block format (header + compressed payload) - `src/vlog/blob_file/` — Value log for large values (separate from LSM blocks) - `src/compaction/` — Compaction strategies (leveled, FIFO, tiered) - `src/seqno.rs` — Sequence number generator (MVCC versioning) +- `src/range_tombstone.rs` — Range tombstone data model and serialization +- `src/range_tombstone_filter.rs` — MVCC-aware range tombstone filtering for iterators +- `src/active_tombstone_set.rs` — Tracks active range tombstones during compaction +- `src/memtable/interval_tree.rs` — Interval tree for memtable range tombstone queries - Compression is pluggable via `CompressionType` enum with `#[cfg(feature)]` variants diff --git a/.github/instructions/rust.instructions.md b/.github/instructions/rust.instructions.md index c3bcf0684..3fec2e21d 100644 --- a/.github/instructions/rust.instructions.md +++ b/.github/instructions/rust.instructions.md @@ -43,6 +43,10 @@ Focus review effort on real bugs, not cosmetics. Stop after finding issues in hi These are not actionable review findings. Do not raise them: +- **Caller-handled edge cases**: Before flagging a function for not handling an edge case (empty collection, `None` vs `Some(empty)`, missing guard), read ALL call sites. If every caller already handles the case, the function's behavior is part of a deliberate contract — not a bug. Only flag if the edge case is truly unhandled end-to-end. +- **Type-system-prevented issues**: Before flagging a potential collision, overlap, or misuse, check whether distinct enum variants, wrapper types, or visibility modifiers make the issue structurally impossible. A `WeakTombstone` variant that never appears in user-facing merge paths cannot collide with user data regardless of key/seqno overlap. +- **Documented design decisions**: When code has a comment explaining WHY a specific approach was chosen, trust the documented reasoning. Flag only if the comment contradicts the actual code behavior — not if you would have chosen a different approach. + - **Comment wording vs code behavior**: If a comment says "flush when full" but the threshold is checked with `>=` not `>`, the intent is clear — the boundary condition is a design choice. Do not suggest rewording comments to match exact comparison operators. - **Comment precision**: "returns the block" when it technically returns `Result` — the comment conveys meaning, not type signature. - **Magic numbers with context**: `4` in `assert_eq!(header.len(), 4, "expected u32 checksum")` — the assertion message provides the context. Do not suggest a named constant when the value is used once in a test with an explanatory message. diff --git a/src/abstract_tree.rs b/src/abstract_tree.rs index c6f1aee9b..a4984b923 100644 --- a/src/abstract_tree.rs +++ b/src/abstract_tree.rs @@ -76,7 +76,9 @@ pub trait AbstractTree { _lock: &MutexGuard<'_, ()>, seqno_threshold: SeqNo, ) -> crate::Result> { - use crate::{compaction::stream::CompactionStream, merge::Merger}; + use crate::{ + compaction::stream::CompactionStream, merge::Merger, range_tombstone::RangeTombstone, + }; let version_history = self.get_version_history_lock(); let latest = version_history.latest_version(); @@ -93,6 +95,13 @@ pub trait AbstractTree { let flushed_size = latest.sealed_memtables.iter().map(|mt| mt.size()).sum(); + // Collect range tombstones from sealed memtables + let mut range_tombstones: Vec = Vec::new(); + for mt in latest.sealed_memtables.iter() { + range_tombstones.extend(mt.range_tombstones_sorted()); + } + range_tombstones.sort(); + let merger = Merger::new( latest .sealed_memtables @@ -104,7 +113,21 @@ pub trait AbstractTree { drop(version_history); - if let Some((tables, blob_files)) = self.flush_to_tables(stream)? { + // Clone needed: flush_to_tables_with_rt consumes the Vec, but on the + // RT-only path (no KV data, tables.is_empty()) we re-insert RTs into the + // active memtable. Flush is infrequent and RT count is small. + if let Some((tables, blob_files)) = + self.flush_to_tables_with_rt(stream, range_tombstones.clone())? + { + // If no tables were produced (RT-only memtable), re-insert RTs + // into active memtable so they aren't lost + if tables.is_empty() && !range_tombstones.is_empty() { + let active = self.active_memtable(); + for rt in &range_tombstones { + active.insert_range_tombstone(rt.start.clone(), rt.end.clone(), rt.seqno); + } + } + self.register_tables( &tables, blob_files.as_deref(), @@ -216,10 +239,23 @@ pub trait AbstractTree { /// # Errors /// /// Will return `Err` if an IO error occurs. - #[warn(clippy::type_complexity)] fn flush_to_tables( &self, stream: impl Iterator>, + ) -> crate::Result> { + self.flush_to_tables_with_rt(stream, Vec::new()) + } + + /// Like [`AbstractTree::flush_to_tables`], but also writes range tombstones. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + #[doc(hidden)] + fn flush_to_tables_with_rt( + &self, + stream: impl Iterator>, + range_tombstones: Vec, ) -> crate::Result>; /// Atomically registers flushed tables into the tree, removing their associated sealed memtables. @@ -680,4 +716,34 @@ pub trait AbstractTree { /// Will return `Err` if an IO error occurs. #[doc(hidden)] fn remove_weak>(&self, key: K, seqno: SeqNo) -> (u64, u64); + + /// Deletes all keys in the range `[start, end)` by inserting a range tombstone. + /// + /// This is much more efficient than deleting keys individually when + /// removing a contiguous range of keys. + /// + /// Returns the approximate size added to the memtable. + /// Returns 0 if `start >= end` (invalid interval is silently ignored). + fn remove_range>(&self, start: K, end: K, seqno: SeqNo) -> u64; + + /// Deletes all keys with the given prefix by inserting a range tombstone. + /// + /// This is sugar over [`AbstractTree::remove_range`] using prefix bounds. + /// + /// Returns the approximate size added to the memtable. + /// Returns 0 for empty prefixes or all-`0xFF` prefixes (cannot form valid half-open range). + fn remove_prefix>(&self, prefix: K, seqno: SeqNo) -> u64 { + use crate::range::prefix_to_range; + use std::ops::Bound; + + let (lo, hi) = prefix_to_range(prefix.as_ref()); + + let Bound::Included(start) = lo else { return 0 }; + + // Bound::Unbounded means the prefix is all 0xFF — no representable + // exclusive upper bound exists, so we cannot form a valid range tombstone. + let Bound::Excluded(end) = hi else { return 0 }; + + self.remove_range(start, end, seqno) + } } diff --git a/src/active_tombstone_set.rs b/src/active_tombstone_set.rs new file mode 100644 index 000000000..87e029003 --- /dev/null +++ b/src/active_tombstone_set.rs @@ -0,0 +1,392 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! Active tombstone sets for tracking range tombstones during iteration. +//! +//! During forward or reverse scans, range tombstones must be activated when +//! the scan enters their range and expired when it leaves. These sets use +//! a seqno multiset (`BTreeMap`) for O(log t) max-seqno queries, +//! and a heap for efficient expiry tracking. +//! +//! A unique monotonic `id` on each heap entry ensures total ordering in the +//! heap (no equality on the tuple), which makes expiry deterministic. + +use crate::{range_tombstone::RangeTombstone, SeqNo, UserKey}; +use std::cmp::Reverse; +use std::collections::{BTreeMap, BinaryHeap}; + +/// Tracks active range tombstones during forward iteration. +/// +/// Tombstones are activated when the scan reaches their `start` key, and +/// expired when the scan reaches or passes their `end` key. +/// +/// Uses a min-heap (via `Reverse`) keyed by `(end, id, seqno)` so the +/// tombstone expiring soonest (smallest `end`) is at the top. +pub struct ActiveTombstoneSet { + seqno_counts: BTreeMap, + pending_expiry: BinaryHeap>, + cutoff_seqno: SeqNo, + next_id: u64, +} + +impl ActiveTombstoneSet { + /// Creates a new forward active tombstone set. + /// + /// Only tombstones with `seqno < cutoff_seqno` will be activated. + #[must_use] + pub fn new(cutoff_seqno: SeqNo) -> Self { + Self { + seqno_counts: BTreeMap::new(), + pending_expiry: BinaryHeap::new(), + cutoff_seqno, + next_id: 0, + } + } + + /// Activates a range tombstone, adding it to the active set. + /// + /// The tombstone is only activated if it is visible at the cutoff seqno + /// (i.e., `rt.seqno < cutoff_seqno`). Duplicate activations (same seqno + /// from different sources) are handled correctly via multiset accounting. + pub fn activate(&mut self, rt: &RangeTombstone) { + if !rt.visible_at(self.cutoff_seqno) { + return; + } + let id = self.next_id; + self.next_id += 1; + *self.seqno_counts.entry(rt.seqno).or_insert(0) += 1; + self.pending_expiry + .push(Reverse((rt.end.clone(), id, rt.seqno))); + } + + /// Expires tombstones whose `end <= current_key`. + /// + /// In the half-open convention `[start, end)`, a tombstone stops covering + /// keys at `end`. So when `current_key >= end`, the tombstone no longer + /// applies and is removed from the active set. + /// + /// # Panics + /// + /// Panics if an expiry pop has no matching activation in the seqno multiset. + pub fn expire_until(&mut self, current_key: &[u8]) { + while let Some(Reverse((ref end, _, seqno))) = self.pending_expiry.peek() { + if current_key >= end.as_ref() { + let seqno = *seqno; + self.pending_expiry.pop(); + #[expect( + clippy::expect_used, + reason = "expiry pop must have matching activation" + )] + let count = self + .seqno_counts + .get_mut(&seqno) + .expect("expiry pop must have matching activation"); + *count -= 1; + if *count == 0 { + self.seqno_counts.remove(&seqno); + } + } else { + break; + } + } + } + + /// Returns the highest seqno among all active tombstones, or `None` if + /// no tombstones are active. + #[must_use] + pub fn max_active_seqno(&self) -> Option { + self.seqno_counts.keys().next_back().copied() + } + + /// Returns `true` if a KV with the given seqno is suppressed by any + /// active tombstone (i.e., there exists an active tombstone with a + /// higher seqno). + #[must_use] + pub fn is_suppressed(&self, key_seqno: SeqNo) -> bool { + self.max_active_seqno().is_some_and(|max| key_seqno < max) + } + + /// Bulk-activates tombstones at a seek position. + /// + /// # Invariant + /// + /// At any iterator position, the active set contains only tombstones + /// where `start <= current_key < end` (and visible at `cutoff_seqno`). + /// Seek prefill must collect truly overlapping tombstones + /// (`start <= key < end`); `expire_until` immediately enforces the + /// `end` bound. + #[expect(dead_code, reason = "used by iterator initialization logic")] + pub fn initialize_from(&mut self, tombstones: impl IntoIterator) { + for rt in tombstones { + self.activate(&rt); + } + } + + /// Returns `true` if there are no active tombstones. + #[expect(dead_code, reason = "helper for callers to inspect active tombstones")] + #[must_use] + pub fn is_empty(&self) -> bool { + self.seqno_counts.is_empty() + } +} + +/// Tracks active range tombstones during reverse iteration. +/// +/// During reverse scans, tombstones are activated when the scan reaches +/// a key < `end` (strict `>`: `rt.end > current_key`), and expired when +/// `current_key < rt.start`. +/// +/// Uses a max-heap keyed by `(start, id, seqno)` so the tombstone +/// expiring soonest (largest `start`) is at the top. +pub struct ActiveTombstoneSetReverse { + seqno_counts: BTreeMap, + pending_expiry: BinaryHeap<(UserKey, u64, SeqNo)>, + cutoff_seqno: SeqNo, + next_id: u64, +} + +impl ActiveTombstoneSetReverse { + /// Creates a new reverse active tombstone set. + /// + /// Only tombstones with `seqno < cutoff_seqno` will be activated. + #[must_use] + pub fn new(cutoff_seqno: SeqNo) -> Self { + Self { + seqno_counts: BTreeMap::new(), + pending_expiry: BinaryHeap::new(), + cutoff_seqno, + next_id: 0, + } + } + + /// Activates a range tombstone, adding it to the active set. + /// + /// The tombstone is only activated if it is visible at the cutoff seqno + /// (i.e., `rt.seqno < cutoff_seqno`). Duplicate activations (same seqno + /// from different sources) are handled correctly via multiset accounting. + /// + /// For reverse iteration, activation uses strict `>`: tombstones with + /// `rt.end > current_key` are activated. `key == end` is NOT covered + /// (half-open). + pub fn activate(&mut self, rt: &RangeTombstone) { + if !rt.visible_at(self.cutoff_seqno) { + return; + } + let id = self.next_id; + self.next_id += 1; + *self.seqno_counts.entry(rt.seqno).or_insert(0) += 1; + self.pending_expiry.push((rt.start.clone(), id, rt.seqno)); + } + + /// Expires tombstones whose `start > current_key`. + /// + /// During reverse iteration, when the scan moves to a key that is + /// before a tombstone's start, that tombstone no longer applies. + /// + /// # Panics + /// + /// Panics if an expiry pop has no matching activation in the seqno multiset. + pub fn expire_until(&mut self, current_key: &[u8]) { + while let Some((ref start, _, seqno)) = self.pending_expiry.peek() { + if current_key < start.as_ref() { + let seqno = *seqno; + self.pending_expiry.pop(); + #[expect( + clippy::expect_used, + reason = "expiry pop must have matching activation" + )] + let count = self + .seqno_counts + .get_mut(&seqno) + .expect("expiry pop must have matching activation"); + *count -= 1; + if *count == 0 { + self.seqno_counts.remove(&seqno); + } + } else { + break; + } + } + } + + /// Returns the highest seqno among all active tombstones, or `None` if + /// no tombstones are active. + #[must_use] + pub fn max_active_seqno(&self) -> Option { + self.seqno_counts.keys().next_back().copied() + } + + /// Returns `true` if a KV with the given seqno is suppressed by any + /// active tombstone (i.e., there exists an active tombstone with a + /// higher seqno). + #[must_use] + pub fn is_suppressed(&self, key_seqno: SeqNo) -> bool { + self.max_active_seqno().is_some_and(|max| key_seqno < max) + } + + /// Bulk-activates tombstones at a seek position (for reverse). + #[expect(dead_code, reason = "used by iterator initialization logic")] + pub fn initialize_from(&mut self, tombstones: impl IntoIterator) { + for rt in tombstones { + self.activate(&rt); + } + } + + /// Returns `true` if there are no active tombstones. + #[expect(dead_code, reason = "helper for callers to inspect active tombstones")] + #[must_use] + pub fn is_empty(&self) -> bool { + self.seqno_counts.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::UserKey; + + fn rt(start: &[u8], end: &[u8], seqno: SeqNo) -> RangeTombstone { + RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) + } + + // ──── Forward tests ──── + + #[test] + fn forward_activate_and_suppress() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + assert!(set.is_suppressed(5)); + assert!(!set.is_suppressed(10)); + assert!(!set.is_suppressed(15)); + } + + #[test] + fn forward_expire_at_end() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + assert!(set.is_suppressed(5)); + set.expire_until(b"m"); // key == end, tombstone expires + assert!(!set.is_suppressed(5)); + } + + #[test] + fn forward_expire_past_end() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + set.expire_until(b"z"); + assert!(set.is_empty()); + } + + #[test] + fn forward_not_expired_before_end() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + set.expire_until(b"l"); + assert!(set.is_suppressed(5)); // still active + } + + #[test] + fn forward_invisible_tombstone_not_activated() { + let mut set = ActiveTombstoneSet::new(5); + set.activate(&rt(b"a", b"m", 10)); // seqno 10 > cutoff 5 + assert!(!set.is_suppressed(1)); + assert!(set.is_empty()); + } + + #[test] + fn forward_multiple_tombstones_max_seqno() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + set.activate(&rt(b"b", b"n", 20)); + assert_eq!(set.max_active_seqno(), Some(20)); + assert!(set.is_suppressed(15)); // 15 < 20 + } + + #[test] + fn forward_duplicate_end_seqno_accounting() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + set.activate(&rt(b"b", b"m", 10)); + assert_eq!(set.max_active_seqno(), Some(10)); + + set.expire_until(b"m"); + assert_eq!(set.max_active_seqno(), None); + assert!(set.is_empty()); + } + + #[test] + fn forward_initialize_from() { + let mut set = ActiveTombstoneSet::new(100); + set.initialize_from(vec![rt(b"a", b"m", 10), rt(b"b", b"z", 20)]); + assert_eq!(set.max_active_seqno(), Some(20)); + } + + #[test] + fn forward_initialize_and_expire() { + let mut set = ActiveTombstoneSet::new(100); + set.initialize_from(vec![rt(b"a", b"d", 10), rt(b"b", b"f", 20)]); + set.expire_until(b"e"); // expires [a,d) but not [b,f) + assert_eq!(set.max_active_seqno(), Some(20)); + set.expire_until(b"f"); // expires [b,f) + assert!(set.is_empty()); + } + + // ──── Reverse tests ──── + + #[test] + fn reverse_activate_and_suppress() { + let mut set = ActiveTombstoneSetReverse::new(100); + set.activate(&rt(b"a", b"m", 10)); + assert!(set.is_suppressed(5)); + assert!(!set.is_suppressed(10)); + } + + #[test] + fn reverse_expire_before_start() { + let mut set = ActiveTombstoneSetReverse::new(100); + set.activate(&rt(b"d", b"m", 10)); + + set.expire_until(b"c"); + assert!(set.is_empty()); + } + + #[test] + fn reverse_not_expired_at_start() { + let mut set = ActiveTombstoneSetReverse::new(100); + set.activate(&rt(b"d", b"m", 10)); + + set.expire_until(b"d"); + assert!(set.is_suppressed(5)); + } + + #[test] + fn reverse_invisible_tombstone_not_activated() { + let mut set = ActiveTombstoneSetReverse::new(5); + set.activate(&rt(b"a", b"m", 10)); + assert!(set.is_empty()); + } + + #[test] + fn reverse_duplicate_end_seqno_accounting() { + let mut set = ActiveTombstoneSetReverse::new(100); + set.activate(&rt(b"d", b"m", 10)); + set.activate(&rt(b"d", b"n", 10)); + assert_eq!(set.max_active_seqno(), Some(10)); + + set.expire_until(b"c"); + assert_eq!(set.max_active_seqno(), None); + assert!(set.is_empty()); + } + + #[test] + fn reverse_multiple_tombstones() { + let mut set = ActiveTombstoneSetReverse::new(100); + set.activate(&rt(b"a", b"m", 10)); + set.activate(&rt(b"f", b"z", 20)); + assert_eq!(set.max_active_seqno(), Some(20)); + + set.expire_until(b"e"); + assert_eq!(set.max_active_seqno(), Some(10)); + } +} diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 58777024c..d803a8f87 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -356,9 +356,10 @@ impl AbstractTree for BlobTree { } #[expect(clippy::too_many_lines, reason = "flush logic is inherently complex")] - fn flush_to_tables( + fn flush_to_tables_with_rt( &self, stream: impl Iterator>, + range_tombstones: Vec, ) -> crate::Result, Option>)>> { use crate::{ coding::Encode, file::BLOBS_FOLDER, file::TABLES_FOLDER, @@ -439,6 +440,11 @@ impl AbstractTree for BlobTree { let separation_threshold = kv_opts.separation_threshold; + // Set range tombstones BEFORE writing KV items so that if MultiWriter + // rotates to a new table during the write loop, earlier tables already + // carry the RT metadata. + table_writer.set_range_tombstones(range_tombstones); + for item in stream { let item = item?; @@ -503,6 +509,9 @@ impl AbstractTree for BlobTree { }) .collect::>>()?; + // Return Some even when tables is empty (RT-only flush): the caller + // (AbstractTree::flush) handles empty tables by re-inserting RTs into + // the active memtable and still needs to delete sealed memtables. Ok(Some((tables, Some(blob_files)))) } @@ -643,4 +652,8 @@ impl AbstractTree for BlobTree { fn remove_weak>(&self, key: K, seqno: SeqNo) -> (u64, u64) { self.index.remove_weak(key, seqno) } + + fn remove_range>(&self, start: K, end: K, seqno: SeqNo) -> u64 { + self.index.remove_range(start, end, seqno) + } } diff --git a/src/compaction/flavour.rs b/src/compaction/flavour.rs index 9a5b34723..816a4a42b 100644 --- a/src/compaction/flavour.rs +++ b/src/compaction/flavour.rs @@ -8,6 +8,7 @@ use crate::coding::{Decode, Encode}; use crate::compaction::worker::Options; use crate::compaction::Input as CompactionPayload; use crate::file::TABLES_FOLDER; +use crate::range_tombstone::RangeTombstone; use crate::table::multi_writer::MultiWriter; use crate::version::{SuperVersions, Version}; use crate::vlog::blob_file::scanner::ScanEntry; @@ -77,7 +78,9 @@ pub(super) fn prepare_table_writer( opts.table_id_generator.clone(), payload.target_size, payload.dest_level, - )?; + )? + // Compaction consumes input tables, so clip RTs to each output table's key range. + .use_clip_range_tombstones(); if index_partitioning { table_writer = table_writer.use_partitioned_index(); @@ -120,7 +123,9 @@ pub(super) fn prepare_table_writer( pub(super) trait CompactionFlavour { fn write(&mut self, item: InternalValue) -> crate::Result<()>; - #[warn(clippy::too_many_arguments)] + /// Writes range tombstones to the current output table. + fn write_range_tombstones(&mut self, tombstones: &[RangeTombstone]); + fn finish( self: Box, super_version: &mut SuperVersions, @@ -164,6 +169,10 @@ impl RelocatingCompaction { } impl CompactionFlavour for RelocatingCompaction { + fn write_range_tombstones(&mut self, tombstones: &[RangeTombstone]) { + self.inner.write_range_tombstones(tombstones); + } + fn write(&mut self, item: InternalValue) -> crate::Result<()> { if item.key.value_type.is_indirection() { let mut reader = &item.value[..]; @@ -371,6 +380,10 @@ impl StandardCompaction { } impl CompactionFlavour for StandardCompaction { + fn write_range_tombstones(&mut self, tombstones: &[RangeTombstone]) { + self.table_writer.set_range_tombstones(tombstones.to_vec()); + } + fn write(&mut self, item: InternalValue) -> crate::Result<()> { let indirection = if item.key.value_type.is_indirection() { Some({ diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index 951fadbbf..46cf4a1f1 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -369,6 +369,16 @@ fn merge_tables( return Ok(()); }; + // Collect range tombstones from input tables before they are moved. + // Canonicalize to avoid duplicate RTs across input tables (MultiWriter + // rotation copies the same RT into every output table during flush). + let mut input_range_tombstones: Vec = tables + .iter() + .flat_map(|t| t.range_tombstones().iter().cloned()) + .collect(); + input_range_tombstones.sort(); + input_range_tombstones.dedup(); + let mut blob_frag_map = FragmentationMap::default(); let Some(mut merge_iter) = create_compaction_stream( @@ -489,6 +499,29 @@ fn merge_tables( drop(compaction_state); hidden_guard(payload, opts, || { + // Propagate range tombstones to output tables BEFORE writing KV items, + // so that if the compactor rotates tables during the merge loop, + // earlier tables already carry the RT metadata. + // At last level, evict tombstones below GC watermark. + if !input_range_tombstones.is_empty() { + let surviving: Vec<_> = if is_last_level { + input_range_tombstones + .into_iter() + .filter(|rt| rt.seqno >= opts.mvcc_gc_watermark) + .collect() + } else { + input_range_tombstones + }; + + if !surviving.is_empty() { + log::debug!( + "Propagating {} range tombstones to compaction output", + surviving.len(), + ); + compactor.write_range_tombstones(&surviving); + } + } + for (idx, item) in merge_iter.enumerate() { let item = item?; diff --git a/src/lib.rs b/src/lib.rs index ea505e657..6c0ea7d95 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -125,6 +125,10 @@ mod path; #[doc(hidden)] pub mod range; +pub(crate) mod active_tombstone_set; +pub(crate) mod range_tombstone; +pub(crate) mod range_tombstone_filter; + #[doc(hidden)] pub mod table; diff --git a/src/memtable/interval_tree.rs b/src/memtable/interval_tree.rs new file mode 100644 index 000000000..0cc070b03 --- /dev/null +++ b/src/memtable/interval_tree.rs @@ -0,0 +1,488 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! AVL-balanced interval tree for efficient range tombstone queries in memtables. +//! +//! Keyed by `start`, augmented with `subtree_max_end`, `subtree_max_seqno`, +//! and `subtree_min_seqno` for pruning during queries. + +use crate::range_tombstone::CoveringRt; +use crate::range_tombstone::RangeTombstone; +use crate::{SeqNo, UserKey}; +use std::cmp::Ordering; + +/// An AVL-balanced BST keyed by range tombstone `start`, augmented with +/// subtree-level metadata for efficient interval queries. +pub struct IntervalTree { + root: Option>, + len: usize, +} + +struct Node { + tombstone: RangeTombstone, + + // AVL metadata + height: i32, + left: Option>, + right: Option>, + + // Augmented metadata + subtree_max_end: UserKey, + subtree_max_seqno: SeqNo, + subtree_min_seqno: SeqNo, +} + +impl Node { + fn new(tombstone: RangeTombstone) -> Self { + let subtree_max_end = tombstone.end.clone(); + let seqno = tombstone.seqno; + Self { + tombstone, + height: 1, + left: None, + right: None, + subtree_max_end, + subtree_max_seqno: seqno, + subtree_min_seqno: seqno, + } + } + + fn update_augmentation(&mut self) { + self.subtree_max_end = self.tombstone.end.clone(); + self.subtree_max_seqno = self.tombstone.seqno; + self.subtree_min_seqno = self.tombstone.seqno; + self.height = 1; + + if let Some(ref left) = self.left { + if left.subtree_max_end > self.subtree_max_end { + self.subtree_max_end = left.subtree_max_end.clone(); + } + if left.subtree_max_seqno > self.subtree_max_seqno { + self.subtree_max_seqno = left.subtree_max_seqno; + } + if left.subtree_min_seqno < self.subtree_min_seqno { + self.subtree_min_seqno = left.subtree_min_seqno; + } + self.height = left.height + 1; + } + + if let Some(ref right) = self.right { + if right.subtree_max_end > self.subtree_max_end { + self.subtree_max_end = right.subtree_max_end.clone(); + } + if right.subtree_max_seqno > self.subtree_max_seqno { + self.subtree_max_seqno = right.subtree_max_seqno; + } + if right.subtree_min_seqno < self.subtree_min_seqno { + self.subtree_min_seqno = right.subtree_min_seqno; + } + let rh = right.height + 1; + if rh > self.height { + self.height = rh; + } + } + } + + fn balance_factor(&self) -> i32 { + let lh = self.left.as_ref().map_or(0, |n| n.height); + let rh = self.right.as_ref().map_or(0, |n| n.height); + lh - rh + } +} + +#[expect( + clippy::expect_used, + reason = "rotation invariant: left child must exist" +)] +fn rotate_right(mut node: Box) -> Box { + let mut new_root = node.left.take().expect("rotate_right requires left child"); + node.left = new_root.right.take(); + node.update_augmentation(); + new_root.right = Some(node); + new_root.update_augmentation(); + new_root +} + +#[expect( + clippy::expect_used, + reason = "rotation invariant: right child must exist" +)] +fn rotate_left(mut node: Box) -> Box { + let mut new_root = node.right.take().expect("rotate_left requires right child"); + node.right = new_root.left.take(); + node.update_augmentation(); + new_root.left = Some(node); + new_root.update_augmentation(); + new_root +} + +#[expect( + clippy::expect_used, + reason = "balance factor guarantees child existence" +)] +fn balance(mut node: Box) -> Box { + node.update_augmentation(); + let bf = node.balance_factor(); + + if bf > 1 { + // Left-heavy + if let Some(ref left) = node.left { + if left.balance_factor() < 0 { + // Left-Right case + node.left = Some(rotate_left(node.left.take().expect("just checked"))); + } + } + return rotate_right(node); + } + + if bf < -1 { + // Right-heavy + if let Some(ref right) = node.right { + if right.balance_factor() > 0 { + // Right-Left case + node.right = Some(rotate_right(node.right.take().expect("just checked"))); + } + } + return rotate_left(node); + } + + node +} + +/// Returns `(node, was_new)` — `was_new` is false when a duplicate was replaced. +fn insert_node(node: Option>, tombstone: RangeTombstone) -> (Box, bool) { + let Some(mut node) = node else { + return (Box::new(Node::new(tombstone)), true); + }; + + let was_new; + match tombstone.cmp(&node.tombstone) { + Ordering::Less => { + let (child, new) = insert_node(node.left.take(), tombstone); + node.left = Some(child); + was_new = new; + } + Ordering::Greater => { + let (child, new) = insert_node(node.right.take(), tombstone); + node.right = Some(child); + was_new = new; + } + Ordering::Equal => { + // Duplicate — replace (shouldn't normally happen) + node.tombstone = tombstone; + node.update_augmentation(); + return (node, false); + } + } + + (balance(node), was_new) +} + +/// Like `collect_overlapping`, but returns `true` as soon as any overlapping +/// tombstone with `seqno > key_seqno` is found. Avoids Vec allocation on the +/// hot read path. +fn any_overlapping_suppresses( + node: Option<&Node>, + key: &[u8], + key_seqno: SeqNo, + read_seqno: SeqNo, +) -> bool { + let Some(n) = node else { return false }; + + if n.subtree_min_seqno >= read_seqno { + return false; + } + + if n.subtree_max_end.as_ref() <= key { + return false; + } + + if any_overlapping_suppresses(n.left.as_deref(), key, key_seqno, read_seqno) { + return true; + } + + if n.tombstone.start.as_ref() <= key { + if n.tombstone.contains_key(key) + && n.tombstone.visible_at(read_seqno) + && n.tombstone.seqno > key_seqno + { + return true; + } + return any_overlapping_suppresses(n.right.as_deref(), key, key_seqno, read_seqno); + } + + false +} + +/// In-order traversal to produce sorted output. +fn inorder(node: Option<&Node>, result: &mut Vec) { + let Some(n) = node else { return }; + inorder(n.left.as_deref(), result); + result.push(n.tombstone.clone()); + inorder(n.right.as_deref(), result); +} + +/// Collects tombstones that fully cover `[min, max]` and are visible at `read_seqno`. +fn collect_covering( + node: Option<&Node>, + min: &[u8], + max: &[u8], + read_seqno: SeqNo, + best: &mut Option, +) { + let Some(n) = node else { return }; + + // Prune: no tombstone visible at this read_seqno + if n.subtree_min_seqno >= read_seqno { + return; + } + + // Prune: max_end <= max means no interval in subtree can fully cover [min, max] + // (need end > max, i.e., max_end > max for half-open covering) + if n.subtree_max_end.as_ref() <= max { + return; + } + + // Recurse left + collect_covering(n.left.as_deref(), min, max, read_seqno, best); + + // Check current node: must have start <= min AND max < end + if n.tombstone.start.as_ref() <= min + && n.tombstone.fully_covers(min, max) + && n.tombstone.visible_at(read_seqno) + { + let dominated = best.as_ref().is_some_and(|b| n.tombstone.seqno <= b.seqno); + if !dominated { + *best = Some(CoveringRt::from(&n.tombstone)); + } + } + + // Only go right if some right-subtree entry might have start <= min + if n.tombstone.start.as_ref() <= min { + collect_covering(n.right.as_deref(), min, max, read_seqno, best); + } +} + +impl IntervalTree { + /// Creates a new empty interval tree. + #[must_use] + pub fn new() -> Self { + Self { root: None, len: 0 } + } + + /// Inserts a range tombstone into the tree. O(log n). + pub fn insert(&mut self, tombstone: RangeTombstone) { + let (root, was_new) = insert_node(self.root.take(), tombstone); + self.root = Some(root); + if was_new { + self.len += 1; + } + } + + /// Returns `true` if the given key at the given seqno is suppressed by + /// any range tombstone visible at `read_seqno`. + /// + /// O(log n + k) where k is the number of overlapping tombstones. + /// Uses early-exit traversal to avoid allocating a Vec. + pub fn query_suppression(&self, key: &[u8], key_seqno: SeqNo, read_seqno: SeqNo) -> bool { + any_overlapping_suppresses(self.root.as_deref(), key, key_seqno, read_seqno) + } + + /// Returns the highest-seqno visible tombstone that fully covers `[min, max]`, + /// or `None` if no such tombstone exists. + /// + /// Used for table-skip decisions. + #[expect(dead_code, reason = "used for table-skip decisions")] + pub fn query_covering_rt_for_range( + &self, + min: &[u8], + max: &[u8], + read_seqno: SeqNo, + ) -> Option { + let mut best = None; + collect_covering(self.root.as_deref(), min, max, read_seqno, &mut best); + best + } + + /// Returns all tombstones in sorted order (by `RangeTombstone::Ord`). + /// + /// Used for flush. + pub fn iter_sorted(&self) -> Vec { + let mut result = Vec::with_capacity(self.len); + inorder(self.root.as_deref(), &mut result); + result + } + + /// Returns the number of tombstones in the tree. + #[must_use] + pub fn len(&self) -> usize { + self.len + } + + /// Returns `true` if the tree is empty. + #[expect( + dead_code, + reason = "tree may have tombstones but is_empty not called in all paths" + )] + #[must_use] + pub fn is_empty(&self) -> bool { + self.len == 0 + } +} + +impl Default for IntervalTree { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +#[expect(clippy::unwrap_used, clippy::indexing_slicing)] +mod tests { + use super::*; + + fn rt(start: &[u8], end: &[u8], seqno: SeqNo) -> RangeTombstone { + RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) + } + + #[test] + fn empty_tree_no_suppression() { + let tree = IntervalTree::new(); + assert!(!tree.query_suppression(b"key", 5, 100)); + } + + #[test] + fn single_tombstone_suppresses() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"b", b"y", 10)); + assert!(tree.query_suppression(b"c", 5, 100)); + } + + #[test] + fn single_tombstone_no_suppress_newer_kv() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"b", b"y", 10)); + assert!(!tree.query_suppression(b"c", 15, 100)); + } + + #[test] + fn single_tombstone_exclusive_end() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"b", b"y", 10)); + assert!(!tree.query_suppression(b"y", 5, 100)); + } + + #[test] + fn single_tombstone_before_start() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"b", b"y", 10)); + assert!(!tree.query_suppression(b"a", 5, 100)); + } + + #[test] + fn tombstone_not_visible() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"b", b"y", 10)); + assert!(!tree.query_suppression(b"c", 5, 9)); + } + + #[test] + fn multiple_tombstones() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"a", b"f", 10)); + tree.insert(rt(b"d", b"m", 20)); + tree.insert(rt(b"p", b"z", 5)); + + assert!(tree.query_suppression(b"e", 15, 100)); + assert!(tree.query_suppression(b"e", 5, 100)); + assert!(!tree.query_suppression(b"e", 25, 100)); + + assert!(tree.query_suppression(b"q", 3, 100)); + assert!(!tree.query_suppression(b"q", 10, 100)); + } + + #[test] + fn covering_rt_found() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"a", b"z", 50)); + tree.insert(rt(b"c", b"g", 10)); + + let crt = tree.query_covering_rt_for_range(b"b", b"y", 100); + assert!(crt.is_some()); + let crt = crt.unwrap(); + assert_eq!(crt.seqno, 50); + } + + #[test] + fn covering_rt_not_found_partial() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"c", b"g", 10)); + + let crt = tree.query_covering_rt_for_range(b"b", b"y", 100); + assert!(crt.is_none()); + } + + #[test] + fn covering_rt_highest_seqno() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"a", b"z", 50)); + tree.insert(rt(b"a", b"z", 100)); + + let crt = tree.query_covering_rt_for_range(b"b", b"y", 200); + assert!(crt.is_some()); + assert_eq!(crt.unwrap().seqno, 100); + } + + #[test] + fn iter_sorted_empty() { + let tree = IntervalTree::new(); + assert!(tree.iter_sorted().is_empty()); + } + + #[test] + fn iter_sorted_multiple() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"d", b"f", 10)); + tree.insert(rt(b"a", b"c", 20)); + tree.insert(rt(b"m", b"z", 5)); + + let sorted = tree.iter_sorted(); + assert_eq!(sorted.len(), 3); + assert_eq!(sorted[0].start.as_ref(), b"a"); + assert_eq!(sorted[1].start.as_ref(), b"d"); + assert_eq!(sorted[2].start.as_ref(), b"m"); + } + + #[test] + fn avl_balance_maintained() { + let mut tree = IntervalTree::new(); + for i in 0u8..20 { + let s = vec![i]; + let e = vec![i + 1]; + tree.insert(rt(&s, &e, u64::from(i))); + } + assert_eq!(tree.len(), 20); + if let Some(ref root) = tree.root { + assert!(root.height <= 6, "AVL height too large: {}", root.height); + } + } + + #[test] + fn seqno_pruning() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"a", b"z", 100)); + tree.insert(rt(b"b", b"y", 200)); + + assert!(!tree.query_suppression(b"c", 5, 50)); + } + + #[test] + fn max_end_pruning() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"a", b"c", 10)); + tree.insert(rt(b"b", b"d", 10)); + + assert!(!tree.query_suppression(b"e", 5, 100)); + } +} diff --git a/src/memtable/mod.rs b/src/memtable/mod.rs index ced9c12b7..4187e8b1e 100644 --- a/src/memtable/mod.rs +++ b/src/memtable/mod.rs @@ -2,14 +2,18 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) +pub mod interval_tree; + use crate::key::InternalKey; +use crate::range_tombstone::RangeTombstone; use crate::{ value::{InternalValue, SeqNo, UserValue}, - ValueType, + UserKey, ValueType, }; use crossbeam_skiplist::SkipMap; use std::ops::RangeBounds; use std::sync::atomic::{AtomicBool, AtomicU64}; +use std::sync::Mutex; pub use crate::tree::inner::MemtableId; @@ -24,6 +28,12 @@ pub struct Memtable { #[doc(hidden)] pub items: SkipMap, + /// Range tombstones stored in an interval tree. + /// + /// Protected by a Mutex since `IntervalTree` is not lock-free. + /// Contention is expected to be low — range deletes are infrequent. + pub(crate) range_tombstones: Mutex, + /// Approximate active memtable size. /// /// If this grows too large, a flush is triggered. @@ -61,6 +71,7 @@ impl Memtable { Self { id, items: SkipMap::default(), + range_tombstones: Mutex::new(interval_tree::IntervalTree::new()), approximate_size: AtomicU64::default(), highest_seqno: AtomicU64::default(), requested_rotation: AtomicBool::default(), @@ -135,10 +146,10 @@ impl Memtable { self.items.len() } - /// Returns `true` if the memtable is empty. + /// Returns `true` if the memtable has no KV items and no range tombstones. #[must_use] pub fn is_empty(&self) -> bool { - self.items.is_empty() + self.items.is_empty() && self.range_tombstone_count() == 0 } /// Inserts an item into the memtable @@ -166,6 +177,96 @@ impl Memtable { (item_size, size_before + item_size) } + /// Inserts a range tombstone covering `[start, end)` at the given seqno. + /// + /// Returns the approximate size added to the memtable. + /// + /// Returns 0 if `start >= end` or if either bound exceeds `u16::MAX` bytes. + /// + /// # Panics + /// + /// Panics if the internal mutex is poisoned. + pub fn insert_range_tombstone(&self, start: UserKey, end: UserKey, seqno: SeqNo) -> u64 { + // Reject invalid intervals in release builds (debug_assert is not enough) + if start >= end { + return 0; + } + + // On-disk RT format writes key lengths as u16, enforce at insertion time. + // Emit a warning when rejecting an oversized bound so this failure is diagnosable. + if u16::try_from(start.len()).is_err() || u16::try_from(end.len()).is_err() { + log::warn!( + "insert_range_tombstone: rejecting oversized range tombstone \ + bounds (start_len = {}, end_len = {}, max = {})", + start.len(), + end.len(), + u16::MAX, + ); + return 0; + } + + let size = (start.len() + end.len() + std::mem::size_of::()) as u64; + + #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")] + self.range_tombstones + .lock() + .expect("lock is poisoned") + .insert(RangeTombstone::new(start, end, seqno)); + + self.approximate_size + .fetch_add(size, std::sync::atomic::Ordering::AcqRel); + + self.highest_seqno + .fetch_max(seqno, std::sync::atomic::Ordering::AcqRel); + + size + } + + /// Returns `true` if the key at `key_seqno` is suppressed by a range tombstone + /// visible at `read_seqno`. + /// + /// # Panics + /// + /// Panics if the internal mutex is poisoned. + pub(crate) fn is_key_suppressed_by_range_tombstone( + &self, + key: &[u8], + key_seqno: SeqNo, + read_seqno: SeqNo, + ) -> bool { + #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")] + self.range_tombstones + .lock() + .expect("lock is poisoned") + .query_suppression(key, key_seqno, read_seqno) + } + + /// Returns all range tombstones in sorted order (for flush). + /// + /// # Panics + /// + /// Panics if the internal mutex is poisoned. + pub(crate) fn range_tombstones_sorted(&self) -> Vec { + #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")] + self.range_tombstones + .lock() + .expect("lock is poisoned") + .iter_sorted() + } + + /// Returns the number of range tombstones. + /// + /// # Panics + /// + /// Panics if the internal mutex is poisoned. + pub fn range_tombstone_count(&self) -> usize { + #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")] + self.range_tombstones + .lock() + .expect("lock is poisoned") + .len() + } + /// Returns the highest sequence number in the memtable. pub fn get_highest_seqno(&self) -> Option { if self.is_empty() { diff --git a/src/range.rs b/src/range.rs index c0cd5df94..b8d58b19e 100644 --- a/src/range.rs +++ b/src/range.rs @@ -7,6 +7,8 @@ use crate::{ memtable::Memtable, merge::Merger, mvcc_stream::MvccStream, + range_tombstone::RangeTombstone, + range_tombstone_filter::RangeTombstoneFilter, run_reader::RunReader, value::{SeqNo, UserKey}, version::SuperVersion, @@ -96,6 +98,10 @@ impl DoubleEndedIterator for TreeIter { } impl TreeIter { + #[expect( + clippy::too_many_lines, + reason = "create_range wires up multiple iterator sources, filters, and tombstone handling; splitting further would reduce clarity" + )] pub fn create_range, R: RangeBounds>( guard: IterState, range: R, @@ -145,6 +151,53 @@ impl TreeIter { let range = (lo, hi); + // Cheap pre-check: count total range tombstones before cloning + let rt_count = lock.version.active_memtable.range_tombstone_count() + + lock + .version + .sealed_memtables + .iter() + .map(|mt| mt.range_tombstone_count()) + .sum::() + + lock + .ephemeral + .as_ref() + .map_or(0, |(mt, _)| mt.range_tombstone_count()) + + lock + .version + .version + .iter_levels() + .flat_map(|lvl| lvl.iter()) + .flat_map(|run| run.iter()) + .map(|t| t.range_tombstones().len()) + .sum::(); + + // Only collect/clone tombstones when the total count is non-zero + let all_range_tombstones = if rt_count > 0 { + let mut rts: Vec = Vec::with_capacity(rt_count); + + rts.extend(lock.version.active_memtable.range_tombstones_sorted()); + for mt in lock.version.sealed_memtables.iter() { + rts.extend(mt.range_tombstones_sorted()); + } + if let Some((mt, _)) = &lock.ephemeral { + rts.extend(mt.range_tombstones_sorted()); + } + for table in lock + .version + .version + .iter_levels() + .flat_map(|lvl| lvl.iter()) + .flat_map(|run| run.iter()) + { + rts.extend(table.range_tombstones().iter().cloned()); + } + // No sort needed here — RangeTombstoneFilter::new sorts internally + rts + } else { + Vec::new() + }; + let mut iters: Vec> = Vec::with_capacity(5); for run in lock @@ -161,10 +214,26 @@ impl TreeIter { #[expect(clippy::expect_used, reason = "we checked for length")] let table = run.first().expect("should exist"); - if table.check_key_range_overlap(&( - range.start_bound().map(|x| &*x.user_key), - range.end_bound().map(|x| &*x.user_key), - )) { + // Table-skip: if a range tombstone fully covers this table + // with a higher seqno, skip it entirely (avoid I/O). + // NOTE: O(tables * rt_count) scan — acceptable for typical RT counts; + // pre-filtering visible RTs or indexing by range is a future optimization. + // key_range.max() is inclusive, fully_covers uses half-open: max < rt.end + let is_covered = all_range_tombstones.iter().any(|rt| { + rt.visible_at(seqno) + && rt.fully_covers( + table.metadata.key_range.min(), + table.metadata.key_range.max(), + ) + && rt.seqno > table.get_highest_seqno() + }); + + if !is_covered + && table.check_key_range_overlap(&( + range.start_bound().map(|x| &*x.user_key), + range.end_bound().map(|x| &*x.user_key), + )) + { let reader = table .range(( range.start_bound().map(|x| &x.user_key).cloned(), @@ -227,10 +296,17 @@ impl TreeIter { let merged = Merger::new(iters); let iter = MvccStream::new(merged); - Box::new(iter.filter(|x| match x { + let iter = iter.filter(|x| match x { Ok(value) => !value.key.is_tombstone(), Err(_) => true, - })) + }); + + // Fast path: skip filter wrapping when no tombstone is visible at this read seqno + if all_range_tombstones.iter().all(|rt| !rt.visible_at(seqno)) { + Box::new(iter) + } else { + Box::new(RangeTombstoneFilter::new(iter, all_range_tombstones, seqno)) + } }) } } diff --git a/src/range_tombstone.rs b/src/range_tombstone.rs new file mode 100644 index 000000000..a3612e5d2 --- /dev/null +++ b/src/range_tombstone.rs @@ -0,0 +1,370 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +use crate::{SeqNo, UserKey}; +use std::cmp::Reverse; + +/// A range tombstone that deletes all keys in `[start, end)` at a given sequence number. +/// +/// Half-open interval: `start` is inclusive, `end` is exclusive. +/// A key `k` is covered iff `start <= k < end`. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct RangeTombstone { + /// Inclusive start bound + pub start: UserKey, + /// Exclusive end bound + pub end: UserKey, + /// Sequence number at which this tombstone was written + pub seqno: SeqNo, +} + +impl RangeTombstone { + /// Creates a new range tombstone for `[start, end)` at the given seqno. + /// + /// # Panics (debug only) + /// + /// Debug-asserts that `start < end`. Callers must validate untrusted input + /// before constructing a `RangeTombstone`. + #[must_use] + pub fn new(start: UserKey, end: UserKey, seqno: SeqNo) -> Self { + debug_assert!(start < end, "range tombstone start must be < end"); + Self { start, end, seqno } + } + + /// Returns `true` if `key` is within `[start, end)`. + #[must_use] + pub fn contains_key(&self, key: &[u8]) -> bool { + self.start.as_ref() <= key && key < self.end.as_ref() + } + + /// Returns `true` if this tombstone is visible at the given read seqno. + /// + /// Uses exclusive boundary (`self.seqno < read_seqno`) consistent with + /// the codebase convention where `seqno` is an exclusive snapshot boundary. + #[must_use] + pub fn visible_at(&self, read_seqno: SeqNo) -> bool { + self.seqno < read_seqno + } + + /// Returns `true` if this tombstone should suppress a KV with the given seqno + /// at the given read snapshot. + /// + /// Suppress iff: `kv_seqno < self.seqno AND self.contains_key(key) AND self.visible_at(read_seqno)` + #[must_use] + pub fn should_suppress(&self, key: &[u8], kv_seqno: SeqNo, read_seqno: SeqNo) -> bool { + self.visible_at(read_seqno) && self.contains_key(key) && kv_seqno < self.seqno + } + + /// Returns the intersection of this tombstone with `[min, max)`, or `None` + /// if the ranges do not overlap. + /// + /// The resulting tombstone has the same seqno as `self`. + #[must_use] + pub fn intersect_opt(&self, min: &[u8], max: &[u8]) -> Option { + let new_start_ref = if self.start.as_ref() > min { + self.start.as_ref() + } else { + min + }; + let new_end_ref = if self.end.as_ref() < max { + self.end.as_ref() + } else { + max + }; + + if new_start_ref < new_end_ref { + Some(Self { + start: UserKey::from(new_start_ref), + end: UserKey::from(new_end_ref), + seqno: self.seqno, + }) + } else { + None + } + } + + /// Returns `true` if this tombstone fully covers the key range `[min, max]`. + /// + /// "Fully covers" means `self.start <= min` AND `max < self.end`. + /// This uses the half-open convention: the inclusive `max` must be + /// strictly less than the exclusive `end`. + #[must_use] + pub fn fully_covers(&self, min: &[u8], max: &[u8]) -> bool { + self.start.as_ref() <= min && max < self.end.as_ref() + } +} + +/// Ordered by `(start asc, seqno desc, end asc)`. +/// +/// The `end` tiebreaker ensures deterministic ordering for debug output +/// and property tests. +impl Ord for RangeTombstone { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + (&self.start, Reverse(self.seqno), &self.end).cmp(&( + &other.start, + Reverse(other.seqno), + &other.end, + )) + } +} + +impl PartialOrd for RangeTombstone { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// Information about a covering range tombstone, used for table-skip decisions. +/// +/// A covering tombstone fully covers a table's key range and has a seqno +/// greater than the table's max seqno, meaning the entire table can be skipped. +#[derive(Clone, Debug)] +pub struct CoveringRt { + /// The start key of the covering tombstone (inclusive) + pub start: UserKey, + /// The end key of the covering tombstone (exclusive) + pub end: UserKey, + /// The seqno of the covering tombstone + pub seqno: SeqNo, +} + +impl CoveringRt { + /// Returns `true` if this covering tombstone fully covers the given + /// key range `[min, max]` and has a higher seqno than the table's max. + #[must_use] + #[expect(dead_code, reason = "wired up in table-skip optimization")] + pub fn covers_table(&self, table_min: &[u8], table_max: &[u8], table_max_seqno: SeqNo) -> bool { + self.start.as_ref() <= table_min + && table_max < self.end.as_ref() + && self.seqno > table_max_seqno + } +} + +impl From<&RangeTombstone> for CoveringRt { + fn from(rt: &RangeTombstone) -> Self { + Self { + start: rt.start.clone(), + end: rt.end.clone(), + seqno: rt.seqno, + } + } +} + +/// Computes the upper bound exclusive key for use in range queries. +/// +/// Given a key, returns the next key in lexicographic order by appending `0x00`. +/// This is useful for converting inclusive upper bounds to exclusive ones +/// in range-cover queries. +/// +/// Returns `None` if the key is already at the maximum allowed length +/// (`u16::MAX`), since appending a byte would violate the u16 key-length +/// invariant used in the on-disk RT block format. +#[must_use] +pub fn upper_bound_exclusive(key: &[u8]) -> Option { + // The codebase enforces that user keys fit in a u16 length + // (see `InternalKey::new`). Appending a byte to a max-length key + // would overflow that limit and corrupt on-disk encodings. + if key.len() >= usize::from(u16::MAX) { + return None; + } + + let mut result = Vec::with_capacity(key.len() + 1); + result.extend_from_slice(key); + result.push(0x00); + Some(UserKey::from(result)) +} + +#[cfg(test)] +#[expect(clippy::unwrap_used)] +mod tests { + use super::*; + + fn rt(start: &[u8], end: &[u8], seqno: SeqNo) -> RangeTombstone { + RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) + } + + #[test] + fn contains_key_inclusive_start() { + let t = rt(b"b", b"d", 10); + assert!(t.contains_key(b"b")); + } + + #[test] + fn contains_key_exclusive_end() { + let t = rt(b"b", b"d", 10); + assert!(!t.contains_key(b"d")); + } + + #[test] + fn contains_key_middle() { + let t = rt(b"b", b"d", 10); + assert!(t.contains_key(b"c")); + } + + #[test] + fn contains_key_before_start() { + let t = rt(b"b", b"d", 10); + assert!(!t.contains_key(b"a")); + } + + #[test] + fn not_visible_at_equal() { + // Exclusive boundary: tombstone@10 is NOT visible at read_seqno=10 + let t = rt(b"a", b"z", 10); + assert!(!t.visible_at(10)); + } + + #[test] + fn visible_at_higher() { + let t = rt(b"a", b"z", 10); + assert!(t.visible_at(20)); + } + + #[test] + fn not_visible_at_lower() { + let t = rt(b"a", b"z", 10); + assert!(!t.visible_at(9)); + } + + #[test] + fn should_suppress_yes() { + let t = rt(b"b", b"d", 10); + // read_seqno=11 (exclusive: tombstone@10 visible at 11) + assert!(t.should_suppress(b"c", 5, 11)); + } + + #[test] + fn should_suppress_no_at_equal_seqno() { + let t = rt(b"b", b"d", 10); + // read_seqno=10: tombstone@10 NOT visible (exclusive boundary) + assert!(!t.should_suppress(b"c", 5, 10)); + } + + #[test] + fn should_suppress_no_newer_kv() { + let t = rt(b"b", b"d", 10); + assert!(!t.should_suppress(b"c", 15, 20)); + } + + #[test] + fn should_suppress_no_not_visible() { + let t = rt(b"b", b"d", 10); + assert!(!t.should_suppress(b"c", 5, 9)); + } + + #[test] + fn should_suppress_no_outside_range() { + let t = rt(b"b", b"d", 10); + assert!(!t.should_suppress(b"e", 5, 11)); + } + + #[test] + fn ordering_by_start_asc() { + let a = rt(b"a", b"z", 10); + let b = rt(b"b", b"z", 10); + assert!(a < b); + } + + #[test] + fn ordering_by_seqno_desc() { + let a = rt(b"a", b"z", 20); + let b = rt(b"a", b"z", 10); + assert!(a < b); // higher seqno comes first + } + + #[test] + fn ordering_by_end_asc_tiebreaker() { + let a = rt(b"a", b"m", 10); + let b = rt(b"a", b"z", 10); + assert!(a < b); + } + + #[test] + fn intersect_overlap() { + let t = rt(b"b", b"y", 10); + let clipped = t.intersect_opt(b"d", b"g").unwrap(); + assert_eq!(clipped.start.as_ref(), b"d"); + assert_eq!(clipped.end.as_ref(), b"g"); + assert_eq!(clipped.seqno, 10); + } + + #[test] + fn intersect_no_overlap() { + let t = rt(b"b", b"d", 10); + assert!(t.intersect_opt(b"e", b"g").is_none()); + } + + #[test] + fn intersect_partial_left() { + let t = rt(b"b", b"f", 10); + let clipped = t.intersect_opt(b"a", b"d").unwrap(); + assert_eq!(clipped.start.as_ref(), b"b"); + assert_eq!(clipped.end.as_ref(), b"d"); + } + + #[test] + fn intersect_partial_right() { + let t = rt(b"b", b"f", 10); + let clipped = t.intersect_opt(b"d", b"z").unwrap(); + assert_eq!(clipped.start.as_ref(), b"d"); + assert_eq!(clipped.end.as_ref(), b"f"); + } + + #[test] + fn fully_covers_yes() { + let t = rt(b"a", b"z", 10); + assert!(t.fully_covers(b"b", b"y")); + } + + #[test] + fn fully_covers_exact_start() { + let t = rt(b"a", b"z", 10); + assert!(t.fully_covers(b"a", b"y")); + } + + #[test] + fn fully_covers_no_end_equal() { + let t = rt(b"a", b"z", 10); + assert!(!t.fully_covers(b"a", b"z")); + } + + #[test] + fn fully_covers_no_start_before() { + let t = rt(b"b", b"z", 10); + assert!(!t.fully_covers(b"a", b"y")); + } + + #[test] + fn covering_rt_covers_table() { + let crt = CoveringRt { + start: UserKey::from(b"a" as &[u8]), + end: UserKey::from(b"z" as &[u8]), + seqno: 100, + }; + assert!(crt.covers_table(b"b", b"y", 50)); + } + + #[test] + fn covering_rt_no_cover_seqno_too_low() { + let crt = CoveringRt { + start: UserKey::from(b"a" as &[u8]), + end: UserKey::from(b"z" as &[u8]), + seqno: 50, + }; + assert!(!crt.covers_table(b"b", b"y", 100)); + } + + #[test] + fn upper_bound_exclusive_appends_zero() { + let key = b"hello"; + let result = upper_bound_exclusive(key).unwrap(); + assert_eq!(result.as_ref(), b"hello\x00"); + } + + #[test] + fn upper_bound_exclusive_returns_none_for_max_length_key() { + let key = vec![0xAA; u16::MAX as usize]; + assert!(upper_bound_exclusive(&key).is_none()); + } +} diff --git a/src/range_tombstone_filter.rs b/src/range_tombstone_filter.rs new file mode 100644 index 000000000..061c834a7 --- /dev/null +++ b/src/range_tombstone_filter.rs @@ -0,0 +1,262 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! Bidirectional range tombstone filter for iteration. +//! +//! Wraps a sorted KV stream and suppresses entries covered by range tombstones. +//! Forward: tombstones sorted by `(start asc, seqno desc)`, activated when +//! `start <= key`, expired when `end <= key`. +//! Reverse: tombstones sorted by `(end desc, seqno desc)`, activated when +//! `end > key`, expired when `key < start`. + +use crate::active_tombstone_set::{ActiveTombstoneSet, ActiveTombstoneSetReverse}; +use crate::range_tombstone::RangeTombstone; +use crate::{InternalValue, SeqNo}; + +/// Wraps a bidirectional KV stream and suppresses entries covered by range tombstones. +pub struct RangeTombstoneFilter { + inner: I, + + // Forward state + fwd_tombstones: Vec, + fwd_idx: usize, + fwd_active: ActiveTombstoneSet, + + // Reverse state + rev_tombstones: Vec, + rev_idx: usize, + rev_active: ActiveTombstoneSetReverse, +} + +impl RangeTombstoneFilter { + /// Creates a new bidirectional filter. + /// + /// `fwd_tombstones` must be sorted by `(start asc, seqno desc, end asc)` (the natural Ord). + /// Internally, a second copy sorted by `(end desc, seqno desc)` is created for reverse. + #[must_use] + pub fn new(inner: I, mut fwd_tombstones: Vec, read_seqno: SeqNo) -> Self { + // Ensure forward tombstones are sorted by natural order (start asc, seqno desc, end asc) + fwd_tombstones.sort(); + + // Build reverse-sorted copy: (end desc, seqno desc) + let mut rev_tombstones = fwd_tombstones.clone(); + rev_tombstones.sort_by(|a, b| (&b.end, &b.seqno).cmp(&(&a.end, &a.seqno))); + + Self { + inner, + fwd_tombstones, + fwd_idx: 0, + fwd_active: ActiveTombstoneSet::new(read_seqno), + rev_tombstones, + rev_idx: 0, + rev_active: ActiveTombstoneSetReverse::new(read_seqno), + } + } + + /// Activates forward tombstones whose start <= `current_key`. + fn fwd_activate_up_to(&mut self, key: &[u8]) { + while let Some(rt) = self.fwd_tombstones.get(self.fwd_idx) { + if rt.start.as_ref() <= key { + self.fwd_active.activate(rt); + self.fwd_idx += 1; + } else { + break; + } + } + } + + /// Activates reverse tombstones whose end > `current_key`. + fn rev_activate_up_to(&mut self, key: &[u8]) { + while let Some(rt) = self.rev_tombstones.get(self.rev_idx) { + if rt.end.as_ref() > key { + self.rev_active.activate(rt); + self.rev_idx += 1; + } else { + break; + } + } + } +} + +impl>> Iterator for RangeTombstoneFilter { + type Item = crate::Result; + + fn next(&mut self) -> Option { + loop { + let item = self.inner.next()?; + + let Ok(kv) = &item else { return Some(item) }; + + let key = kv.key.user_key.as_ref(); + let kv_seqno = kv.key.seqno; + + // Activate tombstones whose start <= this key + self.fwd_activate_up_to(key); + + // Expire tombstones whose end <= this key + self.fwd_active.expire_until(key); + + // Check suppression + if self.fwd_active.is_suppressed(kv_seqno) { + continue; + } + + return Some(item); + } + } +} + +impl>> DoubleEndedIterator + for RangeTombstoneFilter +{ + fn next_back(&mut self) -> Option { + loop { + let item = self.inner.next_back()?; + + let Ok(kv) = &item else { return Some(item) }; + + let key = kv.key.user_key.as_ref(); + let kv_seqno = kv.key.seqno; + + // Activate tombstones whose end > this key (strict >) + self.rev_activate_up_to(key); + + // Expire tombstones whose start > this key (key < start) + self.rev_active.expire_until(key); + + // Check suppression + if self.rev_active.is_suppressed(kv_seqno) { + continue; + } + + return Some(item); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{UserKey, ValueType}; + + fn kv(key: &[u8], seqno: SeqNo) -> InternalValue { + InternalValue::from_components(key, b"v", seqno, ValueType::Value) + } + + fn rt(start: &[u8], end: &[u8], seqno: SeqNo) -> RangeTombstone { + RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) + } + + #[test] + fn no_tombstones() { + let items: Vec> = + vec![Ok(kv(b"a", 1)), Ok(kv(b"b", 2)), Ok(kv(b"c", 3))]; + + let filter = RangeTombstoneFilter::new(items.into_iter(), vec![], SeqNo::MAX); + let results: Vec<_> = filter.flatten().collect(); + assert_eq!(results.len(), 3); + } + + #[test] + fn basic_suppression() { + let items: Vec> = vec![ + Ok(kv(b"a", 5)), + Ok(kv(b"b", 5)), + Ok(kv(b"c", 5)), + Ok(kv(b"d", 5)), + Ok(kv(b"e", 5)), + ]; + + let tombstones = vec![rt(b"b", b"d", 10)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + assert_eq!(keys, vec![b"a".as_ref(), b"d", b"e"]); + } + + #[test] + fn tombstone_does_not_suppress_newer_kv() { + let items: Vec> = vec![Ok(kv(b"b", 10)), Ok(kv(b"c", 3))]; + + let tombstones = vec![rt(b"a", b"z", 5)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + assert_eq!(keys, vec![b"b".as_ref()]); + } + + #[test] + fn half_open_end_exclusive() { + let items: Vec> = + vec![Ok(kv(b"b", 5)), Ok(kv(b"c", 5)), Ok(kv(b"d", 5))]; + + let tombstones = vec![rt(b"b", b"d", 10)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + assert_eq!(keys, vec![b"d".as_ref()]); + } + + #[test] + fn multiple_overlapping_tombstones() { + let items: Vec> = vec![ + Ok(kv(b"a", 1)), + Ok(kv(b"b", 3)), + Ok(kv(b"c", 6)), + Ok(kv(b"d", 1)), + ]; + + let tombstones = vec![rt(b"a", b"c", 5), rt(b"b", b"e", 4)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + assert_eq!(keys, vec![b"c".as_ref()]); + } + + #[test] + fn tombstone_not_visible_at_read_seqno() { + let items: Vec> = vec![Ok(kv(b"b", 3))]; + + let tombstones = vec![rt(b"a", b"z", 10)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, 5); + let results: Vec<_> = filter.flatten().collect(); + + assert_eq!(results.len(), 1); + } + + #[test] + fn reverse_basic_suppression() { + let items: Vec> = vec![ + Ok(kv(b"a", 5)), + Ok(kv(b"b", 5)), + Ok(kv(b"c", 5)), + Ok(kv(b"d", 5)), + Ok(kv(b"e", 5)), + ]; + + let tombstones = vec![rt(b"b", b"d", 10)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.rev().flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + assert_eq!(keys, vec![b"e".as_ref(), b"d", b"a"]); + } + + #[test] + fn reverse_half_open() { + let items: Vec> = + vec![Ok(kv(b"a", 5)), Ok(kv(b"l", 5)), Ok(kv(b"m", 5))]; + + let tombstones = vec![rt(b"a", b"m", 10)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.rev().flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + assert_eq!(keys, vec![b"m".as_ref()]); + } +} diff --git a/src/table/block/type.rs b/src/table/block/type.rs index 82eadf11b..1a90a0104 100644 --- a/src/table/block/type.rs +++ b/src/table/block/type.rs @@ -8,6 +8,7 @@ pub enum BlockType { Index, Filter, Meta, + RangeTombstone, } impl From for u8 { @@ -17,6 +18,7 @@ impl From for u8 { BlockType::Index => 1, BlockType::Filter => 2, BlockType::Meta => 3, + BlockType::RangeTombstone => 4, } } } @@ -30,6 +32,7 @@ impl TryFrom for BlockType { 1 => Ok(Self::Index), 2 => Ok(Self::Filter), 3 => Ok(Self::Meta), + 4 => Ok(Self::RangeTombstone), _ => Err(crate::Error::InvalidTag(("BlockType", value))), } } diff --git a/src/table/inner.rs b/src/table/inner.rs index 5611ca135..e59d47170 100644 --- a/src/table/inner.rs +++ b/src/table/inner.rs @@ -9,6 +9,7 @@ use super::{block_index::BlockIndexImpl, meta::ParsedMeta, regions::ParsedRegion use crate::{ cache::Cache, file_accessor::FileAccessor, + range_tombstone::RangeTombstone, table::{filter::block::FilterBlock, IndexBlock}, tree::inner::TreeId, Checksum, GlobalTableId, SeqNo, @@ -65,6 +66,9 @@ pub struct Inner { /// Cached sum of referenced blob file bytes for this table. /// Lazily computed on first access to avoid repeated I/O in compaction decisions. pub(crate) cached_blob_bytes: OnceLock, + + /// Range tombstones stored in this table. Loaded on open. + pub(crate) range_tombstones: Vec, } impl Inner { diff --git a/src/table/iter.rs b/src/table/iter.rs index b8114c2b5..53e4767c8 100644 --- a/src/table/iter.rs +++ b/src/table/iter.rs @@ -119,9 +119,13 @@ pub struct Iter { } impl Iter { - #[expect( - clippy::too_many_arguments, - reason = "cfg(metrics) adds an extra parameter" + // cfg_attr: expect only fires when metrics feature adds the extra parameter + #[cfg_attr( + feature = "metrics", + expect( + clippy::too_many_arguments, + reason = "metrics adds the extra parameter; without that feature this stays at the lint threshold" + ) )] pub fn new( table_id: GlobalTableId, diff --git a/src/table/mod.rs b/src/table/mod.rs index 10ac8a2eb..02a1fef42 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -31,6 +31,7 @@ use crate::{ cache::Cache, descriptor_table::DescriptorTable, file_accessor::FileAccessor, + range_tombstone::RangeTombstone, table::{ block::{BlockType, ParsedItem}, block_index::{BlockIndex, FullBlockIndex, TwoLevelBlockIndex, VolatileBlockIndex}, @@ -566,6 +567,23 @@ impl Table { None }; + // Load range tombstones (if present) + let range_tombstones = if let Some(rt_handle) = regions.range_tombstones { + log::trace!("Loading range tombstone block, with rt_ptr={rt_handle:?}"); + let block = Block::from_file(&file, rt_handle, crate::CompressionType::None)?; + + if block.header.block_type != BlockType::RangeTombstone { + return Err(crate::Error::InvalidTag(( + "BlockType", + block.header.block_type.into(), + ))); + } + + Self::decode_range_tombstones(&block)? + } else { + Vec::new() + }; + log::debug!( "Recovered table #{} from {}", metadata.id, @@ -598,6 +616,7 @@ impl Table { metrics, cached_blob_bytes: std::sync::OnceLock::new(), + range_tombstones, }))) } @@ -606,6 +625,86 @@ impl Table { self.0.checksum } + /// Decodes range tombstones from a raw block. + /// + /// Wire format (repeated): `[start_len:u16_le][start][end_len:u16_le][end][seqno:u64_le]` + /// + /// # Errors + /// + /// Will return `Err` if the block data is malformed. + fn decode_range_tombstones(block: &Block) -> crate::Result> { + use byteorder::{ReadBytesExt, LE}; + use std::io::{Cursor, Read}; + + let mut tombstones = Vec::new(); + let data = block.data.as_ref(); + let mut cursor = Cursor::new(data); + + #[expect( + clippy::cast_possible_truncation, + reason = "block size always fits in usize" + )] + while (cursor.position() as usize) < data.len() { + let start_len = cursor + .read_u16::() + .map_err(|_| crate::Error::Unrecoverable)? as usize; + + // Validate length against remaining data before allocating + let remaining = data.len() - cursor.position() as usize; + if start_len > remaining { + log::error!( + "Range tombstone block: start_len {start_len} exceeds remaining {remaining}" + ); + return Err(crate::Error::Unrecoverable); + } + + let mut start_buf = vec![0u8; start_len]; + cursor + .read_exact(&mut start_buf) + .map_err(|_| crate::Error::Unrecoverable)?; + + let end_len = cursor + .read_u16::() + .map_err(|_| crate::Error::Unrecoverable)? as usize; + + let remaining = data.len() - cursor.position() as usize; + if end_len > remaining { + log::error!( + "Range tombstone block: end_len {end_len} exceeds remaining {remaining}" + ); + return Err(crate::Error::Unrecoverable); + } + + let mut end_buf = vec![0u8; end_len]; + cursor + .read_exact(&mut end_buf) + .map_err(|_| crate::Error::Unrecoverable)?; + + let seqno = cursor + .read_u64::() + .map_err(|_| crate::Error::Unrecoverable)?; + + let start = UserKey::from(start_buf); + let end = UserKey::from(end_buf); + + // Validate invariant: start < end (reject corrupted data) + if start >= end { + log::error!("Range tombstone block: invalid interval (start >= end)"); + return Err(crate::Error::Unrecoverable); + } + + tombstones.push(RangeTombstone::new(start, end, seqno)); + } + + Ok(tombstones) + } + + /// Returns the range tombstones stored in this table. + #[must_use] + pub(crate) fn range_tombstones(&self) -> &[RangeTombstone] { + &self.0.range_tombstones + } + pub(crate) fn mark_as_deleted(&self) { self.0 .is_deleted diff --git a/src/table/multi_writer.rs b/src/table/multi_writer.rs index fefed7ddf..93417329f 100644 --- a/src/table/multi_writer.rs +++ b/src/table/multi_writer.rs @@ -4,8 +4,9 @@ use super::{filter::BloomConstructionPolicy, writer::Writer}; use crate::{ - blob_tree::handle::BlobIndirection, table::writer::LinkedFile, value::InternalValue, - vlog::BlobFileId, Checksum, CompressionType, HashMap, SequenceNumberCounter, TableId, UserKey, + blob_tree::handle::BlobIndirection, range_tombstone::RangeTombstone, table::writer::LinkedFile, + value::InternalValue, vlog::BlobFileId, Checksum, CompressionType, HashMap, + SequenceNumberCounter, TableId, UserKey, }; use std::path::PathBuf; @@ -46,6 +47,16 @@ pub struct MultiWriter { linked_blobs: HashMap, + /// Range tombstones to distribute across output tables. + /// During compaction these are clipped to each table's key range; + /// during flush they are written unmodified (they must cover keys in older SSTs). + range_tombstones: Vec, + + /// When true, range tombstones are clipped to each output table's KV key range + /// via `intersect_opt`. This is correct for compaction (input tables are consumed) + /// but wrong for flush (RTs must cover keys in older SSTs outside the memtable's range). + clip_range_tombstones: bool, + /// Level the tables are written to initial_level: u8, } @@ -91,9 +102,72 @@ impl MultiWriter { current_key: None, linked_blobs: HashMap::default(), + range_tombstones: Vec::new(), + clip_range_tombstones: false, }) } + /// Enables RT clipping: each tombstone is intersected with the output + /// table's KV key range. Use this for compaction where input tables are + /// consumed; do NOT use for flush where RTs must cover older SSTs. + pub fn use_clip_range_tombstones(mut self) -> Self { + self.clip_range_tombstones = true; + self + } + + /// Sets range tombstones to be distributed across output tables. + pub fn set_range_tombstones(&mut self, tombstones: Vec) { + self.range_tombstones = tombstones; + } + + /// Writes range tombstones to the given writer, respecting the clip mode. + /// + /// - **clip=true** (compaction): intersect each RT with the table's KV key range. + /// - **clip=false** (flush): write all overlapping RTs unmodified so they cover + /// keys in older SSTs outside this memtable's key range. + fn write_rts_to_writer(tombstones: &[RangeTombstone], clip: bool, writer: &mut Writer) { + if let (Some(first_key), Some(last_key)) = + (writer.meta.first_key.clone(), writer.meta.last_key.clone()) + { + if clip { + // Compaction mode: clip RTs to this table's key range. + if let Some(max_exclusive) = + crate::range_tombstone::upper_bound_exclusive(last_key.as_ref()) + { + for rt in tombstones { + if let Some(clipped) = + rt.intersect_opt(first_key.as_ref(), max_exclusive.as_ref()) + { + writer.write_range_tombstone(clipped); + } + } + } else { + // last_key at u16::MAX — can't compute exclusive bound, write overlapping RTs unclipped. + for rt in tombstones { + if rt.start.as_ref() <= last_key.as_ref() + && rt.end.as_ref() > first_key.as_ref() + { + writer.write_range_tombstone(rt.clone()); + } + } + } + } else { + // Flush mode: write ALL RTs without clipping so they cover keys + // in older SSTs outside this memtable's key range. No overlap + // filter — an RT disjoint from this table's KV range (e.g., + // delete_range on keys only in older SSTs) must still be persisted. + for rt in tombstones { + writer.write_range_tombstone(rt.clone()); + } + } + } else { + // RT-only table (no KV items yet) — write all tombstones unclipped. + for rt in tombstones { + writer.write_range_tombstone(rt.clone()); + } + } + } + pub fn register_blob(&mut self, indirection: BlobIndirection) { self.linked_blobs .entry(indirection.vhandle.blob_file_id) @@ -202,6 +276,19 @@ impl MultiWriter { let mut old_writer = std::mem::replace(&mut self.writer, new_writer); + // Write range tombstones to the finishing writer. + // In flush mode (clip=false) tombstones are written unmodified because + // they must cover keys in older SSTs outside this memtable's key range. + // In compaction mode (clip=true) tombstones are clipped to the output + // table's KV range because the input tables are consumed. + if !self.range_tombstones.is_empty() { + Self::write_rts_to_writer( + &self.range_tombstones, + self.clip_range_tombstones, + &mut old_writer, + ); + } + for linked in self.linked_blobs.values() { old_writer.link_blob_file( linked.blob_file_id, @@ -240,6 +327,15 @@ impl MultiWriter { /// /// Returns the metadata of created tables pub fn finish(mut self) -> crate::Result> { + // Write range tombstones to the last writer (same logic as rotate). + if !self.range_tombstones.is_empty() { + Self::write_rts_to_writer( + &self.range_tombstones, + self.clip_range_tombstones, + &mut self.writer, + ); + } + for linked in self.linked_blobs.values() { self.writer.link_blob_file( linked.blob_file_id, diff --git a/src/table/regions.rs b/src/table/regions.rs index 5644d2f2b..135cc6700 100644 --- a/src/table/regions.rs +++ b/src/table/regions.rs @@ -47,6 +47,7 @@ pub struct ParsedRegions { pub index: Option, pub filter_tli: Option, pub filter: Option, + pub range_tombstones: Option, pub linked_blob_files: Option, pub metadata: BlockHandle, } @@ -64,6 +65,7 @@ impl ParsedRegions { })?, index: toc.section(b"index").map(toc_entry_to_handle), filter: toc.section(b"filter").map(toc_entry_to_handle), + range_tombstones: toc.section(b"range_tombstones").map(toc_entry_to_handle), linked_blob_files: toc.section(b"linked_blob_files").map(toc_entry_to_handle), metadata: toc .section(b"meta") diff --git a/src/table/util.rs b/src/table/util.rs index a85b31e2e..f4f2082b9 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -28,9 +28,13 @@ pub struct SliceIndexes(pub usize, pub usize); /// Loads a block from disk or block cache, if cached. /// /// Also handles file descriptor opening and caching. -#[expect( - clippy::too_many_arguments, - reason = "block loading requires many context parameters" +// cfg_attr: expect only fires when metrics feature adds the extra parameter +#[cfg_attr( + feature = "metrics", + expect( + clippy::too_many_arguments, + reason = "metrics adds the extra parameter; without that feature this stays at the lint threshold" + ) )] pub fn load_block( table_id: GlobalTableId, @@ -56,7 +60,7 @@ pub fn load_block( BlockType::Index => { metrics.index_block_load_cached.fetch_add(1, Relaxed); } - BlockType::Data | BlockType::Meta => { + BlockType::Data | BlockType::Meta | BlockType::RangeTombstone => { metrics.data_block_load_cached.fetch_add(1, Relaxed); } } @@ -103,7 +107,7 @@ pub fn load_block( .index_block_io_requested .fetch_add(handle.size().into(), Relaxed); } - BlockType::Data | BlockType::Meta => { + BlockType::Data | BlockType::Meta | BlockType::RangeTombstone => { metrics.data_block_load_io.fetch_add(1, Relaxed); metrics diff --git a/src/table/writer/mod.rs b/src/table/writer/mod.rs index f85d8c845..c71926e68 100644 --- a/src/table/writer/mod.rs +++ b/src/table/writer/mod.rs @@ -14,6 +14,7 @@ use crate::{ checksum::{ChecksumType, ChecksummedWriter}, coding::Encode, file::fsync_directory, + range_tombstone::RangeTombstone, table::{ writer::{ filter::{FilterWriter, FullFilterWriter}, @@ -91,6 +92,9 @@ pub struct Writer { linked_blob_files: Vec, + /// Range tombstones to be written as a separate block + range_tombstones: Vec, + initial_level: u8, } @@ -140,6 +144,7 @@ impl Writer { previous_item: None, linked_blob_files: Vec::new(), + range_tombstones: Vec::new(), }) } @@ -233,6 +238,13 @@ impl Writer { self } + /// Adds a range tombstone to be written into this table's RT block. + pub fn write_range_tombstone(&mut self, rt: RangeTombstone) { + self.meta.lowest_seqno = self.meta.lowest_seqno.min(rt.seqno); + self.meta.highest_seqno = self.meta.highest_seqno.max(rt.seqno); + self.range_tombstones.push(rt); + } + /// Writes an item. /// /// # Note @@ -373,12 +385,65 @@ impl Writer { self.spill_block()?; - // No items written! Just delete table file and return nothing - if self.meta.item_count == 0 { + // No items and no range tombstones — delete the empty table file. + if self.meta.item_count == 0 && self.range_tombstones.is_empty() { std::fs::remove_file(&self.path)?; return Ok(None); } + // If we have range tombstones but no KV items, write a synthetic + // weak tombstone at the first RT's start key to produce a valid index. + // Preserve seqno bounds from the range tombstones (the sentinel at seqno 0 + // should not pull lowest_seqno down). Also ensure the table metadata + // key range conservatively covers all range tombstones. + if self.meta.item_count == 0 { + // Compute the coverage of all range tombstones. + let mut min_start: Option = None; + let mut max_end: Option = None; + let mut max_rt_seqno: crate::SeqNo = 0; + for rt in &self.range_tombstones { + match &min_start { + None => min_start = Some(rt.start.clone()), + Some(cur_min) if rt.start < *cur_min => min_start = Some(rt.start.clone()), + _ => {} + } + match &max_end { + None => max_end = Some(rt.end.clone()), + Some(cur_max) if rt.end > *cur_max => max_end = Some(rt.end.clone()), + _ => {} + } + if rt.seqno > max_rt_seqno { + max_rt_seqno = rt.seqno; + } + } + + if let (Some(start), Some(end)) = (min_start, max_end) { + let saved_lo = self.meta.lowest_seqno; + let saved_hi = self.meta.highest_seqno; + + // Write a sentinel key at min(rt.start) to force index block + // creation in RT-only tables. InternalKey Eq/Ord ignores + // value_type, so the sentinel's (user_key, seqno) pair must be + // globally unique. MAX_SEQNO is reserved and never assigned to + // real writes, guaranteeing no collision during merge/compaction. + // The sentinel is also invisible to all reads (read_seqno < MAX_SEQNO). + let sentinel_seqno = crate::seqno::MAX_SEQNO; + self.write(InternalValue::new_weak_tombstone( + start.clone(), + sentinel_seqno, + ))?; + self.spill_block()?; + + // Restore seqno bounds — sentinel is an implementation detail + self.meta.lowest_seqno = saved_lo; + self.meta.highest_seqno = saved_hi; + + // Ensure the table's key range covers all range tombstones. + self.meta.first_key = Some(start); + self.meta.last_key = Some(end); + } + } + // Write index log::trace!("Finishing index writer"); let index_block_count = self.index_writer.finish(&mut self.file_writer)?; @@ -387,6 +452,43 @@ impl Writer { log::trace!("Finishing filter writer"); let filter_block_count = self.filter_writer.finish(&mut self.file_writer)?; + // Write range tombstones block (if any) + if !self.range_tombstones.is_empty() { + use byteorder::{WriteBytesExt, LE}; + + self.file_writer.start("range_tombstones")?; + + // Wire format (repeated): [start_len:u16_le][start][end_len:u16_le][end][seqno:u64_le] + self.block_buffer.clear(); + for rt in &self.range_tombstones { + let start_len = u16::try_from(rt.start.len()).map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "range tombstone start key length exceeds u16::MAX", + ) + })?; + let end_len = u16::try_from(rt.end.len()).map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "range tombstone end key length exceeds u16::MAX", + ) + })?; + + self.block_buffer.write_u16::(start_len)?; + self.block_buffer.extend_from_slice(&rt.start); + self.block_buffer.write_u16::(end_len)?; + self.block_buffer.extend_from_slice(&rt.end); + self.block_buffer.write_u64::(rt.seqno)?; + } + + Block::write_into( + &mut self.file_writer, + &self.block_buffer, + crate::table::block::BlockType::RangeTombstone, + CompressionType::None, + )?; + } + if !self.linked_blob_files.is_empty() { use byteorder::{WriteBytesExt, LE}; @@ -466,6 +568,10 @@ impl Writer { meta("key_count", &(self.meta.key_count as u64).to_le_bytes()), meta("prefix_truncation#data", &[1]), // NOTE: currently prefix truncation can not be disabled meta("prefix_truncation#index", &[1]), // NOTE: currently prefix truncation can not be disabled + meta( + "range_tombstone_count", + &(self.range_tombstones.len() as u64).to_le_bytes(), + ), meta( "restart_interval#data", &self.data_block_restart_interval.to_le_bytes(), diff --git a/src/tree/mod.rs b/src/tree/mod.rs index e63b61031..3bd882392 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -338,9 +338,10 @@ impl AbstractTree for Tree { .len() } - fn flush_to_tables( + fn flush_to_tables_with_rt( &self, stream: impl Iterator>, + range_tombstones: Vec, ) -> crate::Result, Option>)>> { use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter}; use std::time::Instant; @@ -396,6 +397,11 @@ impl AbstractTree for Tree { table_writer = table_writer.use_partitioned_filter(); } + // Set range tombstones BEFORE writing KV items so that if MultiWriter + // rotates to a new table during the write loop, earlier tables already + // carry the RT metadata. + table_writer.set_range_tombstones(range_tombstones); + for item in stream { table_writer.write(item?)?; } @@ -426,6 +432,9 @@ impl AbstractTree for Tree { }) .collect::>>()?; + // Return Some even when tables is empty (RT-only flush): the caller + // (AbstractTree::flush) handles empty tables by re-inserting RTs into + // the active memtable and still needs to delete sealed memtables. Ok(Some((tables, None))) } @@ -681,6 +690,20 @@ impl AbstractTree for Tree { let value = InternalValue::new_weak_tombstone(key, seqno); self.append_entry(value) } + + fn remove_range>(&self, start: K, end: K, seqno: SeqNo) -> u64 { + #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")] + let memtable = Arc::clone( + &self + .version_history + .read() + .expect("lock is poisoned") + .latest_version() + .active_memtable, + ); + + memtable.insert_range_tombstone(start.into(), end.into(), seqno) + } } impl Tree { @@ -719,18 +742,95 @@ impl Tree { seqno: SeqNo, ) -> crate::Result> { if let Some(entry) = super_version.active_memtable.get(key, seqno) { - return Ok(ignore_tombstone_value(entry)); + let Some(entry) = ignore_tombstone_value(entry) else { + return Ok(None); + }; + + // Check if any range tombstone suppresses this entry + if Self::is_suppressed_by_range_tombstones(super_version, key, entry.key.seqno, seqno) { + return Ok(None); + } + return Ok(Some(entry)); } // Now look in sealed memtables if let Some(entry) = Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno) { - return Ok(ignore_tombstone_value(entry)); + let Some(entry) = ignore_tombstone_value(entry) else { + return Ok(None); + }; + + if Self::is_suppressed_by_range_tombstones(super_version, key, entry.key.seqno, seqno) { + return Ok(None); + } + return Ok(Some(entry)); } // Now look in tables... this may involve disk I/O - Self::get_internal_entry_from_tables(&super_version.version, key, seqno) + let entry = Self::get_internal_entry_from_tables(&super_version.version, key, seqno)?; + + if let Some(entry) = entry { + // NOTE: For normal tables (global_seqno=0) the entry's seqno is the + // true write seqno, so the RT suppression comparison is correct. + // For bulk-ingested tables (global_seqno > 0) the Table::get iterator + // translates seqnos by adding global_seqno, so entry.key.seqno is + // already in the global seqno space. However, if an RT was written + // with a seqno between the local and global values, suppression may + // be incorrect. This is a known limitation of bulk ingest + range + // tombstones — acceptable because ingest assigns a single global + // seqno and users rarely combine ingest with delete_range. + if Self::is_suppressed_by_range_tombstones(super_version, key, entry.key.seqno, seqno) { + return Ok(None); + } + return Ok(Some(entry)); + } + + Ok(None) + } + + /// Checks if a key at `key_seqno` is suppressed by any range tombstone + /// in the active memtable, sealed memtables, or SST tables, visible at `read_seqno`. + fn is_suppressed_by_range_tombstones( + super_version: &SuperVersion, + key: &[u8], + key_seqno: SeqNo, + read_seqno: SeqNo, + ) -> bool { + // Check active memtable range tombstones + if super_version + .active_memtable + .is_key_suppressed_by_range_tombstone(key, key_seqno, read_seqno) + { + return true; + } + + // Check sealed memtable range tombstones + for mt in super_version.sealed_memtables.iter().rev() { + if mt.is_key_suppressed_by_range_tombstone(key, key_seqno, read_seqno) { + return true; + } + } + + // Check SST table range tombstones + // NOTE: We cannot filter by key_range.contains_key here because RT-only + // tables may have a key range narrower than their tombstone coverage. + // RangeTombstone::should_suppress already checks contains_key internally. + for table in super_version + .version + .iter_levels() + .flat_map(|lvl| lvl.iter()) + .flat_map(|run| run.iter()) + .filter(|t| !t.range_tombstones().is_empty()) + { + for rt in table.range_tombstones() { + if rt.should_suppress(key, key_seqno, read_seqno) { + return true; + } + } + } + + false } fn get_internal_entry_from_tables( diff --git a/tests/range_tombstone.rs b/tests/range_tombstone.rs new file mode 100644 index 000000000..f9d66bcfa --- /dev/null +++ b/tests/range_tombstone.rs @@ -0,0 +1,668 @@ +// Guard: trait import required for .key() method on iterator items (IterGuard trait) +use lsm_tree::{get_tmp_folder, AbstractTree, AnyTree, Config, Guard, SequenceNumberCounter}; +use test_log::test; + +fn open_tree(path: &std::path::Path) -> AnyTree { + Config::new( + path, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open() + .expect("should open") +} + +/// Helper to collect keys from a forward iterator. +/// Returns `Vec>` which compares correctly with `vec![b"a", b"b"]` +/// via Rust's `PartialEq` blanket impl for `Vec` where `T: PartialEq`. +fn collect_keys(tree: &AnyTree, seqno: u64) -> lsm_tree::Result>> { + let mut keys = Vec::new(); + for item in tree.iter(seqno, None) { + let k = item.key()?; + keys.push(k.to_vec()); + } + Ok(keys) +} + +/// Helper to collect keys from a reverse iterator. +fn collect_keys_rev(tree: &AnyTree, seqno: u64) -> lsm_tree::Result>> { + let mut keys = Vec::new(); + for item in tree.iter(seqno, None).rev() { + let k = item.key()?; + keys.push(k.to_vec()); + } + Ok(keys) +} + +// --- Test A: Point reads suppressed by memtable range tombstone --- +#[test] +fn range_tombstone_suppresses_point_read_in_memtable() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "val_a", 1); + tree.insert("b", "val_b", 2); + tree.insert("c", "val_c", 3); + tree.insert("d", "val_d", 4); + + // Range tombstone [b, d) at seqno 10 suppresses b and c + tree.remove_range("b", "d", 10); + + // a is outside range — visible + assert_eq!(Some("val_a".as_bytes().into()), tree.get("a", 11)?); + // b is inside range — suppressed + assert_eq!(None, tree.get("b", 11)?); + // c is inside range — suppressed + assert_eq!(None, tree.get("c", 11)?); + // d is at exclusive end — visible + assert_eq!(Some("val_d".as_bytes().into()), tree.get("d", 11)?); + + Ok(()) +} + +// --- Test B: Range tombstone respects MVCC --- +#[test] +fn range_tombstone_mvcc_visibility() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "val_a", 1); + tree.insert("b", "val_b", 2); + + // Range tombstone at seqno 10 + tree.remove_range("a", "z", 10); + + // Reading at seqno 5 — tombstone not visible (seqno 10 > 5) + assert_eq!(Some("val_a".as_bytes().into()), tree.get("a", 5)?); + assert_eq!(Some("val_b".as_bytes().into()), tree.get("b", 5)?); + + // Reading at seqno 11 — tombstone visible, values suppressed + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + + Ok(()) +} + +// --- Test C: Range tombstone does not suppress newer values --- +#[test] +fn range_tombstone_does_not_suppress_newer_values() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "old_a", 1); + tree.remove_range("a", "z", 5); + tree.insert("a", "new_a", 10); + + // new_a at seqno 10 is newer than tombstone at seqno 5 + assert_eq!(Some("new_a".as_bytes().into()), tree.get("a", 11)?); + + Ok(()) +} + +// --- Test D: Range iteration suppressed by range tombstone --- +#[test] +fn range_tombstone_suppresses_range_iteration() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.insert("d", "4", 4); + tree.insert("e", "5", 5); + + // Delete [b, d) at seqno 10 + tree.remove_range("b", "d", 10); + + let keys = collect_keys(&tree, 11)?; + assert_eq!(keys, vec![b"a", b"d", b"e"]); + + Ok(()) +} + +// --- Test E: Reverse iteration with range tombstone --- +#[test] +fn range_tombstone_suppresses_reverse_iteration() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.insert("d", "4", 4); + tree.insert("e", "5", 5); + + tree.remove_range("b", "d", 10); + + let keys = collect_keys_rev(&tree, 11)?; + assert_eq!(keys, vec![b"e", b"d", b"a"]); + + Ok(()) +} + +// --- Test F: Range tombstone in memtable suppresses SST data --- +#[test] +fn range_tombstone_suppresses_across_memtable_and_sst() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Insert data and flush to SST + tree.insert("a", "val_a", 1); + tree.insert("b", "val_b", 2); + tree.insert("c", "val_c", 3); + tree.flush_active_memtable(0)?; + + // Range tombstone in memtable suppresses SST data + tree.remove_range("a", "d", 10); + + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + assert_eq!(None, tree.get("c", 11)?); + + Ok(()) +} + +// --- Test G: Range tombstone in sealed memtable --- +#[test] +fn range_tombstone_in_sealed_memtable() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Insert range tombstone then seal the memtable + tree.remove_range("a", "z", 10); + assert!( + tree.rotate_memtable().is_some(), + "memtable with RT should seal" + ); + assert!(tree.sealed_memtable_count() > 0); + + // Insert new data in active memtable (lower seqno) + tree.insert("b", "val_b", 5); + + // b@5 is suppressed by sealed tombstone@10 + assert_eq!(None, tree.get("b", 11)?); + + // Insert newer data + tree.insert("b", "val_b_new", 15); + // b@15 survives (newer than tombstone@10) + assert_eq!(Some("val_b_new".as_bytes().into()), tree.get("b", 16)?); + + Ok(()) +} + +// --- Test H: remove_prefix --- +#[test] +fn remove_prefix_suppresses_matching_keys() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("user:1", "alice", 1); + tree.insert("user:2", "bob", 2); + tree.insert("user:3", "carol", 3); + tree.insert("order:1", "pizza", 4); + + // Delete all "user:" prefixed keys + tree.remove_prefix("user:", 10); + + assert_eq!(None, tree.get("user:1", 11)?); + assert_eq!(None, tree.get("user:2", 11)?); + assert_eq!(None, tree.get("user:3", 11)?); + // "order:" is not affected + assert_eq!(Some("pizza".as_bytes().into()), tree.get("order:1", 11)?); + + Ok(()) +} + +// --- Test I: Overlapping range tombstones --- +#[test] +fn overlapping_range_tombstones() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.insert("d", "4", 4); + tree.insert("e", "5", 5); + + // Two overlapping tombstones + tree.remove_range("a", "c", 10); // [a, c) + tree.remove_range("b", "e", 15); // [b, e) + + // a: suppressed by [a,c)@10 + assert_eq!(None, tree.get("a", 20)?); + // b: suppressed by both + assert_eq!(None, tree.get("b", 20)?); + // c: suppressed by [b,e)@15 only + assert_eq!(None, tree.get("c", 20)?); + // d: suppressed by [b,e)@15 + assert_eq!(None, tree.get("d", 20)?); + // e: NOT suppressed (exclusive end of [b,e)) + assert_eq!(Some("5".as_bytes().into()), tree.get("e", 20)?); + + Ok(()) +} + +// --- Test J: Range iteration with sealed tombstone and SST data --- +#[test] +fn range_iteration_with_sealed_tombstone_and_sst_data() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Data in SST + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.insert("d", "4", 4); + tree.flush_active_memtable(0)?; + + // Range tombstone in sealed memtable + tree.remove_range("b", "d", 10); + tree.rotate_memtable(); + + // New data in active memtable + tree.insert("e", "5", 11); + + let keys = collect_keys(&tree, 12)?; + assert_eq!(keys, vec![b"a", b"d", b"e"]); + + Ok(()) +} + +// --- Test K: Range tombstone persists through flush to SST --- +#[test] +fn range_tombstone_persists_through_flush() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Insert data + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + + // Insert range tombstone in same memtable + tree.remove_range("a", "c", 10); + + // Flush everything to SST (both data and range tombstone) + tree.flush_active_memtable(0)?; + + // After flush: range tombstone should still suppress from SST + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + assert_eq!(Some("3".as_bytes().into()), tree.get("c", 11)?); // c is at exclusive end + + // Verify via range iteration too + let keys = collect_keys(&tree, 11)?; + assert_eq!(keys, vec![b"c"]); + + Ok(()) +} + +// --- Test K2: Range tombstone survives compaction --- +#[test] +fn range_tombstone_survives_compaction() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Batch 1: data + range tombstone in same memtable + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.insert("d", "4", 4); + tree.remove_range("b", "d", 10); + tree.flush_active_memtable(0)?; + + // Batch 2: more data to force a second table + tree.insert("e", "5", 11); + tree.flush_active_memtable(0)?; + + // Both tables in L0 — compact them + assert_eq!(2, tree.table_count()); + tree.major_compact(64_000_000, 0)?; + + // After compaction, range tombstone should still suppress + assert_eq!(Some("1".as_bytes().into()), tree.get("a", 12)?); + assert_eq!(None, tree.get("b", 12)?); + assert_eq!(None, tree.get("c", 12)?); + assert_eq!(Some("4".as_bytes().into()), tree.get("d", 12)?); + assert_eq!(Some("5".as_bytes().into()), tree.get("e", 12)?); + + // Verify via iteration + let keys = collect_keys(&tree, 12)?; + assert_eq!(keys, vec![b"a", b"d", b"e"]); + + Ok(()) +} + +// --- Test L: Range tombstone persists through recovery --- +#[test] +fn range_tombstone_persists_through_recovery() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + { + let tree = open_tree(folder.path()); + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.remove_range("a", "c", 10); + tree.flush_active_memtable(0)?; + } + + // Reopen the tree — range tombstones should be recovered from SST + { + let tree = open_tree(folder.path()); + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + assert_eq!(Some("3".as_bytes().into()), tree.get("c", 11)?); + } + + Ok(()) +} + +// --- Test M: RT-only memtable flush creates a valid table --- +#[test] +fn range_tombstone_only_flush() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // First: insert data and flush + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.flush_active_memtable(0)?; + + let tables_before = tree.table_count(); + + // Second: insert only a range tombstone and flush + // RT-only flush writes synthetic sentinel tombstones to create a valid SST + tree.remove_range("a", "c", 10); + tree.flush_active_memtable(0)?; + + assert!( + tree.table_count() > tables_before, + "RT-only flush should produce a table with sentinel tombstones" + ); + + // The range tombstone in the SST should suppress + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + assert_eq!(Some("3".as_bytes().into()), tree.get("c", 11)?); + + Ok(()) +} + +// --- Test N: GC eviction at bottom level --- +#[test] +fn range_tombstone_gc_eviction_at_bottom_level() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.remove_range("a", "d", 10); + tree.flush_active_memtable(0)?; + + // Before GC: range tombstone suppresses all + assert_eq!(None, tree.get("a", 11)?); + + // Major compact with GC watermark ABOVE the tombstone seqno + // This should evict the range tombstone at the bottom level + tree.major_compact(64_000_000, 11)?; + + // After GC: both data and tombstone are evicted (all seqno < 11) + // Insert new data — should be visible (no lingering tombstone) + tree.insert("a", "new_a", 15); + assert_eq!(Some("new_a".as_bytes().into()), tree.get("a", 16)?); + + Ok(()) +} + +// --- Test O: Prefix iteration with range tombstone in SST --- +#[test] +fn range_tombstone_prefix_iteration_with_sst() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("user:1", "alice", 1); + tree.insert("user:2", "bob", 2); + tree.insert("user:3", "carol", 3); + tree.insert("order:1", "pizza", 4); + tree.remove_prefix("user:", 10); + tree.flush_active_memtable(0)?; + + // Prefix iteration over "user:" should yield nothing + let mut user_keys = Vec::new(); + for item in tree.prefix("user:", 11, None) { + let k = item.key()?; + user_keys.push(k.to_vec()); + } + assert!(user_keys.is_empty()); + + // Prefix iteration over "order:" should yield "order:1" + let mut order_keys = Vec::new(); + for item in tree.prefix("order:", 11, None) { + let k = item.key()?; + order_keys.push(k.to_vec()); + } + assert_eq!(order_keys, vec![b"order:1"]); + + Ok(()) +} + +// --- Test P: Compaction with MultiWriter rotation preserves RTs across tables --- +#[test] +fn range_tombstone_survives_compaction_with_rotation() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + // Use small target_size to force MultiWriter rotation during compaction + let tree = Config::new( + folder.path(), + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + // Insert enough data to produce multiple tables on compaction + for i in 0u8..20 { + let key = format!("key_{i:03}"); + let val = "x".repeat(4000); + tree.insert(key.as_bytes(), val.as_bytes(), u64::from(i)); + } + // Range tombstone covering a subset + tree.remove_range("key_005", "key_015", 50); + tree.flush_active_memtable(0)?; + + // Force compaction with small target_size to trigger rotation + tree.major_compact(1024, 0)?; + + // After compaction: keys inside [key_005, key_015) should be suppressed + assert_eq!(None, tree.get("key_005", 51)?); + assert_eq!(None, tree.get("key_010", 51)?); + assert_eq!(None, tree.get("key_014", 51)?); + + // Keys outside range should survive + assert!(tree.get("key_000", 51)?.is_some()); + assert!(tree.get("key_015", 51)?.is_some()); + assert!(tree.get("key_019", 51)?.is_some()); + + Ok(()) +} + +// --- Test Q: Table-skip optimization triggers for fully-covered tables --- +#[test] +fn range_tombstone_table_skip_optimization() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Create a table with keys a-c + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.flush_active_memtable(0)?; + + // Create a range tombstone that fully covers the table's key range + // with higher seqno than any key in the table + tree.remove_range("a", "d", 100); + + // The table [a,c] is fully covered by [a,d)@100 (100 > max_seqno=3) + // Table-skip should allow skipping the entire table during iteration + let keys = collect_keys(&tree, 101)?; + assert!(keys.is_empty()); + + // Reverse iteration should also skip + let keys = collect_keys_rev(&tree, 101)?; + assert!(keys.is_empty()); + + Ok(()) +} + +// --- Test R: BlobTree range tombstone support --- +#[test] +fn range_tombstone_blob_tree() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + folder.path(), + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some( + lsm_tree::KvSeparationOptions::default() + .separation_threshold(1) + .compression(lsm_tree::CompressionType::None), + )) + .open()?; + + tree.insert("a", "value_a", 1); + tree.insert("b", "value_b", 2); + tree.insert("c", "value_c", 3); + + // Range tombstone in BlobTree + tree.remove_range("a", "c", 10); + + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + assert_eq!(Some("value_c".as_bytes().into()), tree.get("c", 11)?); + + // Flush and verify persistence + tree.flush_active_memtable(0)?; + + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + assert_eq!(Some("value_c".as_bytes().into()), tree.get("c", 11)?); + + Ok(()) +} + +// --- Test S: Invalid interval silently returns 0 --- +#[test] +fn range_tombstone_invalid_interval() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "1", 1); + + // start >= end — should be silently ignored + let size = tree.remove_range("z", "a", 10); + assert_eq!(0, size); + + // Equal start and end — also invalid + let size = tree.remove_range("a", "a", 10); + assert_eq!(0, size); + + // Data should still be visible + assert_eq!(Some("1".as_bytes().into()), tree.get("a", 11)?); + + Ok(()) +} + +// --- Test T: Multiple compaction rounds preserve range tombstones --- +#[test] +fn range_tombstone_multiple_compaction_rounds() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Round 1: data + RT + flush + compact + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.remove_range("a", "c", 10); + tree.flush_active_memtable(0)?; + tree.major_compact(64_000_000, 0)?; + + // Round 2: add more data + flush + compact again + tree.insert("d", "4", 11); + tree.flush_active_memtable(0)?; + tree.major_compact(64_000_000, 0)?; + + // RT should survive both compaction rounds + assert_eq!(None, tree.get("a", 12)?); + assert_eq!(None, tree.get("b", 12)?); + assert_eq!(Some("3".as_bytes().into()), tree.get("c", 12)?); + assert_eq!(Some("4".as_bytes().into()), tree.get("d", 12)?); + + Ok(()) +} + +// --- Test: RT disjoint from memtable KV range persists through flush --- +// Regression test: delete_range targeting keys only in older SSTs must not be +// dropped during flush just because it doesn't overlap the memtable's KV range. +#[test] +fn range_tombstone_disjoint_from_flush_kv_range() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Write keys [x, y, z] and flush to SST (older data) + tree.insert("x", "1", 1); + tree.insert("y", "2", 2); + tree.insert("z", "3", 3); + tree.flush_active_memtable(0)?; + + // Now write keys [a, b] + delete_range("x", "zz") in a new memtable. + // The RT is disjoint from the KV range [a, b] of this memtable. + tree.insert("a", "4", 4); + tree.insert("b", "5", 5); + tree.remove_range("x", "zz", 10); + tree.flush_active_memtable(0)?; + + // The RT must have survived flush and suppress [x, y, z] in the older SST + assert_eq!(Some("4".as_bytes().into()), tree.get("a", 11)?); + assert_eq!(Some("5".as_bytes().into()), tree.get("b", 11)?); + assert_eq!(None, tree.get("x", 11)?); + assert_eq!(None, tree.get("y", 11)?); + assert_eq!(None, tree.get("z", 11)?); + + Ok(()) +} + +// --- Test: RT disjoint from KV range survives compaction --- +// After flush preserves the RT, compaction should merge it with the older SST +// and either suppress the keys or propagate the RT. +#[test] +fn range_tombstone_disjoint_survives_compaction() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Older data in SST + tree.insert("x", "1", 1); + tree.insert("y", "2", 2); + tree.flush_active_memtable(0)?; + + // New memtable: KV in [a, b], RT covering [x, z) — disjoint from KV + tree.insert("a", "3", 3); + tree.insert("b", "4", 4); + tree.remove_range("x", "z", 10); + tree.flush_active_memtable(0)?; + + // Compact everything + tree.major_compact(64_000_000, 0)?; + + // After compaction, [x, y] should still be suppressed + assert_eq!(Some("3".as_bytes().into()), tree.get("a", 11)?); + assert_eq!(Some("4".as_bytes().into()), tree.get("b", 11)?); + assert_eq!(None, tree.get("x", 11)?); + assert_eq!(None, tree.get("y", 11)?); + + Ok(()) +}