Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions crates/iceberg/src/spec/table_metadata_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,9 @@ impl TableMetadataBuilder {
// Check if partition field names conflict with schema field names across all schemas
self.validate_partition_field_names(&unbound_spec)?;

// Reuse field IDs for equivalent fields from existing partition specs
let unbound_spec = self.reuse_partition_field_ids(unbound_spec)?;

let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)?
.with_last_assigned_field_id(self.metadata.last_partition_id)
.build()?;
Expand Down Expand Up @@ -875,6 +878,44 @@ impl TableMetadataBuilder {
Ok(self)
}

/// Reuse partition field IDs for equivalent fields from existing partition specs.
///
/// According to the Iceberg spec, partition field IDs must be reused if an existing
/// partition spec contains an equivalent field (same source_id and transform).
fn reuse_partition_field_ids(
&self,
unbound_spec: UnboundPartitionSpec,
) -> Result<UnboundPartitionSpec> {
// Build a map of (source_id, transform) -> field_id from existing specs
let equivalent_field_ids: HashMap<_, _> = self
.metadata
.partition_specs
.values()
.flat_map(|spec| spec.fields())
.map(|field| ((field.source_id, &field.transform), field.field_id))
.collect();

// Create new fields with reused field IDs where possible
let fields = unbound_spec
.fields
.into_iter()
.map(|mut field| {
if field.field_id.is_none()
&& let Some(&existing_field_id) =
equivalent_field_ids.get(&(field.source_id, &field.transform))
{
field.field_id = Some(existing_field_id);
}
field
})
.collect();

Ok(UnboundPartitionSpec {
spec_id: unbound_spec.spec_id,
fields,
})
}

/// Set the default partition spec.
///
/// # Errors
Expand Down Expand Up @@ -3505,4 +3546,63 @@ mod tests {
let keys = build_result.metadata.encryption_keys_iter();
assert_eq!(keys.len(), 0);
}

#[test]
fn test_partition_field_id_reuse_across_specs() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
NestedField::required(2, "data", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(3, "timestamp", Type::Primitive(PrimitiveType::Timestamp))
.into(),
])
.build()
.unwrap();

// Create initial table with spec 0: identity(id) -> field_id = 1000
let initial_spec = UnboundPartitionSpec::builder()
.add_partition_field(1, "id", Transform::Identity)
.unwrap()
.build();

let mut metadata = TableMetadataBuilder::new(
schema,
initial_spec,
SortOrder::unsorted_order(),
"s3://bucket/table".to_string(),
FormatVersion::V2,
HashMap::new(),
)
.unwrap()
.build()
.unwrap()
.metadata;

// Add spec 1: bucket(data) -> field_id = 1001
let spec1 = UnboundPartitionSpec::builder()
.add_partition_field(2, "data_bucket", Transform::Bucket(10))
.unwrap()
.build();
let builder = metadata.into_builder(Some("s3://bucket/table/metadata/v1.json".to_string()));
let result = builder.add_partition_spec(spec1).unwrap().build().unwrap();
metadata = result.metadata;

// Add spec 2: identity(id) + bucket(data) + year(timestamp)
// Should reuse field_id 1000 for identity(id) and 1001 for bucket(data)
let spec2 = UnboundPartitionSpec::builder()
.add_partition_field(1, "id", Transform::Identity) // Should reuse 1000
.unwrap()
.add_partition_field(2, "data_bucket", Transform::Bucket(10)) // Should reuse 1001
.unwrap()
.add_partition_field(3, "year", Transform::Year) // Should get new 1002
.unwrap()
.build();
let builder = metadata.into_builder(Some("s3://bucket/table/metadata/v2.json".to_string()));
let result = builder.add_partition_spec(spec2).unwrap().build().unwrap();

// Verify field ID reuse: spec 2 should reuse IDs from specs 0 and 1, assign new ID for new field
let spec2 = result.metadata.partition_spec_by_id(2).unwrap();
let field_ids: Vec<i32> = spec2.fields().iter().map(|f| f.field_id).collect();
assert_eq!(field_ids, vec![1000, 1001, 1002]); // Reused 1000, 1001; new 1002
}
}
2 changes: 1 addition & 1 deletion crates/iceberg/src/spec/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::transform::{BoxedTransformFunction, create_transform_function};
/// predicates and partition predicates.
///
/// All transforms must return `null` for a `null` input value.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)]
pub enum Transform {
/// Source value, unmodified
///
Expand Down
Loading