Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ prost = { version = "0.13" }
# compression
flate2 = { version = "1" }

# hashing
xxhash-rust = { version = "0.8", features = ["xxh64", "xxh3"] }
md-5 = { version = "0.10" }

# encoding
base64 = { version = "0.22" }

# testing
serial_test = { version = "3" }
tempfile = { version = "3" }
Expand Down
7 changes: 7 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ prost = { workspace = true }
# compression
flate2 = { workspace = true }

# hashing
xxhash-rust = { workspace = true }
md-5 = { workspace = true }

# encoding
base64 = { workspace = true }

# datafusion
datafusion = { workspace = true, optional = true }
datafusion-expr = { workspace = true, optional = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/file_group/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1162,7 +1162,7 @@ mod tests {

mod test_replaced_file_groups_from_replace_commit {
use super::super::*;
use crate::table::partition::EMPTY_PARTITION_PATH;
use crate::table::EMPTY_PARTITION_PATH;
use serde_json::{Map, Value, json};

#[test]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/file_group/file_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl FileSlice {
#[cfg(test)]
mod tests {
use super::*;
use crate::table::partition::EMPTY_PARTITION_PATH;
use crate::table::EMPTY_PARTITION_PATH;
use std::str::FromStr;

#[test]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/file_group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl FileGroup {
#[cfg(test)]
mod tests {
use super::*;
use crate::table::partition::EMPTY_PARTITION_PATH;
use crate::table::EMPTY_PARTITION_PATH;

// ============================================================================
// FileGroup tests (v6 tables)
Expand Down
174 changes: 172 additions & 2 deletions crates/core/src/file_group/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ use crate::file_group::log_file::scanner::{LogFileScanner, ScanResult};
use crate::file_group::record_batches::RecordBatches;
use crate::hfile::{HFileReader, HFileRecord};
use crate::merge::record_merger::RecordMerger;
use crate::metadata::merger::FilesPartitionMerger;
use crate::metadata::merger::{ColumnStatsMerger, FilesPartitionMerger, PartitionStatsMerger};
use crate::metadata::meta_field::MetaField;
use crate::metadata::table_record::FilesPartitionRecord;
use crate::metadata::table_record::{
ColumnStatsRecord, FilesPartitionRecord, PartitionStatsRecord,
};
use crate::storage::{ParquetReadOptions, Storage};
use crate::table::ReadOptions;
use crate::table::builder::OptionResolver;
Expand Down Expand Up @@ -553,6 +555,174 @@ impl FileGroupReader {
let merger = FilesPartitionMerger::new(schema);
merger.merge_for_keys(&base_records, &log_records, &hfile_keys)
}

/// Read records from metadata table column_stats partition.
///
/// # Arguments
/// * `file_slice` - The file slice to read from
/// * `keys` - Only read records with these keys. If empty, reads all records.
///
/// # Returns
/// Vector of ColumnStatsRecord entries.
pub(crate) async fn read_metadata_table_column_stats_partition(
&self,
file_slice: &FileSlice,
keys: &[&str],
) -> Result<Vec<ColumnStatsRecord>> {
let base_file_path = file_slice.base_file_relative_path()?;
let log_file_paths: Vec<String> = if file_slice.has_log_file() {
file_slice
.log_files
.iter()
.map(|log_file| file_slice.log_file_relative_path(log_file))
.collect::<Result<Vec<String>>>()?
} else {
vec![]
};

// Open HFile
let mut hfile_reader = HFileReader::open(&self.storage, &base_file_path)
.await
.map_err(|e| {
ReadFileSliceError(format!(
"Failed to read metadata table base file {base_file_path}: {e:?}"
))
})?;

// Get Avro schema from HFile
let schema = hfile_reader
.get_avro_schema()
.map_err(|e| ReadFileSliceError(format!("Failed to get Avro schema: {e:?}")))?
.ok_or_else(|| ReadFileSliceError("No Avro schema found in HFile".to_string()))?
.clone();

let hfile_keys: Vec<&str> = if keys.is_empty() {
vec![]
} else {
let mut sorted = keys.to_vec();
sorted.sort();
sorted
};

let base_records: Vec<HFileRecord> = if hfile_keys.is_empty() {
hfile_reader.collect_records().map_err(|e| {
ReadFileSliceError(format!("Failed to collect HFile records: {e:?}"))
})?
} else {
hfile_reader
.lookup_records(&hfile_keys)
.map_err(|e| ReadFileSliceError(format!("Failed to lookup HFile records: {e:?}")))?
.into_iter()
.filter_map(|(_, r)| r)
.collect()
};

let log_records = if log_file_paths.is_empty() {
vec![]
} else {
let instant_range = self.create_instant_range_for_log_file_scan();
let scan_result = LogFileScanner::new(self.hudi_configs.clone(), self.storage.clone())
.scan(log_file_paths, &instant_range)
.await?;

match scan_result {
ScanResult::HFileRecords(records) => records,
ScanResult::Empty => vec![],
ScanResult::RecordBatches(_) => {
return Err(CoreError::LogBlockError(
"Unexpected RecordBatches in metadata table log file".to_string(),
));
}
}
};

let merger = ColumnStatsMerger::new(schema);
merger.merge_for_keys(&base_records, &log_records, &hfile_keys)
}

/// Read records from metadata table partition_stats partition.
///
/// # Arguments
/// * `file_slice` - The file slice to read from
/// * `keys` - Only read records with these keys. If empty, reads all records.
///
/// # Returns
/// Vector of PartitionStatsRecord entries.
pub(crate) async fn read_metadata_table_partition_stats_partition(
&self,
file_slice: &FileSlice,
keys: &[&str],
) -> Result<Vec<PartitionStatsRecord>> {
let base_file_path = file_slice.base_file_relative_path()?;
let log_file_paths: Vec<String> = if file_slice.has_log_file() {
file_slice
.log_files
.iter()
.map(|log_file| file_slice.log_file_relative_path(log_file))
.collect::<Result<Vec<String>>>()?
} else {
vec![]
};

// Open HFile
let mut hfile_reader = HFileReader::open(&self.storage, &base_file_path)
.await
.map_err(|e| {
ReadFileSliceError(format!(
"Failed to read metadata table base file {base_file_path}: {e:?}"
))
})?;

// Get Avro schema from HFile
let schema = hfile_reader
.get_avro_schema()
.map_err(|e| ReadFileSliceError(format!("Failed to get Avro schema: {e:?}")))?
.ok_or_else(|| ReadFileSliceError("No Avro schema found in HFile".to_string()))?
.clone();

let hfile_keys: Vec<&str> = if keys.is_empty() {
vec![]
} else {
let mut sorted = keys.to_vec();
sorted.sort();
sorted
};

let base_records: Vec<HFileRecord> = if hfile_keys.is_empty() {
hfile_reader.collect_records().map_err(|e| {
ReadFileSliceError(format!("Failed to collect HFile records: {e:?}"))
})?
} else {
hfile_reader
.lookup_records(&hfile_keys)
.map_err(|e| ReadFileSliceError(format!("Failed to lookup HFile records: {e:?}")))?
.into_iter()
.filter_map(|(_, r)| r)
.collect()
};

let log_records = if log_file_paths.is_empty() {
vec![]
} else {
let instant_range = self.create_instant_range_for_log_file_scan();
let scan_result = LogFileScanner::new(self.hudi_configs.clone(), self.storage.clone())
.scan(log_file_paths, &instant_range)
.await?;

match scan_result {
ScanResult::HFileRecords(records) => records,
ScanResult::Empty => vec![],
ScanResult::RecordBatches(_) => {
return Err(CoreError::LogBlockError(
"Unexpected RecordBatches in metadata table log file".to_string(),
));
}
}
};

let merger = PartitionStatsMerger::new(schema);
merger.merge_for_keys(&base_records, &log_records, &hfile_keys)
}
}

/// Creates a commit time filtering mask based on the provided configs.
Expand Down
Loading
Loading