From 9a5cdcb673a56080148c77491df0fae2809f3d02 Mon Sep 17 00:00:00 2001 From: Andrei Dragomir Date: Wed, 11 Jun 2025 13:57:39 +0300 Subject: [PATCH 1/5] [HSTACK] - add support for pseudo-CDF (load recent commits & skip checkpoints) Signed-off-by: Adrian Tanase --- .../core/src/kernel/snapshot/log_segment.rs | 81 +++++++++++++++++ crates/core/src/kernel/snapshot/mod.rs | 86 +++++++++++++++++-- crates/core/src/table/builder.rs | 38 +++++++- 3 files changed, 197 insertions(+), 8 deletions(-) diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 2005c0ee8f..73c977c17d 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -135,6 +135,48 @@ impl LogSegment { Ok(segment) } + /// Try to create a new [`LogSegment`] from a slice of the log. + /// + /// This will create a new [`LogSegment`] from the log with JUST relevant commit log files + /// starting at `start_version` and ending at `end_version`. + /// This ignores check point files, allowing to reconstruct a pseudo-changelog for quick preview + /// of recent data. + pub async fn try_recent_commits( + log_store: &dyn LogStore, + start_version: i64, + max_commits: usize, + ) -> DeltaResult { + debug!("try_recent_commits: start_version: {start_version}",); + log_store.refresh().await?; + let log_url = log_store.log_root_url(); + let mut store_root = log_url.clone(); + store_root.set_path(""); + let log_path = crate::logstore::object_store_path(&log_url)?; + + let mut commit_files = list_commit_files( + &log_store.root_object_store(None), + &log_path, + None, + Some(start_version), + &store_root, + ) + .await?; + + // max count of commits to load without starting from checkpoint + commit_files.truncate(max_commits); + + validate(&commit_files, &vec![])?; + + let mut segment = Self { + version: start_version, + commit_files: commit_files.into_iter().map(|(meta, _)| meta).collect(), + checkpoint_files: vec![], + }; + segment.version = segment.file_version().unwrap_or(start_version); + + Ok(segment) + } + /// Returns the highest commit version number in the log segment pub fn file_version(&self) -> Option { let dummy_url = Url::parse("dummy:///").unwrap(); @@ -553,6 +595,45 @@ pub(super) async fn list_log_files( Ok((commit_files, checkpoint_files)) } +/// List relevant commit files, ignoring checkpoints +/// +/// See `try_recent_commits` for more details on how this is used +pub(super) async fn list_commit_files( + root_store: &dyn ObjectStore, + log_root: &Path, + max_version: Option, + start_version: Option, + store_root: &Url, +) -> DeltaResult)>> { + let max_version = max_version.unwrap_or(i64::MAX - 1); + let start_from = log_root.child(format!("{:020}", start_version.unwrap_or(0)).as_str()); + + let mut commit_files = Vec::with_capacity(25); + + for meta in root_store + .list_with_offset(Some(log_root), &start_from) + .try_collect::>() + .await? + .into_iter() + .filter_map(|f| { + let file_url = store_root.join(f.location.as_ref()).ok()?; + let path = ParsedLogPath::try_from(file_url).ok()??; + Some((f, path)) + }) + { + if meta.1.version <= max_version as u64 + && Some(meta.1.version as i64) >= start_version + && matches!(meta.1.file_type, LogPathFileType::Commit){ + commit_files.push(meta); + } + } + + // NOTE this will sort in reverse order + commit_files.sort_unstable_by(|a, b| b.0.location.cmp(&a.0.location)); + + Ok(commit_files) +} + #[cfg(test)] pub(super) mod tests { use delta_kernel::table_features::{ReaderFeature, WriterFeature}; diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 686e6cabdb..f4966ff4aa 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -15,18 +15,18 @@ //! //! -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::io::{BufRead, BufReader, Cursor}; -use ::serde::{Deserialize, Serialize}; use arrow_array::RecordBatch; use delta_kernel::path::{LogPathFileType, ParsedLogPath}; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; -use object_store::ObjectStore; +use object_store::{ObjectMeta, ObjectStore}; +use ::serde::{Deserialize, Serialize}; +use tracing::log::{debug, info}; use url::Url; - use self::log_segment::LogSegment; use self::parse::{read_adds, read_removes}; use self::replay::{LogMapper, LogReplayScanner, ReplayStream}; @@ -36,7 +36,7 @@ use super::{Action, Add, AddCDCFile, CommitInfo, Metadata, Protocol, Remove, Tra use crate::kernel::parse::read_cdf_adds; use crate::kernel::transaction::{CommitData, PROTOCOL}; use crate::kernel::{ActionType, StructType}; -use crate::logstore::LogStore; +use crate::logstore::{LogStore, LogStoreExt}; use crate::table::config::TableConfig; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; @@ -73,7 +73,17 @@ impl Snapshot { config: DeltaTableConfig, version: Option, ) -> DeltaResult { - let log_segment = LogSegment::try_new(log_store, version).await?; + let log_segment = if config.pseudo_cdf { + // we need the full changes for metadata loading + info!( + "Loading table with pseudo-cdf, target version is: {:?}", + version + ); + LogSegment::try_new(log_store, None).await? + } else { + // classic behaviour, only load metadata up to version + LogSegment::try_new(log_store, version).await? + }; let (protocol, metadata) = log_segment.read_metadata(log_store, &config).await?; if metadata.is_none() || protocol.is_none() { return Err(DeltaTableError::Generic( @@ -85,6 +95,70 @@ impl Snapshot { PROTOCOL.can_read_from_protocol(&protocol)?; + + let mut store_root = log_store.table_root_url().clone(); + store_root.set_path(""); + + + // TODO: extract to helper function + let log_segment = if config.pseudo_cdf { + // unless provided with a start version, we only load the last commit + let start_version = version.unwrap_or(log_segment.version); + + // check if we already have loaded the needed version, discarding VACUUM commits + let min_version = log_segment + .commit_files + .iter() + // simple heuristic for ruling out VACUUM commits (in practice ~900 bytes) + // if this fails, we would need to use commits_stream() and look explicitly for adds + .filter(|f| f.size > 1500) + .filter_map(|f| { + let file_url = store_root.join(f.location.as_ref()).ok()?; + let path = ParsedLogPath::try_from(file_url).ok()??; + Some(path.version as i64) + }) + .min() + .unwrap_or(i64::MAX); + + if start_version >= min_version { + let mut recent_commits: VecDeque = log_segment + .commit_files + .iter() + // go back 2 more commits in order to make sure we skip over VACUUM start & end + .filter_map(|f| { + let file_url = store_root.join(f.location.as_ref()).ok()?; + let path = ParsedLogPath::try_from(file_url).ok()??; + if path.version as i64 >= start_version - 2 { + Some(f) + } else { + None + } + }) + .cloned() + .collect(); + recent_commits.truncate(config.pseudo_cdf_max_commits); + debug!("Reusing existing log files: {:?}", recent_commits); + LogSegment { + version: log_segment.version, + commit_files: recent_commits, + checkpoint_files: vec![], + } + } else { + // go back 2 more commits in order to make sure we skip over VACUUM start & end + let start_version = (start_version - 2).max(0); + let segment = LogSegment::try_recent_commits( + log_store, + start_version, + config.pseudo_cdf_max_commits, + ) + .await?; + debug!("Version not found after checkpoint, reloading slice from version: {start_version}: {:?}", segment.commit_files); + segment + } + } else { + log_segment + }; + Ok(Self { log_segment, config, diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 15a90b7e9e..2952974a25 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -38,6 +38,14 @@ pub struct DeltaTableConfig { /// Hence, DeltaTable will be loaded with significant memory reduction. pub require_files: bool, + /// Enables "pseudo-changelog" by only loading JSON commit files starting with specified version + /// Please note that the meaning of version or datestring is flipped, instead of loading data + /// UP TO the version, it will now load data added AFTER this version + pub pseudo_cdf: bool, + + /// Max number of commits to load + pub pseudo_cdf_max_commits: usize, + /// Controls how many files to buffer from the commit log when updating the table. /// This defaults to 4 * number of cpus /// @@ -57,13 +65,15 @@ pub struct DeltaTableConfig { pub io_runtime: Option, #[delta(skip)] - pub options: HashMap + pub options: HashMap, } impl Default for DeltaTableConfig { fn default() -> Self { Self { require_files: true, + pseudo_cdf: false, + pseudo_cdf_max_commits: 1024, log_buffer_size: num_cpus::get() * 4, log_batch_size: 1024, io_runtime: None, @@ -149,6 +159,23 @@ impl DeltaTableBuilder { self } + /// Sets `pseudo_cdf=true` to the builder + pub fn with_pseudo_cdf(mut self) -> Self { + self.table_config.pseudo_cdf = true; + self + } + + /// Sets `log_buffer_size` to the builder + pub fn with_pseudo_cdf_max_commits(mut self, max_commits: usize) -> DeltaResult { + if max_commits == 0 { + return Err(DeltaTableError::Generic(String::from( + "Max number of commits should be positive", + ))); + } + self.table_config.pseudo_cdf_max_commits = max_commits; + Ok(self) + } + /// Sets `version` to the builder pub fn with_version(mut self, version: i64) -> Self { self.version = DeltaVersion::Version(version); @@ -215,7 +242,14 @@ impl DeltaTableBuilder { storage_options .clone() .into_iter() - .map(|(k, v)| (k.strip_prefix("deltalake.").map(ToString::to_string).unwrap_or(k), v)) + .map(|(k, v)| { + ( + k.strip_prefix("deltalake.") + .map(ToString::to_string) + .unwrap_or(k), + v, + ) + }) .map(|(k, v)| { let needs_trim = v.starts_with("http://") || v.starts_with("https://") From 0733bd542d29fb6906bfb2419727b77663d29d5a Mon Sep 17 00:00:00 2001 From: Adrian Tanase Date: Wed, 11 Jun 2025 12:53:59 +0300 Subject: [PATCH 2/5] [HSTACK] - fix crash on missing statistics Signed-off-by: Adrian Tanase --- crates/core/src/kernel/snapshot/log_data.rs | 8 ++++++-- crates/core/src/operations/cast/mod.rs | 16 +++++++--------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 959d94953d..94844319a8 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -659,8 +659,12 @@ mod datafusion { .map(|sv| sv.to_array()) .collect::, DataFusionError>>() .unwrap(); - let sa = StructArray::new(fields.clone(), arrays, None); - Precision::Exact(ScalarValue::Struct(Arc::new(sa))) + if arrays.is_empty() { + Precision::Absent + } else { + let sa = StructArray::new(fields.clone(), arrays, None); + Precision::Exact(ScalarValue::Struct(Arc::new(sa))) + } }) .unwrap_or(Precision::Absent), _ => Precision::Absent, diff --git a/crates/core/src/operations/cast/mod.rs b/crates/core/src/operations/cast/mod.rs index b3be011ebc..559531bb9c 100644 --- a/crates/core/src/operations/cast/mod.rs +++ b/crates/core/src/operations/cast/mod.rs @@ -213,15 +213,13 @@ pub fn cast_record_batch( batch.num_rows(), )? } else { - // Can be simplified with StructArray::try_new_with_length in arrow 55.1 - let col_arrays = batch.columns().to_owned(); - let s = if col_arrays.is_empty() { - StructArray::new_empty_fields(batch.num_rows(), None) - } else { - StructArray::new(batch.schema().as_ref().to_owned().fields, col_arrays, None) - }; - - cast_struct(&s, target_schema.fields(), &cast_options, add_missing)? + let s = StructArray::try_new_with_length( + batch.schema().as_ref().to_owned().fields, + batch.columns().to_owned(), + None, + batch.num_rows(), + )?; + cast_struct(&s, target_schema.fields(), &cast_options, add_missing)? }; Ok(RecordBatch::try_new_with_options( target_schema, From 009c85e555075fb2f34603faca850c6d603c79b2 Mon Sep 17 00:00:00 2001 From: Adrian Tanase Date: Tue, 17 Jun 2025 13:37:47 +0300 Subject: [PATCH 3/5] [HSTACK] - fix load_with_datetime max_version Signed-off-by: Adrian Tanase --- crates/core/src/table/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 9a5ac478c0..538d5c8bb9 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -411,7 +411,11 @@ impl DeltaTable { break; } } - let mut max_version = match self.get_latest_version().await { + let mut max_version = match self + .log_store + .get_latest_version(self.version().unwrap_or(min_version)) + .await + { Ok(version) => version, Err(DeltaTableError::InvalidVersion(_)) => { return Err(DeltaTableError::NotATable( From 9e36485132f6ceeefa3d02aae37eef22b30fd849 Mon Sep 17 00:00:00 2001 From: Adrian Tanase Date: Tue, 17 Jun 2025 17:05:47 +0300 Subject: [PATCH 4/5] [HSTACK] - add max_log_size hard limit + fallback to pseudo_cdf --- crates/core/src/kernel/snapshot/mod.rs | 45 +++++++++++++++++++------- crates/core/src/table/builder.rs | 21 ++++++++++++ 2 files changed, 55 insertions(+), 11 deletions(-) diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index f4966ff4aa..1a0fa362b5 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -18,15 +18,6 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::io::{BufRead, BufReader, Cursor}; -use arrow_array::RecordBatch; -use delta_kernel::path::{LogPathFileType, ParsedLogPath}; -use futures::stream::BoxStream; -use futures::{StreamExt, TryStreamExt}; -use object_store::path::Path; -use object_store::{ObjectMeta, ObjectStore}; -use ::serde::{Deserialize, Serialize}; -use tracing::log::{debug, info}; -use url::Url; use self::log_segment::LogSegment; use self::parse::{read_adds, read_removes}; use self::replay::{LogMapper, LogReplayScanner, ReplayStream}; @@ -39,6 +30,15 @@ use crate::kernel::{ActionType, StructType}; use crate::logstore::{LogStore, LogStoreExt}; use crate::table::config::TableConfig; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; +use arrow_array::RecordBatch; +use delta_kernel::path::{LogPathFileType, ParsedLogPath}; +use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; +use object_store::path::Path; +use object_store::{ObjectMeta, ObjectStore}; +use ::serde::{Deserialize, Serialize}; +use tracing::log::{debug, info}; +use url::Url; pub use self::log_data::*; @@ -95,10 +95,33 @@ impl Snapshot { PROTOCOL.can_read_from_protocol(&protocol)?; - let mut store_root = log_store.table_root_url().clone(); store_root.set_path(""); + // Check if the log segment size exceeds the configured maximum + if let Some(max_size) = config.max_log_bytes { + let total_size: u64 = log_segment + .checkpoint_files + .iter() + .chain(log_segment.commit_files.iter()) + .map(|f| f.size) + .sum(); + if total_size > max_size as u64 { + if config.pseudo_cdf { + // Use pseudo_cdf as fallback when table is too large + info!( + "Log segment size ({} bytes) exceeds max_log_size ({} bytes), using pseudo_cdf fallback", + total_size, max_size + ); + } else { + return Err(DeltaTableError::Generic(format!( + "Table log segment size ({} bytes) exceeds maximum allowed size ({} bytes). \ + Consider enabling pseudo_cdf mode or increasing max_log_size.", + total_size, max_size + ))); + } + } + } // TODO: extract to helper function let log_segment = if config.pseudo_cdf { @@ -137,7 +160,7 @@ impl Snapshot { .cloned() .collect(); recent_commits.truncate(config.pseudo_cdf_max_commits); - debug!("Reusing existing log files: {:?}", recent_commits); + debug!("Reusing existing log files: {:?}", recent_commits); LogSegment { version: log_segment.version, commit_files: recent_commits, diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 2952974a25..febab8d31d 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -59,6 +59,12 @@ pub struct DeltaTableConfig { /// when processing record batches. pub log_batch_size: usize, + /// Maximum allowed size in bytes for the total log segment (checkpoint + commit files). + /// If the log segment exceeds this size, the table loading will fail unless `pseudo_cdf` + /// is enabled, in which case it will be used as a fallback. + /// If `None`, no size limit is enforced. + pub max_log_bytes: Option, + #[serde(skip_serializing, skip_deserializing)] #[delta(skip)] /// When a runtime handler is provided, all IO tasks are spawn in that handle @@ -76,6 +82,7 @@ impl Default for DeltaTableConfig { pseudo_cdf_max_commits: 1024, log_buffer_size: num_cpus::get() * 4, log_batch_size: 1024, + max_log_bytes: None, io_runtime: None, options: HashMap::new(), } @@ -87,6 +94,9 @@ impl PartialEq for DeltaTableConfig { self.require_files == other.require_files && self.log_buffer_size == other.log_buffer_size && self.log_batch_size == other.log_batch_size + && self.pseudo_cdf == other.pseudo_cdf + && self.pseudo_cdf_max_commits == other.pseudo_cdf_max_commits + && self.max_log_bytes == other.max_log_bytes } } @@ -193,6 +203,17 @@ impl DeltaTableBuilder { Ok(self) } + /// Sets `max_log_bytes` to the builder + pub fn with_max_log_bytes(mut self, max_log_size: usize) -> DeltaResult { + if max_log_size == 0 { + return Err(DeltaTableError::Generic(String::from( + "Max log size should be positive", + ))); + } + self.table_config.max_log_bytes = Some(max_log_size); + Ok(self) + } + /// specify the timestamp given as ISO-8601/RFC-3339 timestamp pub fn with_datestring(self, date_string: impl AsRef) -> DeltaResult { let datetime = DateTime::::from(DateTime::::parse_from_rfc3339( From c865d517eb8f477e9e2afe66787adc0100936873 Mon Sep 17 00:00:00 2001 From: Adrian Tanase Date: Wed, 18 Jun 2025 07:45:29 +0300 Subject: [PATCH 5/5] Only use fallback if over max_log_bytes Signed-off-by: Adrian Tanase --- crates/core/src/kernel/snapshot/mod.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 1a0fa362b5..f52d0e76c0 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -73,12 +73,7 @@ impl Snapshot { config: DeltaTableConfig, version: Option, ) -> DeltaResult { - let log_segment = if config.pseudo_cdf { - // we need the full changes for metadata loading - info!( - "Loading table with pseudo-cdf, target version is: {:?}", - version - ); + let log_segment = if config.pseudo_cdf && config.max_log_bytes.is_some() { LogSegment::try_new(log_store, None).await? } else { // classic behaviour, only load metadata up to version @@ -99,14 +94,15 @@ impl Snapshot { store_root.set_path(""); // Check if the log segment size exceeds the configured maximum - if let Some(max_size) = config.max_log_bytes { + let should_fall_back = if let Some(max_size) = config.max_log_bytes { let total_size: u64 = log_segment .checkpoint_files .iter() .chain(log_segment.commit_files.iter()) .map(|f| f.size) .sum(); - if total_size > max_size as u64 { + let total_size = total_size as usize; + if total_size > max_size { if config.pseudo_cdf { // Use pseudo_cdf as fallback when table is too large info!( @@ -120,11 +116,19 @@ impl Snapshot { total_size, max_size ))); } - } - } + } else { + info!( + "Log segment size ({} bytes) is under max_log_size ({} bytes), loading full table", + total_size, max_size + ); + }; + total_size > max_size + } else { + false + }; // TODO: extract to helper function - let log_segment = if config.pseudo_cdf { + let log_segment = if config.pseudo_cdf && should_fall_back { // unless provided with a start version, we only load the last commit let start_version = version.unwrap_or(log_segment.version);