diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index 3db327d48a..80549fae19 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -833,6 +833,9 @@ impl TableMetadataBuilder { // Check if partition field names conflict with schema field names across all schemas self.validate_partition_field_names(&unbound_spec)?; + // Reuse field IDs for equivalent fields from existing partition specs + let unbound_spec = self.reuse_partition_field_ids(unbound_spec)?; + let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)? .with_last_assigned_field_id(self.metadata.last_partition_id) .build()?; @@ -875,6 +878,44 @@ impl TableMetadataBuilder { Ok(self) } + /// Reuse partition field IDs for equivalent fields from existing partition specs. + /// + /// According to the Iceberg spec, partition field IDs must be reused if an existing + /// partition spec contains an equivalent field (same source_id and transform). + fn reuse_partition_field_ids( + &self, + unbound_spec: UnboundPartitionSpec, + ) -> Result { + // Build a map of (source_id, transform) -> field_id from existing specs + let equivalent_field_ids: HashMap<_, _> = self + .metadata + .partition_specs + .values() + .flat_map(|spec| spec.fields()) + .map(|field| ((field.source_id, &field.transform), field.field_id)) + .collect(); + + // Create new fields with reused field IDs where possible + let fields = unbound_spec + .fields + .into_iter() + .map(|mut field| { + if field.field_id.is_none() + && let Some(&existing_field_id) = + equivalent_field_ids.get(&(field.source_id, &field.transform)) + { + field.field_id = Some(existing_field_id); + } + field + }) + .collect(); + + Ok(UnboundPartitionSpec { + spec_id: unbound_spec.spec_id, + fields, + }) + } + /// Set the default partition spec. /// /// # Errors @@ -3505,4 +3546,63 @@ mod tests { let keys = build_result.metadata.encryption_keys_iter(); assert_eq!(keys.len(), 0); } + + #[test] + fn test_partition_field_id_reuse_across_specs() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "data", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "timestamp", Type::Primitive(PrimitiveType::Timestamp)) + .into(), + ]) + .build() + .unwrap(); + + // Create initial table with spec 0: identity(id) -> field_id = 1000 + let initial_spec = UnboundPartitionSpec::builder() + .add_partition_field(1, "id", Transform::Identity) + .unwrap() + .build(); + + let mut metadata = TableMetadataBuilder::new( + schema, + initial_spec, + SortOrder::unsorted_order(), + "s3://bucket/table".to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata; + + // Add spec 1: bucket(data) -> field_id = 1001 + let spec1 = UnboundPartitionSpec::builder() + .add_partition_field(2, "data_bucket", Transform::Bucket(10)) + .unwrap() + .build(); + let builder = metadata.into_builder(Some("s3://bucket/table/metadata/v1.json".to_string())); + let result = builder.add_partition_spec(spec1).unwrap().build().unwrap(); + metadata = result.metadata; + + // Add spec 2: identity(id) + bucket(data) + year(timestamp) + // Should reuse field_id 1000 for identity(id) and 1001 for bucket(data) + let spec2 = UnboundPartitionSpec::builder() + .add_partition_field(1, "id", Transform::Identity) // Should reuse 1000 + .unwrap() + .add_partition_field(2, "data_bucket", Transform::Bucket(10)) // Should reuse 1001 + .unwrap() + .add_partition_field(3, "year", Transform::Year) // Should get new 1002 + .unwrap() + .build(); + let builder = metadata.into_builder(Some("s3://bucket/table/metadata/v2.json".to_string())); + let result = builder.add_partition_spec(spec2).unwrap().build().unwrap(); + + // Verify field ID reuse: spec 2 should reuse IDs from specs 0 and 1, assign new ID for new field + let spec2 = result.metadata.partition_spec_by_id(2).unwrap(); + let field_ids: Vec = spec2.fields().iter().map(|f| f.field_id).collect(); + assert_eq!(field_ids, vec![1000, 1001, 1002]); // Reused 1000, 1001; new 1002 + } } diff --git a/crates/iceberg/src/spec/transform.rs b/crates/iceberg/src/spec/transform.rs index 354dc1889c..026b12613b 100644 --- a/crates/iceberg/src/spec/transform.rs +++ b/crates/iceberg/src/spec/transform.rs @@ -47,7 +47,7 @@ use crate::transform::{BoxedTransformFunction, create_transform_function}; /// predicates and partition predicates. /// /// All transforms must return `null` for a `null` input value. -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] pub enum Transform { /// Source value, unmodified ///