diff --git a/Cargo.toml b/Cargo.toml index af9b8bab..59adb9a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,6 +92,13 @@ prost = { version = "0.13" } # compression flate2 = { version = "1" } +# hashing +xxhash-rust = { version = "0.8", features = ["xxh64", "xxh3"] } +md-5 = { version = "0.10" } + +# encoding +base64 = { version = "0.22" } + # testing serial_test = { version = "3" } tempfile = { version = "3" } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index ee9a0415..9330899c 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -79,6 +79,13 @@ prost = { workspace = true } # compression flate2 = { workspace = true } +# hashing +xxhash-rust = { workspace = true } +md-5 = { workspace = true } + +# encoding +base64 = { workspace = true } + # datafusion datafusion = { workspace = true, optional = true } datafusion-expr = { workspace = true, optional = true } diff --git a/crates/core/src/file_group/builder.rs b/crates/core/src/file_group/builder.rs index 21fa65b0..45b9ab47 100644 --- a/crates/core/src/file_group/builder.rs +++ b/crates/core/src/file_group/builder.rs @@ -1162,7 +1162,7 @@ mod tests { mod test_replaced_file_groups_from_replace_commit { use super::super::*; - use crate::table::partition::EMPTY_PARTITION_PATH; + use crate::table::EMPTY_PARTITION_PATH; use serde_json::{Map, Value, json}; #[test] diff --git a/crates/core/src/file_group/file_slice.rs b/crates/core/src/file_group/file_slice.rs index 42f6c0da..28a8fa3f 100644 --- a/crates/core/src/file_group/file_slice.rs +++ b/crates/core/src/file_group/file_slice.rs @@ -139,7 +139,7 @@ impl FileSlice { #[cfg(test)] mod tests { use super::*; - use crate::table::partition::EMPTY_PARTITION_PATH; + use crate::table::EMPTY_PARTITION_PATH; use std::str::FromStr; #[test] diff --git a/crates/core/src/file_group/mod.rs b/crates/core/src/file_group/mod.rs index 05293a28..8d69a785 100644 --- a/crates/core/src/file_group/mod.rs +++ b/crates/core/src/file_group/mod.rs @@ -284,7 +284,7 @@ impl FileGroup { #[cfg(test)] mod tests { use super::*; - use crate::table::partition::EMPTY_PARTITION_PATH; + use crate::table::EMPTY_PARTITION_PATH; // ============================================================================ // FileGroup tests (v6 tables) diff --git a/crates/core/src/file_group/reader.rs b/crates/core/src/file_group/reader.rs index 8de9d865..f419cbdf 100644 --- a/crates/core/src/file_group/reader.rs +++ b/crates/core/src/file_group/reader.rs @@ -29,9 +29,11 @@ use crate::file_group::log_file::scanner::{LogFileScanner, ScanResult}; use crate::file_group::record_batches::RecordBatches; use crate::hfile::{HFileReader, HFileRecord}; use crate::merge::record_merger::RecordMerger; -use crate::metadata::merger::FilesPartitionMerger; +use crate::metadata::merger::{ColumnStatsMerger, FilesPartitionMerger, PartitionStatsMerger}; use crate::metadata::meta_field::MetaField; -use crate::metadata::table_record::FilesPartitionRecord; +use crate::metadata::table_record::{ + ColumnStatsRecord, FilesPartitionRecord, PartitionStatsRecord, +}; use crate::storage::{ParquetReadOptions, Storage}; use crate::table::ReadOptions; use crate::table::builder::OptionResolver; @@ -553,6 +555,174 @@ impl FileGroupReader { let merger = FilesPartitionMerger::new(schema); merger.merge_for_keys(&base_records, &log_records, &hfile_keys) } + + /// Read records from metadata table column_stats partition. + /// + /// # Arguments + /// * `file_slice` - The file slice to read from + /// * `keys` - Only read records with these keys. If empty, reads all records. + /// + /// # Returns + /// Vector of ColumnStatsRecord entries. + pub(crate) async fn read_metadata_table_column_stats_partition( + &self, + file_slice: &FileSlice, + keys: &[&str], + ) -> Result> { + let base_file_path = file_slice.base_file_relative_path()?; + let log_file_paths: Vec = if file_slice.has_log_file() { + file_slice + .log_files + .iter() + .map(|log_file| file_slice.log_file_relative_path(log_file)) + .collect::>>()? + } else { + vec![] + }; + + // Open HFile + let mut hfile_reader = HFileReader::open(&self.storage, &base_file_path) + .await + .map_err(|e| { + ReadFileSliceError(format!( + "Failed to read metadata table base file {base_file_path}: {e:?}" + )) + })?; + + // Get Avro schema from HFile + let schema = hfile_reader + .get_avro_schema() + .map_err(|e| ReadFileSliceError(format!("Failed to get Avro schema: {e:?}")))? + .ok_or_else(|| ReadFileSliceError("No Avro schema found in HFile".to_string()))? + .clone(); + + let hfile_keys: Vec<&str> = if keys.is_empty() { + vec![] + } else { + let mut sorted = keys.to_vec(); + sorted.sort(); + sorted + }; + + let base_records: Vec = if hfile_keys.is_empty() { + hfile_reader.collect_records().map_err(|e| { + ReadFileSliceError(format!("Failed to collect HFile records: {e:?}")) + })? + } else { + hfile_reader + .lookup_records(&hfile_keys) + .map_err(|e| ReadFileSliceError(format!("Failed to lookup HFile records: {e:?}")))? + .into_iter() + .filter_map(|(_, r)| r) + .collect() + }; + + let log_records = if log_file_paths.is_empty() { + vec![] + } else { + let instant_range = self.create_instant_range_for_log_file_scan(); + let scan_result = LogFileScanner::new(self.hudi_configs.clone(), self.storage.clone()) + .scan(log_file_paths, &instant_range) + .await?; + + match scan_result { + ScanResult::HFileRecords(records) => records, + ScanResult::Empty => vec![], + ScanResult::RecordBatches(_) => { + return Err(CoreError::LogBlockError( + "Unexpected RecordBatches in metadata table log file".to_string(), + )); + } + } + }; + + let merger = ColumnStatsMerger::new(schema); + merger.merge_for_keys(&base_records, &log_records, &hfile_keys) + } + + /// Read records from metadata table partition_stats partition. + /// + /// # Arguments + /// * `file_slice` - The file slice to read from + /// * `keys` - Only read records with these keys. If empty, reads all records. + /// + /// # Returns + /// Vector of PartitionStatsRecord entries. + pub(crate) async fn read_metadata_table_partition_stats_partition( + &self, + file_slice: &FileSlice, + keys: &[&str], + ) -> Result> { + let base_file_path = file_slice.base_file_relative_path()?; + let log_file_paths: Vec = if file_slice.has_log_file() { + file_slice + .log_files + .iter() + .map(|log_file| file_slice.log_file_relative_path(log_file)) + .collect::>>()? + } else { + vec![] + }; + + // Open HFile + let mut hfile_reader = HFileReader::open(&self.storage, &base_file_path) + .await + .map_err(|e| { + ReadFileSliceError(format!( + "Failed to read metadata table base file {base_file_path}: {e:?}" + )) + })?; + + // Get Avro schema from HFile + let schema = hfile_reader + .get_avro_schema() + .map_err(|e| ReadFileSliceError(format!("Failed to get Avro schema: {e:?}")))? + .ok_or_else(|| ReadFileSliceError("No Avro schema found in HFile".to_string()))? + .clone(); + + let hfile_keys: Vec<&str> = if keys.is_empty() { + vec![] + } else { + let mut sorted = keys.to_vec(); + sorted.sort(); + sorted + }; + + let base_records: Vec = if hfile_keys.is_empty() { + hfile_reader.collect_records().map_err(|e| { + ReadFileSliceError(format!("Failed to collect HFile records: {e:?}")) + })? + } else { + hfile_reader + .lookup_records(&hfile_keys) + .map_err(|e| ReadFileSliceError(format!("Failed to lookup HFile records: {e:?}")))? + .into_iter() + .filter_map(|(_, r)| r) + .collect() + }; + + let log_records = if log_file_paths.is_empty() { + vec![] + } else { + let instant_range = self.create_instant_range_for_log_file_scan(); + let scan_result = LogFileScanner::new(self.hudi_configs.clone(), self.storage.clone()) + .scan(log_file_paths, &instant_range) + .await?; + + match scan_result { + ScanResult::HFileRecords(records) => records, + ScanResult::Empty => vec![], + ScanResult::RecordBatches(_) => { + return Err(CoreError::LogBlockError( + "Unexpected RecordBatches in metadata table log file".to_string(), + )); + } + } + }; + + let merger = PartitionStatsMerger::new(schema); + merger.merge_for_keys(&base_records, &log_records, &hfile_keys) + } } /// Creates a commit time filtering mask based on the provided configs. diff --git a/crates/core/src/metadata/merger.rs b/crates/core/src/metadata/merger.rs index ff1ba812..a8ca16df 100644 --- a/crates/core/src/metadata/merger.rs +++ b/crates/core/src/metadata/merger.rs @@ -19,12 +19,14 @@ //! Merger for metadata table records. //! //! This module provides functionality to merge HFile records from base files -//! and log files for the metadata table's files partition. +//! and log files for the metadata table's partitions. use crate::Result; use crate::hfile::HFileRecord; use crate::metadata::table_record::{ - FilesPartitionRecord, HoodieMetadataFileInfo, decode_files_partition_record_with_schema, + ColumnStatsRecord, FilesPartitionRecord, HoodieMetadataFileInfo, PartitionStatsRecord, + decode_column_stats_record_with_schema, decode_files_partition_record_with_schema, + decode_partition_stats_record_with_schema, }; use apache_avro::Schema as AvroSchema; use std::collections::HashMap; @@ -212,6 +214,166 @@ impl FilesPartitionMerger { } } +/// Merger for column stats records from the metadata table. +/// +/// Column stats records have composite keys (column + partition + file hash). +/// Each HFile record decodes to one or more ColumnStatsRecord entries. +/// The merge semantics are simpler than files partition: +/// - For the same key, newer records replace older ones +/// - Deleted records (is_deleted=true) are kept in the result +pub struct ColumnStatsMerger { + schema: AvroSchema, +} + +impl ColumnStatsMerger { + /// Create a new merger with the given Avro schema. + pub fn new(schema: AvroSchema) -> Self { + Self { schema } + } + + /// Merge base HFile records with log file records for column stats. + /// + /// # Arguments + /// * `base_records` - Records from the base HFile (may be empty) + /// * `log_records` - Records from log files, in chronological order + /// * `keys` - Only return records matching these keys. If empty, return all. + /// + /// # Returns + /// A vector of merged ColumnStatsRecord entries. + pub fn merge_for_keys( + &self, + base_records: &[HFileRecord], + log_records: &[HFileRecord], + keys: &[&str], + ) -> Result> { + let key_set: std::collections::HashSet<&str> = keys.iter().copied().collect(); + let filter_by_keys = !keys.is_empty(); + + // Use a map to track records by (column_name, file_name) for deduplication + let mut merged: HashMap<(String, String), ColumnStatsRecord> = HashMap::new(); + + // Process base records first + for record in base_records { + if filter_by_keys { + if let Some(key) = record.key_as_str() { + if !key_set.contains(key) { + continue; + } + } + } + if let Ok(stats) = self.decode_record(record) { + for stat in stats { + let key = (stat.column_name.clone(), stat.file_name.clone()); + merged.insert(key, stat); + } + } + } + + // Process log records (newer records override older ones) + for record in log_records { + if filter_by_keys { + if let Some(key) = record.key_as_str() { + if !key_set.contains(key) { + continue; + } + } + } + if let Ok(stats) = self.decode_record(record) { + for stat in stats { + let key = (stat.column_name.clone(), stat.file_name.clone()); + merged.insert(key, stat); + } + } + } + + Ok(merged.into_values().collect()) + } + + fn decode_record(&self, record: &HFileRecord) -> Result> { + decode_column_stats_record_with_schema(record, &self.schema) + } +} + +/// Merger for partition stats records from the metadata table. +/// +/// Partition stats records have composite keys (column + partition hash). +/// Each HFile record decodes to one or more PartitionStatsRecord entries. +/// The merge semantics are the same as column stats: +/// - For the same key, newer records replace older ones +/// - Deleted records (is_deleted=true) are kept in the result +pub struct PartitionStatsMerger { + schema: AvroSchema, +} + +impl PartitionStatsMerger { + /// Create a new merger with the given Avro schema. + pub fn new(schema: AvroSchema) -> Self { + Self { schema } + } + + /// Merge base HFile records with log file records for partition stats. + /// + /// # Arguments + /// * `base_records` - Records from the base HFile (may be empty) + /// * `log_records` - Records from log files, in chronological order + /// * `keys` - Only return records matching these keys. If empty, return all. + /// + /// # Returns + /// A vector of merged PartitionStatsRecord entries. + pub fn merge_for_keys( + &self, + base_records: &[HFileRecord], + log_records: &[HFileRecord], + keys: &[&str], + ) -> Result> { + let key_set: std::collections::HashSet<&str> = keys.iter().copied().collect(); + let filter_by_keys = !keys.is_empty(); + + // Use a map to track records by (column_name, partition_path) for deduplication + let mut merged: HashMap<(String, String), PartitionStatsRecord> = HashMap::new(); + + // Process base records first + for record in base_records { + if filter_by_keys { + if let Some(key) = record.key_as_str() { + if !key_set.contains(key) { + continue; + } + } + } + if let Ok(stats) = self.decode_record(record) { + for stat in stats { + let key = (stat.column_name.clone(), stat.partition_path.clone()); + merged.insert(key, stat); + } + } + } + + // Process log records (newer records override older ones) + for record in log_records { + if filter_by_keys { + if let Some(key) = record.key_as_str() { + if !key_set.contains(key) { + continue; + } + } + } + if let Ok(stats) = self.decode_record(record) { + for stat in stats { + let key = (stat.column_name.clone(), stat.partition_path.clone()); + merged.insert(key, stat); + } + } + } + + Ok(merged.into_values().collect()) + } + + fn decode_record(&self, record: &HFileRecord) -> Result> { + decode_partition_stats_record_with_schema(record, &self.schema) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs index 52a64a6e..0f4e09d4 100644 --- a/crates/core/src/metadata/mod.rs +++ b/crates/core/src/metadata/mod.rs @@ -35,3 +35,6 @@ pub const LAKE_FORMAT_METADATA_DIRS: &[&str; 3] = &[ /// The virtual partition field name used in metadata tables. pub const METADATA_TABLE_PARTITION_FIELD: &str = "partition"; + +/// The partition identifier used for non-partitioned tables in the metadata table. +pub const NON_PARTITIONED_NAME: &str = "."; diff --git a/crates/core/src/metadata/table/files.rs b/crates/core/src/metadata/table/files.rs new file mode 100644 index 00000000..e21829a6 --- /dev/null +++ b/crates/core/src/metadata/table/files.rs @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Files partition reading from the metadata table. + +use std::collections::HashMap; + +use arrow_schema::Schema; + +use crate::Result; +use crate::config::read::HudiReadConfig; +use crate::error::CoreError; +use crate::expr::filter::from_str_tuples; +use crate::metadata::METADATA_TABLE_PARTITION_FIELD; +use crate::table::Table; +use crate::table::{FilePruner, PartitionPruner}; + +use super::records::FilesPartitionRecord; + +impl Table { + /// Fetch records from the `files` partition of metadata table + /// with optional data table partition pruning. + /// + /// This is a convenience wrapper that creates a metadata table instance internally. + /// For multiple metadata table operations, prefer creating the metadata table once + /// with [`new_metadata_table`] and calling [`fetch_files_partition_records`] directly. + /// + /// Records are returned with normalized partition keys. For non-partitioned tables, + /// the key is "" (empty string) instead of the internal "." representation. + /// Normalization happens at decode time in [`decode_files_partition_record_with_schema`]. + /// + /// # Errors + /// Returns an error if called on a metadata table instead of a data table. + pub async fn read_metadata_table_files_partition( + &self, + partition_pruner: &PartitionPruner, + ) -> Result> { + self.require_data_table()?; + let metadata_table = self.new_metadata_table().await?; + metadata_table + .fetch_files_partition_records(partition_pruner) + .await + } + + /// Fetch records from the `files` partition with optional partition pruning. + /// + /// For non-partitioned tables, directly fetches the "." record. + /// For partitioned tables with filters, performs partition pruning via `__all_partitions__`. + /// + /// # Arguments + /// * `partition_pruner` - Data table's partition pruner to filter partitions. + /// + /// # Errors + /// Returns an error if called on a data table instead of a metadata table. + pub async fn fetch_files_partition_records( + &self, + partition_pruner: &PartitionPruner, + ) -> Result> { + self.require_metadata_table()?; + + // Non-partitioned table: directly fetch "." record (normalized to "" key) + // + // Note: partition_stats is only enabled for partitioned tables, so partition_stats + // pruning is not available for non-partitioned tables. However, column_stats + // pruning (file-level) is still applied via fs_view. + if !partition_pruner.is_table_partitioned() { + return self + .read_files_partition(&[FilesPartitionRecord::NON_PARTITIONED_NAME]) + .await; + } + + // Partitioned table without filters: read all records + if partition_pruner.is_empty() { + return self.read_files_partition(&[]).await; + } + + // Partitioned table with filters: partition pruning + let all_partitions_records = self + .read_files_partition(&[FilesPartitionRecord::ALL_PARTITIONS_KEY]) + .await?; + + let partition_names: Vec<&str> = all_partitions_records + .get(FilesPartitionRecord::ALL_PARTITIONS_KEY) + .map(|r| r.partition_names()) + .unwrap_or_default(); + + // Step 2: Apply partition pruning + let pruned: Vec<&str> = partition_names + .into_iter() + .filter(|p| partition_pruner.should_include(p)) + .collect(); + + if pruned.is_empty() { + return Ok(HashMap::new()); + } + + // Step 3: Read only the pruned partition records + self.read_files_partition(&pruned).await + } + + /// Fetch specific keys from the files partition. + /// + /// # Errors + /// Returns an error if called on a data table instead of a metadata table. + pub async fn fetch_files_partition_records_by_keys( + &self, + keys: &[&str], + ) -> Result> { + self.require_metadata_table()?; + self.read_files_partition(keys).await + } + + /// Read records from the `files` partition. + /// + /// If keys is empty, reads all records. Otherwise, reads only the specified keys. + /// + /// This is an internal method called after validation. + async fn read_files_partition( + &self, + keys: &[&str], + ) -> Result> { + let Some(timestamp) = self.timeline.get_latest_commit_timestamp_as_option() else { + return Ok(HashMap::new()); + }; + + let timeline_view = self.timeline.create_view_as_of(timestamp).await?; + + let filters = from_str_tuples([( + METADATA_TABLE_PARTITION_FIELD, + "=", + FilesPartitionRecord::PARTITION_NAME, + )])?; + let partition_schema = self.get_partition_schema().await?; + let partition_pruner = + PartitionPruner::new(&filters, &partition_schema, self.hudi_configs.as_ref())?; + + // Use empty file pruner for metadata table - no column stats pruning needed + // Use empty schema since the pruner is empty and won't use the schema + let file_pruner = FilePruner::empty(); + let table_schema = Schema::empty(); + + let file_slices = self + .file_system_view + .get_file_slices_by_storage_listing( + &partition_pruner, + &file_pruner, + &table_schema, + &timeline_view, + ) + .await?; + + if file_slices.len() != 1 { + return Err(CoreError::MetadataTable(format!( + "Expected 1 file slice for {} partition, got {}", + FilesPartitionRecord::PARTITION_NAME, + file_slices.len() + ))); + } + + let file_slice = file_slices.into_iter().next().unwrap(); + let fg_reader = self.create_file_group_reader_with_options([( + HudiReadConfig::FileGroupEndTimestamp, + timestamp, + )])?; + + fg_reader + .read_metadata_table_files_partition(&file_slice, keys) + .await + } +} diff --git a/crates/core/src/metadata/table/mod.rs b/crates/core/src/metadata/table/mod.rs index e5e211cf..87ebb3e0 100644 --- a/crates/core/src/metadata/table/mod.rs +++ b/crates/core/src/metadata/table/mod.rs @@ -21,27 +21,28 @@ //! //! This module provides methods for interacting with Hudi's metadata table, //! which stores file listings and other metadata for efficient table operations. +//! +//! # Module Structure +//! - `records` - Record types for metadata table partitions +//! - `files` - Files partition reading +//! - `stats` - Column and partition statistics reading +mod files; pub mod records; +mod stats; -use std::collections::HashMap; - -use arrow_schema::Schema; +pub use stats::{column_stats_records_to_stats_map, partition_stats_records_to_stats_map}; use crate::Result; -use crate::config::read::HudiReadConfig; use crate::config::table::HudiTableConfig::{ MetadataTableEnabled, MetadataTablePartitions, PartitionFields, TableVersion, }; use crate::error::CoreError; -use crate::expr::filter::from_str_tuples; use crate::metadata::METADATA_TABLE_PARTITION_FIELD; use crate::storage::util::join_url_segments; use crate::table::Table; -use crate::table::file_pruner::FilePruner; -use crate::table::partition::PartitionPruner; -use records::FilesPartitionRecord; +use records::{ColumnStatsRecord, FilesPartitionRecord, PartitionStatsRecord}; impl Table { /// Check if this table is a metadata table. @@ -55,6 +56,32 @@ impl Table { crate::util::path::is_metadata_table_path(&base_path) } + /// Validate that this is a data table (not a metadata table). + /// + /// # Errors + /// Returns an error if this is a metadata table. + pub fn require_data_table(&self) -> Result<()> { + if self.is_metadata_table() { + return Err(CoreError::MetadataTable( + "This method must be called on a data table, not a metadata table".to_string(), + )); + } + Ok(()) + } + + /// Validate that this is a metadata table. + /// + /// # Errors + /// Returns an error if this is not a metadata table. + pub fn require_metadata_table(&self) -> Result<()> { + if !self.is_metadata_table() { + return Err(CoreError::MetadataTable( + "This method must be called on a metadata table instance".to_string(), + )); + } + Ok(()) + } + /// Get the list of available metadata table partitions for this table. /// /// Returns the partitions configured in [`MetadataTablePartitions`]. @@ -78,7 +105,6 @@ impl Table { /// even without explicit `hoodie.metadata.enable=true`. When metadata table /// is enabled, it must have at least the `files` partition enabled. pub fn is_metadata_table_enabled(&self) -> bool { - // TODO: drop v6 support then no need to check table version here let table_version: isize = self .hudi_configs .get(TableVersion) @@ -107,17 +133,29 @@ impl Table { metadata_explicitly_enabled || has_files_partition } - /// Create a metadata table instance for this data table. + /// Check if the column_stats partition is available in the metadata table. /// - /// TODO: support more partitions. Only "files" is used currently. - /// - /// # Errors + /// Returns `true` if "column_stats" is in the configured [`MetadataTablePartitions`]. + pub fn has_column_stats_partition(&self) -> bool { + self.get_metadata_table_partitions() + .contains(&ColumnStatsRecord::PARTITION_NAME.to_string()) + } + + /// Check if the partition_stats partition is available in the metadata table. /// - /// Returns an error if the metadata table cannot be created or if there are - /// no metadata table partitions configured. + /// Returns `true` if "partition_stats" is in the configured [`MetadataTablePartitions`]. + pub fn has_partition_stats_partition(&self) -> bool { + self.get_metadata_table_partitions() + .contains(&PartitionStatsRecord::PARTITION_NAME.to_string()) + } + + /// Create a metadata table instance for this data table. /// - /// # Note - /// Must be called on a DATA table, not a METADATA table. + /// # Errors + /// Returns an error if: + /// - Called on a metadata table instead of a data table + /// - No metadata table partitions are configured + /// - The metadata table cannot be created pub async fn new_metadata_table(&self) -> Result { if self.is_metadata_table() { return Err(CoreError::MetadataTable( @@ -139,178 +177,32 @@ impl Table { ) .await } - - /// Same as [Table::new_metadata_table], but blocking. - pub fn new_metadata_table_blocking(&self) -> Result
{ - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()? - .block_on(async { self.new_metadata_table().await }) - } - - /// Fetch records from the `files` partition of metadata table - /// with optional data table partition pruning. - /// - /// Records are returned with normalized partition keys. For non-partitioned tables, - /// the key is "" (empty string) instead of the internal "." representation. - /// Normalization happens at decode time in [`decode_files_partition_record_with_schema`]. - /// - /// # Note - /// Must be called on a DATA table, not a METADATA table. - pub async fn read_metadata_table_files_partition( - &self, - partition_pruner: &PartitionPruner, - ) -> Result> { - let metadata_table = self.new_metadata_table().await?; - metadata_table - .fetch_files_partition_records(partition_pruner) - .await - } - - /// Same as [Table::read_metadata_table_files_partition], but blocking. - pub fn read_metadata_table_files_partition_blocking( - &self, - partition_pruner: &PartitionPruner, - ) -> Result> { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()? - .block_on(self.read_metadata_table_files_partition(partition_pruner)) - } - - /// Fetch records from the `files` partition with optional partition pruning. - /// - /// For non-partitioned tables, directly fetches the "." record. - /// For partitioned tables with filters, performs partition pruning via `__all_partitions__`. - /// - /// # Arguments - /// * `partition_pruner` - Data table's partition pruner to filter partitions. - /// - /// # Note - /// Must be called on a METADATA table instance. - pub async fn fetch_files_partition_records( - &self, - partition_pruner: &PartitionPruner, - ) -> Result> { - // Non-partitioned table: directly fetch "." record - if !partition_pruner.is_table_partitioned() { - return self - .read_files_partition(&[FilesPartitionRecord::NON_PARTITIONED_NAME]) - .await; - } - - // Partitioned table without filters: read all records - if partition_pruner.is_empty() { - return self.read_files_partition(&[]).await; - } - - // Partitioned table with filters: partition pruning - let all_partitions_records = self - .read_files_partition(&[FilesPartitionRecord::ALL_PARTITIONS_KEY]) - .await?; - - let partition_names: Vec<&str> = all_partitions_records - .get(FilesPartitionRecord::ALL_PARTITIONS_KEY) - .map(|r| r.partition_names()) - .unwrap_or_default(); - - // Step 2: Apply partition pruning - let pruned: Vec<&str> = partition_names - .into_iter() - .filter(|p| partition_pruner.should_include(p)) - .collect(); - - if pruned.is_empty() { - return Ok(HashMap::new()); - } - - // Step 3: Read only the pruned partition records - self.read_files_partition(&pruned).await - } - - /// Read records from the `files` partition. - /// - /// If keys is empty, reads all records. Otherwise, reads only the specified keys. - /// - /// # Note - /// Must be called on a METADATA table instance. - async fn read_files_partition( - &self, - keys: &[&str], - ) -> Result> { - let Some(timestamp) = self.timeline.get_latest_commit_timestamp_as_option() else { - return Ok(HashMap::new()); - }; - - let timeline_view = self.timeline.create_view_as_of(timestamp).await?; - - let filters = from_str_tuples([( - METADATA_TABLE_PARTITION_FIELD, - "=", - FilesPartitionRecord::PARTITION_NAME, - )])?; - let partition_schema = self.get_partition_schema().await?; - let partition_pruner = - PartitionPruner::new(&filters, &partition_schema, self.hudi_configs.as_ref())?; - - // Use empty file pruner for metadata table - no column stats pruning needed - // Use empty schema since the pruner is empty and won't use the schema - let file_pruner = FilePruner::empty(); - let table_schema = Schema::empty(); - - let file_slices = self - .file_system_view - .get_file_slices_by_storage_listing( - &partition_pruner, - &file_pruner, - &table_schema, - &timeline_view, - ) - .await?; - - if file_slices.len() != 1 { - return Err(CoreError::MetadataTable(format!( - "Expected 1 file slice for {} partition, got {}", - FilesPartitionRecord::PARTITION_NAME, - file_slices.len() - ))); - } - - let file_slice = file_slices.into_iter().next().unwrap(); - let fg_reader = self.create_file_group_reader_with_options([( - HudiReadConfig::FileGroupEndTimestamp, - timestamp, - )])?; - - fg_reader - .read_metadata_table_files_partition(&file_slice, keys) - .await - } } #[cfg(test)] mod tests { use super::*; use crate::config::table::HudiTableConfig::TableVersion; - use crate::table::partition::PartitionPruner; + use crate::table::PartitionPruner; use hudi_test::{QuickstartTripsTable, SampleTable}; use records::{FilesPartitionRecord, MetadataRecordType}; use std::collections::HashSet; - fn get_data_table() -> Table { + async fn get_data_table() -> Table { let table_path = QuickstartTripsTable::V8Trips8I3U1D.path_to_mor_avro(); - Table::new_blocking(&table_path).unwrap() + Table::new(&table_path).await.unwrap() } - #[test] - fn hudi_table_read_metadata_table_files_partition() { - let data_table = get_data_table(); - let partition_schema = data_table.get_partition_schema_blocking().unwrap(); + #[tokio::test] + async fn hudi_table_read_metadata_table_files_partition() { + let data_table = get_data_table().await; + let partition_schema = data_table.get_partition_schema().await.unwrap(); let partition_pruner = PartitionPruner::new(&[], &partition_schema, data_table.hudi_configs.as_ref()).unwrap(); let records = data_table - .read_metadata_table_files_partition_blocking(&partition_pruner) + .read_metadata_table_files_partition(&partition_pruner) + .await .unwrap(); // Should have 4 records: __all_partitions__ + 3 city partitions @@ -346,9 +238,9 @@ mod tests { assert!(chennai.total_size() > 0); } - #[test] - fn hudi_table_get_metadata_table_partitions() { - let data_table = get_data_table(); + #[tokio::test] + async fn hudi_table_get_metadata_table_partitions() { + let data_table = get_data_table().await; // Verify we can get the metadata table partitions from the data table let partitions = data_table.get_metadata_table_partitions(); @@ -376,11 +268,11 @@ mod tests { } } - #[test] - fn hudi_table_is_metadata_table_enabled() { + #[tokio::test] + async fn hudi_table_is_metadata_table_enabled() { // V8 table with files partition configured should enable metadata table // even without explicit hoodie.metadata.enable=true - let data_table = get_data_table(); + let data_table = get_data_table().await; // Verify it's a v8 table let table_version: isize = data_table @@ -404,11 +296,11 @@ mod tests { ); } - #[test] - fn hudi_table_v6_metadata_table_not_enabled() { + #[tokio::test] + async fn hudi_table_v6_metadata_table_not_enabled() { // V6 tables should NOT have metadata table enabled, even with explicit setting let base_url = SampleTable::V6Nonpartitioned.url_to_cow(); - let hudi_table = Table::new_blocking(base_url.path()).unwrap(); + let hudi_table = Table::new(base_url.path()).await.unwrap(); // Verify it's a v6 table let table_version: isize = hudi_table @@ -425,35 +317,35 @@ mod tests { ); } - #[test] - fn hudi_table_is_not_metadata_table() { + #[tokio::test] + async fn hudi_table_is_not_metadata_table() { // A regular data table should not be a metadata table let base_url = SampleTable::V6Nonpartitioned.url_to_cow(); - let hudi_table = Table::new_blocking(base_url.path()).unwrap(); + let hudi_table = Table::new(base_url.path()).await.unwrap(); assert!( !hudi_table.is_metadata_table(), "Regular data table should not be a metadata table" ); } - #[test] - fn hudi_table_metadata_table_is_metadata_table() { + #[tokio::test] + async fn hudi_table_metadata_table_is_metadata_table() { // Create a metadata table and verify it's recognized as such - let data_table = get_data_table(); - let metadata_table = data_table.new_metadata_table_blocking().unwrap(); + let data_table = get_data_table().await; + let metadata_table = data_table.new_metadata_table().await.unwrap(); assert!( metadata_table.is_metadata_table(), "Metadata table should be recognized as a metadata table" ); } - #[test] - fn hudi_table_new_metadata_table_from_metadata_table_errors() { + #[tokio::test] + async fn hudi_table_new_metadata_table_from_metadata_table_errors() { // Trying to create a metadata table from a metadata table should fail - let data_table = get_data_table(); - let metadata_table = data_table.new_metadata_table_blocking().unwrap(); + let data_table = get_data_table().await; + let metadata_table = data_table.new_metadata_table().await.unwrap(); - let result = metadata_table.new_metadata_table_blocking(); + let result = metadata_table.new_metadata_table().await; assert!(result.is_err()); let err = result.unwrap_err(); assert!( diff --git a/crates/core/src/metadata/table/records.rs b/crates/core/src/metadata/table/records.rs index eb235b64..10f77e9e 100644 --- a/crates/core/src/metadata/table/records.rs +++ b/crates/core/src/metadata/table/records.rs @@ -35,6 +35,7 @@ use crate::Result; use crate::error::CoreError; use crate::hfile::{HFileReader, HFileRecord}; +use crate::metadata::NON_PARTITIONED_NAME; use apache_avro::Schema as AvroSchema; use apache_avro::types::Value as AvroValue; use std::collections::HashMap; @@ -162,7 +163,8 @@ impl FilesPartitionRecord { /// The key used in metadata table for non-partitioned tables. /// The metadata table stores "." for non-partitioned tables, which maps to "" externally. - pub const NON_PARTITIONED_NAME: &'static str = "."; + /// Uses the shared constant from `crate::metadata::NON_PARTITIONED_NAME`. + pub const NON_PARTITIONED_NAME: &'static str = NON_PARTITIONED_NAME; /// Check if this is an ALL_PARTITIONS record. pub fn is_all_partitions(&self) -> bool { @@ -215,6 +217,146 @@ impl FilesPartitionRecord { } } +// ============================================================================ +// Column Stats Types +// ============================================================================ + +/// Wrapper for values in the column stats Avro union type. +/// +/// The `minValue` and `maxValue` fields in `HoodieMetadataColumnStats` are union types +/// that can contain various primitive and logical types. This enum represents all +/// possible variants for runtime handling. +/// +/// Maps to the wrapper types in HoodieMetadata.avsc: +/// - BooleanWrapper, IntWrapper, LongWrapper, FloatWrapper, DoubleWrapper +/// - BytesWrapper, StringWrapper, DateWrapper, DecimalWrapper +/// - TimeMicrosWrapper, TimestampMicrosWrapper, LocalDateWrapper, ArrayWrapper +#[derive(Debug, Clone, PartialEq)] +pub enum WrappedStatValue { + /// Null value (no statistics available) + Null, + /// Boolean value (from BooleanWrapper) + Boolean(bool), + /// 32-bit integer (from IntWrapper) + Int(i32), + /// 64-bit integer (from LongWrapper) + Long(i64), + /// 32-bit float (from FloatWrapper) + Float(f32), + /// 64-bit double (from DoubleWrapper) + Double(f64), + /// UTF-8 string (from StringWrapper) + String(String), + /// Raw bytes (from BytesWrapper) + Bytes(Vec), + /// Date as days since Unix epoch (from DateWrapper) + Date(i32), + /// Decimal with precision and scale (from DecimalWrapper) + /// Note: Schema defines precision=30, scale=15 but actual values may vary + Decimal { + value: Vec, + precision: u8, + scale: i8, + }, + /// Time in microseconds since midnight (from TimeMicrosWrapper) + TimeMicros(i64), + /// Timestamp in microseconds since Unix epoch (from TimestampMicrosWrapper) + TimestampMicros(i64), + /// Local date as days since Unix epoch (from LocalDateWrapper) + /// Note: Semantically same as Date but from a different wrapper type + LocalDate(i32), + /// Array of wrapped values (from ArrayWrapper) + /// Used for array column statistics + Array(Vec), +} + +/// Value type information from HoodieMetadataColumnStats. +/// +/// Contains type metadata that helps with proper interpretation of min/max values. +#[derive(Debug, Clone, PartialEq)] +pub struct ValueTypeInfo { + /// Enum ordinal representing the value type + pub type_ordinal: i32, + /// Optional additional information about the value type (e.g., timezone for timestamps) + pub additional_info: Option, +} + +/// Column statistics record from the metadata table. +/// +/// Represents a single column's statistics for a specific file. +/// Maps to `HoodieMetadataColumnStats` in HoodieMetadata.avsc. +/// +/// The key format in the column_stats partition is: +/// `COLUMN_HASH + PARTITION_HASH + FILE_HASH` (see `util::hash` module) +#[derive(Debug, Clone)] +pub struct ColumnStatsRecord { + /// File name this stats entry belongs to + pub file_name: String, + /// Column name + pub column_name: String, + /// Minimum value (wrapped in union type) + pub min_value: Option, + /// Maximum value (wrapped in union type) + pub max_value: Option, + /// Number of values (non-null count + null count) + pub value_count: i64, + /// Number of null values + pub null_count: i64, + /// Total size in bytes + pub total_size: i64, + /// Total uncompressed size in bytes + pub total_uncompressed_size: i64, + /// Whether this record marks a deletion + pub is_deleted: bool, + /// Whether the bounds are tight (exact) vs loose (may not be precise) + pub is_tight_bound: bool, + /// Value type information for proper interpretation of min/max values + pub value_type: Option, +} + +impl ColumnStatsRecord { + /// The partition name in the metadata table for column stats. + pub const PARTITION_NAME: &'static str = "column_stats"; +} + +/// Partition-level statistics record from the metadata table. +/// +/// Represents aggregated column statistics for a specific partition. +/// Uses the same structure as `ColumnStatsRecord` but at partition granularity. +/// +/// The key format in the partition_stats partition is: +/// `COLUMN_HASH + PARTITION_HASH` (see `util::hash` module) +#[derive(Debug, Clone)] +pub struct PartitionStatsRecord { + /// Partition path (e.g., "city=chennai" or "" for non-partitioned) + pub partition_path: String, + /// Column name + pub column_name: String, + /// Minimum value across all files in the partition + pub min_value: Option, + /// Maximum value across all files in the partition + pub max_value: Option, + /// Total value count across all files + pub value_count: i64, + /// Total null count across all files + pub null_count: i64, + /// Total size across all files + pub total_size: i64, + /// Whether this record marks a deletion + pub is_deleted: bool, + /// Whether the bounds are tight + pub is_tight_bound: bool, +} + +impl PartitionStatsRecord { + /// The partition name in the metadata table for partition stats. + pub const PARTITION_NAME: &'static str = "partition_stats"; +} + +// ============================================================================ +// Files Partition Decoding +// ============================================================================ + /// Decode an HFile record value from the files partition using Avro. /// /// # Arguments @@ -448,6 +590,470 @@ pub fn get_record_type(avro_value: &AvroValue) -> MetadataRecordType { .unwrap_or(MetadataRecordType::Unknown) } +// ============================================================================ +// Column Stats Decoding +// ============================================================================ + +/// Extract a string from an Avro value (handles union types). +fn extract_string(value: &AvroValue) -> Option { + match value { + AvroValue::String(s) => Some(s.clone()), + AvroValue::Union(_, inner) => extract_string(inner), + _ => None, + } +} + +/// Extract an i32 from an Avro value (handles union types). +fn extract_int(value: &AvroValue) -> Option { + match value { + AvroValue::Int(n) => Some(*n), + AvroValue::Union(_, inner) => extract_int(inner), + _ => None, + } +} + +/// Extract a f32 from an Avro value (handles union types). +#[allow(dead_code)] +fn extract_float(value: &AvroValue) -> Option { + match value { + AvroValue::Float(f) => Some(*f), + AvroValue::Union(_, inner) => extract_float(inner), + _ => None, + } +} + +/// Extract a f64 from an Avro value (handles union types). +#[allow(dead_code)] +fn extract_double(value: &AvroValue) -> Option { + match value { + AvroValue::Double(d) => Some(*d), + AvroValue::Union(_, inner) => extract_double(inner), + _ => None, + } +} + +/// Extract bytes from an Avro value (handles union types). +fn extract_bytes(value: &AvroValue) -> Option> { + match value { + AvroValue::Bytes(b) => Some(b.clone()), + AvroValue::Union(_, inner) => extract_bytes(inner), + _ => None, + } +} + +/// Extract a WrappedStatValue from an Avro union type. +/// +/// The `minValue` and `maxValue` fields in column stats are Avro union types +/// that can contain various primitive and logical types. +fn extract_wrapped_value(value: &AvroValue) -> Option { + match value { + AvroValue::Null => Some(WrappedStatValue::Null), + AvroValue::Boolean(b) => Some(WrappedStatValue::Boolean(*b)), + AvroValue::Int(n) => Some(WrappedStatValue::Int(*n)), + AvroValue::Long(n) => Some(WrappedStatValue::Long(*n)), + AvroValue::Float(f) => Some(WrappedStatValue::Float(*f)), + AvroValue::Double(d) => Some(WrappedStatValue::Double(*d)), + AvroValue::String(s) => Some(WrappedStatValue::String(s.clone())), + AvroValue::Bytes(b) => Some(WrappedStatValue::Bytes(b.clone())), + AvroValue::Date(d) => Some(WrappedStatValue::Date(*d)), + AvroValue::TimeMicros(t) => Some(WrappedStatValue::TimeMicros(*t)), + AvroValue::TimestampMicros(t) => Some(WrappedStatValue::TimestampMicros(*t)), + AvroValue::Union(_, inner) => extract_wrapped_value(inner), + AvroValue::Array(items) => { + // ArrayWrapper contains an array of wrapped values + let values: Vec = + items.iter().filter_map(extract_wrapped_value).collect(); + Some(WrappedStatValue::Array(values)) + } + AvroValue::Record(fields) => { + // Handle wrapper records like IntWrapper, BooleanWrapper, etc. + // These have a "value" field containing the actual primitive value. + // Also handles DecimalWrapper which has value + precision + scale fields. + // Also handles ArrayWrapper which has a "wrappedValues" field. + extract_value_from_wrapper_record(fields) + } + _ => None, + } +} + +/// Extract a value from an Avro wrapper record structure. +/// +/// Hudi uses wrapper records like IntWrapper, BooleanWrapper, etc. with a "value" field. +/// DecimalWrapper is a special case with value, precision, and scale fields. +/// ArrayWrapper has a "wrappedValues" field containing an array of wrapped values. +/// +/// Wrapper records in HoodieMetadata.avsc: +/// - BooleanWrapper { value: boolean } +/// - IntWrapper { value: int } +/// - LongWrapper { value: long } +/// - FloatWrapper { value: float } +/// - DoubleWrapper { value: double } +/// - BytesWrapper { value: bytes } +/// - StringWrapper { value: string } +/// - DateWrapper { value: int } (date logical type) +/// - DecimalWrapper { value: bytes, precision: int, scale: int } +/// - TimeMicrosWrapper { value: long } (time-micros logical type) +/// - TimestampMicrosWrapper { value: long } +/// - LocalDateWrapper { value: int } +/// - ArrayWrapper { wrappedValues: array } +fn extract_value_from_wrapper_record(fields: &[(String, AvroValue)]) -> Option { + // Check if this is a DecimalWrapper (has precision and scale fields) + let has_precision = fields.iter().any(|(n, _)| n == "precision"); + let has_scale = fields.iter().any(|(n, _)| n == "scale"); + + // Check if this is an ArrayWrapper (has wrappedValues field) + let has_wrapped_values = fields.iter().any(|(n, _)| n == "wrappedValues"); + + if has_precision && has_scale { + // DecimalWrapper case + let mut value_bytes = None; + let mut precision = None; + let mut scale = None; + + for (name, val) in fields { + match name.as_str() { + "value" => value_bytes = extract_bytes(val), + "precision" => precision = extract_int(val), + "scale" => scale = extract_int(val), + _ => {} + } + } + + match (value_bytes, precision, scale) { + (Some(v), Some(p), Some(s)) => Some(WrappedStatValue::Decimal { + value: v, + precision: p as u8, + scale: s as i8, + }), + _ => None, + } + } else if has_wrapped_values { + // ArrayWrapper case + for (name, val) in fields { + if name == "wrappedValues" { + return extract_array_wrapper(val); + } + } + None + } else { + // Simple wrapper case - extract the "value" field + for (name, val) in fields { + if name == "value" { + return extract_wrapped_value(val); + } + } + None + } +} + +/// Extract an array of wrapped values from an ArrayWrapper's wrappedValues field. +fn extract_array_wrapper(value: &AvroValue) -> Option { + let items = match value { + AvroValue::Array(items) => items, + AvroValue::Union(_, inner) => { + if let AvroValue::Array(items) = inner.as_ref() { + items + } else { + return None; + } + } + AvroValue::Null => return Some(WrappedStatValue::Array(vec![])), + _ => return None, + }; + + let values: Vec = items.iter().filter_map(extract_wrapped_value).collect(); + Some(WrappedStatValue::Array(values)) +} + +/// Decode column stats records from an HFile record. +/// +/// # Arguments +/// * `reader` - The HFile reader (provides the Avro schema) +/// * `record` - The HFile record containing the Avro-serialized value +/// +/// # Returns +/// A vector of `ColumnStatsRecord` entries (one per file in the record). +pub fn decode_column_stats_record( + reader: &HFileReader, + record: &HFileRecord, +) -> Result> { + let schema = reader + .get_avro_schema() + .map_err(|e| CoreError::MetadataTable(format!("Failed to get schema: {e}")))? + .ok_or_else(|| CoreError::MetadataTable("No Avro schema in HFile".to_string()))?; + + decode_column_stats_record_with_schema(record, schema) +} + +/// Decode column stats records using a provided Avro schema. +/// +/// The HoodieMetadataRecord has a `columnStatMetadata` field containing +/// a map of `file_name -> HoodieMetadataColumnStats`. +pub fn decode_column_stats_record_with_schema( + record: &HFileRecord, + schema: &AvroSchema, +) -> Result> { + let value = record.value(); + if value.is_empty() { + return Ok(vec![]); // Tombstone record + } + + let avro_value = decode_avro_value(value, schema)?; + + // Verify record type is ColumnStats (type = 3) + let record_type = get_record_type(&avro_value); + if record_type != MetadataRecordType::ColumnStats { + return Err(CoreError::MetadataTable(format!( + "Expected ColumnStats record type (3), got {record_type:?}" + ))); + } + + extract_column_stats_metadata(&avro_value) +} + +/// Extract column stats from the columnStatMetadata field. +fn extract_column_stats_metadata(avro_value: &AvroValue) -> Result> { + let mut records = Vec::new(); + + // Find the columnStatMetadata field in the record + let stat_metadata = match avro_value { + AvroValue::Record(fields) => fields.iter().find_map(|(name, val)| { + if name == "columnStatMetadata" { + extract_map_from_union(val) + } else { + None + } + }), + _ => None, + }; + + if let Some(map) = stat_metadata { + for (file_name, stats_value) in map { + if let Some(stats) = extract_single_column_stats(file_name, stats_value) { + records.push(stats); + } + } + } + + Ok(records) +} + +/// Extract a Map from an Avro value that may be wrapped in a Union. +fn extract_map_from_union( + value: &AvroValue, +) -> Option<&std::collections::HashMap> { + match value { + AvroValue::Map(map) => Some(map), + AvroValue::Union(_, inner) => extract_map_from_union(inner), + _ => None, + } +} + +/// Extract a single ColumnStatsRecord from an Avro value. +fn extract_single_column_stats(file_name: &str, value: &AvroValue) -> Option { + let fields = match value { + AvroValue::Record(f) => f, + AvroValue::Union(_, inner) => { + if let AvroValue::Record(f) = inner.as_ref() { + f + } else { + return None; + } + } + _ => return None, + }; + + let mut stats = ColumnStatsRecord { + file_name: file_name.to_string(), + column_name: String::new(), + min_value: None, + max_value: None, + value_count: 0, + null_count: 0, + total_size: 0, + total_uncompressed_size: 0, + is_deleted: false, + is_tight_bound: true, + value_type: None, + }; + + for (field_name, field_value) in fields { + match field_name.as_str() { + "fileName" => { + stats.file_name = extract_string(field_value).unwrap_or(file_name.to_string()) + } + "columnName" => stats.column_name = extract_string(field_value).unwrap_or_default(), + "minValue" => stats.min_value = extract_wrapped_value(field_value), + "maxValue" => stats.max_value = extract_wrapped_value(field_value), + "valueCount" => stats.value_count = extract_long(field_value).unwrap_or(0), + "nullCount" => stats.null_count = extract_long(field_value).unwrap_or(0), + "totalSize" => stats.total_size = extract_long(field_value).unwrap_or(0), + "totalUncompressedSize" => { + stats.total_uncompressed_size = extract_long(field_value).unwrap_or(0) + } + "isDeleted" => stats.is_deleted = extract_bool(field_value).unwrap_or(false), + "isTightBound" => stats.is_tight_bound = extract_bool(field_value).unwrap_or(true), + "valueType" => stats.value_type = extract_value_type_info(field_value), + _ => {} + } + } + + Some(stats) +} + +/// Extract ValueTypeInfo from an Avro value (HoodieValueTypeInfo record). +fn extract_value_type_info(value: &AvroValue) -> Option { + let fields = match value { + AvroValue::Record(f) => f, + AvroValue::Union(_, inner) => { + if let AvroValue::Record(f) = inner.as_ref() { + f + } else { + return None; + } + } + AvroValue::Null => return None, + _ => return None, + }; + + let mut type_ordinal = None; + let mut additional_info = None; + + for (name, val) in fields { + match name.as_str() { + "typeOrdinal" => type_ordinal = extract_int(val), + "additionalInfo" => additional_info = extract_string(val), + _ => {} + } + } + + type_ordinal.map(|ordinal| ValueTypeInfo { + type_ordinal: ordinal, + additional_info, + }) +} + +/// Decode partition stats records from an HFile record. +/// +/// # Arguments +/// * `reader` - The HFile reader (provides the Avro schema) +/// * `record` - The HFile record containing the Avro-serialized value +/// +/// # Returns +/// A vector of `PartitionStatsRecord` entries. +pub fn decode_partition_stats_record( + reader: &HFileReader, + record: &HFileRecord, +) -> Result> { + let schema = reader + .get_avro_schema() + .map_err(|e| CoreError::MetadataTable(format!("Failed to get schema: {e}")))? + .ok_or_else(|| CoreError::MetadataTable("No Avro schema in HFile".to_string()))?; + + decode_partition_stats_record_with_schema(record, schema) +} + +/// Decode partition stats records using a provided Avro schema. +/// +/// Partition stats use the same columnStatMetadata structure as column stats, +/// but the key format is different (no file hash). +pub fn decode_partition_stats_record_with_schema( + record: &HFileRecord, + schema: &AvroSchema, +) -> Result> { + let value = record.value(); + if value.is_empty() { + return Ok(vec![]); // Tombstone record + } + + let avro_value = decode_avro_value(value, schema)?; + + // Verify record type is PartitionStats (type = 6) + let record_type = get_record_type(&avro_value); + if record_type != MetadataRecordType::PartitionStats { + return Err(CoreError::MetadataTable(format!( + "Expected PartitionStats record type (6), got {record_type:?}" + ))); + } + + extract_partition_stats_metadata(&avro_value) +} + +/// Extract partition stats from the columnStatMetadata field. +/// +/// Partition stats use the same Avro schema field as column stats. +fn extract_partition_stats_metadata(avro_value: &AvroValue) -> Result> { + let mut records = Vec::new(); + + // Partition stats also use columnStatMetadata field + let stat_metadata = match avro_value { + AvroValue::Record(fields) => fields.iter().find_map(|(name, val)| { + if name == "columnStatMetadata" { + extract_map_from_union(val) + } else { + None + } + }), + _ => None, + }; + + if let Some(map) = stat_metadata { + for (partition_key, stats_value) in map { + if let Some(stats) = extract_single_partition_stats(partition_key, stats_value) { + records.push(stats); + } + } + } + + Ok(records) +} + +/// Extract a single PartitionStatsRecord from an Avro value. +fn extract_single_partition_stats( + partition_key: &str, + value: &AvroValue, +) -> Option { + let fields = match value { + AvroValue::Record(f) => f, + AvroValue::Union(_, inner) => { + if let AvroValue::Record(f) = inner.as_ref() { + f + } else { + return None; + } + } + _ => return None, + }; + + let mut stats = PartitionStatsRecord { + partition_path: partition_key.to_string(), + column_name: String::new(), + min_value: None, + max_value: None, + value_count: 0, + null_count: 0, + total_size: 0, + is_deleted: false, + is_tight_bound: true, + }; + + for (field_name, field_value) in fields { + match field_name.as_str() { + "columnName" => stats.column_name = extract_string(field_value).unwrap_or_default(), + "minValue" => stats.min_value = extract_wrapped_value(field_value), + "maxValue" => stats.max_value = extract_wrapped_value(field_value), + "valueCount" => stats.value_count = extract_long(field_value).unwrap_or(0), + "nullCount" => stats.null_count = extract_long(field_value).unwrap_or(0), + "totalSize" => stats.total_size = extract_long(field_value).unwrap_or(0), + "isDeleted" => stats.is_deleted = extract_bool(field_value).unwrap_or(false), + "isTightBound" => stats.is_tight_bound = extract_bool(field_value).unwrap_or(true), + _ => {} + } + } + + Some(stats) +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/core/src/metadata/table/stats.rs b/crates/core/src/metadata/table/stats.rs new file mode 100644 index 00000000..0d6a28a8 --- /dev/null +++ b/crates/core/src/metadata/table/stats.rs @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Column and partition statistics reading from the metadata table. + +use std::collections::HashMap; + +use arrow_array::ArrayRef; +use arrow_schema::{DataType, Schema}; + +use crate::Result; +use crate::config::read::HudiReadConfig; +use crate::error::CoreError; +use crate::expr::filter::from_str_tuples; +use crate::metadata::METADATA_TABLE_PARTITION_FIELD; +use crate::statistics::{ColumnStatistics, StatisticsContainer, StatsGranularity}; +use crate::table::{FilePruner, PartitionPruner, Table}; +use crate::util::hash::get_column_stats_key; + +use super::records::{ColumnStatsRecord, PartitionStatsRecord, WrappedStatValue}; + +impl Table { + /// Read column statistics for specific files from the metadata table. + /// + /// This method reads column statistics from the `column_stats` partition of the + /// metadata table and returns them grouped by file name. + /// + /// # Arguments + /// * `file_names` - List of file names to get stats for (without path) + /// * `column_names` - List of column names to get stats for + /// * `partition_path` - The partition path these files belong to + /// + /// # Returns + /// A map from file_name to `StatisticsContainer` containing column stats. + /// + /// # Errors + /// Returns an error if called on a metadata table instead of a data table. + pub async fn read_column_stats_for_files( + &self, + file_names: &[&str], + column_names: &[&str], + partition_path: &str, + ) -> Result> { + if !self.has_column_stats_partition() { + return Err(CoreError::MetadataTable( + "column_stats partition not available".to_string(), + )); + } + + let metadata_table = self.new_metadata_table().await?; + + // Generate keys for all (column, file) combinations + let keys: Vec = column_names + .iter() + .flat_map(|col| { + file_names + .iter() + .map(move |file| get_column_stats_key(col, partition_path, file)) + }) + .collect(); + + let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect(); + + // Read from column_stats partition using the metadata table + let records = metadata_table.fetch_column_stats_records(&key_refs).await?; + + // Convert to StatisticsContainer grouped by file + column_stats_records_to_stats_map(records) + } + + /// Read partition-level statistics from the metadata table. + /// + /// This method reads partition statistics from the `partition_stats` partition of the + /// metadata table and returns them grouped by partition path. + /// + /// # Arguments + /// * `partition_paths` - List of partition paths to get stats for + /// * `column_names` - List of column names to get stats for + /// + /// # Returns + /// A map from partition_path to `StatisticsContainer` containing column stats. + /// + /// # Errors + /// Returns an error if called on a metadata table instead of a data table. + pub async fn read_partition_stats( + &self, + partition_paths: &[&str], + column_names: &[&str], + ) -> Result> { + if !self.has_partition_stats_partition() { + return Err(CoreError::MetadataTable( + "partition_stats partition not available".to_string(), + )); + } + + let metadata_table = self.new_metadata_table().await?; + + // Generate keys for all (column, partition) combinations + let keys: Vec = column_names + .iter() + .flat_map(|col| { + partition_paths + .iter() + .map(move |part| crate::util::hash::get_partition_stats_key(col, part)) + }) + .collect(); + + let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect(); + + // Read from partition_stats partition using the metadata table + let records = metadata_table + .fetch_partition_stats_records(&key_refs) + .await?; + + // Convert to StatisticsContainer grouped by partition + partition_stats_records_to_stats_map(records) + } + + /// Fetch column stats records from this metadata table's column_stats partition. + /// + /// # Arguments + /// * `keys` - The lookup keys for column stats records + /// + /// # Errors + /// Returns an error if called on a data table instead of a metadata table. + pub async fn fetch_column_stats_records( + &self, + keys: &[&str], + ) -> Result> { + self.require_metadata_table()?; + + let Some(timestamp) = self.timeline.get_latest_commit_timestamp_as_option() else { + return Ok(vec![]); + }; + + let timeline_view = self.timeline.create_view_as_of(timestamp).await?; + + let filters = from_str_tuples([( + METADATA_TABLE_PARTITION_FIELD, + "=", + ColumnStatsRecord::PARTITION_NAME, + )])?; + let partition_schema = self.get_partition_schema().await?; + let partition_pruner = + PartitionPruner::new(&filters, &partition_schema, self.hudi_configs.as_ref())?; + + let file_pruner = FilePruner::empty(); + let table_schema = Schema::empty(); + + let file_slices = match self + .file_system_view + .get_file_slices_by_storage_listing( + &partition_pruner, + &file_pruner, + &table_schema, + &timeline_view, + ) + .await + { + Ok(slices) => slices, + Err(e) => { + // Handle case where column_stats partition has log files without matching base files + // This can happen before compaction creates the base HFile + log::warn!( + "Failed to get file slices for column_stats partition: {e}. \ + This may indicate the partition has not been compacted yet." + ); + return Ok(vec![]); + } + }; + + if file_slices.is_empty() { + return Ok(vec![]); + } + + let fg_reader = self.create_file_group_reader_with_options([( + HudiReadConfig::FileGroupEndTimestamp, + timestamp, + )])?; + + // Read from all file slices and merge results + let mut all_results = Vec::new(); + for file_slice in &file_slices { + match fg_reader + .read_metadata_table_column_stats_partition(file_slice, keys) + .await + { + Ok(records) => all_results.extend(records), + Err(e) => { + log::warn!( + "Failed to read column stats from file slice {:?}: {e}", + file_slice.base_file_relative_path() + ); + } + } + } + + Ok(all_results) + } + + /// Fetch partition stats records from this metadata table's partition_stats partition. + /// + /// # Arguments + /// * `keys` - The lookup keys for partition stats records + /// + /// # Errors + /// Returns an error if called on a data table instead of a metadata table. + pub async fn fetch_partition_stats_records( + &self, + keys: &[&str], + ) -> Result> { + self.require_metadata_table()?; + + let Some(timestamp) = self.timeline.get_latest_commit_timestamp_as_option() else { + return Ok(vec![]); + }; + + let timeline_view = self.timeline.create_view_as_of(timestamp).await?; + + let filters = from_str_tuples([( + METADATA_TABLE_PARTITION_FIELD, + "=", + PartitionStatsRecord::PARTITION_NAME, + )])?; + let partition_schema = self.get_partition_schema().await?; + let partition_pruner = + PartitionPruner::new(&filters, &partition_schema, self.hudi_configs.as_ref())?; + + let file_pruner = FilePruner::empty(); + let table_schema = Schema::empty(); + + let file_slices = match self + .file_system_view + .get_file_slices_by_storage_listing( + &partition_pruner, + &file_pruner, + &table_schema, + &timeline_view, + ) + .await + { + Ok(slices) => slices, + Err(e) => { + // Handle case where partition_stats partition has log files without matching base files + // This can happen before compaction creates the base HFile + log::warn!( + "Failed to get file slices for partition_stats partition: {e}. \ + This may indicate the partition has not been compacted yet." + ); + return Ok(vec![]); + } + }; + + if file_slices.is_empty() { + return Ok(vec![]); + } + + let fg_reader = self.create_file_group_reader_with_options([( + HudiReadConfig::FileGroupEndTimestamp, + timestamp, + )])?; + + // Read from all file slices and merge results + let mut all_results = Vec::new(); + for file_slice in &file_slices { + match fg_reader + .read_metadata_table_partition_stats_partition(file_slice, keys) + .await + { + Ok(records) => all_results.extend(records), + Err(e) => { + log::warn!( + "Failed to read partition stats from file slice {:?}: {e}", + file_slice.base_file_relative_path() + ); + } + } + } + + Ok(all_results) + } +} + +// ============================================================================ +// Conversion helpers (public for use by FileSystemView) +// ============================================================================ + +/// Convert column stats records to a map of file name -> StatisticsContainer. +pub fn column_stats_records_to_stats_map( + records: Vec, +) -> Result> { + let mut result: HashMap = HashMap::new(); + + for record in records { + if record.is_deleted { + continue; + } + + let container = result + .entry(record.file_name.clone()) + .or_insert_with(|| StatisticsContainer::new(StatsGranularity::File)); + + if let Some(col_stats) = convert_to_column_statistics(&record) { + container + .columns + .insert(record.column_name.clone(), col_stats); + } + } + + Ok(result) +} + +/// Convert partition stats records to a map of partition path -> StatisticsContainer. +pub fn partition_stats_records_to_stats_map( + records: Vec, +) -> Result> { + let mut result: HashMap = HashMap::new(); + + for record in records { + if record.is_deleted { + continue; + } + + let container = result + .entry(record.partition_path.clone()) + .or_insert_with(|| StatisticsContainer::new(StatsGranularity::File)); + + if let Some(col_stats) = convert_partition_stats_to_column_statistics(&record) { + container + .columns + .insert(record.column_name.clone(), col_stats); + } + } + + Ok(result) +} + +// ============================================================================ +// Conversion helpers +// ============================================================================ + +/// Convert a ColumnStatsRecord to ColumnStatistics. +fn convert_to_column_statistics(record: &ColumnStatsRecord) -> Option { + let data_type = infer_data_type_from_wrapped_value( + record.min_value.as_ref().or(record.max_value.as_ref())?, + )?; + + Some(ColumnStatistics { + column_name: record.column_name.clone(), + data_type: data_type.clone(), + min_value: record + .min_value + .as_ref() + .and_then(|v| wrapped_value_to_arrow_array(v, &data_type)), + max_value: record + .max_value + .as_ref() + .and_then(|v| wrapped_value_to_arrow_array(v, &data_type)), + }) +} + +/// Convert a PartitionStatsRecord to ColumnStatistics. +fn convert_partition_stats_to_column_statistics( + record: &PartitionStatsRecord, +) -> Option { + let data_type = infer_data_type_from_wrapped_value( + record.min_value.as_ref().or(record.max_value.as_ref())?, + )?; + + Some(ColumnStatistics { + column_name: record.column_name.clone(), + data_type: data_type.clone(), + min_value: record + .min_value + .as_ref() + .and_then(|v| wrapped_value_to_arrow_array(v, &data_type)), + max_value: record + .max_value + .as_ref() + .and_then(|v| wrapped_value_to_arrow_array(v, &data_type)), + }) +} + +/// Infer Arrow DataType from a WrappedStatValue. +fn infer_data_type_from_wrapped_value(value: &WrappedStatValue) -> Option { + match value { + WrappedStatValue::Null => None, + WrappedStatValue::Boolean(_) => Some(DataType::Boolean), + WrappedStatValue::Int(_) => Some(DataType::Int32), + WrappedStatValue::Long(_) => Some(DataType::Int64), + WrappedStatValue::Float(_) => Some(DataType::Float32), + WrappedStatValue::Double(_) => Some(DataType::Float64), + WrappedStatValue::String(_) => Some(DataType::Utf8), + WrappedStatValue::Bytes(_) => Some(DataType::Binary), + WrappedStatValue::Date(_) => Some(DataType::Date32), + WrappedStatValue::TimeMicros(_) => { + Some(DataType::Time64(arrow_schema::TimeUnit::Microsecond)) + } + WrappedStatValue::TimestampMicros(_) => Some(DataType::Timestamp( + arrow_schema::TimeUnit::Microsecond, + None, + )), + WrappedStatValue::LocalDate(_) => Some(DataType::Date32), + WrappedStatValue::Decimal { + precision, scale, .. + } => Some(DataType::Decimal128(*precision, *scale)), + WrappedStatValue::Array(_) => { + // Array statistics are complex; return None for now as pruning on arrays + // is not commonly supported + None + } + } +} + +/// Convert a WrappedStatValue to a single-element Arrow array. +fn wrapped_value_to_arrow_array( + value: &WrappedStatValue, + _data_type: &DataType, +) -> Option { + use arrow_array::{ + BinaryArray, BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array, + StringArray, Time64MicrosecondArray, TimestampMicrosecondArray, + }; + use std::sync::Arc; + + match value { + WrappedStatValue::Null => None, + WrappedStatValue::Boolean(b) => Some(Arc::new(BooleanArray::from(vec![*b])) as ArrayRef), + WrappedStatValue::Int(n) => Some(Arc::new(Int32Array::from(vec![*n])) as ArrayRef), + WrappedStatValue::Long(n) => Some(Arc::new(Int64Array::from(vec![*n])) as ArrayRef), + WrappedStatValue::Float(f) => Some(Arc::new(Float32Array::from(vec![*f])) as ArrayRef), + WrappedStatValue::Double(d) => Some(Arc::new(Float64Array::from(vec![*d])) as ArrayRef), + WrappedStatValue::String(s) => { + Some(Arc::new(StringArray::from(vec![s.as_str()])) as ArrayRef) + } + WrappedStatValue::Bytes(b) => { + Some(Arc::new(BinaryArray::from(vec![b.as_slice()])) as ArrayRef) + } + WrappedStatValue::Date(d) => Some(Arc::new(Date32Array::from(vec![*d])) as ArrayRef), + WrappedStatValue::TimeMicros(t) => { + Some(Arc::new(Time64MicrosecondArray::from(vec![*t])) as ArrayRef) + } + WrappedStatValue::TimestampMicros(t) => { + Some(Arc::new(TimestampMicrosecondArray::from(vec![*t])) as ArrayRef) + } + WrappedStatValue::LocalDate(d) => Some(Arc::new(Date32Array::from(vec![*d])) as ArrayRef), + WrappedStatValue::Decimal { + value: bytes, + precision, + scale, + } => { + // Convert bytes to i128 + // Decimal bytes are big-endian two's complement + if bytes.len() > 16 { + return None; + } + let mut padded = [0u8; 16]; + let start = 16 - bytes.len(); + // Handle sign extension + if !bytes.is_empty() && (bytes[0] & 0x80) != 0 { + padded.fill(0xff); + } + padded[start..].copy_from_slice(bytes); + let value = i128::from_be_bytes(padded); + + use arrow_array::Decimal128Array; + let array = Decimal128Array::from(vec![value]) + .with_precision_and_scale(*precision, *scale) + .ok()?; + Some(Arc::new(array) as ArrayRef) + } + WrappedStatValue::Array(_) => { + // Array statistics cannot be converted to a single-element array for pruning + None + } + } +} diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs index 223a86f7..65358e09 100644 --- a/crates/core/src/table/fs_view.rs +++ b/crates/core/src/table/fs_view.rs @@ -21,20 +21,26 @@ use std::collections::HashMap; use std::sync::Arc; use arrow_schema::Schema; +use async_recursion::async_recursion; use crate::Result; use crate::config::HudiConfigs; -use crate::config::table::HudiTableConfig::BaseFileFormat; +use crate::config::table::HudiTableConfig::{BaseFileFormat, MetadataTablePartitions}; use crate::file_group::FileGroup; use crate::file_group::builder::file_groups_from_files_partition_records; use crate::file_group::file_slice::FileSlice; -use crate::metadata::table::records::FilesPartitionRecord; +use crate::metadata::table::records::{ + ColumnStatsRecord, FilesPartitionRecord, PartitionStatsRecord, +}; +use crate::metadata::table::{ + column_stats_records_to_stats_map, partition_stats_records_to_stats_map, +}; use crate::storage::Storage; use crate::table::Table; -use crate::table::file_pruner::FilePruner; use crate::table::listing::FileLister; -use crate::table::partition::PartitionPruner; +use crate::table::{FilePruner, PartitionPruner}; use crate::timeline::view::TimelineView; +use crate::util::hash::{get_column_stats_key, get_partition_stats_key}; use dashmap::DashMap; /// A view of the Hudi table's data files (files stored outside the `.hoodie/` directory) in the file system. It provides APIs to load and @@ -61,76 +67,159 @@ impl FileSystemView { }) } - /// Load file groups from the appropriate source (storage or metadata table records) + /// Check if column_stats partition is available in the metadata table. + fn has_column_stats_partition(&self) -> bool { + let partitions: Vec = self + .hudi_configs + .get_or_default(MetadataTablePartitions) + .into(); + partitions.contains(&ColumnStatsRecord::PARTITION_NAME.to_string()) + } + + /// Check if partition_stats partition is available in the metadata table. + fn has_partition_stats_partition(&self) -> bool { + let partitions: Vec = self + .hudi_configs + .get_or_default(MetadataTablePartitions) + .into(); + partitions.contains(&PartitionStatsRecord::PARTITION_NAME.to_string()) + } + + /// Load file groups from the appropriate source (storage or metadata table) /// and apply stats-based pruning. /// - /// # File Listing Source - /// - If `files_partition_records` is Some: Uses pre-fetched metadata table records - /// - If `files_partition_records` is None: Uses storage listing via FileLister + /// # Metadata Table Path (when `metadata_table` is Some) + /// 1. Fetch files partition records (applies partition value filtering via `should_include`) + /// 2. Enhance partition_pruner with partition_stats (uses __all_partitions__ from fetched records) + /// 3. Apply partition stats filtering on the file groups + /// 4. Apply column_stats pruning on files /// - /// # Stats Pruning Source (for non-empty file_pruner) - /// - Currently: Always extracts stats from Parquet file footers - /// - TODO: Use metadata table partitions when available: - /// - partition_stats: Enhance PartitionPruner to prune partitions before file listing - /// - column_stats: Prune files without reading Parquet footers + /// # Storage Listing Path (when `metadata_table` is None) + /// 1. Use FileLister to list files (applies partition value filtering) + /// 2. Use Parquet footers for file-level stats pruning /// /// # Arguments /// * `partition_pruner` - Filters which partitions to include /// * `file_pruner` - Filters files based on column statistics - /// * `table_schema` - Table schema for statistics extraction + /// * `table_schema` - Table schema for statistics extraction from Parquet footers /// * `timeline_view` - The timeline view providing query timestamp and completion time lookups - /// * `files_partition_records` - Optional pre-fetched metadata table records + /// * `metadata_table` - Optional metadata table for file listing and stats access + #[async_recursion] async fn load_file_groups( &self, partition_pruner: &PartitionPruner, file_pruner: &FilePruner, table_schema: &Schema, timeline_view: &TimelineView, - files_partition_records: Option<&HashMap>, + metadata_table: Option<&Table>, ) -> Result<()> { - // TODO: Enhance PartitionPruner with partition_stats support - // - Load partition_stats from metadata table into PartitionPruner - // - PartitionPruner.should_include() will use both partition column values AND partition_stats - // - For non-partitioned tables: check partition_pruner.can_any_partition_match() for early return - - // Step 1: Get file groups from appropriate source - let file_groups_map = if let Some(records) = files_partition_records { - // Use pre-fetched metadata table records - let base_file_format: String = self.hudi_configs.get_or_default(BaseFileFormat).into(); - file_groups_from_files_partition_records(records, &base_file_format, timeline_view)? + // Track if we're using metadata table successfully for stats access + let mut use_metadata_for_stats = false; + + let (file_groups_map, partition_pruner) = if let Some(mdt) = metadata_table { + // === METADATA TABLE PATH === + + // Step 1: Fetch files partition records + // partition_pruner.should_include() applies partition value filtering + match mdt.fetch_files_partition_records(partition_pruner).await { + Ok(records) => { + use_metadata_for_stats = true; + + // Step 2: Get partition list from __all_partitions__ for stats enhancement + let partition_paths: Vec = records + .get(FilesPartitionRecord::ALL_PARTITIONS_KEY) + .map(|r| r.partition_names().iter().map(|s| s.to_string()).collect()) + .unwrap_or_default(); + + // Step 3: Enhance partition_pruner with partition_stats + let partition_pruner = + if self.has_partition_stats_partition() && !file_pruner.is_empty() { + self.enhance_partition_pruner_with_stats( + partition_pruner, + file_pruner, + &partition_paths, + mdt, + ) + .await + } else { + partition_pruner.clone() + }; + + let base_file_format: String = + self.hudi_configs.get_or_default(BaseFileFormat).into(); + let file_groups_map = file_groups_from_files_partition_records( + &records, + &base_file_format, + timeline_view, + )?; + + (file_groups_map, partition_pruner) + } + Err(e) => { + log::warn!( + "Failed to read metadata table files partition: {e}. Falling back to storage listing." + ); + let lister = FileLister::new( + self.hudi_configs.clone(), + self.storage.clone(), + partition_pruner.to_owned(), + ); + let file_groups_map = lister + .list_file_groups_for_relevant_partitions(timeline_view) + .await?; + (file_groups_map, partition_pruner.clone()) + } + } } else { - // Use storage listing + // === STORAGE LISTING PATH === + // FileLister applies partition value filtering during listing let lister = FileLister::new( self.hudi_configs.clone(), self.storage.clone(), partition_pruner.to_owned(), ); - lister + let file_groups_map = lister .list_file_groups_for_relevant_partitions(timeline_view) - .await? + .await?; + (file_groups_map, partition_pruner.clone()) }; - // Step 2: Apply partition pruning (for metadata table path) and stats pruning - // Note: Storage listing path already applies partition pruning via FileLister - // TODO: Check if metadata table column_stats partition is available - // and use that instead of Parquet footers for better performance + // Step 4: Check if column_stats are available for file-level pruning + let use_column_stats = use_metadata_for_stats && self.has_column_stats_partition(); + + // Step 5: Apply partition stats pruning and column stats pruning on files for (partition_path, file_groups) in file_groups_map { - // Skip partitions that don't match the pruner (for metadata table path) - if files_partition_records.is_some() - && !partition_pruner.is_empty() - && !partition_pruner.should_include(&partition_path) - { + // Skip partitions that don't match the enhanced pruner (partition stats filtering) + if !partition_pruner.is_empty() && !partition_pruner.should_include(&partition_path) { continue; } + // Load column stats from metadata table if available + let preloaded_stats = if use_column_stats && !file_pruner.is_empty() { + self.load_column_stats_from_metadata_table( + &file_groups, + file_pruner, + timeline_view.as_of_timestamp(), + &partition_path, + metadata_table + .expect("metadata_table must be Some when use_column_stats is true"), + ) + .await + } else { + HashMap::new() + }; + + // Apply unified stats pruning (uses preloaded stats, falls back to footer) let retained = self - .apply_stats_pruning_from_footers( + .apply_stats_pruning( file_groups, file_pruner, table_schema, timeline_view.as_of_timestamp(), + &preloaded_stats, ) .await; + self.partition_to_file_groups .insert(partition_path, retained); } @@ -138,20 +227,166 @@ impl FileSystemView { Ok(()) } - /// Apply file-level stats pruning using Parquet file footers. + /// Enhance PartitionPruner with partition-level statistics from the metadata table. + /// + /// Fetches partition stats for the given partitions and adds them to the pruner. + async fn enhance_partition_pruner_with_stats( + &self, + partition_pruner: &PartitionPruner, + file_pruner: &FilePruner, + partition_paths: &[String], + metadata_table: &Table, + ) -> PartitionPruner { + if file_pruner.is_empty() || partition_paths.is_empty() { + return partition_pruner.clone(); + } + + // Get column names for stats lookup + let column_names = file_pruner.filter_column_names(); + if column_names.is_empty() { + return partition_pruner.clone(); + } + + // Generate keys for partition stats lookup + let keys: Vec = column_names + .iter() + .flat_map(|col| { + partition_paths + .iter() + .map(move |part| get_partition_stats_key(col, part)) + }) + .collect(); + let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect(); + + // Read partition stats records from metadata table + match metadata_table + .fetch_partition_stats_records(&key_refs) + .await + { + Ok(records) => { + if records.is_empty() { + return partition_pruner.clone(); + } + match partition_stats_records_to_stats_map(records) { + Ok(partition_stats) => { + log::debug!( + "Loaded partition_stats for {} partitions", + partition_paths.len() + ); + partition_pruner + .clone() + .with_partition_stats(partition_stats) + .with_data_filters(file_pruner.filters()) + } + Err(e) => { + log::warn!( + "Failed to convert partition_stats: {e}. Continuing without partition stats pruning." + ); + partition_pruner.clone() + } + } + } + Err(e) => { + log::warn!( + "Failed to read partition_stats: {e}. Continuing without partition stats pruning." + ); + partition_pruner.clone() + } + } + } + + /// Load column statistics from the metadata table for files in a partition. + /// + /// Returns a map from file name to StatisticsContainer. On error, returns an + /// empty map (caller will fall back to Parquet footers). + async fn load_column_stats_from_metadata_table( + &self, + file_groups: &[FileGroup], + file_pruner: &FilePruner, + as_of_timestamp: &str, + partition_path: &str, + metadata_table: &Table, + ) -> HashMap { + // Collect file names for stats lookup + let file_names: Vec = file_groups + .iter() + .filter_map(|fg| { + fg.get_file_slice_as_of(as_of_timestamp) + .and_then(|fsl| fsl.base_file_relative_path().ok()) + .map(|path| path.rsplit('/').next().unwrap_or(&path).to_string()) + }) + .collect(); + + if file_names.is_empty() { + return HashMap::new(); + } + + let column_names = file_pruner.filter_column_names(); + + // Generate keys for column stats lookup + let keys: Vec = column_names + .iter() + .flat_map(|col| { + file_names + .iter() + .map(move |file| get_column_stats_key(col, partition_path, file)) + }) + .collect(); + let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect(); + + // Read column stats records from metadata table + match metadata_table.fetch_column_stats_records(&key_refs).await { + Ok(records) => { + if records.is_empty() { + log::debug!( + "No column_stats found for {} files in partition '{partition_path}'", + file_names.len() + ); + return HashMap::new(); + } + match column_stats_records_to_stats_map(records) { + Ok(stats) => { + log::debug!( + "Loaded column_stats for {} files in partition '{partition_path}'", + file_names.len() + ); + stats + } + Err(e) => { + log::warn!( + "Failed to convert column_stats for partition '{partition_path}': {e}. Will fall back to Parquet footers." + ); + HashMap::new() + } + } + } + Err(e) => { + log::warn!( + "Failed to read column_stats for partition '{partition_path}': {e}. Will fall back to Parquet footers." + ); + HashMap::new() + } + } + } + + /// Apply file-level stats pruning using column statistics. + /// + /// This is the unified pruning method that works with `StatisticsContainer` from any source: + /// - If pre-loaded stats are available (from metadata table), use them + /// - Otherwise, load stats from Parquet file footers as fallback /// - /// Returns the filtered list of file groups that pass the pruning check. /// Files are included (not pruned) if: /// - The pruner has no filters - /// - The file is not a Parquet file - /// - Column stats cannot be loaded (conservative behavior) + /// - The file is not a Parquet file (for footer fallback) + /// - Stats cannot be loaded from any source (conservative behavior) /// - The file's stats indicate it might contain matching rows - async fn apply_stats_pruning_from_footers( + async fn apply_stats_pruning( &self, file_groups: Vec, file_pruner: &FilePruner, table_schema: &Schema, as_of_timestamp: &str, + preloaded_stats: &HashMap, ) -> Vec { if file_pruner.is_empty() { return file_groups; @@ -172,32 +407,30 @@ impl FileSystemView { } }; - // Case-insensitive check for .parquet extension - if !relative_path.to_lowercase().ends_with(".parquet") { - retained.push(fg); - continue; - } + // Extract file name for stats lookup + let file_name = relative_path.rsplit('/').next().unwrap_or(&relative_path); - // Load column stats from Parquet footer - let stats = match self - .storage - .get_parquet_column_stats(&relative_path, table_schema) - .await - { - Ok(s) => s, - Err(e) => { - log::warn!( - "Failed to load column stats for {relative_path}: {e}. Including file." - ); - retained.push(fg); - continue; - } + // Try pre-loaded stats first (from metadata table), then fall back to Parquet footer + let stats = if let Some(s) = preloaded_stats.get(file_name) { + Some(s.clone()) + } else { + // Fall back to loading from Parquet footer + self.load_stats_from_parquet_footer(&relative_path, table_schema) + .await }; - if file_pruner.should_include(&stats) { - retained.push(fg); - } else { - log::debug!("Pruned file {relative_path} based on column stats"); + match stats { + Some(ref s) => { + if file_pruner.should_include(s) { + retained.push(fg); + } else { + log::debug!("Pruned file {relative_path} based on column stats"); + } + } + None => { + // No stats available from any source - include conservatively + retained.push(fg); + } } } else { // No file slice as of timestamp, include the file group @@ -209,6 +442,34 @@ impl FileSystemView { retained } + /// Load column statistics from a Parquet file's footer. + /// + /// Returns None if the file is not a Parquet file or stats cannot be loaded. + async fn load_stats_from_parquet_footer( + &self, + relative_path: &str, + table_schema: &Schema, + ) -> Option { + // Only load stats for Parquet files + if !relative_path.to_lowercase().ends_with(".parquet") { + return None; + } + + match self + .storage + .get_parquet_column_stats(relative_path, table_schema) + .await + { + Ok(stats) => Some(stats), + Err(e) => { + log::warn!( + "Failed to load column stats from footer for {relative_path}: {e}. Including file." + ); + None + } + } + } + /// Collect file slices from loaded file groups using the timeline view. async fn collect_file_slices( &self, @@ -253,7 +514,7 @@ impl FileSystemView { /// * `file_pruner` - Filters files based on column statistics /// * `table_schema` - Table schema for statistics extraction /// * `timeline_view` - The timeline view containing query context - /// * `metadata_table` - Optional metadata table instance + /// * `metadata_table` - Optional metadata table for file listing and stats access pub(crate) async fn get_file_slices( &self, partition_pruner: &PartitionPruner, @@ -262,19 +523,12 @@ impl FileSystemView { timeline_view: &TimelineView, metadata_table: Option<&Table>, ) -> Result> { - // Fetch records from metadata table if available - let files_partition_records = if let Some(mdt) = metadata_table { - Some(mdt.fetch_files_partition_records(partition_pruner).await?) - } else { - None - }; - self.load_file_groups( partition_pruner, file_pruner, table_schema, timeline_view, - files_partition_records.as_ref(), + metadata_table, ) .await?; @@ -299,7 +553,8 @@ impl FileSystemView { table_schema: &Schema, timeline_view: &TimelineView, ) -> Result> { - // Pass None to force storage listing (avoids recursion for metadata table) + // Pass None for metadata_table to force storage listing + // and footer-based stats pruning (avoids recursion for metadata table) self.load_file_groups( partition_pruner, file_pruner, diff --git a/crates/core/src/table/listing.rs b/crates/core/src/table/listing.rs index ebf1bb55..532d1435 100644 --- a/crates/core/src/table/listing.rs +++ b/crates/core/src/table/listing.rs @@ -26,7 +26,7 @@ use crate::file_group::base_file::BaseFile; use crate::file_group::log_file::LogFile; use crate::metadata::LAKE_FORMAT_METADATA_DIRS; use crate::storage::{Storage, get_leaf_dirs}; -use crate::table::partition::{ +use crate::table::{ EMPTY_PARTITION_PATH, PARTITION_METAFIELD_PREFIX, PartitionPruner, is_table_partitioned, }; use crate::timeline::completion_time::CompletionTimeView; diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 153105ef..b0afefaf 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -88,15 +88,41 @@ //! ``` pub mod builder; -pub mod file_pruner; mod fs_view; mod listing; -pub mod partition; +pub mod pruning; mod read_options; mod validation; +pub use pruning::{FilePruner, PartitionPruner}; pub use read_options::ReadOptions; +use crate::config::table::HudiTableConfig::{ + KeyGeneratorClass, PartitionFields as PartitionFieldsConfig, +}; + +pub const PARTITION_METAFIELD_PREFIX: &str = ".hoodie_partition_metadata"; +pub const EMPTY_PARTITION_PATH: &str = ""; + +/// Check if the table is partitioned based on configuration. +pub fn is_table_partitioned(hudi_configs: &HudiConfigs) -> bool { + let has_partition_fields = { + let partition_fields: Vec = + hudi_configs.get_or_default(PartitionFieldsConfig).into(); + !partition_fields.is_empty() + }; + + let uses_non_partitioned_key_gen = hudi_configs + .try_get(KeyGeneratorClass) + .map(|key_gen| { + let key_gen_str: String = key_gen.into(); + key_gen_str == "org.apache.hudi.keygen.NonpartitionedKeyGenerator" + }) + .unwrap_or(false); + + has_partition_fields && !uses_non_partitioned_key_gen +} + use crate::Result; use crate::config::HudiConfigs; use crate::config::read::HudiReadConfig; @@ -108,9 +134,7 @@ use crate::file_group::reader::FileGroupReader; use crate::metadata::METADATA_TABLE_PARTITION_FIELD; use crate::schema::resolver::{resolve_avro_schema, resolve_schema}; use crate::table::builder::TableBuilder; -use crate::table::file_pruner::FilePruner; use crate::table::fs_view::FileSystemView; -use crate::table::partition::PartitionPruner; use crate::timeline::util::format_timestamp; use crate::timeline::{EARLIEST_START_TIMESTAMP, Timeline}; use crate::util::collection::split_into_chunks; @@ -489,14 +513,16 @@ impl Table { // Create file pruner with filters on non-partition columns let file_pruner = FilePruner::new(filters, &table_schema, &partition_schema)?; - // Try to create metadata table instance if enabled + // Create metadata table if enabled (for files listing and stats pruning) let metadata_table = if self.is_metadata_table_enabled() { - log::debug!("Using metadata table for file listing"); match self.new_metadata_table().await { - Ok(mdt) => Some(mdt), + Ok(mdt) => { + log::debug!("Using metadata table for file listing and stats pruning"); + Some(mdt) + } Err(e) => { log::warn!( - "Failed to create metadata table, falling back to storage listing: {e}" + "Failed to create metadata table: {e}. Falling back to storage listing." ); None } diff --git a/crates/core/src/table/file_pruner.rs b/crates/core/src/table/pruning/file_pruner.rs similarity index 64% rename from crates/core/src/table/file_pruner.rs rename to crates/core/src/table/pruning/file_pruner.rs index 500a2a3b..1e0f209a 100644 --- a/crates/core/src/table/file_pruner.rs +++ b/crates/core/src/table/pruning/file_pruner.rs @@ -20,15 +20,14 @@ //! File-level pruner for filtering files based on column statistics. use crate::Result; -use crate::expr::ExprOperator; use crate::expr::filter::{Filter, SchemableFilter}; -use crate::statistics::{ColumnStatistics, StatisticsContainer}; +use crate::statistics::StatisticsContainer; -use arrow_array::{ArrayRef, Datum}; -use arrow_ord::cmp; use arrow_schema::Schema; use std::collections::HashSet; +use super::StatsPruner; + /// A file-level pruner that filters files based on column statistics. /// /// This pruner uses min/max statistics from Parquet files to determine if a file @@ -46,7 +45,7 @@ impl FilePruner { /// Filters on partition columns are excluded since they are handled by PartitionPruner. /// /// # Arguments - /// * `and_filters` - List of filters to apply + /// * `and_filters` - List of filters to apply (AND semantics) /// * `table_schema` - The table's data schema /// * `partition_schema` - The partition schema (filters on these columns are excluded) pub fn new( @@ -54,7 +53,7 @@ impl FilePruner { table_schema: &Schema, partition_schema: &Schema, ) -> Result { - // Get partition column names to exclude + // Get partition column names to exclude from file pruning let partition_columns: HashSet<&str> = partition_schema .fields() .iter() @@ -83,11 +82,23 @@ impl FilePruner { self.and_filters.is_empty() } + /// Returns the names of columns used in the filter predicates. + pub fn filter_column_names(&self) -> Vec<&str> { + self.and_filters + .iter() + .map(|f| f.field.name().as_str()) + .collect() + } + + /// Returns a clone of the filters for use in partition stats pruning. + pub fn filters(&self) -> Vec { + self.and_filters.clone() + } + /// Returns `true` if the file should be included based on its statistics. /// /// A file is included if ANY of its rows MIGHT match all the filters. /// A file is excluded (pruned) only if we can prove that NO rows can match. - /// /// If statistics are missing or incomplete, the file is included (safe default). pub fn should_include(&self, stats: &StatisticsContainer) -> bool { // If no filters, include everything @@ -95,179 +106,29 @@ impl FilePruner { return true; } - // All filters must pass (AND semantics) - // If any filter definitively excludes the file, return false + // All filters must pass (AND semantics). + // If any filter definitively excludes the file, return false. for filter in &self.and_filters { let col_name = filter.field.name(); - - // Get column statistics. When using StatisticsContainer::from_parquet_metadata(), - // all schema columns will have an entry. However, stats may come from other sources - // (e.g., manually constructed), so we handle missing columns defensively. let Some(col_stats) = stats.columns.get(col_name) else { // No stats for this column, cannot prune - include the file continue; }; - // Check if this filter can prune the file - if self.can_prune_by_filter(filter, col_stats) { + if StatsPruner::can_prune_by_filter(filter, col_stats) { return false; // File can be pruned } } - true // File should be included - } - - /// Determines if a file can be pruned based on a single filter and column stats. - /// - /// Returns `true` if the file can definitely be pruned (no rows can match). - fn can_prune_by_filter(&self, filter: &SchemableFilter, col_stats: &ColumnStatistics) -> bool { - // Get the filter value as an ArrayRef - let filter_array = self.extract_filter_array(filter); - let Some(filter_value) = filter_array else { - return false; // Cannot extract value, don't prune - }; - - let min = &col_stats.min_value; - let max = &col_stats.max_value; - - match filter.operator { - ExprOperator::Eq => { - // Prune if: value < min OR value > max - self.can_prune_eq(&filter_value, min, max) - } - ExprOperator::Ne => { - // Prune if: min = max = value (all rows equal the excluded value) - self.can_prune_ne(&filter_value, min, max) - } - ExprOperator::Lt => { - // Prune if: min >= value (all values are >= the threshold) - self.can_prune_lt(&filter_value, min) - } - ExprOperator::Lte => { - // Prune if: min > value - self.can_prune_lte(&filter_value, min) - } - ExprOperator::Gt => { - // Prune if: max <= value (all values are <= the threshold) - self.can_prune_gt(&filter_value, max) - } - ExprOperator::Gte => { - // Prune if: max < value - self.can_prune_gte(&filter_value, max) - } - } - } - - /// Prune for `col = value`: prune if value < min OR value > max - fn can_prune_eq( - &self, - value: &ArrayRef, - min: &Option, - max: &Option, - ) -> bool { - // Need both min and max to make this decision - let Some(min_val) = min else { - return false; - }; - let Some(max_val) = max else { - return false; - }; - - // Prune if value < min OR value > max - let value_lt_min = cmp::lt(value, min_val).map(|r| r.value(0)).unwrap_or(false); - let value_gt_max = cmp::gt(value, max_val).map(|r| r.value(0)).unwrap_or(false); - - value_lt_min || value_gt_max - } - - /// Prune for `col != value`: prune if min = max = value - fn can_prune_ne( - &self, - value: &ArrayRef, - min: &Option, - max: &Option, - ) -> bool { - // Need both min and max to make this decision - let Some(min_val) = min else { - return false; - }; - let Some(max_val) = max else { - return false; - }; - - // Prune only if min = max = value (all rows have the excluded value) - let min_eq_max = cmp::eq(min_val, max_val) - .map(|r| r.value(0)) - .unwrap_or(false); - let min_eq_value = cmp::eq(min_val, value).map(|r| r.value(0)).unwrap_or(false); - - min_eq_max && min_eq_value - } - - /// Prune for `col < value`: prune if min >= value - fn can_prune_lt(&self, value: &ArrayRef, min: &Option) -> bool { - let Some(min_val) = min else { - return false; - }; - - // Prune if min >= value - cmp::gt_eq(min_val, value) - .map(|r| r.value(0)) - .unwrap_or(false) - } - - /// Prune for `col <= value`: prune if min > value - fn can_prune_lte(&self, value: &ArrayRef, min: &Option) -> bool { - let Some(min_val) = min else { - return false; - }; - - // Prune if min > value - cmp::gt(min_val, value).map(|r| r.value(0)).unwrap_or(false) - } - - /// Prune for `col > value`: prune if max <= value - fn can_prune_gt(&self, value: &ArrayRef, max: &Option) -> bool { - let Some(max_val) = max else { - return false; - }; - - // Prune if max <= value - cmp::lt_eq(max_val, value) - .map(|r| r.value(0)) - .unwrap_or(false) - } - - /// Prune for `col >= value`: prune if max < value - fn can_prune_gte(&self, value: &ArrayRef, max: &Option) -> bool { - let Some(max_val) = max else { - return false; - }; - - // Prune if max < value - cmp::lt(max_val, value).map(|r| r.value(0)).unwrap_or(false) - } - - /// Extracts the filter value as an ArrayRef. - fn extract_filter_array(&self, filter: &SchemableFilter) -> Option { - let (array, is_scalar) = filter.value.get(); - if array.is_empty() { - return None; - } - // Only use scalar values or single-element arrays for min/max pruning. - // Multi-element arrays (e.g., IN lists) cannot be used for simple range pruning. - if is_scalar || array.len() == 1 { - Some(array.slice(0, 1)) - } else { - None - } + true } } #[cfg(test)] mod tests { use super::*; - use arrow_array::{Int64Array, StringArray}; + use crate::statistics::{ColumnStatistics, StatsGranularity}; + use arrow_array::{ArrayRef, Int64Array, StringArray}; use arrow_schema::{DataType, Field}; use std::sync::Arc; @@ -285,7 +146,7 @@ mod tests { } fn create_stats_with_int_range(col_name: &str, min: i64, max: i64) -> StatisticsContainer { - let mut stats = StatisticsContainer::new(crate::statistics::StatsGranularity::File); + let mut stats = StatisticsContainer::new(StatsGranularity::File); stats.columns.insert( col_name.to_string(), ColumnStatistics { @@ -299,7 +160,7 @@ mod tests { } fn create_stats_with_string_range(col_name: &str, min: &str, max: &str) -> StatisticsContainer { - let mut stats = StatisticsContainer::new(crate::statistics::StatsGranularity::File); + let mut stats = StatisticsContainer::new(StatsGranularity::File); stats.columns.insert( col_name.to_string(), ColumnStatistics { @@ -326,11 +187,10 @@ mod tests { let table_schema = create_test_schema(); let partition_schema = create_partition_schema(); - // Filter on partition column should be excluded let filters = vec![Filter::try_from(("date", "=", "2024-01-01")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - assert!(pruner.is_empty()); // Partition column filter should be excluded + assert!(pruner.is_empty()); } #[test] @@ -338,7 +198,6 @@ mod tests { let table_schema = create_test_schema(); let partition_schema = create_partition_schema(); - // Filter on non-partition column should be kept let filters = vec![Filter::try_from(("id", ">", "50")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); @@ -353,7 +212,6 @@ mod tests { let filters = vec![Filter::try_from(("id", "=", "5")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats: min=10, max=100. Filter: id = 5. Should prune (5 < 10). let stats = create_stats_with_int_range("id", 10, 100); assert!(!pruner.should_include(&stats)); } @@ -366,7 +224,6 @@ mod tests { let filters = vec![Filter::try_from(("id", "=", "200")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats: min=10, max=100. Filter: id = 200. Should prune (200 > 100). let stats = create_stats_with_int_range("id", 10, 100); assert!(!pruner.should_include(&stats)); } @@ -379,7 +236,6 @@ mod tests { let filters = vec![Filter::try_from(("id", "=", "50")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats: min=10, max=100. Filter: id = 50. Should include (10 <= 50 <= 100). let stats = create_stats_with_int_range("id", 10, 100); assert!(pruner.should_include(&stats)); } @@ -392,7 +248,6 @@ mod tests { let filters = vec![Filter::try_from(("id", "!=", "50")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats: min=50, max=50. Filter: id != 50. Should prune (all values are 50). let stats = create_stats_with_int_range("id", 50, 50); assert!(!pruner.should_include(&stats)); } @@ -405,7 +260,6 @@ mod tests { let filters = vec![Filter::try_from(("id", "!=", "50")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats: min=10, max=100. Filter: id != 50. Should include (has other values). let stats = create_stats_with_int_range("id", 10, 100); assert!(pruner.should_include(&stats)); } @@ -418,7 +272,6 @@ mod tests { let filters = vec![Filter::try_from(("id", "<", "10")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats: min=10, max=100. Filter: id < 10. Should prune (min >= 10). let stats = create_stats_with_int_range("id", 10, 100); assert!(!pruner.should_include(&stats)); } @@ -431,7 +284,6 @@ mod tests { let filters = vec![Filter::try_from(("id", "<", "50")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats: min=10, max=100. Filter: id < 50. Should include (some values < 50). let stats = create_stats_with_int_range("id", 10, 100); assert!(pruner.should_include(&stats)); } @@ -444,7 +296,6 @@ mod tests { let filters = vec![Filter::try_from(("id", "<=", "5")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats: min=10, max=100. Filter: id <= 5. Should prune (min > 5). let stats = create_stats_with_int_range("id", 10, 100); assert!(!pruner.should_include(&stats)); } @@ -457,7 +308,6 @@ mod tests { let filters = vec![Filter::try_from(("id", ">", "100")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats: min=10, max=100. Filter: id > 100. Should prune (max <= 100). let stats = create_stats_with_int_range("id", 10, 100); assert!(!pruner.should_include(&stats)); } @@ -470,7 +320,6 @@ mod tests { let filters = vec![Filter::try_from(("id", ">", "50")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats: min=10, max=100. Filter: id > 50. Should include (some values > 50). let stats = create_stats_with_int_range("id", 10, 100); assert!(pruner.should_include(&stats)); } @@ -483,7 +332,6 @@ mod tests { let filters = vec![Filter::try_from(("id", ">=", "150")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats: min=10, max=100. Filter: id >= 150. Should prune (max < 150). let stats = create_stats_with_int_range("id", 10, 100); assert!(!pruner.should_include(&stats)); } @@ -496,7 +344,6 @@ mod tests { let filters = vec![Filter::try_from(("id", "<=", "50")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats: min=10, max=100. Filter: id <= 50. Should include (some values <= 50). let stats = create_stats_with_int_range("id", 10, 100); assert!(pruner.should_include(&stats)); } @@ -509,7 +356,6 @@ mod tests { let filters = vec![Filter::try_from(("id", ">=", "50")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats: min=10, max=100. Filter: id >= 50. Should include (some values >= 50). let stats = create_stats_with_int_range("id", 10, 100); assert!(pruner.should_include(&stats)); } @@ -522,11 +368,9 @@ mod tests { let filters = vec![Filter::try_from(("name", "=", "zebra")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats: min="apple", max="banana". Filter: name = "zebra". Should prune. let stats = create_stats_with_string_range("name", "apple", "banana"); assert!(!pruner.should_include(&stats)); - // Stats: min="apple", max="zebra". Filter: name = "banana". Should include. let stats2 = create_stats_with_string_range("name", "apple", "zebra"); assert!(pruner.should_include(&stats2)); } @@ -539,7 +383,6 @@ mod tests { let filters = vec![Filter::try_from(("id", "=", "50")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats for different column - should include (cannot prune without stats) let stats = create_stats_with_int_range("other_column", 1, 10); assert!(pruner.should_include(&stats)); } @@ -552,8 +395,7 @@ mod tests { let filters = vec![Filter::try_from(("id", "=", "50")).unwrap()]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Column exists but has no min/max (e.g., Parquet file with statistics disabled) - let mut stats = StatisticsContainer::new(crate::statistics::StatsGranularity::File); + let mut stats = StatisticsContainer::new(StatsGranularity::File); stats.columns.insert( "id".to_string(), ColumnStatistics { @@ -564,7 +406,6 @@ mod tests { }, ); - // Should include file (cannot prune without min/max values) assert!(pruner.should_include(&stats)); } @@ -579,8 +420,6 @@ mod tests { ]; let pruner = FilePruner::new(&filters, &table_schema, &partition_schema).unwrap(); - // Stats: min=10, max=100. Filter: id > 0 AND id < 5. - // First filter passes (max > 0), second filter prunes (min >= 5). let stats = create_stats_with_int_range("id", 10, 100); assert!(!pruner.should_include(&stats)); } diff --git a/crates/core/src/table/pruning/mod.rs b/crates/core/src/table/pruning/mod.rs new file mode 100644 index 00000000..6a7f65a4 --- /dev/null +++ b/crates/core/src/table/pruning/mod.rs @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Pruning modules for filtering files and partitions based on statistics. +//! +//! This module provides common statistics-based pruning logic that can be used +//! at different granularities (file-level, partition-level). + +mod file_pruner; +mod partition_pruner; + +pub use file_pruner::FilePruner; +pub use partition_pruner::PartitionPruner; + +use arrow_array::ArrayRef; +use arrow_ord::cmp; + +use crate::expr::ExprOperator; +use crate::expr::filter::SchemableFilter; +use crate::statistics::ColumnStatistics; + +/// Common statistics-based pruning logic. +/// +/// These functions determine if a data range can be pruned based on filter predicates +/// and column min/max statistics. They are used by both file-level and partition-level +/// pruners. +pub struct StatsPruner; + +impl StatsPruner { + /// Determines if a data range can be pruned based on a filter and column stats. + /// + /// Returns `true` if the range can definitely be pruned (no rows can match). + pub fn can_prune_by_filter(filter: &SchemableFilter, col_stats: &ColumnStatistics) -> bool { + let Some(filter_value) = Self::extract_filter_array(filter) else { + return false; // Cannot extract value, don't prune + }; + + let min = &col_stats.min_value; + let max = &col_stats.max_value; + + match filter.operator { + ExprOperator::Eq => Self::can_prune_eq(&filter_value, min, max), + ExprOperator::Ne => Self::can_prune_ne(&filter_value, min, max), + ExprOperator::Lt => Self::can_prune_lt(&filter_value, min), + ExprOperator::Lte => Self::can_prune_lte(&filter_value, min), + ExprOperator::Gt => Self::can_prune_gt(&filter_value, max), + ExprOperator::Gte => Self::can_prune_gte(&filter_value, max), + } + } + + /// Extracts the filter value as an ArrayRef. + fn extract_filter_array(filter: &SchemableFilter) -> Option { + use arrow_array::Datum; + let (array, is_scalar) = filter.value.get(); + if array.is_empty() { + return None; + } + if is_scalar || array.len() == 1 { + Some(array.slice(0, 1)) + } else { + None + } + } + + /// Prune for `col = value`: prune if value < min OR value > max + fn can_prune_eq(value: &ArrayRef, min: &Option, max: &Option) -> bool { + // Need both min and max to make this decision + let Some(min_val) = min else { return false }; + let Some(max_val) = max else { return false }; + + // Prune if value is outside the [min, max] range + let value_lt_min = cmp::lt(value, min_val).map(|r| r.value(0)).unwrap_or(false); + let value_gt_max = cmp::gt(value, max_val).map(|r| r.value(0)).unwrap_or(false); + + value_lt_min || value_gt_max + } + + /// Prune for `col != value`: prune if min = max = value (all rows equal the excluded value) + fn can_prune_ne(value: &ArrayRef, min: &Option, max: &Option) -> bool { + // Need both min and max to make this decision + let Some(min_val) = min else { return false }; + let Some(max_val) = max else { return false }; + + // Prune only if all rows have the excluded value (min = max = value) + let min_eq_max = cmp::eq(min_val, max_val) + .map(|r| r.value(0)) + .unwrap_or(false); + let min_eq_value = cmp::eq(min_val, value).map(|r| r.value(0)).unwrap_or(false); + + min_eq_max && min_eq_value + } + + /// Prune for `col < value`: prune if min >= value (all values are >= the threshold) + fn can_prune_lt(value: &ArrayRef, min: &Option) -> bool { + let Some(min_val) = min else { return false }; + cmp::gt_eq(min_val, value) + .map(|r| r.value(0)) + .unwrap_or(false) + } + + /// Prune for `col <= value`: prune if min > value (all values are > the threshold) + fn can_prune_lte(value: &ArrayRef, min: &Option) -> bool { + let Some(min_val) = min else { return false }; + cmp::gt(min_val, value).map(|r| r.value(0)).unwrap_or(false) + } + + /// Prune for `col > value`: prune if max <= value (all values are <= the threshold) + fn can_prune_gt(value: &ArrayRef, max: &Option) -> bool { + let Some(max_val) = max else { return false }; + cmp::lt_eq(max_val, value) + .map(|r| r.value(0)) + .unwrap_or(false) + } + + /// Prune for `col >= value`: prune if max < value (all values are < the threshold) + fn can_prune_gte(value: &ArrayRef, max: &Option) -> bool { + let Some(max_val) = max else { return false }; + cmp::lt(max_val, value).map(|r| r.value(0)).unwrap_or(false) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::Int64Array; + use arrow_schema::DataType; + use std::sync::Arc; + + fn make_int_stats(min: i64, max: i64) -> ColumnStatistics { + ColumnStatistics { + column_name: "col".to_string(), + data_type: DataType::Int64, + min_value: Some(Arc::new(Int64Array::from(vec![min])) as ArrayRef), + max_value: Some(Arc::new(Int64Array::from(vec![max])) as ArrayRef), + } + } + + #[test] + fn test_stats_pruner_eq() { + let schema = arrow_schema::Schema::new(vec![arrow_schema::Field::new( + "col", + DataType::Int64, + false, + )]); + + // value=5, range=[10,100] -> prune (5 < 10) + let filter = crate::expr::filter::Filter::try_from(("col", "=", "5")).unwrap(); + let schemable = SchemableFilter::try_from((filter, &schema)).unwrap(); + let stats = make_int_stats(10, 100); + assert!(StatsPruner::can_prune_by_filter(&schemable, &stats)); + + // value=50, range=[10,100] -> include + let filter = crate::expr::filter::Filter::try_from(("col", "=", "50")).unwrap(); + let schemable = SchemableFilter::try_from((filter, &schema)).unwrap(); + assert!(!StatsPruner::can_prune_by_filter(&schemable, &stats)); + } + + #[test] + fn test_stats_pruner_gt() { + let schema = arrow_schema::Schema::new(vec![arrow_schema::Field::new( + "col", + DataType::Int64, + false, + )]); + + // col > 100, range=[10,100] -> prune (max <= 100) + let filter = crate::expr::filter::Filter::try_from(("col", ">", "100")).unwrap(); + let schemable = SchemableFilter::try_from((filter, &schema)).unwrap(); + let stats = make_int_stats(10, 100); + assert!(StatsPruner::can_prune_by_filter(&schemable, &stats)); + + // col > 50, range=[10,100] -> include + let filter = crate::expr::filter::Filter::try_from(("col", ">", "50")).unwrap(); + let schemable = SchemableFilter::try_from((filter, &schema)).unwrap(); + assert!(!StatsPruner::can_prune_by_filter(&schemable, &stats)); + } +} diff --git a/crates/core/src/table/partition.rs b/crates/core/src/table/pruning/partition_pruner.rs similarity index 71% rename from crates/core/src/table/partition.rs rename to crates/core/src/table/pruning/partition_pruner.rs index 79aa32dd..2f147260 100644 --- a/crates/core/src/table/partition.rs +++ b/crates/core/src/table/pruning/partition_pruner.rs @@ -16,47 +16,44 @@ * specific language governing permissions and limitations * under the License. */ + +//! Partition-level pruner for filtering partitions based on partition values and statistics. + use crate::Result; use crate::config::HudiConfigs; use crate::config::table::HudiTableConfig; use crate::error::CoreError::InvalidPartitionPath; use crate::expr::filter::{Filter, SchemableFilter}; +use crate::statistics::StatisticsContainer; +use crate::table::is_table_partitioned; use arrow_array::{ArrayRef, Scalar}; use arrow_schema::Schema; -use crate::config::table::HudiTableConfig::{KeyGeneratorClass, PartitionFields}; use std::collections::HashMap; use std::sync::Arc; -pub const PARTITION_METAFIELD_PREFIX: &str = ".hoodie_partition_metadata"; -pub const EMPTY_PARTITION_PATH: &str = ""; - -pub fn is_table_partitioned(hudi_configs: &HudiConfigs) -> bool { - let has_partition_fields = { - let partition_fields: Vec = hudi_configs.get_or_default(PartitionFields).into(); - !partition_fields.is_empty() - }; - - let uses_non_partitioned_key_gen = hudi_configs - .try_get(KeyGeneratorClass) - .map(|key_gen| { - let key_gen_str: String = key_gen.into(); - key_gen_str == "org.apache.hudi.keygen.NonpartitionedKeyGenerator" - }) - .unwrap_or(false); - - has_partition_fields && !uses_non_partitioned_key_gen -} +use super::StatsPruner; /// A partition pruner that filters partitions based on the partition path and its filters. +/// +/// The pruner supports two levels of filtering: +/// 1. **Partition column values**: Filters based on the actual partition column values +/// parsed from the partition path (e.g., `city=chennai` → `city = "chennai"`) +/// 2. **Partition statistics** (optional): Filters based on aggregated column statistics +/// at the partition level, loaded from the metadata table's `partition_stats` partition #[derive(Debug, Clone)] pub struct PartitionPruner { schema: Arc, is_hive_style: bool, is_url_encoded: bool, is_partitioned: bool, + /// Filters on partition columns (for partition path value filtering). and_filters: Vec, + /// Filters on data columns (for partition stats filtering). + data_filters: Vec, + /// Optional partition-level statistics for enhanced pruning. + partition_stats: Option>, } impl PartitionPruner { @@ -84,6 +81,8 @@ impl PartitionPruner { is_url_encoded, is_partitioned, and_filters, + data_filters: Vec::new(), + partition_stats: None, }) } @@ -95,12 +94,31 @@ impl PartitionPruner { is_url_encoded: false, is_partitioned: false, and_filters: Vec::new(), + data_filters: Vec::new(), + partition_stats: None, } } + /// Set partition statistics for enhanced pruning. + pub fn with_partition_stats(mut self, stats: HashMap) -> Self { + self.partition_stats = Some(stats); + self + } + + /// Set data column filters for stats-based pruning. + pub fn with_data_filters(mut self, filters: Vec) -> Self { + self.data_filters = filters; + self + } + + /// Check if partition stats are available. + pub fn has_partition_stats(&self) -> bool { + self.partition_stats.is_some() + } + /// Returns `true` if the partition pruner does not have any filters. pub fn is_empty(&self) -> bool { - self.and_filters.is_empty() + self.and_filters.is_empty() && self.data_filters.is_empty() } /// Returns `true` if the table is partitioned. @@ -109,23 +127,48 @@ impl PartitionPruner { } /// Returns `true` if the partition path should be included based on the filters. + /// + /// This method performs two levels of filtering: + /// 1. Partition column value filtering - checks if partition path values match filters + /// 2. Partition stats filtering - if partition_stats are available, uses aggregated + /// column statistics to prune partitions based on data column filters pub fn should_include(&self, partition_path: &str) -> bool { + // Level 1: Partition column value filtering let segments = match self.parse_segments(partition_path) { Ok(s) => s, - Err(_) => return true, // Include the partition regardless of parsing error + Err(_) => return true, }; - self.and_filters.iter().all(|filter| { - match segments.get(filter.field.name()) { - Some(segment_value) => { - match filter.apply_comparison(segment_value) { + let partition_filter_pass = + self.and_filters + .iter() + .all(|filter| match segments.get(filter.field.name()) { + Some(segment_value) => match filter.apply_comparison(segment_value) { Ok(scalar) => scalar.value(0), - Err(_) => true, // Include the partition when comparison error occurs + Err(_) => true, + }, + None => true, + }); + + if !partition_filter_pass { + return false; + } + + // Level 2: Partition stats filtering (if available) + if let Some(ref stats_map) = self.partition_stats { + if let Some(stats) = stats_map.get(partition_path) { + for filter in &self.data_filters { + let col_name = filter.field.name(); + if let Some(col_stats) = stats.columns.get(col_name) { + if StatsPruner::can_prune_by_filter(filter, col_stats) { + return false; + } } } - None => true, // Include the partition when filtering field does not match any field in the partition } - }) + } + + true } fn parse_segments(&self, partition_path: &str) -> Result>> { @@ -181,9 +224,9 @@ mod tests { IsHiveStylePartitioning, IsPartitionPathUrlencoded, }; use crate::expr::ExprOperator; - - use arrow::datatypes::{DataType, Field, Schema}; - use arrow_array::Date32Array; + use crate::statistics::{ColumnStatistics, StatsGranularity}; + use arrow::datatypes::{DataType, Field}; + use arrow_array::{Date32Array, Int64Array}; use std::str::FromStr; fn create_test_schema() -> Schema { @@ -200,6 +243,7 @@ mod tests { (IsPartitionPathUrlencoded, is_url_encoded.to_string()), ]) } + #[test] fn test_partition_pruner_new() { let schema = create_test_schema(); @@ -372,4 +416,41 @@ mod tests { assert_eq!(filter.operator, ExprOperator::from_str(op).unwrap()); } } + + #[test] + fn test_partition_pruner_with_partition_stats() { + let partition_schema = Schema::new(vec![Field::new("city", DataType::Utf8, false)]); + let data_schema = Schema::new(vec![Field::new("amount", DataType::Int64, false)]); + let configs = create_hudi_configs(true, false); + + let make_stats = |min: i64, max: i64| { + let mut stats = StatisticsContainer::new(StatsGranularity::File); + stats.columns.insert( + "amount".to_string(), + ColumnStatistics { + column_name: "amount".to_string(), + data_type: DataType::Int64, + min_value: Some(Arc::new(Int64Array::from(vec![min])) as ArrayRef), + max_value: Some(Arc::new(Int64Array::from(vec![max])) as ArrayRef), + }, + ); + stats + }; + + let mut stats_map = HashMap::new(); + stats_map.insert("city=A".to_string(), make_stats(100, 300)); + stats_map.insert("city=B".to_string(), make_stats(400, 800)); + + let data_filter = Filter::try_from(("amount", ">", "500")).unwrap(); + let data_schemable = SchemableFilter::try_from((data_filter, &data_schema)).unwrap(); + + let pruner = PartitionPruner::new(&[], &partition_schema, &configs) + .unwrap() + .with_partition_stats(stats_map) + .with_data_filters(vec![data_schemable]); + + assert!(pruner.has_partition_stats()); + assert!(!pruner.should_include("city=A")); // max=300 <= 500, pruned + assert!(pruner.should_include("city=B")); // max=800 > 500, included + } } diff --git a/crates/core/src/util/hash.rs b/crates/core/src/util/hash.rs new file mode 100644 index 00000000..9955d168 --- /dev/null +++ b/crates/core/src/util/hash.rs @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +//! Hash utilities for Hudi metadata table key generation. +//! +//! The metadata table uses composite keys for column stats lookups: +//! - ColumnIndexId: Base64(XXHash64(column_name, seed=0xdabadaba)) = 12 chars +//! - PartitionIndexId: Base64(XXHash64(partition_path, seed=0xdabadaba)) = 12 chars +//! - FileIndexId: Base64(MD5(file_name)) = 24 chars +//! +//! Key format: COLUMN_HASH + PARTITION_HASH + FILE_HASH = 48 chars total + +use base64::Engine; +use base64::engine::general_purpose::STANDARD; +use md5::{Digest, Md5}; + +use crate::metadata::NON_PARTITIONED_NAME; + +/// The seed used for XXHash64 in Hudi metadata table key generation. +const XXHASH_SEED: u64 = 0xdaba_daba; + +/// Total length of a column stats key (12 + 12 + 24 = 48 chars with standard Base64 padding). +pub const COLUMN_STATS_KEY_LENGTH: usize = 48; + +/// Column index ID - 12 character Base64-encoded XXHash64 of column name. +/// +/// Used as the first component of column stats keys in the metadata table. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ColumnIndexId(String); + +impl ColumnIndexId { + /// Create a new ColumnIndexId from a column name. + /// + /// Computes XXHash64 of the column name with seed 0xdabadaba and encodes as Base64. + pub fn new(column_name: &str) -> Self { + let hash = xxhash_rust::xxh64::xxh64(column_name.as_bytes(), XXHASH_SEED); + let encoded = STANDARD.encode(hash.to_be_bytes()); + Self(encoded) + } + + /// Get the Base64-encoded hash as a string slice. + pub fn as_str(&self) -> &str { + &self.0 + } +} + +/// Partition index ID - 12 character Base64-encoded XXHash64 of partition path. +/// +/// Used as the second component of column stats keys in the metadata table. +/// For non-partitioned tables, the partition identifier is "." (not ""). +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PartitionIndexId(String); + +impl PartitionIndexId { + /// Create a new PartitionIndexId from a partition path. + /// + /// Computes XXHash64 of the partition path with seed 0xdabadaba and encodes as Base64. + pub fn new(partition_path: &str) -> Self { + let normalized = if partition_path.is_empty() { + NON_PARTITIONED_NAME + } else { + partition_path + }; + let hash = xxhash_rust::xxh64::xxh64(normalized.as_bytes(), XXHASH_SEED); + let encoded = STANDARD.encode(hash.to_be_bytes()); + Self(encoded) + } + + /// Get the Base64-encoded hash as a string slice. + pub fn as_str(&self) -> &str { + &self.0 + } +} + +/// File index ID - 24 character Base64-encoded MD5 of file name. +/// +/// Used as the third component of column stats keys in the metadata table. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct FileIndexId(String); + +impl FileIndexId { + /// Create a new FileIndexId from a file name. + /// + /// Computes MD5 of the file name and encodes as Base64. + pub fn new(file_name: &str) -> Self { + let mut hasher = Md5::new(); + hasher.update(file_name.as_bytes()); + let hash = hasher.finalize(); + let encoded = STANDARD.encode(hash); + Self(encoded) + } + + /// Get the Base64-encoded hash as a string slice. + pub fn as_str(&self) -> &str { + &self.0 + } +} + +/// Generate a column stats key for a specific column, partition, and file. +/// +/// The key format is: COLUMN_HASH + PARTITION_HASH + FILE_HASH (48 chars total) +/// +/// # Arguments +/// * `column_name` - The column name +/// * `partition_path` - The partition path (empty string is normalized to "." for non-partitioned tables) +/// * `file_name` - The file name (just the filename, not the full path) +pub fn get_column_stats_key(column_name: &str, partition_path: &str, file_name: &str) -> String { + let col_id = ColumnIndexId::new(column_name); + let part_id = PartitionIndexId::new(partition_path); + let file_id = FileIndexId::new(file_name); + format!( + "{}{}{}", + col_id.as_str(), + part_id.as_str(), + file_id.as_str() + ) +} + +/// Generate a prefix for looking up all column stats for a specific column. +/// +/// Returns: COLUMN_HASH (12 chars) +/// +/// This can be used with HFile prefix lookups to find all stats for a column. +pub fn get_column_stats_prefix_for_column(column_name: &str) -> String { + ColumnIndexId::new(column_name).0 +} + +/// Generate a prefix for looking up all column stats for a column in a partition. +/// +/// Returns: COLUMN_HASH + PARTITION_HASH (24 chars) +/// +/// This can be used with HFile prefix lookups to find all stats for a column +/// within a specific partition. +pub fn get_column_stats_prefix_for_column_and_partition( + column_name: &str, + partition_path: &str, +) -> String { + let col_id = ColumnIndexId::new(column_name); + let part_id = PartitionIndexId::new(partition_path); + format!("{}{}", col_id.as_str(), part_id.as_str()) +} + +/// Generate a partition stats key for a column and partition. +/// +/// The key format is: COLUMN_HASH + PARTITION_HASH (24 chars) +/// +/// Partition stats use the same key structure as the column+partition prefix +/// since they aggregate stats at the partition level rather than file level. +pub fn get_partition_stats_key(column_name: &str, partition_path: &str) -> String { + get_column_stats_prefix_for_column_and_partition(column_name, partition_path) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_key_component_lengths() { + // ColumnIndexId: 64-bit XXHash -> 8 bytes -> 12 chars Base64 (with padding) + assert_eq!(ColumnIndexId::new("test_column").as_str().len(), 12); + assert_eq!(ColumnIndexId::new("id").as_str().len(), 12); + assert_eq!(ColumnIndexId::new("").as_str().len(), 12); + + // PartitionIndexId: 64-bit XXHash -> 8 bytes -> 12 chars Base64 + assert_eq!(PartitionIndexId::new("city=chennai").as_str().len(), 12); + assert_eq!(PartitionIndexId::new("").as_str().len(), 12); + assert_eq!(PartitionIndexId::new(".").as_str().len(), 12); + + // FileIndexId: 128-bit MD5 -> 16 bytes -> 24 chars Base64 + assert_eq!( + FileIndexId::new("abc-0_0-123-456_20231214.parquet") + .as_str() + .len(), + 24 + ); + } + + #[test] + fn test_column_stats_key_total_length() { + // Total: 12 + 12 + 24 = 48 chars + assert_eq!( + get_column_stats_key("id", "city=chennai", "data.parquet").len(), + COLUMN_STATS_KEY_LENGTH + ); + assert_eq!( + get_column_stats_key("column_name", "", "file.parquet").len(), + COLUMN_STATS_KEY_LENGTH + ); + } + + #[test] + fn test_partition_path_normalization() { + // Empty partition path should be normalized to "." (NON_PARTITIONED_NAME) + let empty_partition = PartitionIndexId::new(""); + let dot_partition = PartitionIndexId::new("."); + assert_eq!( + empty_partition.as_str(), + dot_partition.as_str(), + "Empty partition path should be normalized to '.'" + ); + + // Non-empty partition paths should not be modified + let normal_partition = PartitionIndexId::new("city=chennai"); + assert_ne!(normal_partition.as_str(), dot_partition.as_str()); + } + + #[test] + fn test_column_stats_key_with_non_partitioned_table() { + // For non-partitioned tables, using "" or "." should produce the same key + let key_empty = get_column_stats_key("id", "", "file.parquet"); + let key_dot = get_column_stats_key("id", ".", "file.parquet"); + assert_eq!( + key_empty, key_dot, + "Non-partitioned table keys should match" + ); + } + + #[test] + fn test_different_inputs_produce_different_hashes() { + // Different column names + let col1 = ColumnIndexId::new("column_a"); + let col2 = ColumnIndexId::new("column_b"); + assert_ne!(col1.as_str(), col2.as_str()); + + // Different partition paths + let part1 = PartitionIndexId::new("city=chennai"); + let part2 = PartitionIndexId::new("city=san_francisco"); + assert_ne!(part1.as_str(), part2.as_str()); + + // Different file names + let file1 = FileIndexId::new("file1.parquet"); + let file2 = FileIndexId::new("file2.parquet"); + assert_ne!(file1.as_str(), file2.as_str()); + } + + #[test] + fn test_same_inputs_produce_same_hashes() { + // Hash functions should be deterministic + assert_eq!( + ColumnIndexId::new("test").as_str(), + ColumnIndexId::new("test").as_str() + ); + assert_eq!( + PartitionIndexId::new("part").as_str(), + PartitionIndexId::new("part").as_str() + ); + assert_eq!( + FileIndexId::new("file.parquet").as_str(), + FileIndexId::new("file.parquet").as_str() + ); + } + + #[test] + fn test_column_prefix_relationship() { + let column_prefix = get_column_stats_prefix_for_column("id"); + let column_partition_prefix = + get_column_stats_prefix_for_column_and_partition("id", "city=chennai"); + let full_key = get_column_stats_key("id", "city=chennai", "file.parquet"); + + // Column prefix should be start of column+partition prefix + assert!(column_partition_prefix.starts_with(&column_prefix)); + + // Column+partition prefix should be start of full key + assert!(full_key.starts_with(&column_partition_prefix)); + + // Prefix lengths + assert_eq!(column_prefix.len(), 12); + assert_eq!(column_partition_prefix.len(), 24); + } + + #[test] + fn test_partition_stats_key_format() { + // Partition stats key = column hash + partition hash (24 chars) + let partition_stats_key = get_partition_stats_key("id", "city=chennai"); + assert_eq!(partition_stats_key.len(), 24); + + // Should equal column+partition prefix + let prefix = get_column_stats_prefix_for_column_and_partition("id", "city=chennai"); + assert_eq!(partition_stats_key, prefix); + } + + #[test] + fn test_key_component_ordering() { + // Verify the key structure: COLUMN + PARTITION + FILE + let column = ColumnIndexId::new("id"); + let partition = PartitionIndexId::new("city=chennai"); + let file = FileIndexId::new("data.parquet"); + + let full_key = get_column_stats_key("id", "city=chennai", "data.parquet"); + + // Full key should be concatenation of components in correct order + let expected = format!("{}{}{}", column.as_str(), partition.as_str(), file.as_str()); + assert_eq!(full_key, expected); + } +} diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs index 744d992b..e8346861 100644 --- a/crates/core/src/util/mod.rs +++ b/crates/core/src/util/mod.rs @@ -18,4 +18,5 @@ */ pub mod arrow; pub mod collection; +pub mod hash; pub mod path; diff --git a/crates/core/tests/table_read_tests.rs b/crates/core/tests/table_read_tests.rs index fa899d74..0a1b7254 100644 --- a/crates/core/tests/table_read_tests.rs +++ b/crates/core/tests/table_read_tests.rs @@ -1144,7 +1144,7 @@ mod streaming_queries { /// These tests verify MDT-accelerated file listing and partition normalization. mod mdt_enabled_tables { use super::*; - use hudi_core::table::partition::PartitionPruner; + use hudi_core::table::PartitionPruner; mod snapshot_queries { use super::*; @@ -1189,15 +1189,16 @@ mod mdt_enabled_tables { /// The metadata table stores "." as partition key, but external API should see "". /// For non-partitioned tables, we use a fast path that directly fetches "." without /// going through __all_partitions__ lookup. - #[test] - fn test_v8_nonpartitioned_mdt_partition_normalization() -> Result<()> { + #[tokio::test] + async fn test_v8_nonpartitioned_mdt_partition_normalization() -> Result<()> { let base_url = SampleTableMdt::V8Nonpartitioned.url_to_mor_avro(); - let hudi_table = Table::new_blocking(base_url.path())?; + let hudi_table = Table::new(base_url.path()).await?; // Read MDT files partition records let partition_pruner = PartitionPruner::empty(); - let records = - hudi_table.read_metadata_table_files_partition_blocking(&partition_pruner)?; + let records = hudi_table + .read_metadata_table_files_partition(&partition_pruner) + .await?; // For non-partitioned tables, the fast path only fetches the files record. // __all_partitions__ is not fetched to avoid redundant HFile lookup. @@ -1227,5 +1228,32 @@ mod mdt_enabled_tables { Ok(()) } + + /// Test reading column statistics from metadata table. + #[tokio::test] + async fn test_v8_nonpartitioned_read_column_stats() -> Result<()> { + let base_url = SampleTableMdt::V8Nonpartitioned.url_to_mor_avro(); + let hudi_table = Table::new(base_url.path()).await?; + + // Verify column_stats partition is available + assert!(hudi_table.has_column_stats_partition()); + + // Get file slices and read column stats + let file_slices = hudi_table.get_file_slices(empty_filters()).await?; + let file_names: Vec = file_slices + .iter() + .map(|fs| fs.base_file.file_name()) + .collect(); + let file_name_refs: Vec<&str> = file_names.iter().map(|s| s.as_str()).collect(); + + let stats = hudi_table + .read_column_stats_for_files(&file_name_refs, &["id", "longField"], "") + .await?; + + // API should work - stats may be empty or contain data depending on test data + assert!(stats.is_empty() || stats.values().any(|s| !s.columns.is_empty())); + + Ok(()) + } } }