Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/src/main/java/org/apache/iceberg/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ public Schema(List<NestedField> columns, Set<Integer> identifierFieldIds, TypeUt
this(DEFAULT_SCHEMA_ID, columns, identifierFieldIds, getId);
}

public Schema(List<NestedField> columns, TypeUtil.GetID getId) {
this(DEFAULT_SCHEMA_ID, columns, ImmutableSet.of(), getId);
}

public Schema(int schemaId, List<NestedField> columns) {
this(schemaId, columns, ImmutableSet.of());
}
Expand Down
46 changes: 46 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,52 @@ public interface GetID {
int get(int oldId);
}

/**
* Creates a function that reassigns specified field IDs.
*
* <p>This is useful for merging schemas where some field IDs in one schema might conflict with
* IDs already in use by another schema. The function will reassign the provided IDs to new unused
* IDs, while preserving other IDs.
*
* @param conflictingIds the set of conflicting field IDs that should be reassigned
* @param allUsedIds the set of field IDs that are already in use and cannot be reused
* @return a function that reassigns conflicting field IDs while preserving others
*/
public static GetID reassignConflictingIds(Set<Integer> conflictingIds, Set<Integer> allUsedIds) {
return new ReassignConflictingIds(conflictingIds, allUsedIds);
}

private static class ReassignConflictingIds implements GetID {
private final Set<Integer> conflictingIds;
private final Set<Integer> allUsedIds;
private final AtomicInteger nextId;

private ReassignConflictingIds(Set<Integer> conflictingIds, Set<Integer> allUsedIds) {
this.conflictingIds = conflictingIds;
this.allUsedIds = allUsedIds;
this.nextId = new AtomicInteger();
}

@Override
public int get(int oldId) {
if (conflictingIds.contains(oldId)) {
return nextAvailableId();
} else {
return oldId;
}
}

private int nextAvailableId() {
int candidateId = nextId.incrementAndGet();

while (allUsedIds.contains(candidateId)) {
candidateId = nextId.incrementAndGet();
}

return candidateId;
}
}

public static class SchemaVisitor<T> {
public void beforeField(Types.NestedField field) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalog.Column;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/** Helper methods for working with Spark/Hive metadata. */
Expand Down Expand Up @@ -369,4 +370,8 @@ public static Map<Integer, String> indexQuotedNameById(Schema schema) {
Function<String, String> quotingFunc = name -> String.format("`%s`", name.replace("`", "``"));
return TypeUtil.indexQuotedNameById(schema.asStruct(), quotingFunc);
}

public static StructType toStructType(List<StructField> fields) {
return new StructType(fields.toArray(new StructField[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.BatchScan;
import org.apache.iceberg.FileScanTask;
Expand All @@ -49,7 +47,6 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.InMemoryMetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.Spark3Util;
Expand All @@ -60,6 +57,7 @@
import org.apache.iceberg.spark.SparkV2Filters;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.TypeUtil.GetID;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -97,10 +95,10 @@ public class SparkScanBuilder
private final Table table;
private final CaseInsensitiveStringMap options;
private final SparkReadConf readConf;
private final List<String> metaColumns = Lists.newArrayList();
private final Set<String> metaFieldNames = Sets.newLinkedHashSet();
private final InMemoryMetricsReporter metricsReporter;

private Schema schema;
private Schema projection;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am later going to keep the entire schema here as well.
This is a projection, defaulted to the schema initially.

private boolean caseSensitive;
private List<Expression> filterExpressions = null;
private Predicate[] pushedPredicates = NO_PREDICATES;
Expand All @@ -114,7 +112,7 @@ public class SparkScanBuilder
CaseInsensitiveStringMap options) {
this.spark = spark;
this.table = table;
this.schema = schema;
this.projection = schema;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like the rename

this.options = options;
this.readConf = new SparkReadConf(spark, table, branch, options);
this.caseSensitive = readConf.caseSensitive();
Expand Down Expand Up @@ -169,7 +167,7 @@ public Predicate[] pushPredicates(Predicate[] predicates) {

if (expr != null) {
// try binding the expression to ensure it can be pushed down
Binder.bind(schema.asStruct(), expr, caseSensitive);
Binder.bind(projection.asStruct(), expr, caseSensitive);
expressions.add(expr);
pushableFilters.add(predicate);
}
Expand Down Expand Up @@ -211,7 +209,7 @@ public boolean pushAggregation(Aggregation aggregation) {
try {
Expression expr = SparkAggregates.convert(aggregateFunc);
if (expr != null) {
Expression bound = Binder.bind(schema.asStruct(), expr, caseSensitive);
Expression bound = Binder.bind(projection.asStruct(), expr, caseSensitive);
expressions.add((BoundAggregate<?, ?>) bound);
} else {
LOG.info(
Expand All @@ -232,7 +230,7 @@ public boolean pushAggregation(Aggregation aggregation) {
}

org.apache.iceberg.Scan scan =
buildIcebergBatchScan(true /* include Column Stats */, schemaWithMetadataColumns());
buildIcebergBatchScan(true /* include Column Stats */, projectionWithMetadataColumns());

try (CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles()) {
for (FileScanTask task : fileScanTasks) {
Expand Down Expand Up @@ -321,74 +319,63 @@ private boolean metricsModeSupportsAggregatePushDown(List<BoundAggregate<?, ?>>

@Override
public void pruneColumns(StructType requestedSchema) {
StructType requestedProjection =
new StructType(
Stream.of(requestedSchema.fields())
.filter(field -> MetadataColumns.nonMetadataColumn(field.name()))
.toArray(StructField[]::new));

// the projection should include all columns that will be returned, including those only used in
// filters
this.schema =
SparkSchemaUtil.prune(schema, requestedProjection, filterExpression(), caseSensitive);

Stream.of(requestedSchema.fields())
.map(StructField::name)
.filter(MetadataColumns::isMetadataColumn)
.distinct()
.forEach(metaColumns::add);
}

private Schema schemaWithMetadataColumns() {
// metadata columns
List<Types.NestedField> metadataFields =
metaColumns.stream()
.distinct()
.map(name -> MetadataColumns.metadataColumn(table, name))
.collect(Collectors.toList());
Schema metadataSchema = calculateMetadataSchema(metadataFields);

// schema or rows returned by readers
return TypeUtil.join(schema, metadataSchema);
}

private Schema calculateMetadataSchema(List<Types.NestedField> metaColumnFields) {
Optional<Types.NestedField> partitionField =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this logic is not specific to Spark and it was a bit harder to navigate.

metaColumnFields.stream()
.filter(f -> MetadataColumns.PARTITION_COLUMN_ID == f.fieldId())
.findFirst();

// only calculate potential column id collision if partition metadata column was requested
if (!partitionField.isPresent()) {
return new Schema(metaColumnFields);
}

Set<Integer> idsToReassign =
TypeUtil.indexById(partitionField.get().type().asStructType()).keySet();

// Calculate used ids by union metadata columns with all base table schemas
Set<Integer> currentlyUsedIds =
metaColumnFields.stream().map(Types.NestedField::fieldId).collect(Collectors.toSet());
Set<Integer> allUsedIds =
table.schemas().values().stream()
.map(currSchema -> TypeUtil.indexById(currSchema.asStruct()).keySet())
.reduce(currentlyUsedIds, Sets::union);

// Reassign selected ids to deduplicate with used ids.
AtomicInteger nextId = new AtomicInteger();
return new Schema(
metaColumnFields,
ImmutableSet.of(),
oldId -> {
if (!idsToReassign.contains(oldId)) {
return oldId;
}
int candidate = nextId.incrementAndGet();
while (allUsedIds.contains(candidate)) {
candidate = nextId.incrementAndGet();
}
return candidate;
});
List<StructField> dataFields = Lists.newArrayList();

for (StructField field : requestedSchema.fields()) {
if (MetadataColumns.isMetadataColumn(field.name())) {
metaFieldNames.add(field.name());
} else {
dataFields.add(field);
}
}

StructType requestedProjection = SparkSchemaUtil.toStructType(dataFields);
this.projection = prune(projection, requestedProjection);
}

// the projection should include all columns that will be returned,
// including those only used in filters
private Schema prune(Schema schema, StructType requestedSchema) {
return SparkSchemaUtil.prune(schema, requestedSchema, filterExpression(), caseSensitive);
}

// schema of rows that must be returned by readers
protected Schema projectionWithMetadataColumns() {
Copy link
Contributor

@dramaticlly dramaticlly Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I was OOTO last week.

Curious, we changed visibility from private schemaWithMetadataColumns to protected projectionWithMetadataColumns, is it due to the bigger refactoring in #15240?

return TypeUtil.join(projection, calculateMetadataSchema());
}

// computes metadata schema avoiding conflicts between partition and data field IDs
private Schema calculateMetadataSchema() {
List<Types.NestedField> metaFields = metaFields();
Optional<Types.NestedField> partitionField = findPartitionField(metaFields);

if (partitionField.isEmpty()) {
return new Schema(metaFields);
}

Types.StructType partitionType = partitionField.get().type().asStructType();
Set<Integer> partitionFieldIds = TypeUtil.getProjectedIds(partitionType);
GetID getId = TypeUtil.reassignConflictingIds(partitionFieldIds, allUsedFieldIds());
return new Schema(metaFields, getId);
}

private List<Types.NestedField> metaFields() {
return metaFieldNames.stream()
.map(name -> MetadataColumns.metadataColumn(table, name))
.collect(Collectors.toList());
}

private Optional<Types.NestedField> findPartitionField(List<Types.NestedField> fields) {
return fields.stream()
.filter(field -> MetadataColumns.PARTITION_COLUMN_ID == field.fieldId())
.findFirst();
}

// collects used data field IDs across all known table schemas
private Set<Integer> allUsedFieldIds() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to track used metadata column IDs here. They start from end of INT range and can't conflict by definition. If they do, something is fundamentally wrong.

return table.schemas().values().stream()
.flatMap(tableSchema -> TypeUtil.getProjectedIds(tableSchema.asStruct()).stream())
.collect(Collectors.toSet());
}

@Override
Expand All @@ -401,7 +388,7 @@ public Scan build() {
}

private Scan buildBatchScan() {
Schema expectedSchema = schemaWithMetadataColumns();
Schema expectedSchema = projectionWithMetadataColumns();
return new SparkBatchQueryScan(
spark,
table,
Expand Down Expand Up @@ -573,7 +560,7 @@ public Scan buildChangelogScan() {
}
}

Schema expectedSchema = schemaWithMetadataColumns();
Schema expectedSchema = projectionWithMetadataColumns();

IncrementalChangelogScan scan =
table
Expand Down Expand Up @@ -642,7 +629,7 @@ public Scan buildMergeOnReadScan() {
table,
null,
readConf,
schemaWithMetadataColumns(),
projectionWithMetadataColumns(),
filterExpressions,
metricsReporter::scanReport);
}
Expand All @@ -655,7 +642,7 @@ public Scan buildMergeOnReadScan() {
SparkReadConf adjustedReadConf =
new SparkReadConf(spark, table, readConf.branch(), adjustedOptions);

Schema expectedSchema = schemaWithMetadataColumns();
Schema expectedSchema = projectionWithMetadataColumns();

BatchScan scan =
newBatchScan()
Expand Down Expand Up @@ -685,12 +672,12 @@ public Scan buildCopyOnWriteScan() {
spark,
table,
readConf,
schemaWithMetadataColumns(),
projectionWithMetadataColumns(),
filterExpressions,
metricsReporter::scanReport);
}

Schema expectedSchema = schemaWithMetadataColumns();
Schema expectedSchema = projectionWithMetadataColumns();

BatchScan scan =
newBatchScan()
Expand Down