From ef2892cb5e3588e373e7fb4a87c865db8dbb2511 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Thu, 25 Jul 2024 17:00:03 +0200 Subject: [PATCH] introduce new range APIs/cache --- crates/store/re_query2/examples/range.rs | 157 +++ crates/store/re_query2/src/cache.rs | 52 +- crates/store/re_query2/src/cache_stats.rs | 36 +- crates/store/re_query2/src/lib.rs | 3 + crates/store/re_query2/src/range.rs | 317 ++++++ crates/store/re_query2/tests/range.rs | 1080 +++++++++++++++++++++ 6 files changed, 1640 insertions(+), 5 deletions(-) create mode 100644 crates/store/re_query2/examples/range.rs create mode 100644 crates/store/re_query2/src/range.rs create mode 100644 crates/store/re_query2/tests/range.rs diff --git a/crates/store/re_query2/examples/range.rs b/crates/store/re_query2/examples/range.rs new file mode 100644 index 000000000000..acfe88500c8c --- /dev/null +++ b/crates/store/re_query2/examples/range.rs @@ -0,0 +1,157 @@ +use std::sync::Arc; + +use itertools::{izip, Itertools}; +use re_chunk::{Chunk, RowId}; +use re_chunk_store::{ChunkStore, RangeQuery}; +use re_log_types::example_components::{MyColor, MyLabel, MyPoint, MyPoints}; +use re_log_types::{build_frame_nr, ResolvedTimeRange, TimeType, Timeline}; +use re_types::ComponentBatch; +use re_types_core::{Archetype as _, Loggable as _}; + +use re_query2::{clamped_zip_1x2, range_zip_1x2, RangeResults}; + +// --- + +fn main() -> anyhow::Result<()> { + let store = store()?; + eprintln!("store:\n{store}"); + + let entity_path = "points"; + let timeline = Timeline::new("frame_nr", TimeType::Sequence); + let query = RangeQuery::new(timeline, ResolvedTimeRange::EVERYTHING); + eprintln!("query:{query:?}"); + + let caches = re_query2::Caches::new(&store); + + // First, get the (potentially cached) results for this query. + let results: RangeResults = caches.range( + &store, + &query, + &entity_path.into(), + MyPoints::all_components().iter().copied(), // no generics! + ); + + // * `get_required` returns an error if the chunk is missing. + // * `get` returns an option. + let all_points_chunks = results.get_required(&MyPoint::name())?; + let all_colors_chunks = results.get(&MyColor::name()); + let all_labels_chunks = results.get(&MyLabel::name()); + + // You can always use the standard deserialization path. + // + // The underlying operator is optimized to only pay the cost of downcasting and deserialization + // once for the whole column, and will then return references into that data. + // This is why you have to process the data in two-steps: the iterator needs to have somewhere + // to reference to. + let mut all_points_iters = all_points_chunks + .iter() + .map(|chunk| chunk.iter_component::()) + .collect_vec(); + let all_points_indexed = { + let all_points = all_points_iters.iter_mut().flat_map(|it| it.into_iter()); + let all_points_indices = all_points_chunks + .iter() + .flat_map(|chunk| chunk.iter_component_indices(&query.timeline(), &MyPoint::name())); + izip!(all_points_indices, all_points) + }; + let mut all_labels_iters = all_labels_chunks + .unwrap_or_default() + .iter() + .map(|chunk| chunk.iter_component::()) + .collect_vec(); + let all_labels_indexed = { + let all_labels = all_labels_iters.iter_mut().flat_map(|it| it.into_iter()); + let all_labels_indices = all_labels_chunks + .unwrap_or_default() + .iter() + .flat_map(|chunk| chunk.iter_component_indices(&query.timeline(), &MyLabel::name())); + izip!(all_labels_indices, all_labels) + }; + + // Or, if you want every last bit of performance you can get, you can manipulate the raw + // data directly: + let all_colors_indexed = all_colors_chunks + .unwrap_or_default() + .iter() + .flat_map(|chunk| { + itertools::izip!( + chunk.iter_component_indices(&query.timeline(), &MyColor::name()), + chunk.iter_primitive::(&MyColor::name()), + ) + }); + + // Zip the results together using a stateful time-based join. + let all_frames = range_zip_1x2(all_points_indexed, all_colors_indexed, all_labels_indexed); + + // And finally inspect our final results: + { + let color_default_fn = || Some(MyColor(0xFF00FFFF)); + let label_default_fn = || None; + + eprintln!("results:"); + for ((data_time, row_id), points, colors, labels) in all_frames { + let colors = colors.unwrap_or(&[]).iter().map(|c| Some(MyColor(*c))); + let labels = labels.unwrap_or(&[]).iter().cloned().map(Some); + + // Apply your instance-level joining logic, if any: + let results = + clamped_zip_1x2(points, colors, color_default_fn, labels, label_default_fn) + .collect_vec(); + eprintln!("{data_time:?} @ {row_id}:\n {results:?}"); + } + } + + Ok(()) +} + +// --- + +fn store() -> anyhow::Result { + let mut store = ChunkStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + Default::default(), + ); + + let entity_path = "points"; + + { + let timepoint = [build_frame_nr(123)]; + + let chunk = Chunk::builder(entity_path.into()) + .with_component_batches( + RowId::new(), + timepoint, + [ + &[MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)] as &dyn ComponentBatch, // + &[MyColor::from_rgb(255, 0, 0)], + &[MyLabel("a".into()), MyLabel("b".into())], + ], + ) + .build()?; + + store.insert_chunk(&Arc::new(chunk))?; + } + + { + let timepoint = [build_frame_nr(423)]; + + let chunk = Chunk::builder(entity_path.into()) + .with_component_batches( + RowId::new(), + timepoint, + [ + &[ + MyPoint::new(10.0, 20.0), + MyPoint::new(30.0, 40.0), + MyPoint::new(50.0, 60.0), + ] as &dyn ComponentBatch, // + &[MyColor::from_rgb(255, 0, 0), MyColor::from_rgb(0, 0, 255)], + ], + ) + .build()?; + + store.insert_chunk(&Arc::new(chunk))?; + } + + Ok(store) +} diff --git a/crates/store/re_query2/src/cache.rs b/crates/store/re_query2/src/cache.rs index 2be830778a06..f0e123a97a96 100644 --- a/crates/store/re_query2/src/cache.rs +++ b/crates/store/re_query2/src/cache.rs @@ -12,7 +12,7 @@ use re_chunk_store::{ChunkStore, ChunkStoreDiff, ChunkStoreEvent, ChunkStoreSubs use re_log_types::{EntityPath, ResolvedTimeRange, StoreId, TimeInt, Timeline}; use re_types_core::{components::ClearIsRecursive, ComponentName, Loggable as _}; -use crate::LatestAtCache; +use crate::{LatestAtCache, RangeCache}; // --- @@ -82,6 +82,9 @@ pub struct Caches { // NOTE: `Arc` so we can cheaply free the top-level lock early when needed. pub(crate) latest_at_per_cache_key: RwLock>>>, + + // NOTE: `Arc` so we can cheaply free the top-level lock early when needed. + pub(crate) range_per_cache_key: RwLock>>>, } impl std::fmt::Debug for Caches { @@ -90,6 +93,7 @@ impl std::fmt::Debug for Caches { store_id, might_require_clearing, latest_at_per_cache_key, + range_per_cache_key, } = self; let mut strings = Vec::new(); @@ -123,6 +127,21 @@ impl std::fmt::Debug for Caches { } } + strings.push(format!("[Range @ {store_id}]")); + { + let range_per_cache_key = range_per_cache_key.read(); + let range_per_cache_key: BTreeMap<_, _> = range_per_cache_key.iter().collect(); + + for (cache_key, cache) in &range_per_cache_key { + let cache = cache.read(); + strings.push(format!( + " [{cache_key:?} (pending_invalidations={:?})]", + cache.pending_invalidations, + )); + strings.push(indent::indent_all_by(4, format!("{cache:?}"))); + } + } + f.write_str(&strings.join("\n").replace("\n\n", "\n")) } } @@ -134,6 +153,7 @@ impl Caches { store_id: store.id().clone(), might_require_clearing: Default::default(), latest_at_per_cache_key: Default::default(), + range_per_cache_key: Default::default(), } } @@ -143,10 +163,12 @@ impl Caches { store_id: _, might_require_clearing, latest_at_per_cache_key, + range_per_cache_key, } = self; might_require_clearing.write().clear(); latest_at_per_cache_key.write().clear(); + range_per_cache_key.write().clear(); } } @@ -173,6 +195,7 @@ impl ChunkStoreSubscriber for Caches { struct CompactedEvents { static_: HashMap<(EntityPath, ComponentName), BTreeSet>, temporal_latest_at: HashMap, + temporal_range: HashMap>, } let mut compacted = CompactedEvents::default(); @@ -225,6 +248,12 @@ impl ChunkStoreSubscriber for Caches { .entry(key.clone()) .and_modify(|time| *time = TimeInt::min(*time, data_time)) .or_insert(data_time); + + compacted + .temporal_range + .entry(key) + .or_default() + .insert(chunk.id()); } } } @@ -233,6 +262,7 @@ impl ChunkStoreSubscriber for Caches { let mut might_require_clearing = self.might_require_clearing.write(); let caches_latest_at = self.latest_at_per_cache_key.write(); + let caches_range = self.range_per_cache_key.write(); // NOTE: Don't release the top-level locks -- even though this cannot happen yet with // our current macro-architecture, we want to prevent queries from concurrently // running while we're updating the invalidation flags. @@ -244,7 +274,7 @@ impl ChunkStoreSubscriber for Caches { // yet another layer of caching indirection. // But since this pretty much never happens in practice, let's not go there until we // have metrics showing that show we need to. - for ((entity_path, component_name), _chunk_ids) in compacted.static_ { + for ((entity_path, component_name), chunk_ids) in compacted.static_ { if component_name == ClearIsRecursive::name() { might_require_clearing.insert(entity_path.clone()); } @@ -254,6 +284,15 @@ impl ChunkStoreSubscriber for Caches { cache.write().pending_invalidation = Some(TimeInt::STATIC); } } + + for (key, cache) in caches_range.iter() { + if key.entity_path == entity_path && key.component_name == component_name { + cache + .write() + .pending_invalidations + .extend(chunk_ids.iter().copied()); + } + } } } @@ -270,6 +309,15 @@ impl ChunkStoreSubscriber for Caches { cache.pending_invalidation = Some(time); } } + + for (key, chunk_ids) in compacted.temporal_range { + if let Some(cache) = caches_range.get(&key) { + cache + .write() + .pending_invalidations + .extend(chunk_ids.iter().copied()); + } + } } } } diff --git a/crates/store/re_query2/src/cache_stats.rs b/crates/store/re_query2/src/cache_stats.rs index 859dfed1a469..339ad0cdf6fc 100644 --- a/crates/store/re_query2/src/cache_stats.rs +++ b/crates/store/re_query2/src/cache_stats.rs @@ -12,6 +12,7 @@ use crate::{CacheKey, Caches}; #[derive(Default, Debug, Clone)] pub struct CachesStats { pub latest_at: BTreeMap, + pub range: BTreeMap, } impl CachesStats { @@ -19,14 +20,18 @@ impl CachesStats { pub fn total_size_bytes(&self) -> u64 { re_tracing::profile_function!(); - let Self { latest_at } = self; + let Self { latest_at, range } = self; let latest_at_size_bytes: u64 = latest_at .values() .map(|stats| stats.total_actual_size_bytes) .sum(); + let range_size_bytes: u64 = range + .values() + .map(|stats| stats.total_actual_size_bytes) + .sum(); - latest_at_size_bytes + latest_at_size_bytes + range_size_bytes } } @@ -73,6 +78,31 @@ impl Caches { .collect() }; - CachesStats { latest_at } + let range = { + let range = self.range_per_cache_key.read().clone(); + // Implicitly releasing top-level cache mappings -- concurrent queries can run once again. + + range + .iter() + .map(|(key, cache)| { + let cache = cache.read(); + + ( + key.clone(), + CacheStats { + total_chunks: cache.chunks.len() as _, + total_effective_size_bytes: cache + .chunks + .values() + .map(|cached| cached.chunk.total_size_bytes()) + .sum(), + total_actual_size_bytes: cache.chunks.total_size_bytes(), + }, + ) + }) + .collect() + }; + + CachesStats { latest_at, range } } } diff --git a/crates/store/re_query2/src/lib.rs b/crates/store/re_query2/src/lib.rs index b2d92c611488..62bab7a57d7f 100644 --- a/crates/store/re_query2/src/lib.rs +++ b/crates/store/re_query2/src/lib.rs @@ -3,6 +3,7 @@ mod cache; mod cache_stats; mod latest_at; +mod range; pub mod clamped_zip; pub mod range_zip; @@ -11,9 +12,11 @@ pub use self::cache::{CacheKey, Caches}; pub use self::cache_stats::{CacheStats, CachesStats}; pub use self::clamped_zip::*; pub use self::latest_at::LatestAtResults; +pub use self::range::RangeResults; pub use self::range_zip::*; pub(crate) use self::latest_at::LatestAtCache; +pub(crate) use self::range::RangeCache; pub mod external { pub use paste; diff --git a/crates/store/re_query2/src/range.rs b/crates/store/re_query2/src/range.rs new file mode 100644 index 000000000000..62f6aa213686 --- /dev/null +++ b/crates/store/re_query2/src/range.rs @@ -0,0 +1,317 @@ +use std::{collections::BTreeSet, sync::Arc}; + +use ahash::HashMap; +use nohash_hasher::IntMap; +use parking_lot::RwLock; + +use re_chunk::{Chunk, ChunkId}; +use re_chunk_store::{ChunkStore, RangeQuery, TimeInt}; +use re_log_types::{EntityPath, ResolvedTimeRange}; +use re_types_core::{ComponentName, DeserializationError, SizeBytes}; + +use crate::{CacheKey, Caches}; + +// --- Public API --- + +impl Caches { + /// Queries for the given `component_names` using range semantics. + /// + /// See [`RangeResults`] for more information about how to handle the results. + /// + /// This is a cached API -- data will be lazily cached upon access. + pub fn range( + &self, + store: &ChunkStore, + query: &RangeQuery, + entity_path: &EntityPath, + component_names: impl IntoIterator, + ) -> RangeResults { + re_tracing::profile_function!(entity_path.to_string()); + + let mut results = RangeResults::new(query.clone()); + + // NOTE: This pre-filtering is extremely important: going through all these query layers + // has non-negligible overhead even if the final result ends up being nothing, and our + // number of queries for a frame grows linearly with the number of entity paths. + let component_names = component_names.into_iter().filter(|component_name| { + store.entity_has_component_on_timeline(&query.timeline(), entity_path, component_name) + }); + + for component_name in component_names { + let key = CacheKey::new(entity_path.clone(), query.timeline(), component_name); + + let cache = Arc::clone( + self.range_per_cache_key + .write() + .entry(key.clone()) + .or_insert_with(|| Arc::new(RwLock::new(RangeCache::new(key.clone())))), + ); + + let mut cache = cache.write(); + + cache.handle_pending_invalidation(); + + let cached = cache.range(store, query, entity_path, component_name); + if !cached.is_empty() { + results.add(component_name, cached); + } + } + + results + } +} + +// --- Results --- + +/// Results for a range query. +/// +/// The data is both deserialized and resolved/converted. +/// +/// Use [`RangeResults::get`] or [`RangeResults::get_required`] in order to access the results for +/// each individual component. +#[derive(Debug)] +pub struct RangeResults { + /// The query that yielded these results. + pub query: RangeQuery, + + /// Results for each individual component. + pub components: IntMap>, +} + +impl RangeResults { + #[inline] + pub fn new(query: RangeQuery) -> Self { + Self { + query, + components: Default::default(), + } + } + + #[inline] + pub fn contains(&self, component_name: &ComponentName) -> bool { + self.components.contains_key(component_name) + } + + /// Returns the [`Chunk`]s for the specified `component_name`. + #[inline] + pub fn get(&self, component_name: &ComponentName) -> Option<&[Chunk]> { + self.components + .get(component_name) + .map(|chunks| chunks.as_slice()) + } + + /// Returns the [`Chunk`]s for the specified `component_name`. + /// + /// Returns an error if the component is not present. + #[inline] + pub fn get_required(&self, component_name: &ComponentName) -> crate::Result<&[Chunk]> { + if let Some(chunks) = self.components.get(component_name) { + Ok(chunks) + } else { + Err(DeserializationError::MissingComponent { + component: *component_name, + backtrace: ::backtrace::Backtrace::new_unresolved(), + } + .into()) + } + } +} + +impl RangeResults { + #[doc(hidden)] + #[inline] + pub fn add(&mut self, component_name: ComponentName, chunks: Vec) { + self.components.insert(component_name, chunks); + } +} + +// --- Cache implementation --- + +/// Caches the results of `Range` queries for a given [`CacheKey`]. +pub struct RangeCache { + /// For debugging purposes. + pub cache_key: CacheKey, + + /// All the [`Chunk`]s currently cached. + /// + /// See [`RangeCachedChunk`] for more information. + pub chunks: HashMap, + + /// Every [`ChunkId`] present in this set has been asynchronously invalidated. + /// + /// The next time this cache gets queried, it must remove any entry matching any of these IDs. + /// + /// Invalidation is deferred to query time because it is far more efficient that way: the frame + /// time effectively behaves as a natural micro-batching mechanism. + pub pending_invalidations: BTreeSet, +} + +impl RangeCache { + #[inline] + pub fn new(cache_key: CacheKey) -> Self { + Self { + cache_key, + chunks: HashMap::default(), + pending_invalidations: BTreeSet::default(), + } + } + + /// Returns the time range covered by this [`RangeCache`]. + /// + /// This is extremely slow (`O(n)`), don't use this for anything but debugging. + #[inline] + pub fn time_range(&self) -> ResolvedTimeRange { + self.chunks + .values() + .filter_map(|cached| { + cached + .chunk + .timelines() + .get(&self.cache_key.timeline) + .map(|time_chunk| time_chunk.time_range()) + }) + .fold(ResolvedTimeRange::EMPTY, |mut acc, time_range| { + acc.set_min(TimeInt::min(acc.min(), time_range.min())); + acc.set_max(TimeInt::max(acc.max(), time_range.max())); + acc + }) + } +} + +impl std::fmt::Debug for RangeCache { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + cache_key, + chunks, + pending_invalidations: _, + } = self; + + let mut strings: Vec = Vec::new(); + + strings.push(format!( + "{} ({})", + cache_key.timeline.typ().format_range_utc(self.time_range()), + re_format::format_bytes(chunks.total_size_bytes() as _), + )); + + if strings.is_empty() { + return f.write_str(""); + } + + f.write_str(&strings.join("\n").replace("\n\n", "\n")) + } +} + +pub struct RangeCachedChunk { + pub chunk: Chunk, + + /// When a `Chunk` gets cached, it is pre-processed according to the current [`CacheKey`], + /// e.g. it is time-sorted on the appropriate timeline. + /// + /// In the happy case, pre-processing a `Chunk` is a no-op, and the cached `Chunk` is just a + /// reference to the real one sitting in the store. + /// Otherwise, the cached `Chunk` is a full blown copy of the original one. + pub resorted: bool, +} + +impl SizeBytes for RangeCachedChunk { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { chunk, resorted } = self; + + if *resorted { + // The chunk had to be post-processed for caching. + // Its data was duplicated. + Chunk::heap_size_bytes(chunk) + } else { + // This chunk is just a reference to the one in the store. + // Consider it amortized. + 0 + } + } +} + +impl SizeBytes for RangeCache { + #[inline] + fn heap_size_bytes(&self) -> u64 { + let Self { + cache_key, + chunks, + pending_invalidations, + } = self; + + cache_key.heap_size_bytes() + + chunks.heap_size_bytes() + + pending_invalidations.heap_size_bytes() + } +} + +impl RangeCache { + /// Queries cached range data for a single component. + pub fn range( + &mut self, + store: &ChunkStore, + query: &RangeQuery, + entity_path: &EntityPath, + component_name: ComponentName, + ) -> Vec { + re_tracing::profile_scope!("range", format!("{query:?}")); + + debug_assert_eq!(query.timeline(), self.cache_key.timeline); + + // First, we forward the query as-is to the store. + // + // It's fine to run the query every time -- the index scan itself is not the costly part of a + // range query. + // + // For all relevant chunks that we find, we process them according to the [`CacheKey`], and + // cache them. + + let raw_chunks = store.range_relevant_chunks(query, entity_path, component_name); + for raw_chunk in &raw_chunks { + self.chunks + .entry(raw_chunk.id()) + .or_insert_with(|| RangeCachedChunk { + chunk: raw_chunk + // Pre-sort the cached chunk according to the cache key's timeline. + .sorted_by_timeline_if_unsorted(&self.cache_key.timeline) + // Densify the cached chunk according to the cache key's component, which + // will speed future arrow operations on this chunk. + .densified(component_name), + resorted: !raw_chunk.is_timeline_sorted(&self.cache_key.timeline), + }); + } + + // Second, we simply retrieve from the cache all the relevant `Chunk`s . + // + // Since these `Chunk`s have already been pre-processed adequately, running a range filter + // on them will be quite cheap. + + raw_chunks + .into_iter() + .filter_map(|raw_chunk| self.chunks.get(&raw_chunk.id())) + .map(|cached_sorted_chunk| { + debug_assert!(cached_sorted_chunk + .chunk + .is_timeline_sorted(&query.timeline())); + cached_sorted_chunk.chunk.range(query, component_name) + }) + .filter(|chunk| !chunk.is_empty()) + .collect() + } + + #[inline] + pub fn handle_pending_invalidation(&mut self) { + re_tracing::profile_function!(); + + let Self { + cache_key: _, + chunks, + pending_invalidations, + } = self; + + chunks.retain(|chunk_id, _chunk| !pending_invalidations.contains(chunk_id)); + + pending_invalidations.clear(); + } +} diff --git a/crates/store/re_query2/tests/range.rs b/crates/store/re_query2/tests/range.rs new file mode 100644 index 000000000000..b6d86262cfac --- /dev/null +++ b/crates/store/re_query2/tests/range.rs @@ -0,0 +1,1080 @@ +// https://github.com/rust-lang/rust-clippy/issues/10011 +#![cfg(test)] + +use std::sync::Arc; + +use itertools::Itertools as _; + +use re_chunk::{RowId, Timeline}; +use re_chunk_store::{ + external::re_chunk::Chunk, ChunkStore, ChunkStoreSubscriber as _, RangeQuery, + ResolvedTimeRange, TimeInt, +}; +use re_log_types::{ + build_frame_nr, + example_components::{MyColor, MyPoint, MyPoints}, + EntityPath, TimePoint, +}; +use re_query2::Caches; +use re_types::Archetype; +use re_types_core::Loggable as _; + +// --- + +#[test] +fn simple_range() -> anyhow::Result<()> { + let mut store = ChunkStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + Default::default(), + ); + let mut caches = Caches::new(&store); + + let entity_path: EntityPath = "point".into(); + + let timepoint1 = [build_frame_nr(123)]; + let row_id1_1 = RowId::new(); + let points1_1 = vec![MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)]; + let row_id1_2 = RowId::new(); + let colors1_2 = vec![MyColor::from_rgb(255, 0, 0)]; + let chunk = Chunk::builder(entity_path.clone()) + .with_component_batch(row_id1_1, timepoint1, &points1_1) + .with_component_batch(row_id1_2, timepoint1, &colors1_2) + .build()?; + insert_and_react(&mut store, &mut caches, &Arc::new(chunk)); + + let timepoint2 = [build_frame_nr(223)]; + let row_id2 = RowId::new(); + let colors2 = vec![MyColor::from_rgb(255, 0, 0)]; + let chunk = Chunk::builder(entity_path.clone()) + .with_component_batch(row_id2, timepoint2, &colors2) + .build()?; + insert_and_react(&mut store, &mut caches, &Arc::new(chunk)); + + let timepoint3 = [build_frame_nr(323)]; + let row_id3 = RowId::new(); + let points3 = vec![MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)]; + let chunk = Chunk::builder(entity_path.clone()) + .with_component_batch(row_id3, timepoint3, &points3) + .build()?; + insert_and_react(&mut store, &mut caches, &Arc::new(chunk)); + + // --- First test: `(timepoint1, timepoint3]` --- + + let query = RangeQuery::new( + timepoint1[0].0, + ResolvedTimeRange::new(timepoint1[0].1.as_i64() + 1, timepoint3[0].1), + ); + + let expected_points = &[ + ((TimeInt::new_temporal(323), row_id3), points3.as_slice()), // + ]; + let expected_colors = &[ + ((TimeInt::new_temporal(223), row_id2), colors2.as_slice()), // + ]; + query_and_compare( + &caches, + &store, + &query, + &entity_path, + expected_points, + expected_colors, + ); + + // --- Second test: `[timepoint1, timepoint3]` --- + + let query = RangeQuery::new( + timepoint1[0].0, + ResolvedTimeRange::new(timepoint1[0].1, timepoint3[0].1), + ); + + let expected_points = &[ + ( + (TimeInt::new_temporal(123), row_id1_1), + points1_1.as_slice(), + ), // + ((TimeInt::new_temporal(323), row_id3), points3.as_slice()), // + ]; + let expected_colors = &[ + ( + (TimeInt::new_temporal(123), row_id1_2), + colors1_2.as_slice(), + ), // + ((TimeInt::new_temporal(223), row_id2), colors2.as_slice()), // + ]; + query_and_compare( + &caches, + &store, + &query, + &entity_path, + expected_points, + expected_colors, + ); + + Ok(()) +} + +#[test] +fn static_range() -> anyhow::Result<()> { + let mut store = ChunkStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + Default::default(), + ); + let mut caches = Caches::new(&store); + + let entity_path: EntityPath = "point".into(); + + let timepoint1 = [build_frame_nr(123)]; + let row_id1_1 = RowId::new(); + let points1_1 = vec![MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)]; + let row_id1_2 = RowId::new(); + let colors1_2 = vec![MyColor::from_rgb(255, 0, 0)]; + let chunk = Chunk::builder(entity_path.clone()) + .with_component_batch(row_id1_1, timepoint1, &points1_1) + .with_component_batch(row_id1_2, timepoint1, &colors1_2) + .build()?; + insert_and_react(&mut store, &mut caches, &Arc::new(chunk)); + // Insert statically too! + let row_id1_3 = RowId::new(); + let chunk = Chunk::builder(entity_path.clone()) + .with_component_batch(row_id1_3, TimePoint::default(), &colors1_2) + .build()?; + insert_and_react(&mut store, &mut caches, &Arc::new(chunk)); + + let timepoint2 = [build_frame_nr(223)]; + let row_id2_1 = RowId::new(); + let colors2_1 = vec![MyColor::from_rgb(255, 0, 0)]; + let chunk = Chunk::builder(entity_path.clone()) + .with_component_batch(row_id2_1, timepoint2, &colors2_1) + .build()?; + insert_and_react(&mut store, &mut caches, &Arc::new(chunk)); + // Insert statically too! + let row_id2_2 = RowId::new(); + let chunk = Chunk::builder(entity_path.clone()) + .with_component_batch(row_id2_2, TimePoint::default(), &colors2_1) + .build()?; + insert_and_react(&mut store, &mut caches, &Arc::new(chunk)); + + let timepoint3 = [build_frame_nr(323)]; + // Create some Positions with implicit instances + let row_id3 = RowId::new(); + let points3 = vec![MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)]; + let chunk = Chunk::builder(entity_path.clone()) + .with_component_batch(row_id3, timepoint3, &points3) + .build()?; + insert_and_react(&mut store, &mut caches, &Arc::new(chunk)); + + // --- First test: `(timepoint1, timepoint3]` --- + + let query = RangeQuery::new( + timepoint1[0].0, + ResolvedTimeRange::new(timepoint1[0].1.as_i64() + 1, timepoint3[0].1), + ); + + let expected_points = &[ + ((TimeInt::new_temporal(323), row_id3), points3.as_slice()), // + ]; + let expected_colors = &[ + ((TimeInt::STATIC, row_id2_2), colors2_1.as_slice()), // + ]; + query_and_compare( + &caches, + &store, + &query, + &entity_path, + expected_points, + expected_colors, + ); + + // --- Second test: `[timepoint1, timepoint3]` --- + + // The inclusion of `timepoint1` means latest-at semantics will fall back to timeless data! + + let query = RangeQuery::new( + timepoint1[0].0, + ResolvedTimeRange::new(timepoint1[0].1, timepoint3[0].1), + ); + + let expected_points = &[ + ( + (TimeInt::new_temporal(123), row_id1_1), + points1_1.as_slice(), + ), // + ((TimeInt::new_temporal(323), row_id3), points3.as_slice()), // + ]; + let expected_colors = &[ + ((TimeInt::STATIC, row_id2_2), colors2_1.as_slice()), // + ]; + query_and_compare( + &caches, + &store, + &query, + &entity_path, + expected_points, + expected_colors, + ); + + // --- Third test: `[-inf, +inf]` --- + + let query = RangeQuery::new( + timepoint1[0].0, + ResolvedTimeRange::new(TimeInt::MIN, TimeInt::MAX), + ); + + // same expectations + query_and_compare( + &caches, + &store, + &query, + &entity_path, + expected_points, + expected_colors, + ); + + Ok(()) +} + +// Test the case where the user loads a piece of data at the end of the time range, then a piece at +// the beginning of the range, and finally a piece right in the middle. +// +// DATA = ################################################### +// | | | | \_____/ +// \______/ | | query #1 +// query #2 \_______/ +// query #3 +// +// There is no data invalidation involved, which is what makes this case tricky: the cache must +// properly keep track of the fact that there are holes in the data -- on purpose. +#[test] +fn time_back_and_forth() { + let mut store = ChunkStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + Default::default(), + ); + let mut caches = Caches::new(&store); + + let entity_path: EntityPath = "point".into(); + + let (chunks, points): (Vec<_>, Vec<_>) = (0..10) + .map(|i| { + let timepoint = [build_frame_nr(i)]; + let points = vec![MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)]; + let chunk = Arc::new( + Chunk::builder(entity_path.clone()) + .with_component_batch(RowId::new(), timepoint, &points.clone()) + .build() + .unwrap(), + ); + + insert_and_react(&mut store, &mut caches, &chunk); + + (chunk, points) + }) + .unzip(); + + // --- Query #1: `[8, 10]` --- + + let query = RangeQuery::new( + Timeline::new_sequence("frame_nr"), + ResolvedTimeRange::new(8, 10), + ); + + let expected_points = &[ + ( + ( + TimeInt::new_temporal(8), + chunks[8].row_id_range().unwrap().0, + ), // + points[8].as_slice(), + ), // + ( + ( + TimeInt::new_temporal(9), + chunks[9].row_id_range().unwrap().0, + ), // + points[9].as_slice(), + ), // + ]; + query_and_compare(&caches, &store, &query, &entity_path, expected_points, &[]); + + // --- Query #2: `[1, 3]` --- + + let query = RangeQuery::new( + Timeline::new_sequence("frame_nr"), + ResolvedTimeRange::new(1, 3), + ); + + let expected_points = &[ + ( + ( + TimeInt::new_temporal(1), + chunks[1].row_id_range().unwrap().0, + ), // + points[1].as_slice(), + ), // + ( + ( + TimeInt::new_temporal(2), + chunks[2].row_id_range().unwrap().0, + ), // + points[2].as_slice(), + ), // + ( + ( + TimeInt::new_temporal(3), + chunks[3].row_id_range().unwrap().0, + ), // + points[3].as_slice(), + ), // + ]; + query_and_compare(&caches, &store, &query, &entity_path, expected_points, &[]); + + // --- Query #3: `[5, 7]` --- + + let query = RangeQuery::new( + Timeline::new_sequence("frame_nr"), + ResolvedTimeRange::new(5, 7), + ); + + let expected_points = &[ + ( + ( + TimeInt::new_temporal(5), + chunks[5].row_id_range().unwrap().0, + ), // + points[5].as_slice(), + ), // + ( + ( + TimeInt::new_temporal(6), + chunks[6].row_id_range().unwrap().0, + ), // + points[6].as_slice(), + ), // + ( + ( + TimeInt::new_temporal(7), + chunks[7].row_id_range().unwrap().0, + ), // + points[7].as_slice(), + ), // + ]; + query_and_compare(&caches, &store, &query, &entity_path, expected_points, &[]); +} + +#[test] +fn invalidation() { + let entity_path = "point"; + + let test_invalidation = |query: RangeQuery, + present_data_timepoint: TimePoint, + past_data_timepoint: TimePoint, + future_data_timepoint: TimePoint| { + let past_timestamp = past_data_timepoint + .get(&query.timeline()) + .copied() + .unwrap_or(TimeInt::STATIC); + let present_timestamp = present_data_timepoint + .get(&query.timeline()) + .copied() + .unwrap_or(TimeInt::STATIC); + let future_timestamp = future_data_timepoint + .get(&query.timeline()) + .copied() + .unwrap_or(TimeInt::STATIC); + + let mut store = ChunkStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + Default::default(), + ); + let mut caches = Caches::new(&store); + + let row_id1 = RowId::new(); + let points1 = vec![MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)]; + let chunk1 = Chunk::builder(entity_path.into()) + .with_component_batch(row_id1, present_data_timepoint.clone(), &points1) + .build() + .unwrap(); + insert_and_react(&mut store, &mut caches, &Arc::new(chunk1)); + + let row_id2 = RowId::new(); + let colors2 = vec![MyColor::from_rgb(1, 2, 3)]; + let chunk2 = Chunk::builder(entity_path.into()) + .with_component_batch(row_id2, present_data_timepoint.clone(), &colors2) + .build() + .unwrap(); + insert_and_react(&mut store, &mut caches, &Arc::new(chunk2)); + + let expected_points = &[ + ((present_timestamp, row_id1), points1.as_slice()), // + ]; + let expected_colors = &[ + ((present_timestamp, row_id2), colors2.as_slice()), // + ]; + query_and_compare( + &caches, + &store, + &query, + &entity_path.into(), + expected_points, + expected_colors, + ); + + // --- Modify present --- + + // Modify the PoV component + let row_id3 = RowId::new(); + let points3 = vec![MyPoint::new(10.0, 20.0), MyPoint::new(30.0, 40.0)]; + let chunk3 = Chunk::builder(entity_path.into()) + .with_component_batch(row_id3, present_data_timepoint.clone(), &points3) + .build() + .unwrap(); + insert_and_react(&mut store, &mut caches, &Arc::new(chunk3)); + + let expected_points = &[ + ((present_timestamp, row_id1), points1.as_slice()), // + ((present_timestamp, row_id3), points3.as_slice()), // + ]; + let expected_colors = &[ + ((present_timestamp, row_id2), colors2.as_slice()), // + ]; + query_and_compare( + &caches, + &store, + &query, + &entity_path.into(), + expected_points, + expected_colors, + ); + + // Modify the optional component + let row_id4 = RowId::new(); + let colors4 = vec![MyColor::from_rgb(4, 5, 6), MyColor::from_rgb(7, 8, 9)]; + let chunk4 = Chunk::builder(entity_path.into()) + .with_component_batch(row_id4, present_data_timepoint.clone(), &colors4) + .build() + .unwrap(); + insert_and_react(&mut store, &mut caches, &Arc::new(chunk4)); + + let expected_points = &[ + ((present_timestamp, row_id1), points1.as_slice()), // + ((present_timestamp, row_id3), points3.as_slice()), // + ]; + let expected_colors = &[ + ((present_timestamp, row_id2), colors2.as_slice()), // + ((present_timestamp, row_id4), colors4.as_slice()), // + ]; + query_and_compare( + &caches, + &store, + &query, + &entity_path.into(), + expected_points, + expected_colors, + ); + + // --- Modify past --- + + // Modify the PoV component + let points5 = vec![MyPoint::new(100.0, 200.0), MyPoint::new(300.0, 400.0)]; + let row_id5 = RowId::new(); + let chunk5 = Chunk::builder(entity_path.into()) + .with_component_batch(row_id5, past_data_timepoint.clone(), &points5) + .build() + .unwrap(); + insert_and_react(&mut store, &mut caches, &Arc::new(chunk5)); + + let expected_points1 = &[ + ((past_timestamp, row_id5), points5.as_slice()), // + ] as &[_]; + let expected_points2 = &[ + ((past_timestamp, row_id5), points5.as_slice()), // + ((present_timestamp, row_id1), points1.as_slice()), // + ((present_timestamp, row_id3), points3.as_slice()), // + ] as &[_]; + let expected_points = if past_data_timepoint.is_static() { + expected_points1 + } else { + expected_points2 + }; + let expected_colors = &[ + ((present_timestamp, row_id2), colors2.as_slice()), // + ((present_timestamp, row_id4), colors4.as_slice()), // + ]; + query_and_compare( + &caches, + &store, + &query, + &entity_path.into(), + expected_points, + expected_colors, + ); + + // Modify the optional component + let row_id6 = RowId::new(); + let colors6 = vec![MyColor::from_rgb(10, 11, 12), MyColor::from_rgb(13, 14, 15)]; + let chunk6 = Chunk::builder(entity_path.into()) + .with_component_batch(row_id6, past_data_timepoint.clone(), &colors6) + .build() + .unwrap(); + insert_and_react(&mut store, &mut caches, &Arc::new(chunk6)); + + let expected_colors1 = &[ + ((past_timestamp, row_id6), colors6.as_slice()), // + ] as &[_]; + let expected_colors2 = &[ + ((past_timestamp, row_id6), colors6.as_slice()), // + ((present_timestamp, row_id2), colors2.as_slice()), // + ((present_timestamp, row_id4), colors4.as_slice()), // + ] as &[_]; + let expected_colors = if past_data_timepoint.is_static() { + expected_colors1 + } else { + expected_colors2 + }; + query_and_compare( + &caches, + &store, + &query, + &entity_path.into(), + expected_points, + expected_colors, + ); + + // --- Modify future --- + + // Modify the PoV component + let row_id7 = RowId::new(); + let points7 = vec![MyPoint::new(1000.0, 2000.0), MyPoint::new(3000.0, 4000.0)]; + let chunk7 = Chunk::builder(entity_path.into()) + .with_component_batch(row_id7, future_data_timepoint.clone(), &points7) + .build() + .unwrap(); + insert_and_react(&mut store, &mut caches, &Arc::new(chunk7)); + + let expected_points1 = &[ + ((past_timestamp, row_id5), points5.as_slice()), // + ] as &[_]; + let expected_points2 = &[ + ((past_timestamp, row_id5), points5.as_slice()), // + ((present_timestamp, row_id1), points1.as_slice()), // + ((present_timestamp, row_id3), points3.as_slice()), // + ((future_timestamp, row_id7), points7.as_slice()), // + ] as &[_]; + let expected_points = if past_data_timepoint.is_static() { + expected_points1 + } else { + expected_points2 + }; + query_and_compare( + &caches, + &store, + &query, + &entity_path.into(), + expected_points, + expected_colors, + ); + + // Modify the optional component + let row_id8 = RowId::new(); + let colors8 = vec![MyColor::from_rgb(16, 17, 18)]; + let chunk8 = Chunk::builder(entity_path.into()) + .with_component_batch(row_id8, future_data_timepoint.clone(), &colors8) + .build() + .unwrap(); + insert_and_react(&mut store, &mut caches, &Arc::new(chunk8)); + + let expected_colors1 = &[ + ((past_timestamp, row_id6), colors6.as_slice()), // + ] as &[_]; + let expected_colors2 = &[ + ((past_timestamp, row_id6), colors6.as_slice()), // + ((present_timestamp, row_id2), colors2.as_slice()), // + ((present_timestamp, row_id4), colors4.as_slice()), // + ((future_timestamp, row_id8), colors8.as_slice()), // + ] as &[_]; + let expected_colors = if past_data_timepoint.is_static() { + expected_colors1 + } else { + expected_colors2 + }; + query_and_compare( + &caches, + &store, + &query, + &entity_path.into(), + expected_points, + expected_colors, + ); + }; + + let timeless = TimePoint::default(); + let frame_122 = build_frame_nr(122); + let frame_123 = build_frame_nr(123); + let frame_124 = build_frame_nr(124); + + test_invalidation( + RangeQuery::new(frame_123.0, ResolvedTimeRange::EVERYTHING), + [frame_123].into(), + [frame_122].into(), + [frame_124].into(), + ); + + test_invalidation( + RangeQuery::new(frame_123.0, ResolvedTimeRange::EVERYTHING), + [frame_123].into(), + timeless, + [frame_124].into(), + ); +} + +// Test the following scenario: +// ```py +// rr.log("points", rr.Points3D([1, 2, 3]), static=True) +// +// # Do first query here: LatestAt(+inf) +// # Expected: points=[[1,2,3]] colors=[] +// +// rr.set_time(2) +// rr.log_components("points", rr.components.MyColor(0xFF0000)) +// +// # Do second query here: LatestAt(+inf) +// # Expected: points=[[1,2,3]] colors=[0xFF0000] +// +// rr.set_time(3) +// rr.log_components("points", rr.components.MyColor(0x0000FF)) +// +// # Do third query here: LatestAt(+inf) +// # Expected: points=[[1,2,3]] colors=[0x0000FF] +// +// rr.set_time(3) +// rr.log_components("points", rr.components.MyColor(0x00FF00)) +// +// # Do fourth query here: LatestAt(+inf) +// # Expected: points=[[1,2,3]] colors=[0x00FF00] +// ``` +#[test] +fn invalidation_of_future_optionals() { + let mut store = ChunkStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + Default::default(), + ); + let mut caches = Caches::new(&store); + + let entity_path = "points"; + + let timeless = TimePoint::default(); + let frame2 = [build_frame_nr(2)]; + let frame3 = [build_frame_nr(3)]; + + let query = RangeQuery::new(frame2[0].0, ResolvedTimeRange::EVERYTHING); + + let row_id1 = RowId::new(); + let points1 = vec![MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)]; + let chunk1 = Chunk::builder(entity_path.into()) + .with_component_batch(row_id1, timeless, &points1) + .build() + .unwrap(); + insert_and_react(&mut store, &mut caches, &Arc::new(chunk1)); + + let expected_points = &[ + ((TimeInt::STATIC, row_id1), points1.as_slice()), // + ]; + let expected_colors = &[]; + query_and_compare( + &caches, + &store, + &query, + &entity_path.into(), + expected_points, + expected_colors, + ); + + let row_id2 = RowId::new(); + let colors2 = vec![MyColor::from_rgb(255, 0, 0)]; + let chunk2 = Chunk::builder(entity_path.into()) + .with_component_batch(row_id2, frame2, &colors2) + .build() + .unwrap(); + insert_and_react(&mut store, &mut caches, &Arc::new(chunk2)); + + let expected_colors = &[ + ((TimeInt::new_temporal(2), row_id2), colors2.as_slice()), // + ]; + query_and_compare( + &caches, + &store, + &query, + &entity_path.into(), + expected_points, + expected_colors, + ); + + let row_id3 = RowId::new(); + let colors3 = vec![MyColor::from_rgb(0, 0, 255)]; + let chunk3 = Chunk::builder(entity_path.into()) + .with_component_batch(row_id3, frame3, &colors3) + .build() + .unwrap(); + insert_and_react(&mut store, &mut caches, &Arc::new(chunk3)); + + let expected_colors = &[ + ((TimeInt::new_temporal(2), row_id2), colors2.as_slice()), // + ((TimeInt::new_temporal(3), row_id3), colors3.as_slice()), // + ]; + query_and_compare( + &caches, + &store, + &query, + &entity_path.into(), + expected_points, + expected_colors, + ); + + let row_id4 = RowId::new(); + let colors4 = vec![MyColor::from_rgb(0, 255, 0)]; + let chunk4 = Chunk::builder(entity_path.into()) + .with_component_batch(row_id4, frame3, &colors4) + .build() + .unwrap(); + insert_and_react(&mut store, &mut caches, &Arc::new(chunk4)); + + let expected_colors = &[ + ((TimeInt::new_temporal(2), row_id2), colors2.as_slice()), // + ((TimeInt::new_temporal(3), row_id3), colors3.as_slice()), // + ((TimeInt::new_temporal(3), row_id4), colors4.as_slice()), // + ]; + query_and_compare( + &caches, + &store, + &query, + &entity_path.into(), + expected_points, + expected_colors, + ); +} + +#[test] +fn invalidation_static() { + let mut store = ChunkStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + Default::default(), + ); + let mut caches = Caches::new(&store); + + let entity_path = "points"; + + let timeless = TimePoint::default(); + + let frame0 = [build_frame_nr(TimeInt::ZERO)]; + let query = RangeQuery::new(frame0[0].0, ResolvedTimeRange::EVERYTHING); + + let row_id1 = RowId::new(); + let points1 = vec![MyPoint::new(1.0, 2.0), MyPoint::new(3.0, 4.0)]; + let chunk1 = Chunk::builder(entity_path.into()) + .with_component_batch(row_id1, timeless.clone(), &points1) + .build() + .unwrap(); + insert_and_react(&mut store, &mut caches, &Arc::new(chunk1)); + + let expected_points = &[ + ((TimeInt::STATIC, row_id1), points1.as_slice()), // + ]; + let expected_colors = &[]; + query_and_compare( + &caches, + &store, + &query, + &entity_path.into(), + expected_points, + expected_colors, + ); + + let row_id2 = RowId::new(); + let colors2 = vec![MyColor::from_rgb(255, 0, 0)]; + let chunk2 = Chunk::builder(entity_path.into()) + .with_component_batch(row_id2, timeless.clone(), &colors2) + .build() + .unwrap(); + insert_and_react(&mut store, &mut caches, &Arc::new(chunk2)); + + let expected_colors = &[ + ((TimeInt::STATIC, row_id2), colors2.as_slice()), // + ]; + query_and_compare( + &caches, + &store, + &query, + &entity_path.into(), + expected_points, + expected_colors, + ); + + let row_id3 = RowId::new(); + let colors3 = vec![MyColor::from_rgb(0, 0, 255)]; + let chunk3 = Chunk::builder(entity_path.into()) + .with_component_batch(row_id3, timeless, &colors3) + .build() + .unwrap(); + insert_and_react(&mut store, &mut caches, &Arc::new(chunk3)); + + let expected_colors = &[ + ((TimeInt::STATIC, row_id3), colors3.as_slice()), // + ]; + query_and_compare( + &caches, + &store, + &query, + &entity_path.into(), + expected_points, + expected_colors, + ); +} + +// See . +#[test] +fn concurrent_multitenant_edge_case() { + let mut store = ChunkStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + Default::default(), + ); + let mut caches = Caches::new(&store); + + let entity_path: EntityPath = "point".into(); + + let add_points = |time: i64, point_value: f32| { + let timepoint = [build_frame_nr(time)]; + let points = vec![ + MyPoint::new(point_value, point_value + 1.0), + MyPoint::new(point_value + 2.0, point_value + 3.0), + ]; + let chunk = Arc::new( + Chunk::builder(entity_path.clone()) + .with_component_batch(RowId::new(), timepoint, &points) + .build() + .unwrap(), + ); + (timepoint, points, chunk) + }; + + let (timepoint1, points1, chunk1) = add_points(123, 1.0); + insert_and_react(&mut store, &mut caches, &chunk1); + let (_timepoint2, points2, chunk2) = add_points(223, 2.0); + insert_and_react(&mut store, &mut caches, &chunk2); + let (_timepoint3, points3, chunk3) = add_points(323, 3.0); + insert_and_react(&mut store, &mut caches, &chunk3); + + // --- Tenant #1 queries the data, but doesn't cache the result in the deserialization cache --- + + let query = RangeQuery::new(timepoint1[0].0, ResolvedTimeRange::EVERYTHING); + + eprintln!("{store}"); + + { + let cached = caches.range( + &store, + &query, + &entity_path, + MyPoints::all_components().iter().copied(), + ); + + let _cached_all_points = cached.get_required(&MyPoint::name()).unwrap(); + } + + // --- Meanwhile, tenant #2 queries and deserializes the data --- + + let query = RangeQuery::new(timepoint1[0].0, ResolvedTimeRange::EVERYTHING); + + let expected_points = &[ + ( + (TimeInt::new_temporal(123), chunk1.row_id_range().unwrap().0), + points1.as_slice(), + ), // + ( + (TimeInt::new_temporal(223), chunk2.row_id_range().unwrap().0), + points2.as_slice(), + ), // + ( + (TimeInt::new_temporal(323), chunk3.row_id_range().unwrap().0), + points3.as_slice(), + ), // + ]; + query_and_compare(&caches, &store, &query, &entity_path, expected_points, &[]); +} + +// See . +#[test] +fn concurrent_multitenant_edge_case2() { + let mut store = ChunkStore::new( + re_log_types::StoreId::random(re_log_types::StoreKind::Recording), + Default::default(), + ); + let mut caches = Caches::new(&store); + + let entity_path: EntityPath = "point".into(); + + let add_points = |time: i64, point_value: f32| { + let timepoint = [build_frame_nr(time)]; + let points = vec![ + MyPoint::new(point_value, point_value + 1.0), + MyPoint::new(point_value + 2.0, point_value + 3.0), + ]; + let chunk = Arc::new( + Chunk::builder(entity_path.clone()) + .with_component_batch(RowId::new(), timepoint, &points) + .build() + .unwrap(), + ); + (timepoint, points, chunk) + }; + + let (timepoint1, points1, chunk1) = add_points(123, 1.0); + insert_and_react(&mut store, &mut caches, &chunk1); + let (_timepoint2, points2, chunk2) = add_points(223, 2.0); + insert_and_react(&mut store, &mut caches, &chunk2); + let (_timepoint3, points3, chunk3) = add_points(323, 3.0); + insert_and_react(&mut store, &mut caches, &chunk3); + let (_timepoint4, points4, chunk4) = add_points(423, 4.0); + insert_and_react(&mut store, &mut caches, &chunk4); + let (_timepoint5, points5, chunk5) = add_points(523, 5.0); + insert_and_react(&mut store, &mut caches, &chunk5); + + // --- Tenant #1 queries the data at (123, 223), but doesn't cache the result in the deserialization cache --- + + let query1 = RangeQuery::new(timepoint1[0].0, ResolvedTimeRange::new(123, 223)); + { + let cached = caches.range( + &store, + &query1, + &entity_path, + MyPoints::all_components().iter().copied(), + ); + + let _cached_all_points = cached.get_required(&MyPoint::name()).unwrap(); + } + + // --- Tenant #2 queries the data at (423, 523), but doesn't cache the result in the deserialization cache --- + + let query2 = RangeQuery::new(timepoint1[0].0, ResolvedTimeRange::new(423, 523)); + { + let cached = caches.range( + &store, + &query2, + &entity_path, + MyPoints::all_components().iter().copied(), + ); + + let _cached_all_points = cached.get_required(&MyPoint::name()).unwrap(); + } + + // --- Tenant #2 queries the data at (223, 423) and deserializes it --- + + let query3 = RangeQuery::new(timepoint1[0].0, ResolvedTimeRange::new(223, 423)); + let expected_points = &[ + ( + (TimeInt::new_temporal(223), chunk2.row_id_range().unwrap().0), + points2.as_slice(), + ), // + ( + (TimeInt::new_temporal(323), chunk3.row_id_range().unwrap().0), + points3.as_slice(), + ), // + ( + (TimeInt::new_temporal(423), chunk4.row_id_range().unwrap().0), + points4.as_slice(), + ), // + ]; + query_and_compare(&caches, &store, &query3, &entity_path, expected_points, &[]); + + // --- Tenant #1 finally deserializes its data --- + + let expected_points = &[ + ( + (TimeInt::new_temporal(123), chunk1.row_id_range().unwrap().0), + points1.as_slice(), + ), // + ( + (TimeInt::new_temporal(223), chunk2.row_id_range().unwrap().0), + points2.as_slice(), + ), // + ]; + query_and_compare(&caches, &store, &query1, &entity_path, expected_points, &[]); + + // --- Tenant #2 finally deserializes its data --- + + let expected_points = &[ + ( + (TimeInt::new_temporal(423), chunk4.row_id_range().unwrap().0), + points4.as_slice(), + ), // + ( + (TimeInt::new_temporal(523), chunk5.row_id_range().unwrap().0), + points5.as_slice(), + ), // + ]; + query_and_compare(&caches, &store, &query2, &entity_path, expected_points, &[]); +} + +// // --- + +fn insert_and_react(store: &mut ChunkStore, caches: &mut Caches, chunk: &Arc) { + caches.on_events(&store.insert_chunk(chunk).unwrap()); +} + +fn query_and_compare( + caches: &Caches, + store: &ChunkStore, + query: &RangeQuery, + entity_path: &EntityPath, + expected_all_points_indexed: &[((TimeInt, RowId), &[MyPoint])], + expected_all_colors_indexed: &[((TimeInt, RowId), &[MyColor])], +) { + re_log::setup_logging(); + + for _ in 0..3 { + let cached = caches.range( + store, + query, + entity_path, + MyPoints::all_components().iter().copied(), + ); + + let all_points_chunks = cached.get_required(&MyPoint::name()).unwrap(); + let mut all_points_iters = all_points_chunks + .iter() + .map(|chunk| chunk.iter_component::()) + .collect_vec(); + let all_points_indexed = { + let all_points = all_points_iters.iter_mut().flat_map(|it| it.into_iter()); + let all_points_indices = all_points_chunks.iter().flat_map(|chunk| { + chunk.iter_component_indices(&query.timeline(), &MyPoint::name()) + }); + itertools::izip!(all_points_indices, all_points) + }; + + let all_colors_chunks = cached.get(&MyColor::name()).unwrap_or_default(); + let mut all_colors_iters = all_colors_chunks + .iter() + .map(|chunk| chunk.iter_component::()) + .collect_vec(); + let all_colors_indexed = { + let all_colors = all_colors_iters.iter_mut().flat_map(|it| it.into_iter()); + let all_colors_indices = all_colors_chunks.iter().flat_map(|chunk| { + chunk.iter_component_indices(&query.timeline(), &MyColor::name()) + }); + itertools::izip!(all_colors_indices, all_colors) + }; + + eprintln!("{query:?}"); + eprintln!("{store}"); + + similar_asserts::assert_eq!( + expected_all_points_indexed, + all_points_indexed.collect_vec(), + ); + + similar_asserts::assert_eq!( + expected_all_colors_indexed, + all_colors_indexed.collect_vec(), + ); + } +}