From d2c7cd5ce052879fc73a9e795959285f13ac8219 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 11 Feb 2026 15:07:50 -0800 Subject: [PATCH 1/4] Spark 4.1: Separate compaction and main operations --- ...TestMetaColumnProjectionWithStageScan.java | 10 +- .../spark/SparkCachedTableCatalog.java | 257 ------------------ .../apache/iceberg/spark/SparkReadConf.java | 4 - .../iceberg/spark/SparkReadOptions.java | 3 - .../spark/SparkRewriteTableCatalog.java | 120 ++++++++ .../apache/iceberg/spark/SparkWriteConf.java | 7 - .../iceberg/spark/SparkWriteOptions.java | 3 - .../SparkBinPackFileRewriteRunner.java | 2 - .../SparkRewritePositionDeleteRunner.java | 2 - .../SparkShufflingFileRewriteRunner.java | 9 +- .../iceberg/spark/source/BaseSparkTable.java | 165 +++++++++++ .../iceberg/spark/source/IcebergSource.java | 32 +-- .../source/SparkPositionDeletesRewrite.java | 6 +- .../SparkPositionDeletesRewriteBuilder.java | 21 +- .../spark/source/SparkRewriteTable.java | 74 +++++ .../source/SparkRewriteWriteBuilder.java | 91 +++++++ .../iceberg/spark/source/SparkStagedScan.java | 21 +- .../spark/source/SparkStagedScanBuilder.java | 7 +- .../iceberg/spark/source/SparkTable.java | 12 +- .../spark/source/SparkWriteBuilder.java | 17 +- .../spark/TestFileRewriteCoordinator.java | 71 ++--- .../spark/TestSparkCachedTableCatalog.java | 105 ------- .../source/TestPositionDeletesTable.java | 91 +++---- .../spark/source/TestSparkStagedScan.java | 26 +- 24 files changed, 579 insertions(+), 577 deletions(-) delete mode 100644 spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java create mode 100644 spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkRewriteTableCatalog.java create mode 100644 spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkTable.java create mode 100644 spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteTable.java create mode 100644 spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteWriteBuilder.java delete mode 100644 spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java index b783a006ef73..56191b38ef14 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetaColumnProjectionWithStageScan.java @@ -33,6 +33,7 @@ import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkTableCache; import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -88,32 +89,31 @@ public void testReadStageTableMeta() throws Exception { Table table = Spark3Util.loadIcebergTable(spark, tableName); table.refresh(); - String tableLocation = table.location(); try (CloseableIterable tasks = table.newBatchScan().planFiles()) { String fileSetID = UUID.randomUUID().toString(); + SparkTableCache.get().add(fileSetID, table); stageTask(table, fileSetID, tasks); Dataset scanDF2 = spark .read() .format("iceberg") .option(SparkReadOptions.FILE_OPEN_COST, "0") - .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) - .load(tableLocation); + .load(fileSetID); assertThat(scanDF2.columns()).hasSize(2); } try (CloseableIterable tasks = table.newBatchScan().planFiles()) { String fileSetID = UUID.randomUUID().toString(); + SparkTableCache.get().add(fileSetID, table); stageTask(table, fileSetID, tasks); Dataset scanDF = spark .read() .format("iceberg") .option(SparkReadOptions.FILE_OPEN_COST, "0") - .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) - .load(tableLocation) + .load(fileSetID) .select("*", "_pos"); List rows = scanDF.collectAsList(); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java deleted file mode 100644 index 28427f597b06..000000000000 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.spark; - -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Stream; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.Table; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.base.Splitter; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.spark.source.SparkTable; -import org.apache.iceberg.util.Pair; -import org.apache.iceberg.util.SnapshotUtil; -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; -import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; -import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.TableCatalog; -import org.apache.spark.sql.connector.catalog.TableChange; -import org.apache.spark.sql.connector.expressions.Transform; -import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; - -/** An internal table catalog that is capable of loading tables from a cache. */ -public class SparkCachedTableCatalog implements TableCatalog, SupportsFunctions { - - private static final String CLASS_NAME = SparkCachedTableCatalog.class.getName(); - private static final Splitter COMMA = Splitter.on(","); - private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)"); - private static final Pattern SNAPSHOT_ID = Pattern.compile("snapshot_id_(\\d+)"); - private static final Pattern BRANCH = Pattern.compile("branch_(.*)"); - private static final Pattern TAG = Pattern.compile("tag_(.*)"); - private static final String REWRITE = "rewrite"; - - private static final SparkTableCache TABLE_CACHE = SparkTableCache.get(); - - private String name = null; - - @Override - public Identifier[] listTables(String[] namespace) { - throw new UnsupportedOperationException(CLASS_NAME + " does not support listing tables"); - } - - @Override - public SparkTable loadTable(Identifier ident) throws NoSuchTableException { - return load(ident); - } - - @Override - public SparkTable loadTable(Identifier ident, String version) throws NoSuchTableException { - SparkTable table = load(ident); - Preconditions.checkArgument( - table.snapshotId() == null, "Cannot time travel based on both table identifier and AS OF"); - return table.copyWithSnapshotId(Long.parseLong(version)); - } - - @Override - public SparkTable loadTable(Identifier ident, long timestampMicros) throws NoSuchTableException { - SparkTable table = load(ident); - Preconditions.checkArgument( - table.snapshotId() == null, "Cannot time travel based on both table identifier and AS OF"); - // Spark passes microseconds but Iceberg uses milliseconds for snapshots - long timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestampMicros); - long snapshotId = SnapshotUtil.snapshotIdAsOfTime(table.table(), timestampMillis); - return table.copyWithSnapshotId(snapshotId); - } - - @Override - public void invalidateTable(Identifier ident) { - throw new UnsupportedOperationException(CLASS_NAME + " does not support table invalidation"); - } - - @Override - public SparkTable createTable( - Identifier ident, StructType schema, Transform[] partitions, Map properties) - throws TableAlreadyExistsException { - throw new UnsupportedOperationException(CLASS_NAME + " does not support creating tables"); - } - - @Override - public SparkTable alterTable(Identifier ident, TableChange... changes) { - throw new UnsupportedOperationException(CLASS_NAME + " does not support altering tables"); - } - - @Override - public boolean dropTable(Identifier ident) { - throw new UnsupportedOperationException(CLASS_NAME + " does not support dropping tables"); - } - - @Override - public boolean purgeTable(Identifier ident) throws UnsupportedOperationException { - throw new UnsupportedOperationException(CLASS_NAME + " does not support purging tables"); - } - - @Override - public void renameTable(Identifier oldIdent, Identifier newIdent) { - throw new UnsupportedOperationException(CLASS_NAME + " does not support renaming tables"); - } - - @Override - public void initialize(String catalogName, CaseInsensitiveStringMap options) { - this.name = catalogName; - } - - @Override - public String name() { - return name; - } - - private SparkTable load(Identifier ident) throws NoSuchTableException { - Preconditions.checkArgument( - ident.namespace().length == 0, CLASS_NAME + " does not support namespaces"); - - Pair> parsedIdent = parseIdent(ident); - String key = parsedIdent.first(); - TableLoadOptions options = parseLoadOptions(parsedIdent.second()); - - Table table = TABLE_CACHE.get(key); - - if (table == null) { - throw new NoSuchTableException(ident); - } - - if (options.isTableRewrite()) { - return new SparkTable(table, null, false, true); - } - - if (options.snapshotId() != null) { - return new SparkTable(table, options.snapshotId(), false); - } else if (options.asOfTimestamp() != null) { - return new SparkTable( - table, SnapshotUtil.snapshotIdAsOfTime(table, options.asOfTimestamp()), false); - } else if (options.branch() != null) { - Snapshot branchSnapshot = table.snapshot(options.branch()); - Preconditions.checkArgument( - branchSnapshot != null, - "Cannot find snapshot associated with branch name: %s", - options.branch()); - return new SparkTable(table, branchSnapshot.snapshotId(), false); - } else if (options.tag() != null) { - Snapshot tagSnapshot = table.snapshot(options.tag()); - Preconditions.checkArgument( - tagSnapshot != null, "Cannot find snapshot associated with tag name: %s", options.tag()); - return new SparkTable(table, tagSnapshot.snapshotId(), false); - } else { - return new SparkTable(table, false); - } - } - - private static class TableLoadOptions { - private Long asOfTimestamp; - private Long snapshotId; - private String branch; - private String tag; - private Boolean isTableRewrite; - - Long asOfTimestamp() { - return asOfTimestamp; - } - - Long snapshotId() { - return snapshotId; - } - - String branch() { - return branch; - } - - String tag() { - return tag; - } - - boolean isTableRewrite() { - return Boolean.TRUE.equals(isTableRewrite); - } - } - - /** Extracts table load options from metadata. */ - private TableLoadOptions parseLoadOptions(List metadata) { - TableLoadOptions opts = new TableLoadOptions(); - for (String meta : metadata) { - Matcher timeBasedMatcher = AT_TIMESTAMP.matcher(meta); - if (timeBasedMatcher.matches()) { - opts.asOfTimestamp = Long.parseLong(timeBasedMatcher.group(1)); - continue; - } - - Matcher snapshotBasedMatcher = SNAPSHOT_ID.matcher(meta); - if (snapshotBasedMatcher.matches()) { - opts.snapshotId = Long.parseLong(snapshotBasedMatcher.group(1)); - continue; - } - - Matcher branchBasedMatcher = BRANCH.matcher(meta); - if (branchBasedMatcher.matches()) { - opts.branch = branchBasedMatcher.group(1); - continue; - } - - Matcher tagBasedMatcher = TAG.matcher(meta); - if (tagBasedMatcher.matches()) { - opts.tag = tagBasedMatcher.group(1); - } - - if (meta.equalsIgnoreCase(REWRITE)) { - opts.isTableRewrite = true; - } - } - - long numberOptions = - Stream.of(opts.snapshotId, opts.asOfTimestamp, opts.branch, opts.tag, opts.isTableRewrite) - .filter(Objects::nonNull) - .count(); - Preconditions.checkArgument( - numberOptions <= 1, - "Can specify only one of snapshot-id (%s), as-of-timestamp (%s), branch (%s), tag (%s), is-table-rewrite (%s)", - opts.snapshotId, - opts.asOfTimestamp, - opts.branch, - opts.tag, - opts.isTableRewrite); - - return opts; - } - - private Pair> parseIdent(Identifier ident) { - int hashIndex = ident.name().lastIndexOf('#'); - if (hashIndex != -1 && !ident.name().endsWith("#")) { - String key = ident.name().substring(0, hashIndex); - List metadata = COMMA.splitToList(ident.name().substring(hashIndex + 1)); - return Pair.of(key, metadata); - } else { - return Pair.of(ident.name(), ImmutableList.of()); - } - } -} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 2788e160d526..16bf76ce30d6 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -131,10 +131,6 @@ public String tag() { return confParser.stringConf().option(SparkReadOptions.TAG).parseOptional(); } - public String scanTaskSetId() { - return confParser.stringConf().option(SparkReadOptions.SCAN_TASK_SET_ID).parseOptional(); - } - public boolean streamingSkipDeleteSnapshots() { return confParser .booleanConf() diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index 17f2bfee69b8..8071b1db5b92 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -62,9 +62,6 @@ private SparkReadOptions() {} // Overrides the table's read.parquet.vectorization.batch-size public static final String VECTORIZATION_BATCH_SIZE = "batch-size"; - // Set ID that is used to fetch scan tasks - public static final String SCAN_TASK_SET_ID = "scan-task-set-id"; - // skip snapshots of type delete while reading stream out of iceberg table public static final String STREAMING_SKIP_DELETE_SNAPSHOTS = "streaming-skip-delete-snapshots"; public static final boolean STREAMING_SKIP_DELETE_SNAPSHOTS_DEFAULT = false; diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkRewriteTableCatalog.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkRewriteTableCatalog.java new file mode 100644 index 000000000000..a1016beb1886 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkRewriteTableCatalog.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.util.Map; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.source.SparkRewriteTable; +import org.apache.iceberg.spark.source.SparkTable; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class SparkRewriteTableCatalog implements TableCatalog, SupportsFunctions { + + private static final String CLASS_NAME = SparkRewriteTableCatalog.class.getName(); + private static final SparkTableCache TABLE_CACHE = SparkTableCache.get(); + + private String name = null; + + @Override + public Identifier[] listTables(String[] namespace) { + throw new UnsupportedOperationException(CLASS_NAME + " does not support listing tables"); + } + + @Override + public SparkRewriteTable loadTable(Identifier ident) throws NoSuchTableException { + validateNoNamespace(ident); + + String groupId = ident.name(); + Table table = TABLE_CACHE.get(groupId); + + if (table == null) { + throw new NoSuchTableException(ident); + } + + return new SparkRewriteTable(table, groupId); + } + + @Override + public SparkTable loadTable(Identifier ident, String version) throws NoSuchTableException { + throw new UnsupportedOperationException(CLASS_NAME + " does not support time travel"); + } + + @Override + public SparkTable loadTable(Identifier ident, long timestampMicros) throws NoSuchTableException { + throw new UnsupportedOperationException(CLASS_NAME + " does not support time travel"); + } + + @Override + public void invalidateTable(Identifier ident) { + throw new UnsupportedOperationException(CLASS_NAME + " does not support table invalidation"); + } + + @Override + public SparkTable createTable( + Identifier ident, StructType schema, Transform[] partitions, Map properties) + throws TableAlreadyExistsException { + throw new UnsupportedOperationException(CLASS_NAME + " does not support creating tables"); + } + + @Override + public SparkTable alterTable(Identifier ident, TableChange... changes) { + throw new UnsupportedOperationException(CLASS_NAME + " does not support altering tables"); + } + + @Override + public boolean dropTable(Identifier ident) { + throw new UnsupportedOperationException(CLASS_NAME + " does not support dropping tables"); + } + + @Override + public boolean purgeTable(Identifier ident) throws UnsupportedOperationException { + throw new UnsupportedOperationException(CLASS_NAME + " does not support purging tables"); + } + + @Override + public void renameTable(Identifier oldIdent, Identifier newIdent) { + throw new UnsupportedOperationException(CLASS_NAME + " does not support renaming tables"); + } + + @Override + public void initialize(String catalogName, CaseInsensitiveStringMap options) { + this.name = catalogName; + } + + @Override + public String name() { + return name; + } + + private void validateNoNamespace(Identifier ident) { + Preconditions.checkArgument( + ident.namespace().length == 0, + "%s does not support namespaces, but got: %s", + CLASS_NAME, + String.join(".", ident.namespace())); + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 96131e0e56dd..6648d7ea389f 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -267,13 +267,6 @@ public Map extraSnapshotMetadata() { return extraSnapshotMetadata; } - public String rewrittenFileSetId() { - return confParser - .stringConf() - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID) - .parseOptional(); - } - public SparkWriteRequirements writeRequirements() { if (ignoreTableDistributionAndOrdering()) { LOG.info("Skipping distribution/ordering: disabled per job configuration"); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index 33db70bae587..40816eef2ffa 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -50,9 +50,6 @@ private SparkWriteOptions() {} // Checks if input schema and table schema are same(default: true) public static final String CHECK_ORDERING = "check-ordering"; - // File scan task set ID that indicates which files must be replaced - public static final String REWRITTEN_FILE_SCAN_TASK_SET_ID = "rewritten-file-scan-task-set-id"; - public static final String OUTPUT_SPEC_ID = "output-spec-id"; public static final String OVERWRITE_MODE = "overwrite-mode"; diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackFileRewriteRunner.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackFileRewriteRunner.java index 6d2ef585b1f5..084e21b1bd21 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackFileRewriteRunner.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackFileRewriteRunner.java @@ -45,7 +45,6 @@ protected void doRewrite(String groupId, RewriteFileGroup group) { spark() .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId) .option(SparkReadOptions.SPLIT_SIZE, group.inputSplitSize()) .option(SparkReadOptions.FILE_OPEN_COST, "0") .load(groupId); @@ -54,7 +53,6 @@ protected void doRewrite(String groupId, RewriteFileGroup group) { scanDF .write() .format("iceberg") - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId) .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, group.maxOutputFileSize()) .option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode(group).modeName()) .option(SparkWriteOptions.OUTPUT_SPEC_ID, group.outputSpecId()) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkRewritePositionDeleteRunner.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkRewritePositionDeleteRunner.java index 4bbd2280565a..4bcf20811014 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkRewritePositionDeleteRunner.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkRewritePositionDeleteRunner.java @@ -105,7 +105,6 @@ protected void doRewrite(String groupId, RewritePositionDeletesGroup group) { spark() .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId) .option(SparkReadOptions.SPLIT_SIZE, group.inputSplitSize()) .option(SparkReadOptions.FILE_OPEN_COST, "0") .load(groupId); @@ -120,7 +119,6 @@ protected void doRewrite(String groupId, RewritePositionDeletesGroup group) { .sortWithinPartitions("file_path", "pos") .write() .format("iceberg") - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId) .option(SparkWriteOptions.TARGET_DELETE_FILE_SIZE_BYTES, group.maxOutputFileSize()) .mode("append") .save(groupId); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java index 569eb252cba5..bc1665dc32c2 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingFileRewriteRunner.java @@ -31,7 +31,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkFunctionCatalog; -import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SortOrderUtil; @@ -104,12 +103,7 @@ public void init(Map options) { @Override public void doRewrite(String groupId, RewriteFileGroup fileGroup) { - Dataset scanDF = - spark() - .read() - .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId) - .load(groupId); + Dataset scanDF = spark().read().format("iceberg").load(groupId); Dataset sortedDF = sortedDF( @@ -122,7 +116,6 @@ public void doRewrite(String groupId, RewriteFileGroup fileGroup) { sortedDF .write() .format("iceberg") - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId) .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, fileGroup.maxOutputFileSize()) .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") .option(SparkWriteOptions.OUTPUT_SPEC_ID, fileGroup.outputSpecId()) diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkTable.java new file mode 100644 index 000000000000..a5d9293a9fc1 --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkTable.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.apache.iceberg.TableProperties.CURRENT_SNAPSHOT_ID; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.apache.iceberg.TableProperties.FORMAT_VERSION; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableUtil; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.MetadataColumn; +import org.apache.spark.sql.connector.catalog.SupportsMetadataColumns; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructType; + +abstract class BaseSparkTable + implements org.apache.spark.sql.connector.catalog.Table, SupportsMetadataColumns { + + private static final String PROVIDER = "provider"; + private static final String FORMAT = "format"; + private static final String LOCATION = "location"; + private static final String SORT_ORDER = "sort-order"; + private static final String IDENTIFIER_FIELDS = "identifier-fields"; + private static final Set RESERVED_PROPERTIES = + ImmutableSet.of( + PROVIDER, + FORMAT, + CURRENT_SNAPSHOT_ID, + LOCATION, + FORMAT_VERSION, + SORT_ORDER, + IDENTIFIER_FIELDS); + + private final Table table; + private final Schema schema; + + private SparkSession lazySpark = null; + private StructType lazySparkSchema = null; + + protected BaseSparkTable(Table table, Schema schema) { + this.table = table; + this.schema = schema; + } + + protected SparkSession spark() { + if (lazySpark == null) { + this.lazySpark = SparkSession.active(); + } + return lazySpark; + } + + public Table table() { + return table; + } + + @Override + public String name() { + return table.toString(); + } + + @Override + public StructType schema() { + if (lazySparkSchema == null) { + this.lazySparkSchema = SparkSchemaUtil.convert(schema); + } + return lazySparkSchema; + } + + @Override + public Transform[] partitioning() { + return Spark3Util.toTransforms(table.spec()); + } + + @Override + public Map properties() { + ImmutableMap.Builder propsBuilder = ImmutableMap.builder(); + + propsBuilder.put(FORMAT, "iceberg/" + fileFormat()); + propsBuilder.put(PROVIDER, "iceberg"); + propsBuilder.put(LOCATION, table.location()); + propsBuilder.put(CURRENT_SNAPSHOT_ID, currentSnapshotId()); + + if (table instanceof BaseTable) { + TableOperations ops = ((BaseTable) table).operations(); + propsBuilder.put(FORMAT_VERSION, String.valueOf(ops.current().formatVersion())); + } + + if (table.sortOrder().isSorted()) { + propsBuilder.put(SORT_ORDER, Spark3Util.describe(table.sortOrder())); + } + + Set identifierFields = table.schema().identifierFieldNames(); + if (!identifierFields.isEmpty()) { + propsBuilder.put(IDENTIFIER_FIELDS, "[" + String.join(",", identifierFields) + "]"); + } + + table.properties().entrySet().stream() + .filter(entry -> !RESERVED_PROPERTIES.contains(entry.getKey())) + .forEach(propsBuilder::put); + + return propsBuilder.build(); + } + + @Override + public MetadataColumn[] metadataColumns() { + List cols = Lists.newArrayList(); + + cols.add(SparkMetadataColumns.SPEC_ID); + cols.add(SparkMetadataColumns.partition(table)); + cols.add(SparkMetadataColumns.FILE_PATH); + cols.add(SparkMetadataColumns.ROW_POSITION); + cols.add(SparkMetadataColumns.IS_DELETED); + + if (TableUtil.supportsRowLineage(table)) { + cols.add(SparkMetadataColumns.ROW_ID); + cols.add(SparkMetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER); + } + + return cols.toArray(SparkMetadataColumn[]::new); + } + + private String fileFormat() { + return table.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + } + + private String currentSnapshotId() { + Snapshot currentSnapshot = table.currentSnapshot(); + return currentSnapshot != null ? String.valueOf(currentSnapshot.snapshotId()) : "none"; + } + + @Override + public String toString() { + return table.toString(); + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index a9df99461558..a0462e8f8982 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -27,12 +27,11 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.PathIdentifier; import org.apache.iceberg.spark.Spark3Util; -import org.apache.iceberg.spark.SparkCachedTableCatalog; import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkRewriteTableCatalog; import org.apache.iceberg.spark.SparkSessionCatalog; import org.apache.iceberg.spark.SparkTableCache; -import org.apache.iceberg.spark.SparkWriteOptions; import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -66,15 +65,14 @@ public class IcebergSource implements DataSourceRegister, SupportsCatalogOptions, SessionConfigSupport { private static final String DEFAULT_CATALOG_NAME = "default_iceberg"; - private static final String DEFAULT_CACHE_CATALOG_NAME = "default_cache_iceberg"; - private static final String DEFAULT_CATALOG = "spark.sql.catalog." + DEFAULT_CATALOG_NAME; - private static final String DEFAULT_CACHE_CATALOG = - "spark.sql.catalog." + DEFAULT_CACHE_CATALOG_NAME; + private static final String REWRITE_CATALOG_NAME = "default_rewrite_catalog"; + private static final String CATALOG_PREFIX = "spark.sql.catalog."; + private static final String DEFAULT_CATALOG = CATALOG_PREFIX + DEFAULT_CATALOG_NAME; + private static final String REWRITE_CATALOG = CATALOG_PREFIX + REWRITE_CATALOG_NAME; private static final String AT_TIMESTAMP = "at_timestamp_"; private static final String SNAPSHOT_ID = "snapshot_id_"; private static final String BRANCH_PREFIX = "branch_"; private static final String TAG_PREFIX = "tag_"; - private static final String REWRITE_SELECTOR = "rewrite"; private static final String[] EMPTY_NAMESPACE = new String[0]; private static final SparkTableCache TABLE_CACHE = SparkTableCache.get(); @@ -165,21 +163,15 @@ private Spark3Util.CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStri selector = TAG_PREFIX + tag; } - String groupId = - options.getOrDefault( - SparkReadOptions.SCAN_TASK_SET_ID, - options.get(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID)); - if (groupId != null) { - selector = REWRITE_SELECTOR; - } - CatalogManager catalogManager = spark.sessionState().catalogManager(); + // return rewrite catalog with path as group ID if table is staged for rewrite if (TABLE_CACHE.contains(path)) { return new Spark3Util.CatalogAndIdentifier( - catalogManager.catalog(DEFAULT_CACHE_CATALOG_NAME), - Identifier.of(EMPTY_NAMESPACE, pathWithSelector(path, selector))); - } else if (path.contains("/")) { + catalogManager.catalog(REWRITE_CATALOG_NAME), Identifier.of(EMPTY_NAMESPACE, path)); + } + + if (path.contains("/")) { // contains a path. Return iceberg default catalog and a PathIdentifier return new Spark3Util.CatalogAndIdentifier( catalogManager.catalog(DEFAULT_CATALOG_NAME), @@ -258,8 +250,8 @@ private static void setupDefaultSparkCatalogs(SparkSession spark) { config.forEach((key, value) -> spark.conf().set(DEFAULT_CATALOG + "." + key, value)); } - if (spark.conf().getOption(DEFAULT_CACHE_CATALOG).isEmpty()) { - spark.conf().set(DEFAULT_CACHE_CATALOG, SparkCachedTableCatalog.class.getName()); + if (spark.conf().getOption(REWRITE_CATALOG).isEmpty()) { + spark.conf().set(REWRITE_CATALOG, SparkRewriteTableCatalog.class.getName()); } } } diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 0ec7084bfd1b..7fc535ffa53d 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -70,13 +70,13 @@ public class SparkPositionDeletesRewrite implements Write { private final JavaSparkContext sparkContext; private final Table table; + private final String fileSetId; private final String queryId; private final FileFormat format; private final long targetFileSize; private final DeleteGranularity deleteGranularity; private final Schema writeSchema; private final StructType dsSchema; - private final String fileSetId; private final int specId; private final StructLike partition; private final Map writeProperties; @@ -86,6 +86,7 @@ public class SparkPositionDeletesRewrite implements Write { * * @param spark Spark session * @param table instance of {@link PositionDeletesTable} + * @param fileSetId file set ID * @param writeConf Spark write config * @param writeInfo Spark write info * @param writeSchema Iceberg output schema @@ -96,6 +97,7 @@ public class SparkPositionDeletesRewrite implements Write { SparkPositionDeletesRewrite( SparkSession spark, Table table, + String fileSetId, SparkWriteConf writeConf, LogicalWriteInfo writeInfo, Schema writeSchema, @@ -104,13 +106,13 @@ public class SparkPositionDeletesRewrite implements Write { StructLike partition) { this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); this.table = table; + this.fileSetId = fileSetId; this.queryId = writeInfo.queryId(); this.format = writeConf.deleteFileFormat(); this.targetFileSize = writeConf.targetDeleteFileSize(); this.deleteGranularity = writeConf.deleteGranularity(); this.writeSchema = writeSchema; this.dsSchema = dsSchema; - this.fileSetId = writeConf.rewrittenFileSetId(); this.specId = specId; this.partition = partition; this.writeProperties = writeConf.writeProperties(); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java index 9fccc05ea25c..5e5d268ab942 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewriteBuilder.java @@ -50,28 +50,25 @@ public class SparkPositionDeletesRewriteBuilder implements WriteBuilder { private final SparkSession spark; private final Table table; + private final String fileSetId; private final SparkWriteConf writeConf; - private final LogicalWriteInfo writeInfo; + private final LogicalWriteInfo info; private final StructType dsSchema; private final Schema writeSchema; SparkPositionDeletesRewriteBuilder( - SparkSession spark, Table table, String branch, LogicalWriteInfo info) { + SparkSession spark, Table table, String fileSetId, LogicalWriteInfo info) { this.spark = spark; this.table = table; - this.writeConf = new SparkWriteConf(spark, table, branch, info.options()); - this.writeInfo = info; + this.fileSetId = fileSetId; + this.writeConf = new SparkWriteConf(spark, table, info.options()); + this.info = info; this.dsSchema = info.schema(); this.writeSchema = SparkSchemaUtil.convert(table.schema(), dsSchema, writeConf.caseSensitive()); } @Override public Write build() { - String fileSetId = writeConf.rewrittenFileSetId(); - - Preconditions.checkArgument( - fileSetId != null, "Can only write to %s via actions", table.name()); - // all files of rewrite group have same partition and spec id ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); List tasks = taskSetManager.fetchTasks(table, fileSetId); @@ -82,10 +79,10 @@ public Write build() { StructLike partition = partition(fileSetId, tasks); return new SparkPositionDeletesRewrite( - spark, table, writeConf, writeInfo, writeSchema, dsSchema, specId, partition); + spark, table, fileSetId, writeConf, info, writeSchema, dsSchema, specId, partition); } - private int specId(String fileSetId, List tasks) { + private static int specId(String fileSetId, List tasks) { Set specIds = tasks.stream().map(t -> t.spec().specId()).collect(Collectors.toSet()); Preconditions.checkArgument( specIds.size() == 1, @@ -95,7 +92,7 @@ private int specId(String fileSetId, List tasks) { return tasks.get(0).spec().specId(); } - private StructLike partition(String fileSetId, List tasks) { + private static StructLike partition(String fileSetId, List tasks) { StructLikeSet partitions = StructLikeSet.create(tasks.get(0).spec().partitionType()); tasks.stream().map(ContentScanTask::partition).forEach(partitions::add); Preconditions.checkArgument( diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteTable.java new file mode 100644 index 000000000000..73d5b34f1c2d --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteTable.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Set; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PositionDeletesTable; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableUtil; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.spark.sql.connector.catalog.SupportsRead; +import org.apache.spark.sql.connector.catalog.SupportsWrite; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class SparkRewriteTable extends BaseSparkTable implements SupportsRead, SupportsWrite { + + private static final Set CAPABILITIES = + ImmutableSet.of(TableCapability.BATCH_READ, TableCapability.BATCH_WRITE); + + private final String groupId; + + public SparkRewriteTable(Table table, String groupId) { + super(table, rewriteSchema(table)); + this.groupId = groupId; + } + + @Override + public Set capabilities() { + return CAPABILITIES; + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + return new SparkStagedScanBuilder(spark(), table(), groupId, options); + } + + @Override + public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { + if (table() instanceof PositionDeletesTable) { + return new SparkPositionDeletesRewriteBuilder(spark(), table(), groupId, info); + } else { + return new SparkRewriteWriteBuilder(spark(), table(), rewriteSchema(table()), groupId, info); + } + } + + private static Schema rewriteSchema(Table table) { + if (TableUtil.supportsRowLineage(table)) { + return MetadataColumns.schemaWithRowLineage(table.schema()); + } else { + return table.schema(); + } + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteWriteBuilder.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteWriteBuilder.java new file mode 100644 index 000000000000..714ac8e4853e --- /dev/null +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkRewriteWriteBuilder.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.iceberg.types.TypeUtil; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.write.BatchWrite; +import org.apache.spark.sql.connector.write.LogicalWriteInfo; +import org.apache.spark.sql.connector.write.Write; +import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.connector.write.streaming.StreamingWrite; + +class SparkRewriteWriteBuilder implements WriteBuilder { + + private final SparkSession spark; + private final Table table; + private final Schema schema; + private final String groupId; + private final SparkWriteConf writeConf; + private final LogicalWriteInfo info; + private final boolean caseSensitive; + private final boolean checkNullability; + private final boolean checkOrdering; + + SparkRewriteWriteBuilder( + SparkSession spark, Table table, Schema schema, String groupId, LogicalWriteInfo info) { + this.spark = spark; + this.table = table; + this.schema = schema; + this.groupId = groupId; + this.writeConf = new SparkWriteConf(spark, table, info.options()); + this.info = info; + this.caseSensitive = writeConf.caseSensitive(); + this.checkNullability = writeConf.checkNullability(); + this.checkOrdering = writeConf.checkOrdering(); + } + + @Override + public Write build() { + Schema writeSchema = validateWriteSchema(); + SparkUtil.validatePartitionTransforms(table.spec()); + String appId = spark.sparkContext().applicationId(); + return new SparkWrite( + spark, + table, + writeConf, + info, + appId, + writeSchema, + info.schema(), + writeConf.writeRequirements()) { + + @Override + public BatchWrite toBatch() { + return asRewrite(groupId); + } + + @Override + public StreamingWrite toStreaming() { + throw new UnsupportedOperationException("Streaming writes are not supported for rewrites"); + } + }; + } + + private Schema validateWriteSchema() { + Schema writeSchema = SparkSchemaUtil.convert(schema, info.schema(), caseSensitive); + TypeUtil.validateWriteSchema(schema, writeSchema, checkNullability, checkOrdering); + return writeSchema; + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java index 394c92273673..435c2cbd15b5 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.source; +import java.util.Collections; import java.util.List; import java.util.Objects; import org.apache.iceberg.ScanTask; @@ -28,8 +29,10 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.ScanTaskSetManager; import org.apache.iceberg.spark.SparkReadConf; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.read.Statistics; class SparkStagedScan extends SparkScan { @@ -40,14 +43,26 @@ class SparkStagedScan extends SparkScan { private List> taskGroups = null; // lazy cache of tasks - SparkStagedScan(SparkSession spark, Table table, Schema expectedSchema, SparkReadConf readConf) { - super(spark, table, readConf, expectedSchema, ImmutableList.of(), null); - this.taskSetId = readConf.scanTaskSetId(); + SparkStagedScan( + SparkSession spark, + Table table, + Schema projection, + String taskSetId, + SparkReadConf readConf) { + super(spark, table, readConf, projection, ImmutableList.of(), null); + this.taskSetId = taskSetId; this.splitSize = readConf.splitSize(); this.splitLookback = readConf.splitLookback(); this.openFileCost = readConf.splitOpenFileCost(); } + @Override + public Statistics estimateStatistics() { + long rowsCount = taskGroups().stream().mapToLong(ScanTaskGroup::estimatedRowsCount).sum(); + long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), rowsCount); + return new Stats(sizeInBytes, rowsCount, Collections.emptyMap()); + } + @Override protected List> taskGroups() { if (taskGroups == null) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java index c5c86c3ebf28..7164c53a3d98 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScanBuilder.java @@ -41,21 +41,24 @@ class SparkStagedScanBuilder implements ScanBuilder, SupportsPushDownRequiredCol private final SparkSession spark; private final Table table; + private final String taskSetId; private final SparkReadConf readConf; private final List metaColumns = Lists.newArrayList(); private Schema schema; - SparkStagedScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) { + SparkStagedScanBuilder( + SparkSession spark, Table table, String taskSetId, CaseInsensitiveStringMap options) { this.spark = spark; this.table = table; + this.taskSetId = taskSetId; this.readConf = new SparkReadConf(spark, table, options); this.schema = table.schema(); } @Override public Scan build() { - return new SparkStagedScan(spark, table, schemaWithMetadataColumns(), readConf); + return new SparkStagedScan(spark, table, schemaWithMetadataColumns(), taskSetId, readConf); } @Override diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 8261d0317804..335d0f72fd45 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -32,7 +32,6 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.PositionDeletesTable; import org.apache.iceberg.Schema; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; @@ -289,10 +288,6 @@ public MetadataColumn[] metadataColumns() { @Override public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { - if (options.containsKey(SparkReadOptions.SCAN_TASK_SET_ID)) { - return new SparkStagedScanBuilder(sparkSession(), icebergTable, options); - } - if (refreshEagerly) { icebergTable.refresh(); } @@ -307,12 +302,7 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { public WriteBuilder newWriteBuilder(LogicalWriteInfo info) { Preconditions.checkArgument( snapshotId == null, "Cannot write to table at a specific snapshot: %s", snapshotId); - - if (icebergTable instanceof PositionDeletesTable) { - return new SparkPositionDeletesRewriteBuilder(sparkSession(), icebergTable, branch, info); - } else { - return new SparkWriteBuilder(sparkSession(), icebergTable, branch, info); - } + return new SparkWriteBuilder(sparkSession(), icebergTable, branch, info); } @Override diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java index 89af7740d988..182e56a861ce 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java @@ -54,7 +54,6 @@ class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, Suppo private final LogicalWriteInfo writeInfo; private final StructType dsSchema; private final String overwriteMode; - private final String rewrittenFileSetId; private boolean overwriteDynamic = false; private boolean overwriteByFilter = false; private Expression overwriteExpr = null; @@ -70,15 +69,12 @@ class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, Suppo this.writeInfo = info; this.dsSchema = info.schema(); this.overwriteMode = writeConf.overwriteMode(); - this.rewrittenFileSetId = writeConf.rewrittenFileSetId(); } public WriteBuilder overwriteFiles(Scan scan, Command command, IsolationLevel isolationLevel) { Preconditions.checkState(!overwriteByFilter, "Cannot overwrite individual files and by filter"); Preconditions.checkState( !overwriteDynamic, "Cannot overwrite individual files and dynamically"); - Preconditions.checkState( - rewrittenFileSetId == null, "Cannot overwrite individual files and rewrite"); this.overwriteFiles = true; this.copyOnWriteScan = (SparkCopyOnWriteScan) scan; @@ -92,8 +88,6 @@ public WriteBuilder overwriteDynamicPartitions() { Preconditions.checkState( !overwriteByFilter, "Cannot overwrite dynamically and by filter: %s", overwriteExpr); Preconditions.checkState(!overwriteFiles, "Cannot overwrite individual files and dynamically"); - Preconditions.checkState( - rewrittenFileSetId == null, "Cannot overwrite dynamically and rewrite"); this.overwriteDynamic = true; return this; @@ -103,7 +97,6 @@ public WriteBuilder overwriteDynamicPartitions() { public WriteBuilder overwrite(Filter[] filters) { Preconditions.checkState( !overwriteFiles, "Cannot overwrite individual files and using filters"); - Preconditions.checkState(rewrittenFileSetId == null, "Cannot overwrite and rewrite"); this.overwriteExpr = SparkFilters.convert(filters); if (overwriteExpr == Expressions.alwaysTrue() && "dynamic".equals(overwriteMode)) { @@ -123,9 +116,7 @@ public Write build() { // operation or if it's a compaction. // In any other case, only null row IDs and sequence numbers would be produced which // means the row lineage columns can be excluded from the output files - boolean writeRequiresRowLineage = - TableUtil.supportsRowLineage(table) - && (overwriteFiles || writeConf.rewrittenFileSetId() != null); + boolean writeRequiresRowLineage = TableUtil.supportsRowLineage(table) && overwriteFiles; boolean writeAlreadyIncludesLineage = dsSchema.exists(field -> field.name().equals(MetadataColumns.ROW_ID.name())); StructType sparkWriteSchema = dsSchema; @@ -156,9 +147,7 @@ public Write build() { @Override public BatchWrite toBatch() { - if (rewrittenFileSetId != null) { - return asRewrite(rewrittenFileSetId); - } else if (overwriteByFilter) { + if (overwriteByFilter) { return asOverwriteByFilter(overwriteExpr); } else if (overwriteDynamic) { return asDynamicOverwrite(); @@ -177,8 +166,6 @@ public StreamingWrite toStreaming() { !overwriteByFilter || overwriteExpr == Expressions.alwaysTrue(), "Unsupported streaming operation: overwrite by filter: %s", overwriteExpr); - Preconditions.checkState( - rewrittenFileSetId == null, "Unsupported streaming operation: rewrite"); if (overwriteByFilter) { return asStreamingOverwrite(); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java index 085eedf45d1d..664c2019151f 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java @@ -71,34 +71,31 @@ public void testBinPackRewrite() throws NoSuchTableException, IOException { long avgFileSize = fileSizes.stream().mapToLong(i -> i).sum() / fileSizes.size(); try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { - String fileSetID = UUID.randomUUID().toString(); + String groupId = UUID.randomUUID().toString(); ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); - taskSetManager.stageTasks(table, fileSetID, Lists.newArrayList(fileScanTasks)); + taskSetManager.stageTasks(table, groupId, Lists.newArrayList(fileScanTasks)); + SparkTableCache.get().add(groupId, table); // read and pack original 4 files into 2 splits Dataset scanDF = spark .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) .option(SparkReadOptions.SPLIT_SIZE, Long.toString(avgFileSize * 2)) .option(SparkReadOptions.FILE_OPEN_COST, "0") - .load(tableName); + .load(groupId); // write the packed data into new files where each split becomes a new file - scanDF - .writeTo(tableName) - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID) - .append(); + scanDF.write().format("iceberg").mode("append").save(groupId); // commit the rewrite FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get(); Set rewrittenFiles = - taskSetManager.fetchTasks(table, fileSetID).stream() + taskSetManager.fetchTasks(table, groupId).stream() .map(t -> t.asFileScanTask().file()) .collect(Collectors.toCollection(DataFileSet::create)); - Set addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID); + Set addedFiles = rewriteCoordinator.fetchNewFiles(table, groupId); table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit(); } @@ -127,20 +124,20 @@ public void testSortRewrite() throws NoSuchTableException, IOException { assertThat(table.snapshots()).as("Should produce 4 snapshots").hasSize(4); try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { - String fileSetID = UUID.randomUUID().toString(); + String groupId = UUID.randomUUID().toString(); ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); - taskSetManager.stageTasks(table, fileSetID, Lists.newArrayList(fileScanTasks)); + taskSetManager.stageTasks(table, groupId, Lists.newArrayList(fileScanTasks)); + SparkTableCache.get().add(groupId, table); // read original 4 files as 4 splits Dataset scanDF = spark .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) .option(SparkReadOptions.SPLIT_SIZE, "134217728") .option(SparkReadOptions.FILE_OPEN_COST, "134217728") - .load(tableName); + .load(groupId); // make sure we disable AQE and set the number of shuffle partitions as the target num files ImmutableMap sqlConf = @@ -151,25 +148,17 @@ public void testSortRewrite() throws NoSuchTableException, IOException { withSQLConf( sqlConf, () -> { - try { - // write new files with sorted records - scanDF - .sort("id") - .writeTo(tableName) - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID) - .append(); - } catch (NoSuchTableException e) { - throw new RuntimeException("Could not replace files", e); - } + // write new files with sorted records + scanDF.sort("id").write().format("iceberg").mode("append").save(groupId); }); // commit the rewrite FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get(); Set rewrittenFiles = - taskSetManager.fetchTasks(table, fileSetID).stream() + taskSetManager.fetchTasks(table, groupId).stream() .map(t -> t.asFileScanTask().file()) .collect(Collectors.toCollection(DataFileSet::create)); - Set addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID); + Set addedFiles = rewriteCoordinator.fetchNewFiles(table, groupId); table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit(); } @@ -196,14 +185,15 @@ public void testCommitMultipleRewrites() throws NoSuchTableException, IOExceptio Table table = validationCatalog.loadTable(tableIdent); - String firstFileSetID = UUID.randomUUID().toString(); + String firstGroupId = UUID.randomUUID().toString(); long firstFileSetSnapshotId = table.currentSnapshot().snapshotId(); ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); try (CloseableIterable tasks = table.newScan().planFiles()) { // stage first 2 files for compaction - taskSetManager.stageTasks(table, firstFileSetID, Lists.newArrayList(tasks)); + taskSetManager.stageTasks(table, firstGroupId, Lists.newArrayList(tasks)); + SparkTableCache.get().add(firstGroupId, table); } // add two more files @@ -212,43 +202,40 @@ public void testCommitMultipleRewrites() throws NoSuchTableException, IOExceptio table.refresh(); - String secondFileSetID = UUID.randomUUID().toString(); + String secondGroupId = UUID.randomUUID().toString(); try (CloseableIterable tasks = table.newScan().appendsAfter(firstFileSetSnapshotId).planFiles()) { // stage 2 more files for compaction - taskSetManager.stageTasks(table, secondFileSetID, Lists.newArrayList(tasks)); + taskSetManager.stageTasks(table, secondGroupId, Lists.newArrayList(tasks)); + SparkTableCache.get().add(secondGroupId, table); } - ImmutableSet fileSetIDs = ImmutableSet.of(firstFileSetID, secondFileSetID); + ImmutableSet groupIds = ImmutableSet.of(firstGroupId, secondGroupId); - for (String fileSetID : fileSetIDs) { + for (String groupId : groupIds) { // read and pack 2 files into 1 split Dataset scanDF = spark .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) .option(SparkReadOptions.SPLIT_SIZE, Long.MAX_VALUE) - .load(tableName); + .load(groupId); // write the combined data as one file - scanDF - .writeTo(tableName) - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID) - .append(); + scanDF.write().format("iceberg").mode("append").save(groupId); } // commit both rewrites at the same time FileRewriteCoordinator rewriteCoordinator = FileRewriteCoordinator.get(); Set rewrittenFiles = - fileSetIDs.stream() - .flatMap(fileSetID -> taskSetManager.fetchTasks(table, fileSetID).stream()) + groupIds.stream() + .flatMap(groupId -> taskSetManager.fetchTasks(table, groupId).stream()) .map(t -> t.asFileScanTask().file()) .collect(Collectors.toSet()); Set addedFiles = - fileSetIDs.stream() - .flatMap(fileSetID -> rewriteCoordinator.fetchNewFiles(table, fileSetID).stream()) + groupIds.stream() + .flatMap(groupId -> rewriteCoordinator.fetchNewFiles(table, groupId).stream()) .collect(Collectors.toCollection(DataFileSet::create)); table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit(); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java deleted file mode 100644 index 228bf43b89b1..000000000000 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkCachedTableCatalog.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.spark; - -import org.apache.iceberg.ParameterizedTestExtension; -import org.apache.iceberg.Parameters; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.Table; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; - -@ExtendWith(ParameterizedTestExtension.class) -public class TestSparkCachedTableCatalog extends TestBaseWithCatalog { - - private static final SparkTableCache TABLE_CACHE = SparkTableCache.get(); - - @BeforeAll - public static void setupCachedTableCatalog() { - spark.conf().set("spark.sql.catalog.testcache", SparkCachedTableCatalog.class.getName()); - } - - @AfterAll - public static void unsetCachedTableCatalog() { - spark.conf().unset("spark.sql.catalog.testcache"); - } - - @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") - protected static Object[][] parameters() { - return new Object[][] { - { - SparkCatalogConfig.HIVE.catalogName(), - SparkCatalogConfig.HIVE.implementation(), - SparkCatalogConfig.HIVE.properties() - }, - }; - } - - @TestTemplate - public void testTimeTravel() { - sql("CREATE TABLE %s (id INT, dep STRING) USING iceberg", tableName); - - Table table = validationCatalog.loadTable(tableIdent); - - sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName); - - table.refresh(); - Snapshot firstSnapshot = table.currentSnapshot(); - waitUntilAfter(firstSnapshot.timestampMillis()); - - sql("INSERT INTO TABLE %s VALUES (2, 'hr')", tableName); - - table.refresh(); - Snapshot secondSnapshot = table.currentSnapshot(); - waitUntilAfter(secondSnapshot.timestampMillis()); - - sql("INSERT INTO TABLE %s VALUES (3, 'hr')", tableName); - - table.refresh(); - - try { - TABLE_CACHE.add("key", table); - - assertEquals( - "Should have expected rows in 3rd snapshot", - ImmutableList.of(row(1, "hr"), row(2, "hr"), row(3, "hr")), - sql("SELECT * FROM testcache.key ORDER BY id")); - - assertEquals( - "Should have expected rows in 2nd snapshot", - ImmutableList.of(row(1, "hr"), row(2, "hr")), - sql( - "SELECT * FROM testcache.`key#at_timestamp_%s` ORDER BY id", - secondSnapshot.timestampMillis())); - - assertEquals( - "Should have expected rows in 1st snapshot", - ImmutableList.of(row(1, "hr")), - sql( - "SELECT * FROM testcache.`key#snapshot_id_%d` ORDER BY id", - firstSnapshot.snapshotId())); - - } finally { - TABLE_CACHE.remove("key"); - } - } -} diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java index 7892fd65b405..194b7921892a 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java @@ -71,7 +71,7 @@ import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.spark.SparkStructLike; -import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.spark.SparkTableCache; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.CharSequenceSet; @@ -846,21 +846,18 @@ public void testWrite() throws IOException, NoSuchTableException { for (String partValue : ImmutableList.of("a", "b")) { try (CloseableIterable tasks = tasks(posDeletesTable, "data", partValue)) { String fileSetID = UUID.randomUUID().toString(); + SparkTableCache.get().add(fileSetID, posDeletesTable); stageTask(tab, fileSetID, tasks); Dataset scanDF = spark .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE) - .load(posDeletesTableName); + .load(fileSetID); assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1); - scanDF - .writeTo(posDeletesTableName) - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID) - .append(); + scanDF.write().format("iceberg").mode("append").save(fileSetID); commit(tab, posDeletesTable, fileSetID, 1); } @@ -914,20 +911,17 @@ public void testWriteUnpartitionedNullRows() throws Exception { String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes"; try (CloseableIterable tasks = posDeletesTable.newBatchScan().planFiles()) { String fileSetID = UUID.randomUUID().toString(); + SparkTableCache.get().add(fileSetID, posDeletesTable); stageTask(tab, fileSetID, tasks); Dataset scanDF = spark .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE) - .load(posDeletesTableName); + .load(fileSetID); assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1); - scanDF - .writeTo(posDeletesTableName) - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID) - .append(); + scanDF.write().format("iceberg").mode("append").save(fileSetID); commit(tab, posDeletesTable, fileSetID, 1); } @@ -990,19 +984,17 @@ public void testWriteMixedRows() throws Exception { for (String partValue : ImmutableList.of("a", "b")) { try (CloseableIterable tasks = tasks(posDeletesTable, "data", partValue)) { String fileSetID = UUID.randomUUID().toString(); + SparkTableCache.get().add(fileSetID, posDeletesTable); stageTask(tab, fileSetID, tasks); Dataset scanDF = spark .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) - .load(posDeletesTableName); + .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE) + .load(fileSetID); assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1); - scanDF - .writeTo(posDeletesTableName) - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID) - .append(); + scanDF.write().format("iceberg").mode("append").save(fileSetID); commit(tab, posDeletesTable, fileSetID, 1); } @@ -1073,20 +1065,17 @@ public void testWritePartitionEvolutionAdd() throws Exception { try (CloseableIterable tasks = posDeletesTable.newBatchScan().filter(Expressions.isNull("partition.data")).planFiles()) { String fileSetID = UUID.randomUUID().toString(); + SparkTableCache.get().add(fileSetID, posDeletesTable); stageTask(tab, fileSetID, tasks); Dataset scanDF = spark .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE) - .load(posDeletesTableName); + .load(fileSetID); assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1); - scanDF - .writeTo(posDeletesTableName) - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID) - .append(); + scanDF.write().format("iceberg").mode("append").save(fileSetID); commit(tab, posDeletesTable, fileSetID, 1); } @@ -1117,20 +1106,17 @@ public void testWritePartitionEvolutionAdd() throws Exception { for (String partValue : ImmutableList.of("a", "b")) { try (CloseableIterable tasks = tasks(posDeletesTable, "data", partValue)) { String fileSetID = UUID.randomUUID().toString(); + SparkTableCache.get().add(fileSetID, posDeletesTable); stageTask(tab, fileSetID, tasks); Dataset scanDF = spark .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE) - .load(posDeletesTableName); + .load(fileSetID); assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1); - scanDF - .writeTo(posDeletesTableName) - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID) - .append(); + scanDF.write().format("iceberg").mode("append").save(fileSetID); // commit the rewrite commit(tab, posDeletesTable, fileSetID, 1); @@ -1181,33 +1167,29 @@ public void testWritePartitionEvolutionDisallowed() throws Exception { Dataset scanDF; String fileSetID = UUID.randomUUID().toString(); try (CloseableIterable tasks = posDeletesTable.newBatchScan().planFiles()) { + SparkTableCache.get().add(fileSetID, posDeletesTable); stageTask(tab, fileSetID, tasks); scanDF = spark .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE) - .load(posDeletesTableName); + .load(fileSetID); assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1); // Add partition field to render the original un-partitioned dataset un-commitable tab.updateSpec().addField("data").commit(); } - scanDF - .writeTo(posDeletesTableName) - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID) - .append(); + scanDF.write().format("iceberg").mode("append").save(fileSetID); scanDF = spark .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE) - .load(posDeletesTableName); + .load(fileSetID); assertThat(Arrays.asList(scanDF.columns()).contains("partition")); dropTable(tableName); @@ -1252,21 +1234,18 @@ public void testWriteSchemaEvolutionAdd() throws Exception { // rewrite files of old schema try (CloseableIterable tasks = tasks(posDeletesTable, "data", "a")) { String fileSetID = UUID.randomUUID().toString(); + SparkTableCache.get().add(fileSetID, posDeletesTable); stageTask(tab, fileSetID, tasks); Dataset scanDF = spark .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE) - .load(posDeletesTableName); + .load(fileSetID); assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1); - scanDF - .writeTo(posDeletesTableName) - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID) - .append(); + scanDF.write().format("iceberg").mode("append").save(fileSetID); commit(tab, posDeletesTable, fileSetID, 1); } @@ -1300,21 +1279,18 @@ public void testWriteSchemaEvolutionAdd() throws Exception { // rewrite files of new schema try (CloseableIterable tasks = tasks(posDeletesTable, "data", "c")) { String fileSetID = UUID.randomUUID().toString(); + SparkTableCache.get().add(fileSetID, posDeletesTable); stageTask(tab, fileSetID, tasks); Dataset scanDF = spark .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE) - .load(posDeletesTableName); + .load(fileSetID); assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1); - scanDF - .writeTo(posDeletesTableName) - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID) - .append(); + scanDF.write().format("iceberg").mode("append").save(fileSetID); commit(tab, posDeletesTable, fileSetID, 1); } @@ -1377,20 +1353,17 @@ public void testWriteSchemaEvolutionRemove() throws Exception { for (String partValue : ImmutableList.of("a", "b", "c", "d")) { try (CloseableIterable tasks = tasks(posDeletesTable, "data", partValue)) { String fileSetID = UUID.randomUUID().toString(); + SparkTableCache.get().add(fileSetID, posDeletesTable); stageTask(tab, fileSetID, tasks); Dataset scanDF = spark .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID) .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE) - .load(posDeletesTableName); + .load(fileSetID); assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1); - scanDF - .writeTo(posDeletesTableName) - .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, fileSetID) - .append(); + scanDF.write().format("iceberg").mode("append").save(fileSetID); commit(tab, posDeletesTable, fileSetID, 1); } @@ -1453,8 +1426,8 @@ public void testNormalWritesNotAllowed() throws IOException { Dataset scanDF = spark.read().format("iceberg").load(posDeletesTableName); assertThatThrownBy(() -> scanDF.writeTo(posDeletesTableName).append()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Can only write to " + posDeletesTableName + " via actions"); + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Cannot append to a metadata table"); dropTable(tableName); } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java index e444b7cb1f7c..de07b7471f6e 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkStagedScan.java @@ -31,6 +31,7 @@ import org.apache.iceberg.spark.CatalogTestBase; import org.apache.iceberg.spark.ScanTaskSetManager; import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkTableCache; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -60,16 +61,12 @@ public void testTaskSetLoading() throws NoSuchTableException, IOException { try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); - String setID = UUID.randomUUID().toString(); - taskSetManager.stageTasks(table, setID, ImmutableList.copyOf(fileScanTasks)); + String groupId = UUID.randomUUID().toString(); + taskSetManager.stageTasks(table, groupId, ImmutableList.copyOf(fileScanTasks)); + SparkTableCache.get().add(groupId, table); - // load the staged file set - Dataset scanDF = - spark - .read() - .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, setID) - .load(tableName); + // load the staged file set via the rewrite catalog + Dataset scanDF = spark.read().format("iceberg").load(groupId); // write the records back essentially duplicating data scanDF.writeTo(tableName).append(); @@ -96,18 +93,18 @@ public void testTaskSetPlanning() throws NoSuchTableException, IOException { try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); - String setID = UUID.randomUUID().toString(); + String groupId = UUID.randomUUID().toString(); List tasks = ImmutableList.copyOf(fileScanTasks); - taskSetManager.stageTasks(table, setID, tasks); + taskSetManager.stageTasks(table, groupId, tasks); + SparkTableCache.get().add(groupId, table); // load the staged file set and make sure each file is in a separate split Dataset scanDF = spark .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, setID) .option(SparkReadOptions.SPLIT_SIZE, tasks.get(0).file().fileSizeInBytes()) - .load(tableName); + .load(groupId); assertThat(scanDF.javaRDD().getNumPartitions()) .as("Num partitions should match") .isEqualTo(2); @@ -117,9 +114,8 @@ public void testTaskSetPlanning() throws NoSuchTableException, IOException { spark .read() .format("iceberg") - .option(SparkReadOptions.SCAN_TASK_SET_ID, setID) .option(SparkReadOptions.SPLIT_SIZE, Long.MAX_VALUE) - .load(tableName); + .load(groupId); assertThat(scanDF.javaRDD().getNumPartitions()) .as("Num partitions should match") .isEqualTo(1); From f73b71c2e0373f9351b488569a93453d4d806f6f Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Thu, 12 Feb 2026 10:46:40 -0800 Subject: [PATCH 2/4] Rework checks for table cache --- .../extensions/TestRewriteDataFilesProcedure.java | 15 ++++++++++++--- .../org/apache/iceberg/spark/SparkTableCache.java | 5 +++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index c652b011ba3a..b37422beacf4 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -830,7 +830,10 @@ public void testBinPackTableWithSpecialChars() { List actualRecords = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)); assertEquals("Data after compaction should not change", expectedRecords, actualRecords); - assertThat(SparkTableCache.get().size()).as("Table cache must be empty").isZero(); + Table table = validationCatalog.loadTable(identifier); + assertThat(SparkTableCache.get().tables()) + .as("Table cache must not contain the test table") + .noneMatch(cachedTable -> cachedTable.uuid().equals(table.uuid())); } @TestTemplate @@ -870,7 +873,10 @@ public void testSortTableWithSpecialChars() { List actualRecords = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)); assertEquals("Data after compaction should not change", expectedRecords, actualRecords); - assertThat(SparkTableCache.get().size()).as("Table cache must be empty").isZero(); + Table table = validationCatalog.loadTable(identifier); + assertThat(SparkTableCache.get().tables()) + .as("Table cache must not contain the test table") + .noneMatch(cachedTable -> cachedTable.uuid().equals(table.uuid())); } @TestTemplate @@ -910,7 +916,10 @@ public void testZOrderTableWithSpecialChars() { List actualRecords = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)); assertEquals("Data after compaction should not change", expectedRecords, actualRecords); - assertThat(SparkTableCache.get().size()).as("Table cache must be empty").isZero(); + Table table = validationCatalog.loadTable(identifier); + assertThat(SparkTableCache.get().tables()) + .as("Table cache must not contain the test table") + .noneMatch(cachedTable -> cachedTable.uuid().equals(table.uuid())); } @TestTemplate diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java index 6218423db491..83c6303d0fa4 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableCache.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark; +import java.util.Collection; import java.util.Map; import org.apache.iceberg.Table; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -51,4 +52,8 @@ public Table get(String key) { public Table remove(String key) { return cache.remove(key); } + + public Collection tables() { + return cache.values(); + } } From d5991fe743b24962b78e875f79e44d402aaf044b Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Mon, 16 Feb 2026 13:00:13 -0800 Subject: [PATCH 3/4] Remove unnecessary change --- .../iceberg/spark/source/TestPositionDeletesTable.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java index 194b7921892a..25341c4ea998 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java @@ -987,12 +987,7 @@ public void testWriteMixedRows() throws Exception { SparkTableCache.get().add(fileSetID, posDeletesTable); stageTask(tab, fileSetID, tasks); - Dataset scanDF = - spark - .read() - .format("iceberg") - .option(SparkReadOptions.FILE_OPEN_COST, Integer.MAX_VALUE) - .load(fileSetID); + Dataset scanDF = spark.read().format("iceberg").load(fileSetID); assertThat(scanDF.javaRDD().getNumPartitions()).isEqualTo(1); scanDF.write().format("iceberg").mode("append").save(fileSetID); From 11639f54db8b2c1d2b3e0d9757968510d04369f7 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Mon, 16 Feb 2026 13:22:02 -0800 Subject: [PATCH 4/4] Remove no longer used local vars --- .../iceberg/spark/source/TestPositionDeletesTable.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java index 25341c4ea998..5641c7b2a03b 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java @@ -842,7 +842,6 @@ public void testWrite() throws IOException, NoSuchTableException { Table posDeletesTable = MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES); - String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes"; for (String partValue : ImmutableList.of("a", "b")) { try (CloseableIterable tasks = tasks(posDeletesTable, "data", partValue)) { String fileSetID = UUID.randomUUID().toString(); @@ -908,7 +907,6 @@ public void testWriteUnpartitionedNullRows() throws Exception { Table posDeletesTable = MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES); - String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes"; try (CloseableIterable tasks = posDeletesTable.newBatchScan().planFiles()) { String fileSetID = UUID.randomUUID().toString(); SparkTableCache.get().add(fileSetID, posDeletesTable); @@ -980,7 +978,6 @@ public void testWriteMixedRows() throws Exception { // rewrite delete files Table posDeletesTable = MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES); - String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes"; for (String partValue : ImmutableList.of("a", "b")) { try (CloseableIterable tasks = tasks(posDeletesTable, "data", partValue)) { String fileSetID = UUID.randomUUID().toString(); @@ -1054,7 +1051,6 @@ public void testWritePartitionEvolutionAdd() throws Exception { Table posDeletesTable = MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES); - String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes"; // Read/write back unpartitioned data try (CloseableIterable tasks = @@ -1224,7 +1220,6 @@ public void testWriteSchemaEvolutionAdd() throws Exception { Table posDeletesTable = MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES); - String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes"; // rewrite files of old schema try (CloseableIterable tasks = tasks(posDeletesTable, "data", "a")) { @@ -1342,7 +1337,6 @@ public void testWriteSchemaEvolutionRemove() throws Exception { Table posDeletesTable = MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES); - String posDeletesTableName = catalogName + ".default." + tableName + ".position_deletes"; // rewrite files for (String partValue : ImmutableList.of("a", "b", "c", "d")) {