diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 959d94953d..94844319a8 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -659,8 +659,12 @@ mod datafusion { .map(|sv| sv.to_array()) .collect::, DataFusionError>>() .unwrap(); - let sa = StructArray::new(fields.clone(), arrays, None); - Precision::Exact(ScalarValue::Struct(Arc::new(sa))) + if arrays.is_empty() { + Precision::Absent + } else { + let sa = StructArray::new(fields.clone(), arrays, None); + Precision::Exact(ScalarValue::Struct(Arc::new(sa))) + } }) .unwrap_or(Precision::Absent), _ => Precision::Absent, diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index 2005c0ee8f..73c977c17d 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -135,6 +135,48 @@ impl LogSegment { Ok(segment) } + /// Try to create a new [`LogSegment`] from a slice of the log. + /// + /// This will create a new [`LogSegment`] from the log with JUST relevant commit log files + /// starting at `start_version` and ending at `end_version`. + /// This ignores check point files, allowing to reconstruct a pseudo-changelog for quick preview + /// of recent data. + pub async fn try_recent_commits( + log_store: &dyn LogStore, + start_version: i64, + max_commits: usize, + ) -> DeltaResult { + debug!("try_recent_commits: start_version: {start_version}",); + log_store.refresh().await?; + let log_url = log_store.log_root_url(); + let mut store_root = log_url.clone(); + store_root.set_path(""); + let log_path = crate::logstore::object_store_path(&log_url)?; + + let mut commit_files = list_commit_files( + &log_store.root_object_store(None), + &log_path, + None, + Some(start_version), + &store_root, + ) + .await?; + + // max count of commits to load without starting from checkpoint + commit_files.truncate(max_commits); + + validate(&commit_files, &vec![])?; + + let mut segment = Self { + version: start_version, + commit_files: commit_files.into_iter().map(|(meta, _)| meta).collect(), + checkpoint_files: vec![], + }; + segment.version = segment.file_version().unwrap_or(start_version); + + Ok(segment) + } + /// Returns the highest commit version number in the log segment pub fn file_version(&self) -> Option { let dummy_url = Url::parse("dummy:///").unwrap(); @@ -553,6 +595,45 @@ pub(super) async fn list_log_files( Ok((commit_files, checkpoint_files)) } +/// List relevant commit files, ignoring checkpoints +/// +/// See `try_recent_commits` for more details on how this is used +pub(super) async fn list_commit_files( + root_store: &dyn ObjectStore, + log_root: &Path, + max_version: Option, + start_version: Option, + store_root: &Url, +) -> DeltaResult)>> { + let max_version = max_version.unwrap_or(i64::MAX - 1); + let start_from = log_root.child(format!("{:020}", start_version.unwrap_or(0)).as_str()); + + let mut commit_files = Vec::with_capacity(25); + + for meta in root_store + .list_with_offset(Some(log_root), &start_from) + .try_collect::>() + .await? + .into_iter() + .filter_map(|f| { + let file_url = store_root.join(f.location.as_ref()).ok()?; + let path = ParsedLogPath::try_from(file_url).ok()??; + Some((f, path)) + }) + { + if meta.1.version <= max_version as u64 + && Some(meta.1.version as i64) >= start_version + && matches!(meta.1.file_type, LogPathFileType::Commit){ + commit_files.push(meta); + } + } + + // NOTE this will sort in reverse order + commit_files.sort_unstable_by(|a, b| b.0.location.cmp(&a.0.location)); + + Ok(commit_files) +} + #[cfg(test)] pub(super) mod tests { use delta_kernel::table_features::{ReaderFeature, WriterFeature}; diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 686e6cabdb..f52d0e76c0 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -15,18 +15,9 @@ //! //! -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::io::{BufRead, BufReader, Cursor}; -use ::serde::{Deserialize, Serialize}; -use arrow_array::RecordBatch; -use delta_kernel::path::{LogPathFileType, ParsedLogPath}; -use futures::stream::BoxStream; -use futures::{StreamExt, TryStreamExt}; -use object_store::path::Path; -use object_store::ObjectStore; -use url::Url; - use self::log_segment::LogSegment; use self::parse::{read_adds, read_removes}; use self::replay::{LogMapper, LogReplayScanner, ReplayStream}; @@ -36,9 +27,18 @@ use super::{Action, Add, AddCDCFile, CommitInfo, Metadata, Protocol, Remove, Tra use crate::kernel::parse::read_cdf_adds; use crate::kernel::transaction::{CommitData, PROTOCOL}; use crate::kernel::{ActionType, StructType}; -use crate::logstore::LogStore; +use crate::logstore::{LogStore, LogStoreExt}; use crate::table::config::TableConfig; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; +use arrow_array::RecordBatch; +use delta_kernel::path::{LogPathFileType, ParsedLogPath}; +use futures::stream::BoxStream; +use futures::{StreamExt, TryStreamExt}; +use object_store::path::Path; +use object_store::{ObjectMeta, ObjectStore}; +use ::serde::{Deserialize, Serialize}; +use tracing::log::{debug, info}; +use url::Url; pub use self::log_data::*; @@ -73,7 +73,12 @@ impl Snapshot { config: DeltaTableConfig, version: Option, ) -> DeltaResult { - let log_segment = LogSegment::try_new(log_store, version).await?; + let log_segment = if config.pseudo_cdf && config.max_log_bytes.is_some() { + LogSegment::try_new(log_store, None).await? + } else { + // classic behaviour, only load metadata up to version + LogSegment::try_new(log_store, version).await? + }; let (protocol, metadata) = log_segment.read_metadata(log_store, &config).await?; if metadata.is_none() || protocol.is_none() { return Err(DeltaTableError::Generic( @@ -85,6 +90,102 @@ impl Snapshot { PROTOCOL.can_read_from_protocol(&protocol)?; + let mut store_root = log_store.table_root_url().clone(); + store_root.set_path(""); + + // Check if the log segment size exceeds the configured maximum + let should_fall_back = if let Some(max_size) = config.max_log_bytes { + let total_size: u64 = log_segment + .checkpoint_files + .iter() + .chain(log_segment.commit_files.iter()) + .map(|f| f.size) + .sum(); + let total_size = total_size as usize; + if total_size > max_size { + if config.pseudo_cdf { + // Use pseudo_cdf as fallback when table is too large + info!( + "Log segment size ({} bytes) exceeds max_log_size ({} bytes), using pseudo_cdf fallback", + total_size, max_size + ); + } else { + return Err(DeltaTableError::Generic(format!( + "Table log segment size ({} bytes) exceeds maximum allowed size ({} bytes). \ + Consider enabling pseudo_cdf mode or increasing max_log_size.", + total_size, max_size + ))); + } + } else { + info!( + "Log segment size ({} bytes) is under max_log_size ({} bytes), loading full table", + total_size, max_size + ); + }; + total_size > max_size + } else { + false + }; + + // TODO: extract to helper function + let log_segment = if config.pseudo_cdf && should_fall_back { + // unless provided with a start version, we only load the last commit + let start_version = version.unwrap_or(log_segment.version); + + // check if we already have loaded the needed version, discarding VACUUM commits + let min_version = log_segment + .commit_files + .iter() + // simple heuristic for ruling out VACUUM commits (in practice ~900 bytes) + // if this fails, we would need to use commits_stream() and look explicitly for adds + .filter(|f| f.size > 1500) + .filter_map(|f| { + let file_url = store_root.join(f.location.as_ref()).ok()?; + let path = ParsedLogPath::try_from(file_url).ok()??; + Some(path.version as i64) + }) + .min() + .unwrap_or(i64::MAX); + + if start_version >= min_version { + let mut recent_commits: VecDeque = log_segment + .commit_files + .iter() + // go back 2 more commits in order to make sure we skip over VACUUM start & end + .filter_map(|f| { + let file_url = store_root.join(f.location.as_ref()).ok()?; + let path = ParsedLogPath::try_from(file_url).ok()??; + if path.version as i64 >= start_version - 2 { + Some(f) + } else { + None + } + }) + .cloned() + .collect(); + recent_commits.truncate(config.pseudo_cdf_max_commits); + debug!("Reusing existing log files: {:?}", recent_commits); + LogSegment { + version: log_segment.version, + commit_files: recent_commits, + checkpoint_files: vec![], + } + } else { + // go back 2 more commits in order to make sure we skip over VACUUM start & end + let start_version = (start_version - 2).max(0); + let segment = LogSegment::try_recent_commits( + log_store, + start_version, + config.pseudo_cdf_max_commits, + ) + .await?; + debug!("Version not found after checkpoint, reloading slice from version: {start_version}: {:?}", segment.commit_files); + segment + } + } else { + log_segment + }; + Ok(Self { log_segment, config, diff --git a/crates/core/src/operations/cast/mod.rs b/crates/core/src/operations/cast/mod.rs index b3be011ebc..559531bb9c 100644 --- a/crates/core/src/operations/cast/mod.rs +++ b/crates/core/src/operations/cast/mod.rs @@ -213,15 +213,13 @@ pub fn cast_record_batch( batch.num_rows(), )? } else { - // Can be simplified with StructArray::try_new_with_length in arrow 55.1 - let col_arrays = batch.columns().to_owned(); - let s = if col_arrays.is_empty() { - StructArray::new_empty_fields(batch.num_rows(), None) - } else { - StructArray::new(batch.schema().as_ref().to_owned().fields, col_arrays, None) - }; - - cast_struct(&s, target_schema.fields(), &cast_options, add_missing)? + let s = StructArray::try_new_with_length( + batch.schema().as_ref().to_owned().fields, + batch.columns().to_owned(), + None, + batch.num_rows(), + )?; + cast_struct(&s, target_schema.fields(), &cast_options, add_missing)? }; Ok(RecordBatch::try_new_with_options( target_schema, diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index 15a90b7e9e..febab8d31d 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -38,6 +38,14 @@ pub struct DeltaTableConfig { /// Hence, DeltaTable will be loaded with significant memory reduction. pub require_files: bool, + /// Enables "pseudo-changelog" by only loading JSON commit files starting with specified version + /// Please note that the meaning of version or datestring is flipped, instead of loading data + /// UP TO the version, it will now load data added AFTER this version + pub pseudo_cdf: bool, + + /// Max number of commits to load + pub pseudo_cdf_max_commits: usize, + /// Controls how many files to buffer from the commit log when updating the table. /// This defaults to 4 * number of cpus /// @@ -51,21 +59,30 @@ pub struct DeltaTableConfig { /// when processing record batches. pub log_batch_size: usize, + /// Maximum allowed size in bytes for the total log segment (checkpoint + commit files). + /// If the log segment exceeds this size, the table loading will fail unless `pseudo_cdf` + /// is enabled, in which case it will be used as a fallback. + /// If `None`, no size limit is enforced. + pub max_log_bytes: Option, + #[serde(skip_serializing, skip_deserializing)] #[delta(skip)] /// When a runtime handler is provided, all IO tasks are spawn in that handle pub io_runtime: Option, #[delta(skip)] - pub options: HashMap + pub options: HashMap, } impl Default for DeltaTableConfig { fn default() -> Self { Self { require_files: true, + pseudo_cdf: false, + pseudo_cdf_max_commits: 1024, log_buffer_size: num_cpus::get() * 4, log_batch_size: 1024, + max_log_bytes: None, io_runtime: None, options: HashMap::new(), } @@ -77,6 +94,9 @@ impl PartialEq for DeltaTableConfig { self.require_files == other.require_files && self.log_buffer_size == other.log_buffer_size && self.log_batch_size == other.log_batch_size + && self.pseudo_cdf == other.pseudo_cdf + && self.pseudo_cdf_max_commits == other.pseudo_cdf_max_commits + && self.max_log_bytes == other.max_log_bytes } } @@ -149,6 +169,23 @@ impl DeltaTableBuilder { self } + /// Sets `pseudo_cdf=true` to the builder + pub fn with_pseudo_cdf(mut self) -> Self { + self.table_config.pseudo_cdf = true; + self + } + + /// Sets `log_buffer_size` to the builder + pub fn with_pseudo_cdf_max_commits(mut self, max_commits: usize) -> DeltaResult { + if max_commits == 0 { + return Err(DeltaTableError::Generic(String::from( + "Max number of commits should be positive", + ))); + } + self.table_config.pseudo_cdf_max_commits = max_commits; + Ok(self) + } + /// Sets `version` to the builder pub fn with_version(mut self, version: i64) -> Self { self.version = DeltaVersion::Version(version); @@ -166,6 +203,17 @@ impl DeltaTableBuilder { Ok(self) } + /// Sets `max_log_bytes` to the builder + pub fn with_max_log_bytes(mut self, max_log_size: usize) -> DeltaResult { + if max_log_size == 0 { + return Err(DeltaTableError::Generic(String::from( + "Max log size should be positive", + ))); + } + self.table_config.max_log_bytes = Some(max_log_size); + Ok(self) + } + /// specify the timestamp given as ISO-8601/RFC-3339 timestamp pub fn with_datestring(self, date_string: impl AsRef) -> DeltaResult { let datetime = DateTime::::from(DateTime::::parse_from_rfc3339( @@ -215,7 +263,14 @@ impl DeltaTableBuilder { storage_options .clone() .into_iter() - .map(|(k, v)| (k.strip_prefix("deltalake.").map(ToString::to_string).unwrap_or(k), v)) + .map(|(k, v)| { + ( + k.strip_prefix("deltalake.") + .map(ToString::to_string) + .unwrap_or(k), + v, + ) + }) .map(|(k, v)| { let needs_trim = v.starts_with("http://") || v.starts_with("https://") diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 9a5ac478c0..538d5c8bb9 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -411,7 +411,11 @@ impl DeltaTable { break; } } - let mut max_version = match self.get_latest_version().await { + let mut max_version = match self + .log_store + .get_latest_version(self.version().unwrap_or(min_version)) + .await + { Ok(version) => version, Err(DeltaTableError::InvalidVersion(_)) => { return Err(DeltaTableError::NotATable(