Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,12 @@ mod datafusion {
.map(|sv| sv.to_array())
.collect::<Result<Vec<_>, DataFusionError>>()
.unwrap();
let sa = StructArray::new(fields.clone(), arrays, None);
Precision::Exact(ScalarValue::Struct(Arc::new(sa)))
if arrays.is_empty() {
Precision::Absent
} else {
let sa = StructArray::new(fields.clone(), arrays, None);
Precision::Exact(ScalarValue::Struct(Arc::new(sa)))
}
})
.unwrap_or(Precision::Absent),
_ => Precision::Absent,
Expand Down
81 changes: 81 additions & 0 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,48 @@ impl LogSegment {
Ok(segment)
}

/// Try to create a new [`LogSegment`] from a slice of the log.
///
/// This will create a new [`LogSegment`] from the log with JUST relevant commit log files
/// starting at `start_version` and ending at `end_version`.
/// This ignores check point files, allowing to reconstruct a pseudo-changelog for quick preview
/// of recent data.
pub async fn try_recent_commits(
log_store: &dyn LogStore,
start_version: i64,
max_commits: usize,
) -> DeltaResult<Self> {
debug!("try_recent_commits: start_version: {start_version}",);
log_store.refresh().await?;
let log_url = log_store.log_root_url();
let mut store_root = log_url.clone();
store_root.set_path("");
let log_path = crate::logstore::object_store_path(&log_url)?;

let mut commit_files = list_commit_files(
&log_store.root_object_store(None),
&log_path,
None,
Some(start_version),
&store_root,
)
.await?;

// max count of commits to load without starting from checkpoint
commit_files.truncate(max_commits);

validate(&commit_files, &vec![])?;

let mut segment = Self {
version: start_version,
commit_files: commit_files.into_iter().map(|(meta, _)| meta).collect(),
checkpoint_files: vec![],
};
segment.version = segment.file_version().unwrap_or(start_version);

Ok(segment)
}

/// Returns the highest commit version number in the log segment
pub fn file_version(&self) -> Option<i64> {
let dummy_url = Url::parse("dummy:///").unwrap();
Expand Down Expand Up @@ -553,6 +595,45 @@ pub(super) async fn list_log_files(
Ok((commit_files, checkpoint_files))
}

/// List relevant commit files, ignoring checkpoints
///
/// See `try_recent_commits` for more details on how this is used
pub(super) async fn list_commit_files(
root_store: &dyn ObjectStore,
log_root: &Path,
max_version: Option<i64>,
start_version: Option<i64>,
store_root: &Url,
) -> DeltaResult<Vec<(ObjectMeta, ParsedLogPath<Url>)>> {
let max_version = max_version.unwrap_or(i64::MAX - 1);
let start_from = log_root.child(format!("{:020}", start_version.unwrap_or(0)).as_str());

let mut commit_files = Vec::with_capacity(25);

for meta in root_store
.list_with_offset(Some(log_root), &start_from)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.filter_map(|f| {
let file_url = store_root.join(f.location.as_ref()).ok()?;
let path = ParsedLogPath::try_from(file_url).ok()??;
Some((f, path))
})
{
if meta.1.version <= max_version as u64
&& Some(meta.1.version as i64) >= start_version
&& matches!(meta.1.file_type, LogPathFileType::Commit){
commit_files.push(meta);
}
}

// NOTE this will sort in reverse order
commit_files.sort_unstable_by(|a, b| b.0.location.cmp(&a.0.location));

Ok(commit_files)
}

#[cfg(test)]
pub(super) mod tests {
use delta_kernel::table_features::{ReaderFeature, WriterFeature};
Expand Down
125 changes: 113 additions & 12 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,9 @@
//!
//!

use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::io::{BufRead, BufReader, Cursor};

use ::serde::{Deserialize, Serialize};
use arrow_array::RecordBatch;
use delta_kernel::path::{LogPathFileType, ParsedLogPath};
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectStore;
use url::Url;

use self::log_segment::LogSegment;
use self::parse::{read_adds, read_removes};
use self::replay::{LogMapper, LogReplayScanner, ReplayStream};
Expand All @@ -36,9 +27,18 @@ use super::{Action, Add, AddCDCFile, CommitInfo, Metadata, Protocol, Remove, Tra
use crate::kernel::parse::read_cdf_adds;
use crate::kernel::transaction::{CommitData, PROTOCOL};
use crate::kernel::{ActionType, StructType};
use crate::logstore::LogStore;
use crate::logstore::{LogStore, LogStoreExt};
use crate::table::config::TableConfig;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};
use arrow_array::RecordBatch;
use delta_kernel::path::{LogPathFileType, ParsedLogPath};
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use ::serde::{Deserialize, Serialize};
use tracing::log::{debug, info};
use url::Url;

pub use self::log_data::*;

Expand Down Expand Up @@ -73,7 +73,12 @@ impl Snapshot {
config: DeltaTableConfig,
version: Option<i64>,
) -> DeltaResult<Self> {
let log_segment = LogSegment::try_new(log_store, version).await?;
let log_segment = if config.pseudo_cdf && config.max_log_bytes.is_some() {
LogSegment::try_new(log_store, None).await?
} else {
// classic behaviour, only load metadata up to version
LogSegment::try_new(log_store, version).await?
};
let (protocol, metadata) = log_segment.read_metadata(log_store, &config).await?;
if metadata.is_none() || protocol.is_none() {
return Err(DeltaTableError::Generic(
Expand All @@ -85,6 +90,102 @@ impl Snapshot {

PROTOCOL.can_read_from_protocol(&protocol)?;

let mut store_root = log_store.table_root_url().clone();
store_root.set_path("");

// Check if the log segment size exceeds the configured maximum
let should_fall_back = if let Some(max_size) = config.max_log_bytes {
let total_size: u64 = log_segment
.checkpoint_files
.iter()
.chain(log_segment.commit_files.iter())
.map(|f| f.size)
.sum();
let total_size = total_size as usize;
if total_size > max_size {
if config.pseudo_cdf {
// Use pseudo_cdf as fallback when table is too large
info!(
"Log segment size ({} bytes) exceeds max_log_size ({} bytes), using pseudo_cdf fallback",
total_size, max_size
);
} else {
return Err(DeltaTableError::Generic(format!(
"Table log segment size ({} bytes) exceeds maximum allowed size ({} bytes). \
Consider enabling pseudo_cdf mode or increasing max_log_size.",
total_size, max_size
)));
}
} else {
info!(
"Log segment size ({} bytes) is under max_log_size ({} bytes), loading full table",
total_size, max_size
);
};
total_size > max_size
} else {
false
};

// TODO: extract to helper function
let log_segment = if config.pseudo_cdf && should_fall_back {
// unless provided with a start version, we only load the last commit
let start_version = version.unwrap_or(log_segment.version);

// check if we already have loaded the needed version, discarding VACUUM commits
let min_version = log_segment
.commit_files
.iter()
// simple heuristic for ruling out VACUUM commits (in practice ~900 bytes)
// if this fails, we would need to use commits_stream() and look explicitly for adds
.filter(|f| f.size > 1500)
.filter_map(|f| {
let file_url = store_root.join(f.location.as_ref()).ok()?;
let path = ParsedLogPath::try_from(file_url).ok()??;
Some(path.version as i64)
})
.min()
.unwrap_or(i64::MAX);

if start_version >= min_version {
let mut recent_commits: VecDeque<ObjectMeta> = log_segment
.commit_files
.iter()
// go back 2 more commits in order to make sure we skip over VACUUM start & end
.filter_map(|f| {
let file_url = store_root.join(f.location.as_ref()).ok()?;
let path = ParsedLogPath::try_from(file_url).ok()??;
if path.version as i64 >= start_version - 2 {
Some(f)
} else {
None
}
})
.cloned()
.collect();
recent_commits.truncate(config.pseudo_cdf_max_commits);
debug!("Reusing existing log files: {:?}", recent_commits);
LogSegment {
version: log_segment.version,
commit_files: recent_commits,
checkpoint_files: vec![],
}
} else {
// go back 2 more commits in order to make sure we skip over VACUUM start & end
let start_version = (start_version - 2).max(0);
let segment = LogSegment::try_recent_commits(
log_store,
start_version,
config.pseudo_cdf_max_commits,
)
.await?;
debug!("Version not found after checkpoint, reloading slice from version: {start_version}: {:?}", segment.commit_files);
segment
}
} else {
log_segment
};

Ok(Self {
log_segment,
config,
Expand Down
16 changes: 7 additions & 9 deletions crates/core/src/operations/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,13 @@ pub fn cast_record_batch(
batch.num_rows(),
)?
} else {
// Can be simplified with StructArray::try_new_with_length in arrow 55.1
let col_arrays = batch.columns().to_owned();
let s = if col_arrays.is_empty() {
StructArray::new_empty_fields(batch.num_rows(), None)
} else {
StructArray::new(batch.schema().as_ref().to_owned().fields, col_arrays, None)
};

cast_struct(&s, target_schema.fields(), &cast_options, add_missing)?
let s = StructArray::try_new_with_length(
batch.schema().as_ref().to_owned().fields,
batch.columns().to_owned(),
None,
batch.num_rows(),
)?;
cast_struct(&s, target_schema.fields(), &cast_options, add_missing)?
};
Ok(RecordBatch::try_new_with_options(
target_schema,
Expand Down
Loading
Loading