From da8eb71f7eacaece1cac361a79e62ac44f51fa7a Mon Sep 17 00:00:00 2001 From: Ethan Urbanski Date: Sun, 4 Jan 2026 20:50:13 -0500 Subject: [PATCH 1/2] feat(iceberg): add delete file support to SnapshotProducer This enables SnapshotProducer to accept and process delete files (position deletes and equality deletes) alongside data files. Changes: - Add added_delete_files field to SnapshotProducer - Add validate_added_delete_files() for delete file validation: - Rejects delete files in V1 format - Validates content types (PositionDeletes, EqualityDeletes) - Requires equality_ids for equality delete files - Validates partition spec compatibility - Add write_delete_manifest() to write delete manifests with ManifestContentType::Deletes - Update manifest_file() to include delete manifests - Update summary() to populate delete file metrics - Enhance validate_duplicate_files() for both data and delete files - Add comprehensive unit tests This lays the groundwork for operations like RowDelta that need to atomically commit both data files and delete files. --- crates/iceberg/src/transaction/append.rs | 1 + crates/iceberg/src/transaction/snapshot.rs | 398 +++++++++++++++++++-- 2 files changed, 372 insertions(+), 27 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..b6f1330e03 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -90,6 +90,7 @@ impl TransactionAction for FastAppendAction { self.key_metadata.clone(), self.snapshot_properties.clone(), self.added_data_files.clone(), + vec![], ); // validate added files diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..1faa0b3bff 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -23,10 +23,10 @@ use uuid::Uuid; use crate::error::Result; use crate::spec::{ - DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, - ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, Snapshot, - SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, StructType, Summary, - TableProperties, update_snapshot_summaries, + DataContentType, DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, + ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, + Operation, Snapshot, SnapshotReference, SnapshotRetention, SnapshotSummaryCollector, Struct, + StructType, Summary, TableProperties, update_snapshot_summaries, }; use crate::table::Table; use crate::transaction::ActionCommit; @@ -114,6 +114,7 @@ pub(crate) struct SnapshotProducer<'a> { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + added_delete_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). @@ -127,6 +128,7 @@ impl<'a> SnapshotProducer<'a> { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + added_delete_files: Vec, ) -> Self { Self { table, @@ -135,13 +137,14 @@ impl<'a> SnapshotProducer<'a> { key_metadata, snapshot_properties, added_data_files, + added_delete_files, manifest_counter: (0..), } } pub(crate) fn validate_added_data_files(&self) -> Result<()> { for data_file in &self.added_data_files { - if data_file.content_type() != crate::spec::DataContentType::Data { + if data_file.content_type() != DataContentType::Data { return Err(Error::new( ErrorKind::DataInvalid, "Only data content type is allowed for fast append", @@ -163,14 +166,104 @@ impl<'a> SnapshotProducer<'a> { Ok(()) } + /// Validates added delete files. + /// + /// Checks that: + /// - Delete files are not used with format version 1 + /// - Delete files have valid content types (PositionDeletes or EqualityDeletes) + /// - Equality delete files have equality_ids set + /// - Delete files reference valid partition specs + /// - Partition values are compatible with partition types + pub(crate) fn validate_added_delete_files(&self) -> Result<()> { + let format_version = self.table.metadata().format_version(); + if format_version == FormatVersion::V1 && !self.added_delete_files.is_empty() { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Delete files are not supported in format version 1. Upgrade the table to format version 2 or later.", + )); + } + + for delete_file in &self.added_delete_files { + match delete_file.content_type() { + DataContentType::PositionDeletes => {} + DataContentType::EqualityDeletes => { + let ids = delete_file.equality_ids().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Equality delete files must have equality_ids set", + ) + })?; + if ids.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Equality delete files must have equality_ids set", + )); + } + } + DataContentType::Data => { + return Err(Error::new( + ErrorKind::DataInvalid, + "Data content type is not allowed for delete files", + )); + } + } + + if self.table.metadata().default_partition_spec_id() != delete_file.partition_spec_id { + return Err(Error::new( + ErrorKind::DataInvalid, + "Delete file partition spec id does not match table default partition spec id", + )); + } + Self::validate_partition_value( + delete_file.partition(), + self.table.metadata().default_partition_type(), + )?; + } + + Ok(()) + } + pub(crate) async fn validate_duplicate_files(&self) -> Result<()> { - let new_files: HashSet<&str> = self - .added_data_files - .iter() - .map(|df| df.file_path.as_str()) - .collect(); + let mut seen_data_files: HashSet<&str> = HashSet::new(); + let mut intra_batch_data_duplicates = Vec::new(); + for data_file in &self.added_data_files { + if !seen_data_files.insert(data_file.file_path.as_str()) { + intra_batch_data_duplicates.push(data_file.file_path.clone()); + } + } + if !intra_batch_data_duplicates.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add duplicate data files in the same batch: {}", + intra_batch_data_duplicates.join(", ") + ), + )); + } + + let mut seen_delete_files: HashSet<&str> = HashSet::new(); + let mut intra_batch_delete_duplicates = Vec::new(); + for delete_file in &self.added_delete_files { + if !seen_delete_files.insert(delete_file.file_path.as_str()) { + intra_batch_delete_duplicates.push(delete_file.file_path.clone()); + } + } + if !intra_batch_delete_duplicates.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add duplicate delete files in the same batch: {}", + intra_batch_delete_duplicates.join(", ") + ), + )); + } + + let new_data_files = seen_data_files; + let new_delete_files = seen_delete_files; + + let mut duplicate_data_files = Vec::new(); + let mut duplicate_delete_files = Vec::new(); - let mut referenced_files = Vec::new(); if let Some(current_snapshot) = self.table.metadata().current_snapshot() { let manifest_list = current_snapshot .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) @@ -181,19 +274,34 @@ impl<'a> SnapshotProducer<'a> { .await?; for entry in manifest.entries() { let file_path = entry.file_path(); - if new_files.contains(file_path) && entry.is_alive() { - referenced_files.push(file_path.to_string()); + if entry.is_alive() { + if new_data_files.contains(file_path) { + duplicate_data_files.push(file_path.to_string()); + } + if new_delete_files.contains(file_path) { + duplicate_delete_files.push(file_path.to_string()); + } } } } } - if !referenced_files.is_empty() { + if !duplicate_data_files.is_empty() { return Err(Error::new( ErrorKind::DataInvalid, format!( - "Cannot add files that are already referenced by table, files: {}", - referenced_files.join(", ") + "Cannot add data files that are already referenced by table, files: {}", + duplicate_data_files.join(", ") + ), + )); + } + + if !duplicate_delete_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add delete files that are already referenced by table, files: {}", + duplicate_delete_files.join(", ") ), )); } @@ -319,34 +427,62 @@ impl<'a> SnapshotProducer<'a> { writer.write_manifest_file().await } + async fn write_delete_manifest(&mut self) -> Result { + let added_delete_files = std::mem::take(&mut self.added_delete_files); + if added_delete_files.is_empty() { + return Err(Error::new( + ErrorKind::PreconditionFailed, + "No added delete files found when write a delete manifest file", + )); + } + + let snapshot_id = self.snapshot_id; + let format_version = self.table.metadata().format_version(); + let manifest_entries = added_delete_files.into_iter().map(|delete_file| { + let builder = ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .data_file(delete_file); + if format_version == FormatVersion::V1 { + builder.snapshot_id(snapshot_id).build() + } else { + builder.build() + } + }); + let mut writer = self.new_manifest_writer(ManifestContentType::Deletes)?; + for entry in manifest_entries { + writer.add_entry(entry)?; + } + writer.write_manifest_file().await + } + async fn manifest_file( &mut self, snapshot_produce_operation: &OP, manifest_process: &MP, ) -> Result> { - // Assert current snapshot producer contains new content to add to new snapshot. - // - // TODO: Allowing snapshot property setup with no added data files is a workaround. - // We should clean it up after all necessary actions are supported. - // For details, please refer to https://github.com/apache/iceberg-rust/issues/1548 - if self.added_data_files.is_empty() && self.snapshot_properties.is_empty() { + let has_data_files = !self.added_data_files.is_empty(); + let has_delete_files = !self.added_delete_files.is_empty(); + let has_properties = !self.snapshot_properties.is_empty(); + + if !has_data_files && !has_delete_files && !has_properties { return Err(Error::new( ErrorKind::PreconditionFailed, - "No added data files or added snapshot properties found when write a manifest file", + "No added data files, delete files, or snapshot properties found when write a manifest file", )); } let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; let mut manifest_files = existing_manifests; - // Process added entries. - if !self.added_data_files.is_empty() { + if has_data_files { let added_manifest = self.write_added_manifest().await?; manifest_files.push(added_manifest); } - // # TODO - // Support process delete entries. + if has_delete_files { + let delete_manifest = self.write_delete_manifest().await?; + manifest_files.push(delete_manifest); + } let manifest_files = manifest_process.process_manifests(self, manifest_files); Ok(manifest_files) @@ -383,6 +519,14 @@ impl<'a> SnapshotProducer<'a> { ); } + for delete_file in &self.added_delete_files { + summary_collector.add_file( + delete_file, + table_metadata.current_schema().clone(), + table_metadata.default_partition_spec().clone(), + ); + } + let previous_snapshot = table_metadata .snapshot_by_id(self.snapshot_id) .and_then(|snapshot| snapshot.parent_snapshot_id()) @@ -511,3 +655,203 @@ impl<'a> SnapshotProducer<'a> { Ok(ActionCommit::new(updates, requirements)) } } + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::*; + use crate::spec::{DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct}; + use crate::transaction::tests::{make_v1_table, make_v2_minimal_table}; + + fn make_position_delete_file(spec_id: i32) -> DataFile { + DataFileBuilder::default() + .content(DataContentType::PositionDeletes) + .file_path("test/delete-1.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(spec_id) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap() + } + + fn make_equality_delete_file_with_ids(spec_id: i32, equality_ids: Option>) -> DataFile { + DataFileBuilder::default() + .content(DataContentType::EqualityDeletes) + .file_path("test/eq-delete-1.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(spec_id) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .equality_ids(equality_ids) + .build() + .unwrap() + } + + fn make_data_file_as_delete(spec_id: i32) -> DataFile { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/data-1.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(spec_id) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap() + } + + #[test] + fn test_validate_delete_files_rejected_in_v1() { + let table = make_v1_table(); + let spec_id = table.metadata().default_partition_spec_id(); + let delete_file = make_position_delete_file(spec_id); + + let producer = SnapshotProducer::new( + &table, + Uuid::now_v7(), + None, + HashMap::new(), + vec![], + vec![delete_file], + ); + + let result = producer.validate_added_delete_files(); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err + .message() + .contains("Delete files are not supported in format version 1")); + } + + #[test] + fn test_validate_equality_delete_requires_equality_ids() { + let table = make_v2_minimal_table(); + let spec_id = table.metadata().default_partition_spec_id(); + let delete_file = make_equality_delete_file_with_ids(spec_id, None); + + let producer = SnapshotProducer::new( + &table, + Uuid::now_v7(), + None, + HashMap::new(), + vec![], + vec![delete_file], + ); + + let result = producer.validate_added_delete_files(); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err + .message() + .contains("Equality delete files must have equality_ids set")); + } + + #[test] + fn test_validate_equality_delete_rejects_empty_equality_ids() { + let table = make_v2_minimal_table(); + let spec_id = table.metadata().default_partition_spec_id(); + let delete_file = make_equality_delete_file_with_ids(spec_id, Some(vec![])); + + let producer = SnapshotProducer::new( + &table, + Uuid::now_v7(), + None, + HashMap::new(), + vec![], + vec![delete_file], + ); + + let result = producer.validate_added_delete_files(); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err + .message() + .contains("Equality delete files must have equality_ids set")); + } + + #[test] + fn test_validate_delete_files_rejects_data_content_type() { + let table = make_v2_minimal_table(); + let spec_id = table.metadata().default_partition_spec_id(); + let data_file = make_data_file_as_delete(spec_id); + + let producer = SnapshotProducer::new( + &table, + Uuid::now_v7(), + None, + HashMap::new(), + vec![], + vec![data_file], + ); + + let result = producer.validate_added_delete_files(); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err + .message() + .contains("Data content type is not allowed for delete files")); + } + + #[test] + fn test_validate_position_delete_file_succeeds() { + let table = make_v2_minimal_table(); + let spec_id = table.metadata().default_partition_spec_id(); + let delete_file = make_position_delete_file(spec_id); + + let producer = SnapshotProducer::new( + &table, + Uuid::now_v7(), + None, + HashMap::new(), + vec![], + vec![delete_file], + ); + + let result = producer.validate_added_delete_files(); + assert!(result.is_ok()); + } + + #[test] + fn test_validate_equality_delete_file_with_ids_succeeds() { + let table = make_v2_minimal_table(); + let spec_id = table.metadata().default_partition_spec_id(); + let delete_file = make_equality_delete_file_with_ids(spec_id, Some(vec![1])); + + let producer = SnapshotProducer::new( + &table, + Uuid::now_v7(), + None, + HashMap::new(), + vec![], + vec![delete_file], + ); + + let result = producer.validate_added_delete_files(); + assert!(result.is_ok()); + } + + #[test] + fn test_empty_delete_files_returns_error() { + let table = make_v2_minimal_table(); + + let mut producer = SnapshotProducer::new( + &table, + Uuid::now_v7(), + None, + HashMap::new(), + vec![], + vec![], + ); + + let result = futures::executor::block_on(producer.write_delete_manifest()); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err + .message() + .contains("No added delete files found when write a delete manifest file")); + } +} From 26003b0b004cd201ac3dc86a795df4b4939f5d39 Mon Sep 17 00:00:00 2001 From: Ethan Urbanski Date: Mon, 5 Jan 2026 00:16:53 -0500 Subject: [PATCH 2/2] fix(snapshot): strengthen delete file validation and wire into FastAppend - Add cross-type duplicate check: reject same path in data and delete files - Reject equality_ids on position delete files (spec compliance) - Remove unreachable V1 code path in write_delete_manifest - Add TODO for partition spec validation strictness (partition evolution) - Wire validate_added_delete_files into FastAppendAction - Add tests for cross-type duplicates and position delete validation - Apply rustfmt formatting --- crates/iceberg/src/transaction/append.rs | 1 + crates/iceberg/src/transaction/snapshot.rs | 237 ++++++++++++++------- 2 files changed, 159 insertions(+), 79 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index b6f1330e03..896e8f140a 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -95,6 +95,7 @@ impl TransactionAction for FastAppendAction { // validate added files snapshot_producer.validate_added_data_files()?; + snapshot_producer.validate_added_delete_files()?; // Checks duplicate files if self.check_duplicate { diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 1faa0b3bff..b7b147e2ad 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -185,7 +185,14 @@ impl<'a> SnapshotProducer<'a> { for delete_file in &self.added_delete_files { match delete_file.content_type() { - DataContentType::PositionDeletes => {} + DataContentType::PositionDeletes => { + if delete_file.equality_ids().is_some() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Position delete files must not have equality_ids set", + )); + } + } DataContentType::EqualityDeletes => { let ids = delete_file.equality_ids().ok_or_else(|| { Error::new( @@ -208,6 +215,9 @@ impl<'a> SnapshotProducer<'a> { } } + // TODO: This validation is too strict for partition evolution scenarios where delete + // files may reference older partition specs. Once manifest-per-spec is implemented, + // relax this to check that the spec_id exists rather than matching the default. if self.table.metadata().default_partition_spec_id() != delete_file.partition_spec_id { return Err(Error::new( ErrorKind::DataInvalid, @@ -261,6 +271,21 @@ impl<'a> SnapshotProducer<'a> { let new_data_files = seen_data_files; let new_delete_files = seen_delete_files; + // Check for cross-type duplicates: same path cannot appear in both data and delete files + let cross_type_duplicates: Vec<_> = new_data_files + .intersection(&new_delete_files) + .map(|s| s.to_string()) + .collect(); + if !cross_type_duplicates.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add the same file path as both a data file and a delete file: {}", + cross_type_duplicates.join(", ") + ), + )); + } + let mut duplicate_data_files = Vec::new(); let mut duplicate_delete_files = Vec::new(); @@ -436,17 +461,11 @@ impl<'a> SnapshotProducer<'a> { )); } - let snapshot_id = self.snapshot_id; - let format_version = self.table.metadata().format_version(); let manifest_entries = added_delete_files.into_iter().map(|delete_file| { - let builder = ManifestEntry::builder() + ManifestEntry::builder() .status(crate::spec::ManifestStatus::Added) - .data_file(delete_file); - if format_version == FormatVersion::V1 { - builder.snapshot_id(snapshot_id).build() - } else { - builder.build() - } + .data_file(delete_file) + .build() }); let mut writer = self.new_manifest_writer(ManifestContentType::Deletes)?; for entry in manifest_entries { @@ -677,7 +696,10 @@ mod tests { .unwrap() } - fn make_equality_delete_file_with_ids(spec_id: i32, equality_ids: Option>) -> DataFile { + fn make_equality_delete_file_with_ids( + spec_id: i32, + equality_ids: Option>, + ) -> DataFile { DataFileBuilder::default() .content(DataContentType::EqualityDeletes) .file_path("test/eq-delete-1.parquet".to_string()) @@ -710,21 +732,18 @@ mod tests { let spec_id = table.metadata().default_partition_spec_id(); let delete_file = make_position_delete_file(spec_id); - let producer = SnapshotProducer::new( - &table, - Uuid::now_v7(), - None, - HashMap::new(), - vec![], - vec![delete_file], - ); + let producer = + SnapshotProducer::new(&table, Uuid::now_v7(), None, HashMap::new(), vec![], vec![ + delete_file, + ]); let result = producer.validate_added_delete_files(); assert!(result.is_err()); let err = result.unwrap_err(); - assert!(err - .message() - .contains("Delete files are not supported in format version 1")); + assert!( + err.message() + .contains("Delete files are not supported in format version 1") + ); } #[test] @@ -733,21 +752,18 @@ mod tests { let spec_id = table.metadata().default_partition_spec_id(); let delete_file = make_equality_delete_file_with_ids(spec_id, None); - let producer = SnapshotProducer::new( - &table, - Uuid::now_v7(), - None, - HashMap::new(), - vec![], - vec![delete_file], - ); + let producer = + SnapshotProducer::new(&table, Uuid::now_v7(), None, HashMap::new(), vec![], vec![ + delete_file, + ]); let result = producer.validate_added_delete_files(); assert!(result.is_err()); let err = result.unwrap_err(); - assert!(err - .message() - .contains("Equality delete files must have equality_ids set")); + assert!( + err.message() + .contains("Equality delete files must have equality_ids set") + ); } #[test] @@ -756,21 +772,18 @@ mod tests { let spec_id = table.metadata().default_partition_spec_id(); let delete_file = make_equality_delete_file_with_ids(spec_id, Some(vec![])); - let producer = SnapshotProducer::new( - &table, - Uuid::now_v7(), - None, - HashMap::new(), - vec![], - vec![delete_file], - ); + let producer = + SnapshotProducer::new(&table, Uuid::now_v7(), None, HashMap::new(), vec![], vec![ + delete_file, + ]); let result = producer.validate_added_delete_files(); assert!(result.is_err()); let err = result.unwrap_err(); - assert!(err - .message() - .contains("Equality delete files must have equality_ids set")); + assert!( + err.message() + .contains("Equality delete files must have equality_ids set") + ); } #[test] @@ -779,21 +792,18 @@ mod tests { let spec_id = table.metadata().default_partition_spec_id(); let data_file = make_data_file_as_delete(spec_id); - let producer = SnapshotProducer::new( - &table, - Uuid::now_v7(), - None, - HashMap::new(), - vec![], - vec![data_file], - ); + let producer = + SnapshotProducer::new(&table, Uuid::now_v7(), None, HashMap::new(), vec![], vec![ + data_file, + ]); let result = producer.validate_added_delete_files(); assert!(result.is_err()); let err = result.unwrap_err(); - assert!(err - .message() - .contains("Data content type is not allowed for delete files")); + assert!( + err.message() + .contains("Data content type is not allowed for delete files") + ); } #[test] @@ -802,14 +812,10 @@ mod tests { let spec_id = table.metadata().default_partition_spec_id(); let delete_file = make_position_delete_file(spec_id); - let producer = SnapshotProducer::new( - &table, - Uuid::now_v7(), - None, - HashMap::new(), - vec![], - vec![delete_file], - ); + let producer = + SnapshotProducer::new(&table, Uuid::now_v7(), None, HashMap::new(), vec![], vec![ + delete_file, + ]); let result = producer.validate_added_delete_files(); assert!(result.is_ok()); @@ -821,37 +827,110 @@ mod tests { let spec_id = table.metadata().default_partition_spec_id(); let delete_file = make_equality_delete_file_with_ids(spec_id, Some(vec![1])); - let producer = SnapshotProducer::new( - &table, - Uuid::now_v7(), - None, - HashMap::new(), - vec![], - vec![delete_file], - ); + let producer = + SnapshotProducer::new(&table, Uuid::now_v7(), None, HashMap::new(), vec![], vec![ + delete_file, + ]); let result = producer.validate_added_delete_files(); assert!(result.is_ok()); } #[test] - fn test_empty_delete_files_returns_error() { + fn test_write_delete_manifest_precondition_empty_files() { let table = make_v2_minimal_table(); - let mut producer = SnapshotProducer::new( + let mut producer = + SnapshotProducer::new(&table, Uuid::now_v7(), None, HashMap::new(), vec![], vec![]); + + let result = futures::executor::block_on(producer.write_delete_manifest()); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.message() + .contains("No added delete files found when write a delete manifest file") + ); + } + + fn make_data_file_with_path(spec_id: i32, path: &str) -> DataFile { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(path.to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(spec_id) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap() + } + + fn make_position_delete_file_with_path(spec_id: i32, path: &str) -> DataFile { + DataFileBuilder::default() + .content(DataContentType::PositionDeletes) + .file_path(path.to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(spec_id) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap() + } + + #[test] + fn test_validate_cross_type_duplicate_files_rejected() { + let table = make_v2_minimal_table(); + let spec_id = table.metadata().default_partition_spec_id(); + let shared_path = "test/shared-file.parquet"; + let data_file = make_data_file_with_path(spec_id, shared_path); + let delete_file = make_position_delete_file_with_path(spec_id, shared_path); + + let producer = SnapshotProducer::new( &table, Uuid::now_v7(), None, HashMap::new(), - vec![], - vec![], + vec![data_file], + vec![delete_file], ); - let result = futures::executor::block_on(producer.write_delete_manifest()); + let result = futures::executor::block_on(producer.validate_duplicate_files()); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.message() + .contains("Cannot add the same file path as both a data file and a delete file") + ); + } + + #[test] + fn test_validate_position_delete_rejects_equality_ids() { + let table = make_v2_minimal_table(); + let spec_id = table.metadata().default_partition_spec_id(); + let delete_file = DataFileBuilder::default() + .content(DataContentType::PositionDeletes) + .file_path("test/pos-delete-with-eq-ids.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(spec_id) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .equality_ids(Some(vec![1, 2])) + .build() + .unwrap(); + + let producer = + SnapshotProducer::new(&table, Uuid::now_v7(), None, HashMap::new(), vec![], vec![ + delete_file, + ]); + + let result = producer.validate_added_delete_files(); assert!(result.is_err()); let err = result.unwrap_err(); - assert!(err - .message() - .contains("No added delete files found when write a delete manifest file")); + assert!( + err.message() + .contains("Position delete files must not have equality_ids set") + ); } }