diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java new file mode 100644 index 000000000000..a7bf4bd1879d --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; + +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Scan; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Binder; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionUtil; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.metrics.InMemoryMetricsReporter; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkV2Filters; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.TypeUtil.GetID; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.expressions.filter.Predicate; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A base Spark scan builder with common functionality like projection and predicate pushdown. + * + *

Note that this class intentionally doesn't implement any optional mix-in Spark interfaces even + * if it contains necessary logic, allowing each concrete scan implementation to select what + * functionality is applicable to that scan. + */ +abstract class BaseSparkScanBuilder implements ScanBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(BaseSparkScanBuilder.class); + private static final Predicate[] NO_PREDICATES = new Predicate[0]; + + private final SparkSession spark; + private final Table table; + private final Schema schema; + private final SparkReadConf readConf; + private final boolean caseSensitive; + private final Set metaFieldNames = Sets.newLinkedHashSet(); + private final InMemoryMetricsReporter metricsReporter = new InMemoryMetricsReporter(); + + private Schema projection; + private List filters = Lists.newArrayList(); + private Predicate[] pushedPredicates = NO_PREDICATES; + private Integer limit = null; + + protected BaseSparkScanBuilder( + SparkSession spark, Table table, Schema schema, CaseInsensitiveStringMap options) { + this(spark, table, schema, null, options); + } + + protected BaseSparkScanBuilder( + SparkSession spark, + Table table, + Schema schema, + String branch, + CaseInsensitiveStringMap options) { + this.spark = spark; + this.table = table; + this.schema = schema; + this.readConf = new SparkReadConf(spark, table, branch, options); + this.caseSensitive = readConf.caseSensitive(); + this.projection = schema; + } + + protected SparkSession spark() { + return spark; + } + + protected Table table() { + return table; + } + + protected Schema schema() { + return schema; + } + + protected Schema projection() { + return projection; + } + + protected SparkReadConf readConf() { + return readConf; + } + + protected boolean caseSensitive() { + return caseSensitive; + } + + protected List filters() { + return filters; + } + + protected Expression filter() { + return filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and); + } + + protected InMemoryMetricsReporter metricsReporter() { + return metricsReporter; + } + + // logic necessary for SupportsPushDownRequiredColumns + public void pruneColumns(StructType requestedType) { + List dataFields = Lists.newArrayList(); + + for (StructField field : requestedType.fields()) { + if (MetadataColumns.isMetadataColumn(field.name())) { + metaFieldNames.add(field.name()); + } else { + dataFields.add(field); + } + } + + StructType requestedDataType = SparkSchemaUtil.toStructType(dataFields); + this.projection = SparkSchemaUtil.prune(projection, requestedDataType, filter(), caseSensitive); + } + + // logic necessary for SupportsPushDownV2Filters + public Predicate[] pushPredicates(Predicate[] predicates) { + // there are 3 kinds of filters: + // (1) filters that can be pushed down completely and don't have to evaluated by Spark + // (e.g. filters that select entire partitions) + // (2) filters that can be pushed down partially and require record-level filtering in Spark + // (e.g. filters that may select some but not necessarily all rows in a file) + // (3) filters that can't be pushed down at all and have to be evaluated by Spark + // (e.g. unsupported filters) + // filters (1) and (2) are used to prune files during job planning in Iceberg + // filters (2) and (3) form a set of post scan filters and must be evaluated by Spark + + List expressions = Lists.newArrayListWithExpectedSize(predicates.length); + List pushablePredicates = Lists.newArrayListWithExpectedSize(predicates.length); + List postScanPredicates = Lists.newArrayListWithExpectedSize(predicates.length); + + for (Predicate predicate : predicates) { + try { + Expression expr = SparkV2Filters.convert(predicate); + + if (expr != null) { + // try binding the expression to ensure it can be pushed down + Binder.bind(projection.asStruct(), expr, caseSensitive); + expressions.add(expr); + pushablePredicates.add(predicate); + } + + if (expr == null || !ExpressionUtil.selectsPartitions(expr, table, caseSensitive)) { + postScanPredicates.add(predicate); + } else { + LOG.info("Evaluating completely on Iceberg side: {}", predicate); + } + + } catch (Exception e) { + LOG.warn("Failed to check if {} can be pushed down: {}", predicate, e.getMessage()); + postScanPredicates.add(predicate); + } + } + + this.filters = expressions; + this.pushedPredicates = pushablePredicates.toArray(new Predicate[0]); + + return postScanPredicates.toArray(new Predicate[0]); + } + + // logic necessary for SupportsPushDownV2Filters + public Predicate[] pushedPredicates() { + return pushedPredicates; + } + + // logic necessary for SupportsPushDownLimit + public boolean pushLimit(int newLimit) { + this.limit = newLimit; + return true; + } + + // schema of rows that must be returned by readers + protected Schema projectionWithMetadataColumns() { + return TypeUtil.join(projection, calculateMetadataSchema()); + } + + // computes metadata schema avoiding conflicts between partition and data field IDs + private Schema calculateMetadataSchema() { + List metaFields = metaFields(); + Optional partitionField = findPartitionField(metaFields); + + if (partitionField.isEmpty()) { + return new Schema(metaFields); + } + + Types.StructType partitionType = partitionField.get().type().asStructType(); + Set partitionFieldIds = TypeUtil.getProjectedIds(partitionType); + GetID getId = TypeUtil.reassignConflictingIds(partitionFieldIds, allUsedFieldIds()); + return new Schema(metaFields, getId); + } + + private List metaFields() { + return metaFieldNames.stream() + .map(name -> MetadataColumns.metadataColumn(table, name)) + .collect(Collectors.toList()); + } + + private Optional findPartitionField(List fields) { + return fields.stream() + .filter(field -> MetadataColumns.PARTITION_COLUMN_ID == field.fieldId()) + .findFirst(); + } + + // collects used data field IDs across all known table schemas + private Set allUsedFieldIds() { + return table.schemas().values().stream() + .flatMap(tableSchema -> TypeUtil.getProjectedIds(tableSchema.asStruct()).stream()) + .collect(Collectors.toSet()); + } + + protected > T configureSplitPlanning(T scan) { + T newScan = scan; + + Long splitSize = readConf.splitSizeOption(); + if (splitSize != null) { + newScan = newScan.option(SPLIT_SIZE, String.valueOf(splitSize)); + } + + Integer splitLookback = readConf.splitLookbackOption(); + if (splitLookback != null) { + newScan = newScan.option(SPLIT_LOOKBACK, String.valueOf(splitLookback)); + } + + Long splitOpenFileCost = readConf.splitOpenFileCostOption(); + if (splitOpenFileCost != null) { + newScan = newScan.option(SPLIT_OPEN_FILE_COST, String.valueOf(splitOpenFileCost)); + } + + if (limit != null) { + newScan = newScan.minRowsRequested(limit.longValue()); + } + + return newScan; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 2fb188d83d43..8495ae2a47cc 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -20,15 +20,11 @@ import java.io.IOException; import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; import org.apache.iceberg.BaseTable; import org.apache.iceberg.BatchScan; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.IncrementalAppendScan; import org.apache.iceberg.IncrementalChangelogScan; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.MetricsModes; import org.apache.iceberg.Schema; @@ -36,72 +32,47 @@ import org.apache.iceberg.SparkDistributedDataScan; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; import org.apache.iceberg.expressions.AggregateEvaluator; import org.apache.iceberg.expressions.Binder; import org.apache.iceberg.expressions.BoundAggregate; import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.expressions.ExpressionUtil; -import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.metrics.InMemoryMetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkAggregates; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.iceberg.spark.SparkV2Filters; import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.TypeUtil.GetID; -import org.apache.iceberg.types.Types; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.expressions.aggregate.AggregateFunc; import org.apache.spark.sql.connector.expressions.aggregate.Aggregation; -import org.apache.spark.sql.connector.expressions.filter.Predicate; import org.apache.spark.sql.connector.read.Scan; -import org.apache.spark.sql.connector.read.ScanBuilder; import org.apache.spark.sql.connector.read.Statistics; import org.apache.spark.sql.connector.read.SupportsPushDownAggregates; import org.apache.spark.sql.connector.read.SupportsPushDownLimit; import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns; import org.apache.spark.sql.connector.read.SupportsPushDownV2Filters; import org.apache.spark.sql.connector.read.SupportsReportStatistics; -import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SparkScanBuilder - implements ScanBuilder, - SupportsPushDownAggregates, - SupportsPushDownV2Filters, +public class SparkScanBuilder extends BaseSparkScanBuilder + implements SupportsPushDownV2Filters, SupportsPushDownRequiredColumns, SupportsReportStatistics, - SupportsPushDownLimit { + SupportsPushDownLimit, + SupportsPushDownAggregates { private static final Logger LOG = LoggerFactory.getLogger(SparkScanBuilder.class); - private static final Predicate[] NO_PREDICATES = new Predicate[0]; - private Scan localScan; - private final SparkSession spark; - private final Table table; private final CaseInsensitiveStringMap options; - private final SparkReadConf readConf; - private final Set metaFieldNames = Sets.newLinkedHashSet(); - private final InMemoryMetricsReporter metricsReporter; - - private Schema projection; - private boolean caseSensitive; - private List filterExpressions = null; - private Predicate[] pushedPredicates = NO_PREDICATES; - private Integer limit = null; + private Scan localScan; SparkScanBuilder( SparkSession spark, @@ -109,13 +80,8 @@ public class SparkScanBuilder String branch, Schema schema, CaseInsensitiveStringMap options) { - this.spark = spark; - this.table = table; - this.projection = schema; + super(spark, table, schema, branch, options); this.options = options; - this.readConf = new SparkReadConf(spark, table, branch, options); - this.caseSensitive = readConf.caseSensitive(); - this.metricsReporter = new InMemoryMetricsReporter(); } SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) { @@ -132,68 +98,6 @@ public class SparkScanBuilder this(spark, table, null, schema, options); } - private Expression filterExpression() { - if (filterExpressions != null) { - return filterExpressions.stream().reduce(Expressions.alwaysTrue(), Expressions::and); - } - return Expressions.alwaysTrue(); - } - - public SparkScanBuilder caseSensitive(boolean isCaseSensitive) { - this.caseSensitive = isCaseSensitive; - return this; - } - - @Override - public Predicate[] pushPredicates(Predicate[] predicates) { - // there are 3 kinds of filters: - // (1) filters that can be pushed down completely and don't have to evaluated by Spark - // (e.g. filters that select entire partitions) - // (2) filters that can be pushed down partially and require record-level filtering in Spark - // (e.g. filters that may select some but not necessarily all rows in a file) - // (3) filters that can't be pushed down at all and have to be evaluated by Spark - // (e.g. unsupported filters) - // filters (1) and (2) are used prune files during job planning in Iceberg - // filters (2) and (3) form a set of post scan filters and must be evaluated by Spark - - List expressions = Lists.newArrayListWithExpectedSize(predicates.length); - List pushableFilters = Lists.newArrayListWithExpectedSize(predicates.length); - List postScanFilters = Lists.newArrayListWithExpectedSize(predicates.length); - - for (Predicate predicate : predicates) { - try { - Expression expr = SparkV2Filters.convert(predicate); - - if (expr != null) { - // try binding the expression to ensure it can be pushed down - Binder.bind(projection.asStruct(), expr, caseSensitive); - expressions.add(expr); - pushableFilters.add(predicate); - } - - if (expr == null || !ExpressionUtil.selectsPartitions(expr, table, caseSensitive)) { - postScanFilters.add(predicate); - } else { - LOG.info("Evaluating completely on Iceberg side: {}", predicate); - } - - } catch (Exception e) { - LOG.warn("Failed to check if {} can be pushed down: {}", predicate, e.getMessage()); - postScanFilters.add(predicate); - } - } - - this.filterExpressions = expressions; - this.pushedPredicates = pushableFilters.toArray(new Predicate[0]); - - return postScanFilters.toArray(new Predicate[0]); - } - - @Override - public Predicate[] pushedPredicates() { - return pushedPredicates; - } - @Override public boolean pushAggregation(Aggregation aggregation) { if (!canPushDownAggregation(aggregation)) { @@ -208,7 +112,7 @@ public boolean pushAggregation(Aggregation aggregation) { try { Expression expr = SparkAggregates.convert(aggregateFunc); if (expr != null) { - Expression bound = Binder.bind(projection.asStruct(), expr, caseSensitive); + Expression bound = Binder.bind(projection().asStruct(), expr, caseSensitive()); expressions.add((BoundAggregate) bound); } else { LOG.info( @@ -255,18 +159,17 @@ public boolean pushAggregation(Aggregation aggregation) { StructLike structLike = aggregateEvaluator.result(); pushedAggregateRows[0] = new StructInternalRow(aggregateEvaluator.resultType()).setStruct(structLike); - localScan = - new SparkLocalScan(table, pushedAggregateSchema, pushedAggregateRows, filterExpressions); + localScan = new SparkLocalScan(table(), pushedAggregateSchema, pushedAggregateRows, filters()); return true; } private boolean canPushDownAggregation(Aggregation aggregation) { - if (!(table instanceof BaseTable)) { + if (!(table() instanceof BaseTable)) { return false; } - if (!readConf.aggregatePushDownEnabled()) { + if (!readConf().aggregatePushDownEnabled()) { return false; } @@ -282,7 +185,7 @@ private boolean canPushDownAggregation(Aggregation aggregation) { } private boolean metricsModeSupportsAggregatePushDown(List> aggregates) { - MetricsConfig config = MetricsConfig.forTable(table); + MetricsConfig config = MetricsConfig.forTable(table()); for (BoundAggregate aggregate : aggregates) { String colName = aggregate.columnName(); if (!colName.equals("*")) { @@ -316,67 +219,6 @@ private boolean metricsModeSupportsAggregatePushDown(List> return true; } - @Override - public void pruneColumns(StructType requestedSchema) { - List dataFields = Lists.newArrayList(); - - for (StructField field : requestedSchema.fields()) { - if (MetadataColumns.isMetadataColumn(field.name())) { - metaFieldNames.add(field.name()); - } else { - dataFields.add(field); - } - } - - StructType requestedProjection = SparkSchemaUtil.toStructType(dataFields); - this.projection = prune(projection, requestedProjection); - } - - // the projection should include all columns that will be returned, - // including those only used in filters - private Schema prune(Schema schema, StructType requestedSchema) { - return SparkSchemaUtil.prune(schema, requestedSchema, filterExpression(), caseSensitive); - } - - // schema of rows that must be returned by readers - protected Schema projectionWithMetadataColumns() { - return TypeUtil.join(projection, calculateMetadataSchema()); - } - - // computes metadata schema avoiding conflicts between partition and data field IDs - private Schema calculateMetadataSchema() { - List metaFields = metaFields(); - Optional partitionField = findPartitionField(metaFields); - - if (partitionField.isEmpty()) { - return new Schema(metaFields); - } - - Types.StructType partitionType = partitionField.get().type().asStructType(); - Set partitionFieldIds = TypeUtil.getProjectedIds(partitionType); - GetID getId = TypeUtil.reassignConflictingIds(partitionFieldIds, allUsedFieldIds()); - return new Schema(metaFields, getId); - } - - private List metaFields() { - return metaFieldNames.stream() - .map(name -> MetadataColumns.metadataColumn(table, name)) - .collect(Collectors.toList()); - } - - private Optional findPartitionField(List fields) { - return fields.stream() - .filter(field -> MetadataColumns.PARTITION_COLUMN_ID == field.fieldId()) - .findFirst(); - } - - // collects used data field IDs across all known table schemas - private Set allUsedFieldIds() { - return table.schemas().values().stream() - .flatMap(tableSchema -> TypeUtil.getProjectedIds(tableSchema.asStruct()).stream()) - .collect(Collectors.toSet()); - } - @Override public Scan build() { if (localScan != null) { @@ -389,20 +231,20 @@ public Scan build() { private Scan buildBatchScan() { Schema expectedSchema = projectionWithMetadataColumns(); return new SparkBatchQueryScan( - spark, - table, + spark(), + table(), buildIcebergBatchScan(false /* not include Column Stats */, expectedSchema), - readConf, + readConf(), expectedSchema, - filterExpressions, - metricsReporter::scanReport); + filters(), + metricsReporter()::scanReport); } private org.apache.iceberg.Scan buildIcebergBatchScan(boolean withStats, Schema expectedSchema) { - Long snapshotId = readConf.snapshotId(); - Long asOfTimestamp = readConf.asOfTimestamp(); - String branch = readConf.branch(); - String tag = readConf.tag(); + Long snapshotId = readConf().snapshotId(); + Long asOfTimestamp = readConf().asOfTimestamp(); + String branch = readConf().branch(); + String tag = readConf().tag(); Preconditions.checkArgument( snapshotId == null || asOfTimestamp == null, @@ -410,8 +252,8 @@ private org.apache.iceberg.Scan buildIcebergBatchScan(boolean withStats, Schema SparkReadOptions.SNAPSHOT_ID, SparkReadOptions.AS_OF_TIMESTAMP); - Long startSnapshotId = readConf.startSnapshotId(); - Long endSnapshotId = readConf.endSnapshotId(); + Long startSnapshotId = readConf().startSnapshotId(); + Long endSnapshotId = readConf().endSnapshotId(); if (snapshotId != null || asOfTimestamp != null) { Preconditions.checkArgument( @@ -429,8 +271,8 @@ private org.apache.iceberg.Scan buildIcebergBatchScan(boolean withStats, Schema SparkReadOptions.END_SNAPSHOT_ID, SparkReadOptions.START_SNAPSHOT_ID); - Long startTimestamp = readConf.startTimestamp(); - Long endTimestamp = readConf.endTimestamp(); + Long startTimestamp = readConf().startTimestamp(); + Long endTimestamp = readConf().endTimestamp(); Preconditions.checkArgument( startTimestamp == null && endTimestamp == null, "Cannot set %s or %s for incremental scans and batch scan. They are only valid for " @@ -454,10 +296,10 @@ private org.apache.iceberg.Scan buildBatchScan( Schema expectedSchema) { BatchScan scan = newBatchScan() - .caseSensitive(caseSensitive) - .filter(filterExpression()) + .caseSensitive(caseSensitive()) + .filter(filter()) .project(expectedSchema) - .metricsReporter(metricsReporter); + .metricsReporter(metricsReporter()); if (withStats) { scan = scan.includeColumnStats(); @@ -485,13 +327,13 @@ private org.apache.iceberg.Scan buildBatchScan( private org.apache.iceberg.Scan buildIncrementalAppendScan( long startSnapshotId, Long endSnapshotId, boolean withStats, Schema expectedSchema) { IncrementalAppendScan scan = - table + table() .newIncrementalAppendScan() .fromSnapshotExclusive(startSnapshotId) - .caseSensitive(caseSensitive) - .filter(filterExpression()) + .caseSensitive(caseSensitive()) + .filter(filter()) .project(expectedSchema) - .metricsReporter(metricsReporter); + .metricsReporter(metricsReporter()); if (withStats) { scan = scan.includeColumnStats(); @@ -507,20 +349,20 @@ private org.apache.iceberg.Scan buildIncrementalAppendScan( @SuppressWarnings("CyclomaticComplexity") public Scan buildChangelogScan() { Preconditions.checkArgument( - readConf.snapshotId() == null - && readConf.asOfTimestamp() == null - && readConf.branch() == null - && readConf.tag() == null, + readConf().snapshotId() == null + && readConf().asOfTimestamp() == null + && readConf().branch() == null + && readConf().tag() == null, "Cannot set neither %s, %s, %s and %s for changelogs", SparkReadOptions.SNAPSHOT_ID, SparkReadOptions.AS_OF_TIMESTAMP, SparkReadOptions.BRANCH, SparkReadOptions.TAG); - Long startSnapshotId = readConf.startSnapshotId(); - Long endSnapshotId = readConf.endSnapshotId(); - Long startTimestamp = readConf.startTimestamp(); - Long endTimestamp = readConf.endTimestamp(); + Long startSnapshotId = readConf().startSnapshotId(); + Long endSnapshotId = readConf().endSnapshotId(); + Long startTimestamp = readConf().startTimestamp(); + Long endTimestamp = readConf().endTimestamp(); Preconditions.checkArgument( !(startSnapshotId != null && startTimestamp != null), @@ -544,8 +386,8 @@ public Scan buildChangelogScan() { boolean emptyScan = false; if (startTimestamp != null) { - if (table.currentSnapshot() == null - || startTimestamp > table.currentSnapshot().timestampMillis()) { + if (table().currentSnapshot() == null + || startTimestamp > table().currentSnapshot().timestampMillis()) { emptyScan = true; } startSnapshotId = getStartSnapshotId(startTimestamp); @@ -562,12 +404,12 @@ public Scan buildChangelogScan() { Schema expectedSchema = projectionWithMetadataColumns(); IncrementalChangelogScan scan = - table + table() .newIncrementalChangelogScan() - .caseSensitive(caseSensitive) - .filter(filterExpression()) + .caseSensitive(caseSensitive()) + .filter(filter()) .project(expectedSchema) - .metricsReporter(metricsReporter); + .metricsReporter(metricsReporter()); if (startSnapshotId != null) { scan = scan.fromSnapshotExclusive(startSnapshotId); @@ -580,11 +422,11 @@ public Scan buildChangelogScan() { scan = configureSplitPlanning(scan); return new SparkChangelogScan( - spark, table, scan, readConf, expectedSchema, filterExpressions, emptyScan); + spark(), table(), scan, readConf(), expectedSchema, filters(), emptyScan); } private Long getStartSnapshotId(Long startTimestamp) { - Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp); + Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table(), startTimestamp); if (oldestSnapshotAfter == null) { return null; @@ -597,7 +439,7 @@ private Long getStartSnapshotId(Long startTimestamp) { private Long getEndSnapshotId(Long endTimestamp) { Long endSnapshotId = null; - for (Snapshot snapshot : SnapshotUtil.currentAncestors(table)) { + for (Snapshot snapshot : SnapshotUtil.currentAncestors(table())) { if (snapshot.timestampMillis() <= endTimestamp) { endSnapshotId = snapshot.snapshotId(); break; @@ -608,29 +450,31 @@ private Long getEndSnapshotId(Long endTimestamp) { public Scan buildMergeOnReadScan() { Preconditions.checkArgument( - readConf.snapshotId() == null && readConf.asOfTimestamp() == null && readConf.tag() == null, + readConf().snapshotId() == null + && readConf().asOfTimestamp() == null + && readConf().tag() == null, "Cannot set time travel options %s, %s, %s for row-level command scans", SparkReadOptions.SNAPSHOT_ID, SparkReadOptions.AS_OF_TIMESTAMP, SparkReadOptions.TAG); Preconditions.checkArgument( - readConf.startSnapshotId() == null && readConf.endSnapshotId() == null, + readConf().startSnapshotId() == null && readConf().endSnapshotId() == null, "Cannot set incremental scan options %s and %s for row-level command scans", SparkReadOptions.START_SNAPSHOT_ID, SparkReadOptions.END_SNAPSHOT_ID); - Snapshot snapshot = SnapshotUtil.latestSnapshot(table, readConf.branch()); + Snapshot snapshot = SnapshotUtil.latestSnapshot(table(), readConf().branch()); if (snapshot == null) { return new SparkBatchQueryScan( - spark, - table, + spark(), + table(), null, - readConf, + readConf(), projectionWithMetadataColumns(), - filterExpressions, - metricsReporter::scanReport); + filters(), + metricsReporter()::scanReport); } // remember the current snapshot ID for commit validation @@ -639,41 +483,41 @@ public Scan buildMergeOnReadScan() { CaseInsensitiveStringMap adjustedOptions = Spark3Util.setOption(SparkReadOptions.SNAPSHOT_ID, Long.toString(snapshotId), options); SparkReadConf adjustedReadConf = - new SparkReadConf(spark, table, readConf.branch(), adjustedOptions); + new SparkReadConf(spark(), table(), readConf().branch(), adjustedOptions); Schema expectedSchema = projectionWithMetadataColumns(); BatchScan scan = newBatchScan() .useSnapshot(snapshotId) - .caseSensitive(caseSensitive) - .filter(filterExpression()) + .caseSensitive(caseSensitive()) + .filter(filter()) .project(expectedSchema) - .metricsReporter(metricsReporter); + .metricsReporter(metricsReporter()); scan = configureSplitPlanning(scan); return new SparkBatchQueryScan( - spark, - table, + spark(), + table(), scan, adjustedReadConf, expectedSchema, - filterExpressions, - metricsReporter::scanReport); + filters(), + metricsReporter()::scanReport); } public Scan buildCopyOnWriteScan() { - Snapshot snapshot = SnapshotUtil.latestSnapshot(table, readConf.branch()); + Snapshot snapshot = SnapshotUtil.latestSnapshot(table(), readConf().branch()); if (snapshot == null) { return new SparkCopyOnWriteScan( - spark, - table, - readConf, + spark(), + table(), + readConf(), projectionWithMetadataColumns(), - filterExpressions, - metricsReporter::scanReport); + filters(), + metricsReporter()::scanReport); } Schema expectedSchema = projectionWithMetadataColumns(); @@ -682,50 +526,22 @@ public Scan buildCopyOnWriteScan() { newBatchScan() .useSnapshot(snapshot.snapshotId()) .ignoreResiduals() - .caseSensitive(caseSensitive) - .filter(filterExpression()) + .caseSensitive(caseSensitive()) + .filter(filter()) .project(expectedSchema) - .metricsReporter(metricsReporter); + .metricsReporter(metricsReporter()); scan = configureSplitPlanning(scan); return new SparkCopyOnWriteScan( - spark, - table, + spark(), + table(), scan, snapshot, - readConf, + readConf(), expectedSchema, - filterExpressions, - metricsReporter::scanReport); - } - - private > T configureSplitPlanning(T scan) { - T configuredScan = scan; - - Long splitSize = readConf.splitSizeOption(); - if (splitSize != null) { - configuredScan = configuredScan.option(TableProperties.SPLIT_SIZE, String.valueOf(splitSize)); - } - - Integer splitLookback = readConf.splitLookbackOption(); - if (splitLookback != null) { - configuredScan = - configuredScan.option(TableProperties.SPLIT_LOOKBACK, String.valueOf(splitLookback)); - } - - Long splitOpenFileCost = readConf.splitOpenFileCostOption(); - if (splitOpenFileCost != null) { - configuredScan = - configuredScan.option( - TableProperties.SPLIT_OPEN_FILE_COST, String.valueOf(splitOpenFileCost)); - } - - if (null != limit) { - configuredScan = configuredScan.minRowsRequested(limit.longValue()); - } - - return configuredScan; + filters(), + metricsReporter()::scanReport); } @Override @@ -739,16 +555,10 @@ public StructType readSchema() { } private BatchScan newBatchScan() { - if (readConf.distributedPlanningEnabled()) { - return new SparkDistributedDataScan(spark, table, readConf); + if (readConf().distributedPlanningEnabled()) { + return new SparkDistributedDataScan(spark(), table(), readConf()); } else { - return table.newBatchScan(); + return table().newBatchScan(); } } - - @Override - public boolean pushLimit(int pushedLimit) { - this.limit = pushedLimit; - return true; - } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java index 7164c53a3d98..bf7d79ac3491 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java @@ -18,78 +18,27 @@ */ package org.apache.iceberg.spark.source; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.spark.SparkReadConf; -import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.read.Scan; -import org.apache.spark.sql.connector.read.ScanBuilder; import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; -class SparkStagedScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns { +class SparkStagedScanBuilder extends BaseSparkScanBuilder + implements SupportsPushDownRequiredColumns { - private final SparkSession spark; - private final Table table; private final String taskSetId; - private final SparkReadConf readConf; - private final List metaColumns = Lists.newArrayList(); - - private Schema schema; SparkStagedScanBuilder( SparkSession spark, Table table, String taskSetId, CaseInsensitiveStringMap options) { - this.spark = spark; - this.table = table; + super(spark, table, table.schema(), options); this.taskSetId = taskSetId; - this.readConf = new SparkReadConf(spark, table, options); - this.schema = table.schema(); } @Override public Scan build() { - return new SparkStagedScan(spark, table, schemaWithMetadataColumns(), taskSetId, readConf); - } - - @Override - public void pruneColumns(StructType requestedSchema) { - StructType requestedProjection = removeMetaColumns(requestedSchema); - this.schema = SparkSchemaUtil.prune(schema, requestedProjection); - - Stream.of(requestedSchema.fields()) - .map(StructField::name) - .filter(MetadataColumns::isMetadataColumn) - .distinct() - .forEach(metaColumns::add); - } - - private StructType removeMetaColumns(StructType structType) { - return new StructType( - Stream.of(structType.fields()) - .filter(field -> MetadataColumns.nonMetadataColumn(field.name())) - .toArray(StructField[]::new)); - } - - private Schema schemaWithMetadataColumns() { - // metadata columns - List fields = - metaColumns.stream() - .distinct() - .map(name -> MetadataColumns.metadataColumn(table, name)) - .collect(Collectors.toList()); - Schema meta = new Schema(fields); - - // schema of rows returned by readers - return TypeUtil.join(schema, meta); + Schema projection = projectionWithMetadataColumns(); + return new SparkStagedScan(spark(), table(), projection, taskSetId, readConf()); } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java index f2d7a7f7d590..b6c4a1998085 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java @@ -223,8 +223,7 @@ public void testUnpartitionedCaseInsensitiveIDFilters() { for (int i = 0; i < 10; i += 1) { SparkScanBuilder builder = - new SparkScanBuilder(spark, TABLES.load(options.get("path")), options) - .caseSensitive(false); + new SparkScanBuilder(spark, TABLES.load(options.get("path")), options); pushFilters( builder,