-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark 4.1: Simplify handling of metadata columns #15297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,9 +22,7 @@ | |
| import java.util.List; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
| import org.apache.iceberg.BaseTable; | ||
| import org.apache.iceberg.BatchScan; | ||
| import org.apache.iceberg.FileScanTask; | ||
|
|
@@ -49,7 +47,6 @@ | |
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.metrics.InMemoryMetricsReporter; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
| import org.apache.iceberg.spark.Spark3Util; | ||
|
|
@@ -60,6 +57,7 @@ | |
| import org.apache.iceberg.spark.SparkV2Filters; | ||
| import org.apache.iceberg.types.Type; | ||
| import org.apache.iceberg.types.TypeUtil; | ||
| import org.apache.iceberg.types.TypeUtil.GetID; | ||
| import org.apache.iceberg.types.Types; | ||
| import org.apache.iceberg.util.SnapshotUtil; | ||
| import org.apache.spark.sql.SparkSession; | ||
|
|
@@ -97,10 +95,10 @@ public class SparkScanBuilder | |
| private final Table table; | ||
| private final CaseInsensitiveStringMap options; | ||
| private final SparkReadConf readConf; | ||
| private final List<String> metaColumns = Lists.newArrayList(); | ||
| private final Set<String> metaFieldNames = Sets.newLinkedHashSet(); | ||
| private final InMemoryMetricsReporter metricsReporter; | ||
|
|
||
| private Schema schema; | ||
| private Schema projection; | ||
| private boolean caseSensitive; | ||
| private List<Expression> filterExpressions = null; | ||
| private Predicate[] pushedPredicates = NO_PREDICATES; | ||
|
|
@@ -114,7 +112,7 @@ public class SparkScanBuilder | |
| CaseInsensitiveStringMap options) { | ||
| this.spark = spark; | ||
| this.table = table; | ||
| this.schema = schema; | ||
| this.projection = schema; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i like the rename |
||
| this.options = options; | ||
| this.readConf = new SparkReadConf(spark, table, branch, options); | ||
| this.caseSensitive = readConf.caseSensitive(); | ||
|
|
@@ -169,7 +167,7 @@ public Predicate[] pushPredicates(Predicate[] predicates) { | |
|
|
||
| if (expr != null) { | ||
| // try binding the expression to ensure it can be pushed down | ||
| Binder.bind(schema.asStruct(), expr, caseSensitive); | ||
| Binder.bind(projection.asStruct(), expr, caseSensitive); | ||
| expressions.add(expr); | ||
| pushableFilters.add(predicate); | ||
| } | ||
|
|
@@ -211,7 +209,7 @@ public boolean pushAggregation(Aggregation aggregation) { | |
| try { | ||
| Expression expr = SparkAggregates.convert(aggregateFunc); | ||
| if (expr != null) { | ||
| Expression bound = Binder.bind(schema.asStruct(), expr, caseSensitive); | ||
| Expression bound = Binder.bind(projection.asStruct(), expr, caseSensitive); | ||
| expressions.add((BoundAggregate<?, ?>) bound); | ||
| } else { | ||
| LOG.info( | ||
|
|
@@ -232,7 +230,7 @@ public boolean pushAggregation(Aggregation aggregation) { | |
| } | ||
|
|
||
| org.apache.iceberg.Scan scan = | ||
| buildIcebergBatchScan(true /* include Column Stats */, schemaWithMetadataColumns()); | ||
| buildIcebergBatchScan(true /* include Column Stats */, projectionWithMetadataColumns()); | ||
|
|
||
| try (CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles()) { | ||
| for (FileScanTask task : fileScanTasks) { | ||
|
|
@@ -321,74 +319,63 @@ private boolean metricsModeSupportsAggregatePushDown(List<BoundAggregate<?, ?>> | |
|
|
||
| @Override | ||
| public void pruneColumns(StructType requestedSchema) { | ||
| StructType requestedProjection = | ||
| new StructType( | ||
| Stream.of(requestedSchema.fields()) | ||
| .filter(field -> MetadataColumns.nonMetadataColumn(field.name())) | ||
| .toArray(StructField[]::new)); | ||
|
|
||
| // the projection should include all columns that will be returned, including those only used in | ||
| // filters | ||
| this.schema = | ||
| SparkSchemaUtil.prune(schema, requestedProjection, filterExpression(), caseSensitive); | ||
|
|
||
| Stream.of(requestedSchema.fields()) | ||
| .map(StructField::name) | ||
| .filter(MetadataColumns::isMetadataColumn) | ||
| .distinct() | ||
| .forEach(metaColumns::add); | ||
| } | ||
|
|
||
| private Schema schemaWithMetadataColumns() { | ||
| // metadata columns | ||
| List<Types.NestedField> metadataFields = | ||
| metaColumns.stream() | ||
| .distinct() | ||
| .map(name -> MetadataColumns.metadataColumn(table, name)) | ||
| .collect(Collectors.toList()); | ||
| Schema metadataSchema = calculateMetadataSchema(metadataFields); | ||
|
|
||
| // schema or rows returned by readers | ||
| return TypeUtil.join(schema, metadataSchema); | ||
| } | ||
|
|
||
| private Schema calculateMetadataSchema(List<Types.NestedField> metaColumnFields) { | ||
| Optional<Types.NestedField> partitionField = | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A lot of this logic is not specific to Spark and it was a bit harder to navigate. |
||
| metaColumnFields.stream() | ||
| .filter(f -> MetadataColumns.PARTITION_COLUMN_ID == f.fieldId()) | ||
| .findFirst(); | ||
|
|
||
| // only calculate potential column id collision if partition metadata column was requested | ||
| if (!partitionField.isPresent()) { | ||
| return new Schema(metaColumnFields); | ||
| } | ||
|
|
||
| Set<Integer> idsToReassign = | ||
| TypeUtil.indexById(partitionField.get().type().asStructType()).keySet(); | ||
|
|
||
| // Calculate used ids by union metadata columns with all base table schemas | ||
| Set<Integer> currentlyUsedIds = | ||
| metaColumnFields.stream().map(Types.NestedField::fieldId).collect(Collectors.toSet()); | ||
| Set<Integer> allUsedIds = | ||
| table.schemas().values().stream() | ||
| .map(currSchema -> TypeUtil.indexById(currSchema.asStruct()).keySet()) | ||
| .reduce(currentlyUsedIds, Sets::union); | ||
|
|
||
| // Reassign selected ids to deduplicate with used ids. | ||
| AtomicInteger nextId = new AtomicInteger(); | ||
| return new Schema( | ||
| metaColumnFields, | ||
| ImmutableSet.of(), | ||
| oldId -> { | ||
| if (!idsToReassign.contains(oldId)) { | ||
| return oldId; | ||
| } | ||
| int candidate = nextId.incrementAndGet(); | ||
| while (allUsedIds.contains(candidate)) { | ||
| candidate = nextId.incrementAndGet(); | ||
| } | ||
| return candidate; | ||
| }); | ||
| List<StructField> dataFields = Lists.newArrayList(); | ||
|
|
||
| for (StructField field : requestedSchema.fields()) { | ||
| if (MetadataColumns.isMetadataColumn(field.name())) { | ||
| metaFieldNames.add(field.name()); | ||
| } else { | ||
| dataFields.add(field); | ||
| } | ||
| } | ||
|
|
||
| StructType requestedProjection = SparkSchemaUtil.toStructType(dataFields); | ||
| this.projection = prune(projection, requestedProjection); | ||
| } | ||
|
|
||
| // the projection should include all columns that will be returned, | ||
| // including those only used in filters | ||
| private Schema prune(Schema schema, StructType requestedSchema) { | ||
| return SparkSchemaUtil.prune(schema, requestedSchema, filterExpression(), caseSensitive); | ||
| } | ||
|
|
||
| // schema of rows that must be returned by readers | ||
| protected Schema projectionWithMetadataColumns() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I was OOTO last week. Curious, we changed visibility from private |
||
| return TypeUtil.join(projection, calculateMetadataSchema()); | ||
| } | ||
|
|
||
| // computes metadata schema avoiding conflicts between partition and data field IDs | ||
| private Schema calculateMetadataSchema() { | ||
| List<Types.NestedField> metaFields = metaFields(); | ||
| Optional<Types.NestedField> partitionField = findPartitionField(metaFields); | ||
|
|
||
| if (partitionField.isEmpty()) { | ||
| return new Schema(metaFields); | ||
| } | ||
|
|
||
| Types.StructType partitionType = partitionField.get().type().asStructType(); | ||
| Set<Integer> partitionFieldIds = TypeUtil.getProjectedIds(partitionType); | ||
| GetID getId = TypeUtil.reassignConflictingIds(partitionFieldIds, allUsedFieldIds()); | ||
| return new Schema(metaFields, getId); | ||
| } | ||
|
|
||
| private List<Types.NestedField> metaFields() { | ||
| return metaFieldNames.stream() | ||
| .map(name -> MetadataColumns.metadataColumn(table, name)) | ||
| .collect(Collectors.toList()); | ||
| } | ||
|
|
||
| private Optional<Types.NestedField> findPartitionField(List<Types.NestedField> fields) { | ||
| return fields.stream() | ||
| .filter(field -> MetadataColumns.PARTITION_COLUMN_ID == field.fieldId()) | ||
| .findFirst(); | ||
| } | ||
|
|
||
| // collects used data field IDs across all known table schemas | ||
| private Set<Integer> allUsedFieldIds() { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need to track used metadata column IDs here. They start from end of INT range and can't conflict by definition. If they do, something is fundamentally wrong. |
||
| return table.schemas().values().stream() | ||
| .flatMap(tableSchema -> TypeUtil.getProjectedIds(tableSchema.asStruct()).stream()) | ||
| .collect(Collectors.toSet()); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -401,7 +388,7 @@ public Scan build() { | |
| } | ||
|
|
||
| private Scan buildBatchScan() { | ||
| Schema expectedSchema = schemaWithMetadataColumns(); | ||
| Schema expectedSchema = projectionWithMetadataColumns(); | ||
| return new SparkBatchQueryScan( | ||
| spark, | ||
| table, | ||
|
|
@@ -573,7 +560,7 @@ public Scan buildChangelogScan() { | |
| } | ||
| } | ||
|
|
||
| Schema expectedSchema = schemaWithMetadataColumns(); | ||
| Schema expectedSchema = projectionWithMetadataColumns(); | ||
|
|
||
| IncrementalChangelogScan scan = | ||
| table | ||
|
|
@@ -642,7 +629,7 @@ public Scan buildMergeOnReadScan() { | |
| table, | ||
| null, | ||
| readConf, | ||
| schemaWithMetadataColumns(), | ||
| projectionWithMetadataColumns(), | ||
| filterExpressions, | ||
| metricsReporter::scanReport); | ||
| } | ||
|
|
@@ -655,7 +642,7 @@ public Scan buildMergeOnReadScan() { | |
| SparkReadConf adjustedReadConf = | ||
| new SparkReadConf(spark, table, readConf.branch(), adjustedOptions); | ||
|
|
||
| Schema expectedSchema = schemaWithMetadataColumns(); | ||
| Schema expectedSchema = projectionWithMetadataColumns(); | ||
|
|
||
| BatchScan scan = | ||
| newBatchScan() | ||
|
|
@@ -685,12 +672,12 @@ public Scan buildCopyOnWriteScan() { | |
| spark, | ||
| table, | ||
| readConf, | ||
| schemaWithMetadataColumns(), | ||
| projectionWithMetadataColumns(), | ||
| filterExpressions, | ||
| metricsReporter::scanReport); | ||
| } | ||
|
|
||
| Schema expectedSchema = schemaWithMetadataColumns(); | ||
| Schema expectedSchema = projectionWithMetadataColumns(); | ||
|
|
||
| BatchScan scan = | ||
| newBatchScan() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am later going to keep the entire
schemahere as well.This is a projection, defaulted to the schema initially.