diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java
new file mode 100644
index 000000000000..a7bf4bd1879d
--- /dev/null
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkScanBuilder.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Scan;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Binder;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionUtil;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.metrics.InMemoryMetricsReporter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkV2Filters;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.TypeUtil.GetID;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.expressions.filter.Predicate;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A base Spark scan builder with common functionality like projection and predicate pushdown.
+ *
+ *
Note that this class intentionally doesn't implement any optional mix-in Spark interfaces even
+ * if it contains necessary logic, allowing each concrete scan implementation to select what
+ * functionality is applicable to that scan.
+ */
+abstract class BaseSparkScanBuilder implements ScanBuilder {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BaseSparkScanBuilder.class);
+ private static final Predicate[] NO_PREDICATES = new Predicate[0];
+
+ private final SparkSession spark;
+ private final Table table;
+ private final Schema schema;
+ private final SparkReadConf readConf;
+ private final boolean caseSensitive;
+ private final Set metaFieldNames = Sets.newLinkedHashSet();
+ private final InMemoryMetricsReporter metricsReporter = new InMemoryMetricsReporter();
+
+ private Schema projection;
+ private List filters = Lists.newArrayList();
+ private Predicate[] pushedPredicates = NO_PREDICATES;
+ private Integer limit = null;
+
+ protected BaseSparkScanBuilder(
+ SparkSession spark, Table table, Schema schema, CaseInsensitiveStringMap options) {
+ this(spark, table, schema, null, options);
+ }
+
+ protected BaseSparkScanBuilder(
+ SparkSession spark,
+ Table table,
+ Schema schema,
+ String branch,
+ CaseInsensitiveStringMap options) {
+ this.spark = spark;
+ this.table = table;
+ this.schema = schema;
+ this.readConf = new SparkReadConf(spark, table, branch, options);
+ this.caseSensitive = readConf.caseSensitive();
+ this.projection = schema;
+ }
+
+ protected SparkSession spark() {
+ return spark;
+ }
+
+ protected Table table() {
+ return table;
+ }
+
+ protected Schema schema() {
+ return schema;
+ }
+
+ protected Schema projection() {
+ return projection;
+ }
+
+ protected SparkReadConf readConf() {
+ return readConf;
+ }
+
+ protected boolean caseSensitive() {
+ return caseSensitive;
+ }
+
+ protected List filters() {
+ return filters;
+ }
+
+ protected Expression filter() {
+ return filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
+ }
+
+ protected InMemoryMetricsReporter metricsReporter() {
+ return metricsReporter;
+ }
+
+ // logic necessary for SupportsPushDownRequiredColumns
+ public void pruneColumns(StructType requestedType) {
+ List dataFields = Lists.newArrayList();
+
+ for (StructField field : requestedType.fields()) {
+ if (MetadataColumns.isMetadataColumn(field.name())) {
+ metaFieldNames.add(field.name());
+ } else {
+ dataFields.add(field);
+ }
+ }
+
+ StructType requestedDataType = SparkSchemaUtil.toStructType(dataFields);
+ this.projection = SparkSchemaUtil.prune(projection, requestedDataType, filter(), caseSensitive);
+ }
+
+ // logic necessary for SupportsPushDownV2Filters
+ public Predicate[] pushPredicates(Predicate[] predicates) {
+ // there are 3 kinds of filters:
+ // (1) filters that can be pushed down completely and don't have to evaluated by Spark
+ // (e.g. filters that select entire partitions)
+ // (2) filters that can be pushed down partially and require record-level filtering in Spark
+ // (e.g. filters that may select some but not necessarily all rows in a file)
+ // (3) filters that can't be pushed down at all and have to be evaluated by Spark
+ // (e.g. unsupported filters)
+ // filters (1) and (2) are used to prune files during job planning in Iceberg
+ // filters (2) and (3) form a set of post scan filters and must be evaluated by Spark
+
+ List expressions = Lists.newArrayListWithExpectedSize(predicates.length);
+ List pushablePredicates = Lists.newArrayListWithExpectedSize(predicates.length);
+ List postScanPredicates = Lists.newArrayListWithExpectedSize(predicates.length);
+
+ for (Predicate predicate : predicates) {
+ try {
+ Expression expr = SparkV2Filters.convert(predicate);
+
+ if (expr != null) {
+ // try binding the expression to ensure it can be pushed down
+ Binder.bind(projection.asStruct(), expr, caseSensitive);
+ expressions.add(expr);
+ pushablePredicates.add(predicate);
+ }
+
+ if (expr == null || !ExpressionUtil.selectsPartitions(expr, table, caseSensitive)) {
+ postScanPredicates.add(predicate);
+ } else {
+ LOG.info("Evaluating completely on Iceberg side: {}", predicate);
+ }
+
+ } catch (Exception e) {
+ LOG.warn("Failed to check if {} can be pushed down: {}", predicate, e.getMessage());
+ postScanPredicates.add(predicate);
+ }
+ }
+
+ this.filters = expressions;
+ this.pushedPredicates = pushablePredicates.toArray(new Predicate[0]);
+
+ return postScanPredicates.toArray(new Predicate[0]);
+ }
+
+ // logic necessary for SupportsPushDownV2Filters
+ public Predicate[] pushedPredicates() {
+ return pushedPredicates;
+ }
+
+ // logic necessary for SupportsPushDownLimit
+ public boolean pushLimit(int newLimit) {
+ this.limit = newLimit;
+ return true;
+ }
+
+ // schema of rows that must be returned by readers
+ protected Schema projectionWithMetadataColumns() {
+ return TypeUtil.join(projection, calculateMetadataSchema());
+ }
+
+ // computes metadata schema avoiding conflicts between partition and data field IDs
+ private Schema calculateMetadataSchema() {
+ List metaFields = metaFields();
+ Optional partitionField = findPartitionField(metaFields);
+
+ if (partitionField.isEmpty()) {
+ return new Schema(metaFields);
+ }
+
+ Types.StructType partitionType = partitionField.get().type().asStructType();
+ Set partitionFieldIds = TypeUtil.getProjectedIds(partitionType);
+ GetID getId = TypeUtil.reassignConflictingIds(partitionFieldIds, allUsedFieldIds());
+ return new Schema(metaFields, getId);
+ }
+
+ private List metaFields() {
+ return metaFieldNames.stream()
+ .map(name -> MetadataColumns.metadataColumn(table, name))
+ .collect(Collectors.toList());
+ }
+
+ private Optional findPartitionField(List fields) {
+ return fields.stream()
+ .filter(field -> MetadataColumns.PARTITION_COLUMN_ID == field.fieldId())
+ .findFirst();
+ }
+
+ // collects used data field IDs across all known table schemas
+ private Set allUsedFieldIds() {
+ return table.schemas().values().stream()
+ .flatMap(tableSchema -> TypeUtil.getProjectedIds(tableSchema.asStruct()).stream())
+ .collect(Collectors.toSet());
+ }
+
+ protected > T configureSplitPlanning(T scan) {
+ T newScan = scan;
+
+ Long splitSize = readConf.splitSizeOption();
+ if (splitSize != null) {
+ newScan = newScan.option(SPLIT_SIZE, String.valueOf(splitSize));
+ }
+
+ Integer splitLookback = readConf.splitLookbackOption();
+ if (splitLookback != null) {
+ newScan = newScan.option(SPLIT_LOOKBACK, String.valueOf(splitLookback));
+ }
+
+ Long splitOpenFileCost = readConf.splitOpenFileCostOption();
+ if (splitOpenFileCost != null) {
+ newScan = newScan.option(SPLIT_OPEN_FILE_COST, String.valueOf(splitOpenFileCost));
+ }
+
+ if (limit != null) {
+ newScan = newScan.minRowsRequested(limit.longValue());
+ }
+
+ return newScan;
+ }
+}
diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 2fb188d83d43..8495ae2a47cc 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -20,15 +20,11 @@
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.BatchScan;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.IncrementalChangelogScan;
-import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.MetricsModes;
import org.apache.iceberg.Schema;
@@ -36,72 +32,47 @@
import org.apache.iceberg.SparkDistributedDataScan;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.AggregateEvaluator;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.BoundAggregate;
import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.ExpressionUtil;
-import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.metrics.InMemoryMetricsReporter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkAggregates;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.spark.SparkV2Filters;
import org.apache.iceberg.types.Type;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.TypeUtil.GetID;
-import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.expressions.aggregate.AggregateFunc;
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation;
-import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.Scan;
-import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsPushDownAggregates;
import org.apache.spark.sql.connector.read.SupportsPushDownLimit;
import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.connector.read.SupportsPushDownV2Filters;
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
-import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SparkScanBuilder
- implements ScanBuilder,
- SupportsPushDownAggregates,
- SupportsPushDownV2Filters,
+public class SparkScanBuilder extends BaseSparkScanBuilder
+ implements SupportsPushDownV2Filters,
SupportsPushDownRequiredColumns,
SupportsReportStatistics,
- SupportsPushDownLimit {
+ SupportsPushDownLimit,
+ SupportsPushDownAggregates {
private static final Logger LOG = LoggerFactory.getLogger(SparkScanBuilder.class);
- private static final Predicate[] NO_PREDICATES = new Predicate[0];
- private Scan localScan;
- private final SparkSession spark;
- private final Table table;
private final CaseInsensitiveStringMap options;
- private final SparkReadConf readConf;
- private final Set metaFieldNames = Sets.newLinkedHashSet();
- private final InMemoryMetricsReporter metricsReporter;
-
- private Schema projection;
- private boolean caseSensitive;
- private List filterExpressions = null;
- private Predicate[] pushedPredicates = NO_PREDICATES;
- private Integer limit = null;
+ private Scan localScan;
SparkScanBuilder(
SparkSession spark,
@@ -109,13 +80,8 @@ public class SparkScanBuilder
String branch,
Schema schema,
CaseInsensitiveStringMap options) {
- this.spark = spark;
- this.table = table;
- this.projection = schema;
+ super(spark, table, schema, branch, options);
this.options = options;
- this.readConf = new SparkReadConf(spark, table, branch, options);
- this.caseSensitive = readConf.caseSensitive();
- this.metricsReporter = new InMemoryMetricsReporter();
}
SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
@@ -132,68 +98,6 @@ public class SparkScanBuilder
this(spark, table, null, schema, options);
}
- private Expression filterExpression() {
- if (filterExpressions != null) {
- return filterExpressions.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
- }
- return Expressions.alwaysTrue();
- }
-
- public SparkScanBuilder caseSensitive(boolean isCaseSensitive) {
- this.caseSensitive = isCaseSensitive;
- return this;
- }
-
- @Override
- public Predicate[] pushPredicates(Predicate[] predicates) {
- // there are 3 kinds of filters:
- // (1) filters that can be pushed down completely and don't have to evaluated by Spark
- // (e.g. filters that select entire partitions)
- // (2) filters that can be pushed down partially and require record-level filtering in Spark
- // (e.g. filters that may select some but not necessarily all rows in a file)
- // (3) filters that can't be pushed down at all and have to be evaluated by Spark
- // (e.g. unsupported filters)
- // filters (1) and (2) are used prune files during job planning in Iceberg
- // filters (2) and (3) form a set of post scan filters and must be evaluated by Spark
-
- List expressions = Lists.newArrayListWithExpectedSize(predicates.length);
- List pushableFilters = Lists.newArrayListWithExpectedSize(predicates.length);
- List postScanFilters = Lists.newArrayListWithExpectedSize(predicates.length);
-
- for (Predicate predicate : predicates) {
- try {
- Expression expr = SparkV2Filters.convert(predicate);
-
- if (expr != null) {
- // try binding the expression to ensure it can be pushed down
- Binder.bind(projection.asStruct(), expr, caseSensitive);
- expressions.add(expr);
- pushableFilters.add(predicate);
- }
-
- if (expr == null || !ExpressionUtil.selectsPartitions(expr, table, caseSensitive)) {
- postScanFilters.add(predicate);
- } else {
- LOG.info("Evaluating completely on Iceberg side: {}", predicate);
- }
-
- } catch (Exception e) {
- LOG.warn("Failed to check if {} can be pushed down: {}", predicate, e.getMessage());
- postScanFilters.add(predicate);
- }
- }
-
- this.filterExpressions = expressions;
- this.pushedPredicates = pushableFilters.toArray(new Predicate[0]);
-
- return postScanFilters.toArray(new Predicate[0]);
- }
-
- @Override
- public Predicate[] pushedPredicates() {
- return pushedPredicates;
- }
-
@Override
public boolean pushAggregation(Aggregation aggregation) {
if (!canPushDownAggregation(aggregation)) {
@@ -208,7 +112,7 @@ public boolean pushAggregation(Aggregation aggregation) {
try {
Expression expr = SparkAggregates.convert(aggregateFunc);
if (expr != null) {
- Expression bound = Binder.bind(projection.asStruct(), expr, caseSensitive);
+ Expression bound = Binder.bind(projection().asStruct(), expr, caseSensitive());
expressions.add((BoundAggregate, ?>) bound);
} else {
LOG.info(
@@ -255,18 +159,17 @@ public boolean pushAggregation(Aggregation aggregation) {
StructLike structLike = aggregateEvaluator.result();
pushedAggregateRows[0] =
new StructInternalRow(aggregateEvaluator.resultType()).setStruct(structLike);
- localScan =
- new SparkLocalScan(table, pushedAggregateSchema, pushedAggregateRows, filterExpressions);
+ localScan = new SparkLocalScan(table(), pushedAggregateSchema, pushedAggregateRows, filters());
return true;
}
private boolean canPushDownAggregation(Aggregation aggregation) {
- if (!(table instanceof BaseTable)) {
+ if (!(table() instanceof BaseTable)) {
return false;
}
- if (!readConf.aggregatePushDownEnabled()) {
+ if (!readConf().aggregatePushDownEnabled()) {
return false;
}
@@ -282,7 +185,7 @@ private boolean canPushDownAggregation(Aggregation aggregation) {
}
private boolean metricsModeSupportsAggregatePushDown(List> aggregates) {
- MetricsConfig config = MetricsConfig.forTable(table);
+ MetricsConfig config = MetricsConfig.forTable(table());
for (BoundAggregate aggregate : aggregates) {
String colName = aggregate.columnName();
if (!colName.equals("*")) {
@@ -316,67 +219,6 @@ private boolean metricsModeSupportsAggregatePushDown(List>
return true;
}
- @Override
- public void pruneColumns(StructType requestedSchema) {
- List dataFields = Lists.newArrayList();
-
- for (StructField field : requestedSchema.fields()) {
- if (MetadataColumns.isMetadataColumn(field.name())) {
- metaFieldNames.add(field.name());
- } else {
- dataFields.add(field);
- }
- }
-
- StructType requestedProjection = SparkSchemaUtil.toStructType(dataFields);
- this.projection = prune(projection, requestedProjection);
- }
-
- // the projection should include all columns that will be returned,
- // including those only used in filters
- private Schema prune(Schema schema, StructType requestedSchema) {
- return SparkSchemaUtil.prune(schema, requestedSchema, filterExpression(), caseSensitive);
- }
-
- // schema of rows that must be returned by readers
- protected Schema projectionWithMetadataColumns() {
- return TypeUtil.join(projection, calculateMetadataSchema());
- }
-
- // computes metadata schema avoiding conflicts between partition and data field IDs
- private Schema calculateMetadataSchema() {
- List metaFields = metaFields();
- Optional partitionField = findPartitionField(metaFields);
-
- if (partitionField.isEmpty()) {
- return new Schema(metaFields);
- }
-
- Types.StructType partitionType = partitionField.get().type().asStructType();
- Set partitionFieldIds = TypeUtil.getProjectedIds(partitionType);
- GetID getId = TypeUtil.reassignConflictingIds(partitionFieldIds, allUsedFieldIds());
- return new Schema(metaFields, getId);
- }
-
- private List metaFields() {
- return metaFieldNames.stream()
- .map(name -> MetadataColumns.metadataColumn(table, name))
- .collect(Collectors.toList());
- }
-
- private Optional findPartitionField(List fields) {
- return fields.stream()
- .filter(field -> MetadataColumns.PARTITION_COLUMN_ID == field.fieldId())
- .findFirst();
- }
-
- // collects used data field IDs across all known table schemas
- private Set allUsedFieldIds() {
- return table.schemas().values().stream()
- .flatMap(tableSchema -> TypeUtil.getProjectedIds(tableSchema.asStruct()).stream())
- .collect(Collectors.toSet());
- }
-
@Override
public Scan build() {
if (localScan != null) {
@@ -389,20 +231,20 @@ public Scan build() {
private Scan buildBatchScan() {
Schema expectedSchema = projectionWithMetadataColumns();
return new SparkBatchQueryScan(
- spark,
- table,
+ spark(),
+ table(),
buildIcebergBatchScan(false /* not include Column Stats */, expectedSchema),
- readConf,
+ readConf(),
expectedSchema,
- filterExpressions,
- metricsReporter::scanReport);
+ filters(),
+ metricsReporter()::scanReport);
}
private org.apache.iceberg.Scan buildIcebergBatchScan(boolean withStats, Schema expectedSchema) {
- Long snapshotId = readConf.snapshotId();
- Long asOfTimestamp = readConf.asOfTimestamp();
- String branch = readConf.branch();
- String tag = readConf.tag();
+ Long snapshotId = readConf().snapshotId();
+ Long asOfTimestamp = readConf().asOfTimestamp();
+ String branch = readConf().branch();
+ String tag = readConf().tag();
Preconditions.checkArgument(
snapshotId == null || asOfTimestamp == null,
@@ -410,8 +252,8 @@ private org.apache.iceberg.Scan buildIcebergBatchScan(boolean withStats, Schema
SparkReadOptions.SNAPSHOT_ID,
SparkReadOptions.AS_OF_TIMESTAMP);
- Long startSnapshotId = readConf.startSnapshotId();
- Long endSnapshotId = readConf.endSnapshotId();
+ Long startSnapshotId = readConf().startSnapshotId();
+ Long endSnapshotId = readConf().endSnapshotId();
if (snapshotId != null || asOfTimestamp != null) {
Preconditions.checkArgument(
@@ -429,8 +271,8 @@ private org.apache.iceberg.Scan buildIcebergBatchScan(boolean withStats, Schema
SparkReadOptions.END_SNAPSHOT_ID,
SparkReadOptions.START_SNAPSHOT_ID);
- Long startTimestamp = readConf.startTimestamp();
- Long endTimestamp = readConf.endTimestamp();
+ Long startTimestamp = readConf().startTimestamp();
+ Long endTimestamp = readConf().endTimestamp();
Preconditions.checkArgument(
startTimestamp == null && endTimestamp == null,
"Cannot set %s or %s for incremental scans and batch scan. They are only valid for "
@@ -454,10 +296,10 @@ private org.apache.iceberg.Scan buildBatchScan(
Schema expectedSchema) {
BatchScan scan =
newBatchScan()
- .caseSensitive(caseSensitive)
- .filter(filterExpression())
+ .caseSensitive(caseSensitive())
+ .filter(filter())
.project(expectedSchema)
- .metricsReporter(metricsReporter);
+ .metricsReporter(metricsReporter());
if (withStats) {
scan = scan.includeColumnStats();
@@ -485,13 +327,13 @@ private org.apache.iceberg.Scan buildBatchScan(
private org.apache.iceberg.Scan buildIncrementalAppendScan(
long startSnapshotId, Long endSnapshotId, boolean withStats, Schema expectedSchema) {
IncrementalAppendScan scan =
- table
+ table()
.newIncrementalAppendScan()
.fromSnapshotExclusive(startSnapshotId)
- .caseSensitive(caseSensitive)
- .filter(filterExpression())
+ .caseSensitive(caseSensitive())
+ .filter(filter())
.project(expectedSchema)
- .metricsReporter(metricsReporter);
+ .metricsReporter(metricsReporter());
if (withStats) {
scan = scan.includeColumnStats();
@@ -507,20 +349,20 @@ private org.apache.iceberg.Scan buildIncrementalAppendScan(
@SuppressWarnings("CyclomaticComplexity")
public Scan buildChangelogScan() {
Preconditions.checkArgument(
- readConf.snapshotId() == null
- && readConf.asOfTimestamp() == null
- && readConf.branch() == null
- && readConf.tag() == null,
+ readConf().snapshotId() == null
+ && readConf().asOfTimestamp() == null
+ && readConf().branch() == null
+ && readConf().tag() == null,
"Cannot set neither %s, %s, %s and %s for changelogs",
SparkReadOptions.SNAPSHOT_ID,
SparkReadOptions.AS_OF_TIMESTAMP,
SparkReadOptions.BRANCH,
SparkReadOptions.TAG);
- Long startSnapshotId = readConf.startSnapshotId();
- Long endSnapshotId = readConf.endSnapshotId();
- Long startTimestamp = readConf.startTimestamp();
- Long endTimestamp = readConf.endTimestamp();
+ Long startSnapshotId = readConf().startSnapshotId();
+ Long endSnapshotId = readConf().endSnapshotId();
+ Long startTimestamp = readConf().startTimestamp();
+ Long endTimestamp = readConf().endTimestamp();
Preconditions.checkArgument(
!(startSnapshotId != null && startTimestamp != null),
@@ -544,8 +386,8 @@ public Scan buildChangelogScan() {
boolean emptyScan = false;
if (startTimestamp != null) {
- if (table.currentSnapshot() == null
- || startTimestamp > table.currentSnapshot().timestampMillis()) {
+ if (table().currentSnapshot() == null
+ || startTimestamp > table().currentSnapshot().timestampMillis()) {
emptyScan = true;
}
startSnapshotId = getStartSnapshotId(startTimestamp);
@@ -562,12 +404,12 @@ public Scan buildChangelogScan() {
Schema expectedSchema = projectionWithMetadataColumns();
IncrementalChangelogScan scan =
- table
+ table()
.newIncrementalChangelogScan()
- .caseSensitive(caseSensitive)
- .filter(filterExpression())
+ .caseSensitive(caseSensitive())
+ .filter(filter())
.project(expectedSchema)
- .metricsReporter(metricsReporter);
+ .metricsReporter(metricsReporter());
if (startSnapshotId != null) {
scan = scan.fromSnapshotExclusive(startSnapshotId);
@@ -580,11 +422,11 @@ public Scan buildChangelogScan() {
scan = configureSplitPlanning(scan);
return new SparkChangelogScan(
- spark, table, scan, readConf, expectedSchema, filterExpressions, emptyScan);
+ spark(), table(), scan, readConf(), expectedSchema, filters(), emptyScan);
}
private Long getStartSnapshotId(Long startTimestamp) {
- Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table, startTimestamp);
+ Snapshot oldestSnapshotAfter = SnapshotUtil.oldestAncestorAfter(table(), startTimestamp);
if (oldestSnapshotAfter == null) {
return null;
@@ -597,7 +439,7 @@ private Long getStartSnapshotId(Long startTimestamp) {
private Long getEndSnapshotId(Long endTimestamp) {
Long endSnapshotId = null;
- for (Snapshot snapshot : SnapshotUtil.currentAncestors(table)) {
+ for (Snapshot snapshot : SnapshotUtil.currentAncestors(table())) {
if (snapshot.timestampMillis() <= endTimestamp) {
endSnapshotId = snapshot.snapshotId();
break;
@@ -608,29 +450,31 @@ private Long getEndSnapshotId(Long endTimestamp) {
public Scan buildMergeOnReadScan() {
Preconditions.checkArgument(
- readConf.snapshotId() == null && readConf.asOfTimestamp() == null && readConf.tag() == null,
+ readConf().snapshotId() == null
+ && readConf().asOfTimestamp() == null
+ && readConf().tag() == null,
"Cannot set time travel options %s, %s, %s for row-level command scans",
SparkReadOptions.SNAPSHOT_ID,
SparkReadOptions.AS_OF_TIMESTAMP,
SparkReadOptions.TAG);
Preconditions.checkArgument(
- readConf.startSnapshotId() == null && readConf.endSnapshotId() == null,
+ readConf().startSnapshotId() == null && readConf().endSnapshotId() == null,
"Cannot set incremental scan options %s and %s for row-level command scans",
SparkReadOptions.START_SNAPSHOT_ID,
SparkReadOptions.END_SNAPSHOT_ID);
- Snapshot snapshot = SnapshotUtil.latestSnapshot(table, readConf.branch());
+ Snapshot snapshot = SnapshotUtil.latestSnapshot(table(), readConf().branch());
if (snapshot == null) {
return new SparkBatchQueryScan(
- spark,
- table,
+ spark(),
+ table(),
null,
- readConf,
+ readConf(),
projectionWithMetadataColumns(),
- filterExpressions,
- metricsReporter::scanReport);
+ filters(),
+ metricsReporter()::scanReport);
}
// remember the current snapshot ID for commit validation
@@ -639,41 +483,41 @@ public Scan buildMergeOnReadScan() {
CaseInsensitiveStringMap adjustedOptions =
Spark3Util.setOption(SparkReadOptions.SNAPSHOT_ID, Long.toString(snapshotId), options);
SparkReadConf adjustedReadConf =
- new SparkReadConf(spark, table, readConf.branch(), adjustedOptions);
+ new SparkReadConf(spark(), table(), readConf().branch(), adjustedOptions);
Schema expectedSchema = projectionWithMetadataColumns();
BatchScan scan =
newBatchScan()
.useSnapshot(snapshotId)
- .caseSensitive(caseSensitive)
- .filter(filterExpression())
+ .caseSensitive(caseSensitive())
+ .filter(filter())
.project(expectedSchema)
- .metricsReporter(metricsReporter);
+ .metricsReporter(metricsReporter());
scan = configureSplitPlanning(scan);
return new SparkBatchQueryScan(
- spark,
- table,
+ spark(),
+ table(),
scan,
adjustedReadConf,
expectedSchema,
- filterExpressions,
- metricsReporter::scanReport);
+ filters(),
+ metricsReporter()::scanReport);
}
public Scan buildCopyOnWriteScan() {
- Snapshot snapshot = SnapshotUtil.latestSnapshot(table, readConf.branch());
+ Snapshot snapshot = SnapshotUtil.latestSnapshot(table(), readConf().branch());
if (snapshot == null) {
return new SparkCopyOnWriteScan(
- spark,
- table,
- readConf,
+ spark(),
+ table(),
+ readConf(),
projectionWithMetadataColumns(),
- filterExpressions,
- metricsReporter::scanReport);
+ filters(),
+ metricsReporter()::scanReport);
}
Schema expectedSchema = projectionWithMetadataColumns();
@@ -682,50 +526,22 @@ public Scan buildCopyOnWriteScan() {
newBatchScan()
.useSnapshot(snapshot.snapshotId())
.ignoreResiduals()
- .caseSensitive(caseSensitive)
- .filter(filterExpression())
+ .caseSensitive(caseSensitive())
+ .filter(filter())
.project(expectedSchema)
- .metricsReporter(metricsReporter);
+ .metricsReporter(metricsReporter());
scan = configureSplitPlanning(scan);
return new SparkCopyOnWriteScan(
- spark,
- table,
+ spark(),
+ table(),
scan,
snapshot,
- readConf,
+ readConf(),
expectedSchema,
- filterExpressions,
- metricsReporter::scanReport);
- }
-
- private > T configureSplitPlanning(T scan) {
- T configuredScan = scan;
-
- Long splitSize = readConf.splitSizeOption();
- if (splitSize != null) {
- configuredScan = configuredScan.option(TableProperties.SPLIT_SIZE, String.valueOf(splitSize));
- }
-
- Integer splitLookback = readConf.splitLookbackOption();
- if (splitLookback != null) {
- configuredScan =
- configuredScan.option(TableProperties.SPLIT_LOOKBACK, String.valueOf(splitLookback));
- }
-
- Long splitOpenFileCost = readConf.splitOpenFileCostOption();
- if (splitOpenFileCost != null) {
- configuredScan =
- configuredScan.option(
- TableProperties.SPLIT_OPEN_FILE_COST, String.valueOf(splitOpenFileCost));
- }
-
- if (null != limit) {
- configuredScan = configuredScan.minRowsRequested(limit.longValue());
- }
-
- return configuredScan;
+ filters(),
+ metricsReporter()::scanReport);
}
@Override
@@ -739,16 +555,10 @@ public StructType readSchema() {
}
private BatchScan newBatchScan() {
- if (readConf.distributedPlanningEnabled()) {
- return new SparkDistributedDataScan(spark, table, readConf);
+ if (readConf().distributedPlanningEnabled()) {
+ return new SparkDistributedDataScan(spark(), table(), readConf());
} else {
- return table.newBatchScan();
+ return table().newBatchScan();
}
}
-
- @Override
- public boolean pushLimit(int pushedLimit) {
- this.limit = pushedLimit;
- return true;
- }
}
diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java
index 7164c53a3d98..bf7d79ac3491 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java
@@ -18,78 +18,27 @@
*/
package org.apache.iceberg.spark.source;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.SparkReadConf;
-import org.apache.iceberg.spark.SparkSchemaUtil;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.read.Scan;
-import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
-class SparkStagedScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns {
+class SparkStagedScanBuilder extends BaseSparkScanBuilder
+ implements SupportsPushDownRequiredColumns {
- private final SparkSession spark;
- private final Table table;
private final String taskSetId;
- private final SparkReadConf readConf;
- private final List metaColumns = Lists.newArrayList();
-
- private Schema schema;
SparkStagedScanBuilder(
SparkSession spark, Table table, String taskSetId, CaseInsensitiveStringMap options) {
- this.spark = spark;
- this.table = table;
+ super(spark, table, table.schema(), options);
this.taskSetId = taskSetId;
- this.readConf = new SparkReadConf(spark, table, options);
- this.schema = table.schema();
}
@Override
public Scan build() {
- return new SparkStagedScan(spark, table, schemaWithMetadataColumns(), taskSetId, readConf);
- }
-
- @Override
- public void pruneColumns(StructType requestedSchema) {
- StructType requestedProjection = removeMetaColumns(requestedSchema);
- this.schema = SparkSchemaUtil.prune(schema, requestedProjection);
-
- Stream.of(requestedSchema.fields())
- .map(StructField::name)
- .filter(MetadataColumns::isMetadataColumn)
- .distinct()
- .forEach(metaColumns::add);
- }
-
- private StructType removeMetaColumns(StructType structType) {
- return new StructType(
- Stream.of(structType.fields())
- .filter(field -> MetadataColumns.nonMetadataColumn(field.name()))
- .toArray(StructField[]::new));
- }
-
- private Schema schemaWithMetadataColumns() {
- // metadata columns
- List fields =
- metaColumns.stream()
- .distinct()
- .map(name -> MetadataColumns.metadataColumn(table, name))
- .collect(Collectors.toList());
- Schema meta = new Schema(fields);
-
- // schema of rows returned by readers
- return TypeUtil.join(schema, meta);
+ Schema projection = projectionWithMetadataColumns();
+ return new SparkStagedScan(spark(), table(), projection, taskSetId, readConf());
}
}
diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
index f2d7a7f7d590..b6c4a1998085 100644
--- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
@@ -223,8 +223,7 @@ public void testUnpartitionedCaseInsensitiveIDFilters() {
for (int i = 0; i < 10; i += 1) {
SparkScanBuilder builder =
- new SparkScanBuilder(spark, TABLES.load(options.get("path")), options)
- .caseSensitive(false);
+ new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
pushFilters(
builder,