From 3b163c360d6a4d926c78f11bea90c1f26713b5a4 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Tue, 10 Feb 2026 11:10:23 -0800 Subject: [PATCH 1/4] Spark 4.1: Align handling of branches in reads and writes --- .../iceberg/spark/extensions/TestDelete.java | 16 +-- .../iceberg/spark/extensions/TestMerge.java | 26 ++--- .../iceberg/spark/extensions/TestUpdate.java | 16 +-- .../apache/iceberg/spark/SparkReadConf.java | 42 +++----- .../apache/iceberg/spark/SparkTableUtil.java | 101 +++++++++++++++--- .../apache/iceberg/spark/SparkWriteConf.java | 46 +++----- .../iceberg/spark/SparkWriteOptions.java | 3 + .../iceberg/spark/source/SparkTable.java | 2 +- .../SparkDistributedDataScanTestBase.java | 3 +- .../TestSparkDistributedDataScanDeletes.java | 3 +- ...stSparkDistributedDataScanFilterFiles.java | 3 +- ...TestSparkDistributedDataScanReporting.java | 3 +- .../TestSparkDistributionAndOrderingUtil.java | 7 +- .../iceberg/spark/TestSparkExecutorCache.java | 6 +- .../iceberg/spark/TestSparkWriteConf.java | 43 ++++---- .../actions/TestRewriteDataFilesAction.java | 2 +- .../sql/TestPartitionedWritesToWapBranch.java | 26 +++-- 17 files changed, 195 insertions(+), 153 deletions(-) diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java index fbf6ce3559a7..7e0f6207edc9 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java @@ -1375,15 +1375,17 @@ public void testDeleteToWapBranchWithTableBranchIdentifier() throws NoSuchTableE append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr")); createBranchIfNeeded(); + // writing to explicit branch should succeed even with WAP branch set withSQLConf( ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"), - () -> - assertThatThrownBy(() -> sql("DELETE FROM %s t WHERE id=0", commitTarget())) - .isInstanceOf(ValidationException.class) - .hasMessage( - String.format( - "Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [wap]", - branch))); + () -> { + sql("DELETE FROM %s t WHERE id=0", commitTarget()); + + assertEquals( + "Should have deleted row in explicit branch", + ImmutableList.of(row(1, "hr"), row(2, "hr")), + sql("SELECT * FROM %s ORDER BY id", commitTarget())); + }); } @TestTemplate diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java index 3f584031d907..b21d1a4042bc 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java @@ -2983,21 +2983,21 @@ public void testMergeToWapBranchWithTableBranchIdentifier() { ImmutableList expectedRows = ImmutableList.of(row(-1), row(0), row(1), row(2), row(3), row(4)); + // writing to explicit branch should succeed even with WAP branch set withSQLConf( ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"), - () -> - assertThatThrownBy( - () -> - sql( - "MERGE INTO %s t USING source s ON t.id = s.id " - + "WHEN MATCHED THEN UPDATE SET *" - + "WHEN NOT MATCHED THEN INSERT *", - commitTarget())) - .isInstanceOf(ValidationException.class) - .hasMessage( - String.format( - "Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [wap]", - branch))); + () -> { + sql( + "MERGE INTO %s t USING source s ON t.id = s.id " + + "WHEN MATCHED THEN UPDATE SET *" + + "WHEN NOT MATCHED THEN INSERT *", + commitTarget()); + + assertEquals( + "Should have expected rows in explicit branch", + expectedRows, + sql("SELECT * FROM %s ORDER BY id", commitTarget())); + }); } private void checkJoinAndFilterConditions(String query, String join, String icebergFilters) { diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index 2afbc697e178..79ea5c2138e4 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -1500,15 +1500,17 @@ public void testUpdateToWapBranchWithTableBranchIdentifier() { "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')", tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED); + // writing to explicit branch should succeed even with WAP branch set withSQLConf( ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"), - () -> - assertThatThrownBy(() -> sql("UPDATE %s SET dep='hr' WHERE dep='a'", commitTarget())) - .isInstanceOf(ValidationException.class) - .hasMessage( - String.format( - "Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [wap]", - branch))); + () -> { + sql("UPDATE %s SET dep='software' WHERE dep='hr'", commitTarget()); + + assertEquals( + "Should have updated row in explicit branch", + ImmutableList.of(row(1, "software")), + sql("SELECT * FROM %s ORDER BY id", commitTarget())); + }); } private RowLevelOperationMode mode(Table table) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 2788e160d526..c1831be8bb2d 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -20,15 +20,13 @@ import static org.apache.iceberg.PlanningMode.LOCAL; -import java.util.Map; import org.apache.iceberg.PlanningMode; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.Util; -import org.apache.iceberg.util.PropertyUtil; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * A class for common Iceberg configs for Spark reads. @@ -57,18 +55,24 @@ public class SparkReadConf { private final SparkSession spark; private final Table table; private final String branch; + private final CaseInsensitiveStringMap options; private final SparkConfParser confParser; - public SparkReadConf(SparkSession spark, Table table, Map readOptions) { - this(spark, table, null, readOptions); + public SparkReadConf(SparkSession spark, Table table) { + this(spark, table, CaseInsensitiveStringMap.empty()); + } + + public SparkReadConf(SparkSession spark, Table table, CaseInsensitiveStringMap options) { + this(spark, table, null, options); } public SparkReadConf( - SparkSession spark, Table table, String branch, Map readOptions) { + SparkSession spark, Table table, String branch, CaseInsensitiveStringMap options) { this.spark = spark; this.table = table; this.branch = branch; - this.confParser = new SparkConfParser(spark, table, readOptions); + this.options = options; + this.confParser = new SparkConfParser(spark, table, options); } public boolean caseSensitive() { @@ -102,29 +106,7 @@ public Long endSnapshotId() { } public String branch() { - String optionBranch = confParser.stringConf().option(SparkReadOptions.BRANCH).parseOptional(); - ValidationException.check( - branch == null || optionBranch == null || optionBranch.equals(branch), - "Must not specify different branches in both table identifier and read option, " - + "got [%s] in identifier and [%s] in options", - branch, - optionBranch); - String inputBranch = branch != null ? branch : optionBranch; - if (inputBranch != null) { - return inputBranch; - } - - boolean wapEnabled = - PropertyUtil.propertyAsBoolean( - table.properties(), TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, false); - if (wapEnabled) { - String wapBranch = spark.conf().get(SparkSQLProperties.WAP_BRANCH, null); - if (wapBranch != null && table.refs().containsKey(wapBranch)) { - return wapBranch; - } - } - - return null; + return SparkTableUtil.determineReadBranch(spark, table, branch, options); } public String tag() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java index 0b4578630a06..593154d72a9c 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java @@ -60,7 +60,6 @@ import org.apache.iceberg.TableOperations; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.TableMigrationUtil; -import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hadoop.SerializableConfiguration; import org.apache.iceberg.hadoop.Util; @@ -1025,34 +1024,102 @@ private static DataSourceV2Relation createRelation( sparkTable, Option.empty(), Option.empty(), options, Option.empty()); } + public static String determineWriteBranch(SparkSession spark, Table table, String branch) { + return determineWriteBranch(spark, table, branch, CaseInsensitiveStringMap.empty()); + } + /** * Determine the write branch. * - *

Validate wap config and determine the write branch. + *

The target branch can be specified via table identifier, write option, or in SQL: + * + *

    + *
  • The identifier and option branches can't conflict. If both are set, they must match. + *
  • Identifier and option branches take priority over the session WAP branch. + *
  • If neither the option nor the identifier branch is set and WAP is enabled for this table, + * use the WAP branch from the session SQL config. + *
+ * + *

Note: WAP ID and WAP branch cannot be set at the same time. + * + *

Note: The target branch may be created during the write operation if it does not exist. * * @param spark a Spark Session - * @param branch write branch if there is no WAP branch configured - * @return branch for write operation + * @param table the table being written to + * @param branch write branch configured via table identifier, or null + * @param options write options + * @return branch for write operation, or null for main branch */ - public static String determineWriteBranch(SparkSession spark, String branch) { + public static String determineWriteBranch( + SparkSession spark, Table table, String branch, CaseInsensitiveStringMap options) { + String optionBranch = options.get(SparkWriteOptions.BRANCH); + if (optionBranch != null) { + Preconditions.checkArgument( + branch == null || optionBranch.equals(branch), + "Explicitly configured branch [%s] and write option [%s] are in conflict", + branch, + optionBranch); + return optionBranch; + } + + if (branch == null && wapEnabled(table)) { + return wapSessionBranch(spark); + } + + return branch; + } + + /** + * Determine the read branch. + * + *

The target branch can be specified via table identifier, read option, or in SQL: + * + *

    + *
  • The identifier and option branches can't conflict. If both are set, they must match. + *
  • Identifier and option branches take priority over the session WAP branch. + *
  • If neither the option nor the identifier branch is set and WAP is enabled for this table, + * use the WAP branch from the session SQL config (only if the branch already exists). + *
+ * + *

Note: WAP ID and WAP branch cannot be set at the same time. + * + * @param spark a Spark Session + * @param table the table being read from + * @param branch read branch configured via table identifier, or null + * @param options read options + * @return branch for read operation, or null for main branch + */ + public static String determineReadBranch( + SparkSession spark, Table table, String branch, CaseInsensitiveStringMap options) { + String optionBranch = options.get(SparkReadOptions.BRANCH); + if (optionBranch != null) { + Preconditions.checkArgument( + branch == null || optionBranch.equals(branch), + "Explicitly configured branch [%s] and read option [%s] are in conflict", + branch, + optionBranch); + return optionBranch; + } + + if (branch == null && wapEnabled(table)) { + String wapBranch = wapSessionBranch(spark); + if (wapBranch != null && table.refs().containsKey(wapBranch)) { + return wapBranch; + } + } + + return branch; + } + + private static String wapSessionBranch(SparkSession spark) { String wapId = spark.conf().get(SparkSQLProperties.WAP_ID, null); String wapBranch = spark.conf().get(SparkSQLProperties.WAP_BRANCH, null); - ValidationException.check( + Preconditions.checkArgument( wapId == null || wapBranch == null, "Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]", wapId, wapBranch); - - if (wapBranch != null) { - ValidationException.check( - branch == null, - "Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [%s]", - branch, - wapBranch); - - return wapBranch; - } - return branch; + return wapBranch; } public static boolean wapEnabled(Table table) { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java index 96131e0e56dd..d021085af6ab 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java @@ -46,7 +46,6 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableUtil; import org.apache.iceberg.deletes.DeleteGranularity; -import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -55,6 +54,7 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.write.RowLevelOperation.Command; import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.JavaConverters; @@ -88,21 +88,25 @@ public class SparkWriteConf { private final Table table; private final String branch; private final RuntimeConfig sessionConf; - private final Map writeOptions; + private final CaseInsensitiveStringMap options; private final SparkConfParser confParser; - public SparkWriteConf(SparkSession spark, Table table, Map writeOptions) { - this(spark, table, null, writeOptions); + public SparkWriteConf(SparkSession spark, Table table) { + this(spark, table, null, CaseInsensitiveStringMap.empty()); + } + + public SparkWriteConf(SparkSession spark, Table table, CaseInsensitiveStringMap options) { + this(spark, table, null, options); } public SparkWriteConf( - SparkSession spark, Table table, String branch, Map writeOptions) { + SparkSession spark, Table table, String branch, CaseInsensitiveStringMap options) { this.spark = spark; this.table = table; this.branch = branch; this.sessionConf = spark.conf(); - this.writeOptions = writeOptions; - this.confParser = new SparkConfParser(spark, table, writeOptions); + this.options = options; + this.confParser = new SparkConfParser(spark, table, options); } public boolean checkNullability() { @@ -124,7 +128,7 @@ public boolean checkOrdering() { } public String overwriteMode() { - String overwriteMode = writeOptions.get(SparkWriteOptions.OVERWRITE_MODE); + String overwriteMode = options.get(SparkWriteOptions.OVERWRITE_MODE); return overwriteMode != null ? overwriteMode.toLowerCase(Locale.ROOT) : null; } @@ -262,7 +266,7 @@ public Map extraSnapshotMetadata() { // Add write options, overriding session configuration if necessary extraSnapshotMetadata.putAll( - PropertyUtil.propertiesWithPrefix(writeOptions, SnapshotSummary.EXTRA_METADATA_PREFIX)); + PropertyUtil.propertiesWithPrefix(options, SnapshotSummary.EXTRA_METADATA_PREFIX)); return extraSnapshotMetadata; } @@ -466,29 +470,7 @@ public boolean caseSensitive() { } public String branch() { - if (wapEnabled()) { - String wapId = wapId(); - String wapBranch = - confParser.stringConf().sessionConf(SparkSQLProperties.WAP_BRANCH).parseOptional(); - - ValidationException.check( - wapId == null || wapBranch == null, - "Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]", - wapId, - wapBranch); - - if (wapBranch != null) { - ValidationException.check( - branch == null, - "Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [%s]", - branch, - wapBranch); - - return wapBranch; - } - } - - return branch; + return SparkTableUtil.determineWriteBranch(spark, table, branch, options); } public Map writeProperties() { diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java index 33db70bae587..b0e7242330f6 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java @@ -23,6 +23,9 @@ public class SparkWriteOptions { private SparkWriteOptions() {} + // Overrides the target branch for write operations + public static final String BRANCH = "branch"; + // Fileformat for write operations(default: Table write.format.default ) public static final String WRITE_FORMAT = "write-format"; diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java index 8261d0317804..009a6e0d6f3d 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java @@ -400,7 +400,7 @@ public void deleteWhere(Predicate[] predicates) { .deleteFromRowFilter(deleteExpr); if (SparkTableUtil.wapEnabled(table())) { - branch = SparkTableUtil.determineWriteBranch(sparkSession(), branch); + branch = SparkTableUtil.determineWriteBranch(sparkSession(), icebergTable, branch); } if (branch != null) { diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java index 404ba7284606..aa4f3dc72416 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.List; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.SparkReadConf; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.internal.SQLConf; @@ -81,7 +80,7 @@ protected BatchScan asOfTime(BatchScan scan, long timestampMillis) { @Override protected BatchScan newScan() { - SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of()); + SparkReadConf readConf = new SparkReadConf(spark, table); return new SparkDistributedDataScan(spark, table, readConf); } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java index 659507e4c5e3..6ffaede5b069 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.List; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.SparkReadConf; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.internal.SQLConf; @@ -85,7 +84,7 @@ public static void stopSpark() { @Override protected BatchScan newScan(Table table) { - SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of()); + SparkReadConf readConf = new SparkReadConf(spark, table); return new SparkDistributedDataScan(spark, table, readConf); } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java index a218f965ea65..1e680ace292f 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java @@ -21,7 +21,6 @@ import static org.apache.iceberg.PlanningMode.DISTRIBUTED; import static org.apache.iceberg.PlanningMode.LOCAL; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.SparkReadConf; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.internal.SQLConf; @@ -79,7 +78,7 @@ protected BatchScan newScan(Table table) { .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName()) .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName()) .commit(); - SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of()); + SparkReadConf readConf = new SparkReadConf(spark, table); return new SparkDistributedDataScan(spark, table, readConf); } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java index 2665d7ba8d3b..9b736004de57 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java @@ -23,7 +23,6 @@ import java.util.Arrays; import java.util.List; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.SparkReadConf; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.internal.SQLConf; @@ -80,7 +79,7 @@ protected BatchScan newScan(Table table) { .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName()) .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName()) .commit(); - SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of()); + SparkReadConf readConf = new SparkReadConf(spark, table); return new SparkDistributedDataScan(spark, table, readConf); } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java index ca86350346cd..21b67b89a99b 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java @@ -34,7 +34,6 @@ import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.spark.sql.connector.distributions.Distribution; import org.apache.spark.sql.connector.distributions.Distributions; import org.apache.spark.sql.connector.expressions.Expression; @@ -2976,7 +2975,7 @@ public void testRangePositionDeltaMergePartitionedSortedTable() { private void checkWriteDistributionAndOrdering( Table table, Distribution expectedDistribution, SortOrder[] expectedOrdering) { - SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + SparkWriteConf writeConf = new SparkWriteConf(spark, table); SparkWriteRequirements requirements = writeConf.writeRequirements(); @@ -2992,7 +2991,7 @@ private void checkCopyOnWriteDistributionAndOrdering( Command command, Distribution expectedDistribution, SortOrder[] expectedOrdering) { - SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + SparkWriteConf writeConf = new SparkWriteConf(spark, table); SparkWriteRequirements requirements = writeConf.copyOnWriteRequirements(command); @@ -3008,7 +3007,7 @@ private void checkPositionDeltaDistributionAndOrdering( Command command, Distribution expectedDistribution, SortOrder[] expectedOrdering) { - SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + SparkWriteConf writeConf = new SparkWriteConf(spark, table); SparkWriteRequirements requirements = writeConf.positionDeltaRequirements(command); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java index a411c3fc703e..607955971d4a 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java @@ -191,7 +191,7 @@ public void testDeleteFilesCacheDisabledConfig() throws Exception { SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "true", SparkSQLProperties.EXECUTOR_CACHE_DELETE_FILES_ENABLED, "false"), () -> { - SparkReadConf readConf = new SparkReadConf(spark, table, Collections.emptyMap()); + SparkReadConf readConf = new SparkReadConf(spark, table); assertThat(readConf.cacheDeleteFilesOnExecutors()).isFalse(); }); @@ -200,7 +200,7 @@ public void testDeleteFilesCacheDisabledConfig() throws Exception { SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "true", SparkSQLProperties.EXECUTOR_CACHE_DELETE_FILES_ENABLED, "true"), () -> { - SparkReadConf readConf = new SparkReadConf(spark, table, Collections.emptyMap()); + SparkReadConf readConf = new SparkReadConf(spark, table); assertThat(readConf.cacheDeleteFilesOnExecutors()).isTrue(); }); @@ -209,7 +209,7 @@ public void testDeleteFilesCacheDisabledConfig() throws Exception { SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "false", SparkSQLProperties.EXECUTOR_CACHE_DELETE_FILES_ENABLED, "true"), () -> { - SparkReadConf readConf = new SparkReadConf(spark, table, Collections.emptyMap()); + SparkReadConf readConf = new SparkReadConf(spark, table); assertThat(readConf.cacheDeleteFilesOnExecutors()).isFalse(); }); } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 61aacfa4589d..5487d1ef4a2a 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -60,6 +60,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; @@ -143,7 +144,7 @@ public void testDurationConf() { @TestTemplate public void testDeleteGranularityDefault() { Table table = validationCatalog.loadTable(tableIdent); - SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + SparkWriteConf writeConf = new SparkWriteConf(spark, table); DeleteGranularity value = writeConf.deleteGranularity(); assertThat(value).isEqualTo(DeleteGranularity.FILE); @@ -158,7 +159,7 @@ public void testDeleteGranularityTableProperty() { .set(TableProperties.DELETE_GRANULARITY, DeleteGranularity.PARTITION.toString()) .commit(); - SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + SparkWriteConf writeConf = new SparkWriteConf(spark, table); DeleteGranularity value = writeConf.deleteGranularity(); assertThat(value).isEqualTo(DeleteGranularity.PARTITION); @@ -176,7 +177,7 @@ public void testDeleteGranularityWriteOption() { Map options = ImmutableMap.of(SparkWriteOptions.DELETE_GRANULARITY, DeleteGranularity.FILE.toString()); - SparkWriteConf writeConf = new SparkWriteConf(spark, table, options); + SparkWriteConf writeConf = new SparkWriteConf(spark, table); DeleteGranularity value = writeConf.deleteGranularity(); assertThat(value).isEqualTo(DeleteGranularity.FILE); @@ -188,7 +189,7 @@ public void testDeleteGranularityInvalidValue() { table.updateProperties().set(TableProperties.DELETE_GRANULARITY, "invalid").commit(); - SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + SparkWriteConf writeConf = new SparkWriteConf(spark, table); assertThatThrownBy(writeConf::deleteGranularity) .isInstanceOf(IllegalArgumentException.class) @@ -199,7 +200,7 @@ public void testDeleteGranularityInvalidValue() { public void testAdvisoryPartitionSize() { Table table = validationCatalog.loadTable(tableIdent); - SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + SparkWriteConf writeConf = new SparkWriteConf(spark, table); long value1 = writeConf.writeRequirements().advisoryPartitionSize(); assertThat(value1).isGreaterThan(64L * 1024 * 1024).isLessThan(2L * 1024 * 1024 * 1024); @@ -217,7 +218,7 @@ public void testAdvisoryPartitionSize() { public void testSparkWriteConfDistributionDefault() { Table table = validationCatalog.loadTable(tableIdent); - SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + SparkWriteConf writeConf = new SparkWriteConf(spark, table); checkMode(DistributionMode.HASH, writeConf); } @@ -226,8 +227,8 @@ public void testSparkWriteConfDistributionDefault() { public void testSparkWriteConfDistributionModeWithWriteOption() { Table table = validationCatalog.loadTable(tableIdent); - Map writeOptions = - ImmutableMap.of(SparkWriteOptions.DISTRIBUTION_MODE, DistributionMode.NONE.modeName()); + CaseInsensitiveStringMap writeOptions = CaseInsensitiveStringMap.empty(); + writeOptions.put(SparkWriteOptions.DISTRIBUTION_MODE, DistributionMode.NONE.modeName()); SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); checkMode(DistributionMode.NONE, writeConf); @@ -239,7 +240,7 @@ public void testSparkWriteConfDistributionModeWithSessionConfig() { ImmutableMap.of(SparkSQLProperties.DISTRIBUTION_MODE, DistributionMode.NONE.modeName()), () -> { Table table = validationCatalog.loadTable(tableIdent); - SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + SparkWriteConf writeConf = new SparkWriteConf(spark, table); checkMode(DistributionMode.NONE, writeConf); }); } @@ -256,7 +257,7 @@ public void testSparkWriteConfDistributionModeWithTableProperties() { .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE) .commit(); - SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + SparkWriteConf writeConf = new SparkWriteConf(spark, table); checkMode(DistributionMode.NONE, writeConf); } @@ -275,7 +276,7 @@ public void testSparkWriteConfDistributionModeWithTblPropAndSessionConfig() { .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE) .commit(); - SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + SparkWriteConf writeConf = new SparkWriteConf(spark, table); // session config overwrite the table properties checkMode(DistributionMode.NONE, writeConf); }); @@ -288,9 +289,8 @@ public void testSparkWriteConfDistributionModeWithWriteOptionAndSessionConfig() () -> { Table table = validationCatalog.loadTable(tableIdent); - Map writeOptions = - ImmutableMap.of( - SparkWriteOptions.DISTRIBUTION_MODE, DistributionMode.NONE.modeName()); + CaseInsensitiveStringMap writeOptions = CaseInsensitiveStringMap.empty(); + writeOptions.put(SparkWriteOptions.DISTRIBUTION_MODE, DistributionMode.NONE.modeName()); SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); // write options overwrite the session config @@ -305,9 +305,8 @@ public void testSparkWriteConfDistributionModeWithEverything() { () -> { Table table = validationCatalog.loadTable(tableIdent); - Map writeOptions = - ImmutableMap.of( - SparkWriteOptions.DISTRIBUTION_MODE, DistributionMode.NONE.modeName()); + CaseInsensitiveStringMap writeOptions = CaseInsensitiveStringMap.empty(); + writeOptions.put(SparkWriteOptions.DISTRIBUTION_MODE, DistributionMode.NONE.modeName()); table .updateProperties() @@ -402,7 +401,7 @@ public void testExtraSnapshotMetadataReflectsSessionConfig() { ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "session-value"), () -> { Table table = validationCatalog.loadTable(tableIdent); - SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + SparkWriteConf writeConf = new SparkWriteConf(spark, table); Map metadata = writeConf.extraSnapshotMetadata(); @@ -416,8 +415,8 @@ public void testExtraSnapshotMetadataWriteOptionsOverrideSessionConfig() { ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "session-value"), () -> { Table table = validationCatalog.loadTable(tableIdent); - Map writeOptions = - ImmutableMap.of("snapshot-property.test-key", "write-option-value"); + CaseInsensitiveStringMap writeOptions = CaseInsensitiveStringMap.empty(); + writeOptions.put("snapshot-property.test-key", "write-option-value"); SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); Map metadata = writeConf.extraSnapshotMetadata(); @@ -596,7 +595,7 @@ public void testDeleteFileWriteConf() { public void testDVWriteConf() { Table table = validationCatalog.loadTable(tableIdent); table.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit(); - SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + SparkWriteConf writeConf = new SparkWriteConf(spark, table); assertThat(writeConf.deleteFileFormat()).isEqualTo(FileFormat.PUFFIN); } @@ -613,7 +612,7 @@ private void testWriteProperties(List> propertiesSuite) { updateProperties.commit(); - SparkWriteConf writeConf = new SparkWriteConf(spark, table, ImmutableMap.of()); + SparkWriteConf writeConf = new SparkWriteConf(spark, table); Map writeProperties = writeConf.writeProperties(); Map expectedProperties = propertiesSuite.get(2); assertThat(writeConf.writeProperties()).hasSameSizeAs(expectedProperties); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 6d965f3dcc62..c76c75c9a3ce 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -2105,7 +2105,7 @@ public void testExecutorCacheForDeleteFilesDisabled() { RewriteDataFilesSparkAction action = SparkActions.get(spark).rewriteDataFiles(table); // The constructor should have set the configuration to false - SparkReadConf readConf = new SparkReadConf(action.spark(), table, Collections.emptyMap()); + SparkReadConf readConf = new SparkReadConf(action.spark(), table); assertThat(readConf.cacheDeleteFilesOnExecutors()) .as("Executor cache for delete files should be disabled in RewriteDataFilesSparkAction") .isFalse(); diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWapBranch.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWapBranch.java index 45268b78f893..1b6334f23e50 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWapBranch.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWapBranch.java @@ -25,7 +25,7 @@ import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.spark.SparkSQLProperties; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -61,15 +61,25 @@ protected String selectTarget() { } @TestTemplate - public void testBranchAndWapBranchCannotBothBeSetForWrite() { + public void testWriteToBranchWithWapBranchSet() { Table table = validationCatalog.loadTable(tableIdent); table.manageSnapshots().createBranch("test2", table.refs().get(BRANCH).snapshotId()).commit(); sql("REFRESH TABLE " + tableName); - assertThatThrownBy(() -> sql("INSERT INTO %s.branch_test2 VALUES (4, 'd')", tableName)) - .isInstanceOf(ValidationException.class) - .hasMessage( - "Cannot write to both branch and WAP branch, but got branch [test2] and WAP branch [%s]", - BRANCH); + + // writing to explicit branch should succeed even with WAP branch set + sql("INSERT INTO TABLE %s.branch_test2 VALUES (4, 'd')", tableName); + + // verify write went to branch test2 + assertEquals( + "Data should be written to branch test2", + ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c"), row(4L, "d")), + sql("SELECT * FROM %s VERSION AS OF 'test2' ORDER BY id", tableName)); + + // verify current state is not affected + assertEquals( + "Data should be written to branch test2", + ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")), + sql("SELECT * FROM %s ORDER BY id", tableName)); } @TestTemplate @@ -77,7 +87,7 @@ public void testWapIdAndWapBranchCannotBothBeSetForWrite() { String wapId = UUID.randomUUID().toString(); spark.conf().set(SparkSQLProperties.WAP_ID, wapId); assertThatThrownBy(() -> sql("INSERT INTO %s VALUES (4, 'd')", tableName)) - .isInstanceOf(ValidationException.class) + .isInstanceOf(IllegalArgumentException.class) .hasMessage( "Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]", wapId, BRANCH); } From 198b320feebb6f811d0aabd662b31e73b0346e4a Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Tue, 10 Feb 2026 13:27:24 -0800 Subject: [PATCH 2/4] Fix benchmarks --- .../java/org/apache/iceberg/spark/PlanningBenchmark.java | 2 +- .../apache/iceberg/spark/TaskGroupPlanningBenchmark.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java index 0eff3a847e41..c50a3fd406d7 100644 --- a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java +++ b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java @@ -398,7 +398,7 @@ private BatchScan newDistributedScan(PlanningMode dataMode, PlanningMode deleteM .set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName()) .set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName()) .commit(); - SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of()); + SparkReadConf readConf = new SparkReadConf(spark, table); return new SparkDistributedDataScan(spark, table, readConf); } diff --git a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java index 45c95bf99741..dc4b56855d99 100644 --- a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java +++ b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java @@ -39,11 +39,11 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.TestHelpers; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.parser.ParseException; import org.openjdk.jmh.annotations.Benchmark; @@ -107,7 +107,7 @@ public void tearDownBenchmark() { @Benchmark @Threads(1) public void planTaskGroups(Blackhole blackhole) { - SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of()); + SparkReadConf readConf = new SparkReadConf(spark, table); List> taskGroups = TableScanUtil.planTaskGroups( fileTasks, @@ -137,7 +137,7 @@ public void planTaskGroups(Blackhole blackhole) { @Benchmark @Threads(1) public void planTaskGroupsWithGrouping(Blackhole blackhole) { - SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of()); + SparkReadConf readConf = new SparkReadConf(spark, table); List> taskGroups = TableScanUtil.planTaskGroups( From cbcd94f7ee9a75441034716d7dd80678ae983f53 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Tue, 10 Feb 2026 13:35:29 -0800 Subject: [PATCH 3/4] Remove import --- .../org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java index dc4b56855d99..8a8097834ef8 100644 --- a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java +++ b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java @@ -43,7 +43,6 @@ import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.parser.ParseException; import org.openjdk.jmh.annotations.Benchmark; From 100aa35577378ff3db9600f83187bc6a35d296ff Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 11 Feb 2026 12:17:11 -0800 Subject: [PATCH 4/4] Fix test usage --- .../iceberg/spark/TestSparkWriteConf.java | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java index 5487d1ef4a2a..227b93dfa478 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java @@ -177,7 +177,8 @@ public void testDeleteGranularityWriteOption() { Map options = ImmutableMap.of(SparkWriteOptions.DELETE_GRANULARITY, DeleteGranularity.FILE.toString()); - SparkWriteConf writeConf = new SparkWriteConf(spark, table); + SparkWriteConf writeConf = + new SparkWriteConf(spark, table, new CaseInsensitiveStringMap(options)); DeleteGranularity value = writeConf.deleteGranularity(); assertThat(value).isEqualTo(DeleteGranularity.FILE); @@ -227,8 +228,9 @@ public void testSparkWriteConfDistributionDefault() { public void testSparkWriteConfDistributionModeWithWriteOption() { Table table = validationCatalog.loadTable(tableIdent); - CaseInsensitiveStringMap writeOptions = CaseInsensitiveStringMap.empty(); - writeOptions.put(SparkWriteOptions.DISTRIBUTION_MODE, DistributionMode.NONE.modeName()); + CaseInsensitiveStringMap writeOptions = + new CaseInsensitiveStringMap( + ImmutableMap.of(SparkWriteOptions.DISTRIBUTION_MODE, DistributionMode.NONE.modeName())); SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); checkMode(DistributionMode.NONE, writeConf); @@ -289,8 +291,10 @@ public void testSparkWriteConfDistributionModeWithWriteOptionAndSessionConfig() () -> { Table table = validationCatalog.loadTable(tableIdent); - CaseInsensitiveStringMap writeOptions = CaseInsensitiveStringMap.empty(); - writeOptions.put(SparkWriteOptions.DISTRIBUTION_MODE, DistributionMode.NONE.modeName()); + CaseInsensitiveStringMap writeOptions = + new CaseInsensitiveStringMap( + ImmutableMap.of( + SparkWriteOptions.DISTRIBUTION_MODE, DistributionMode.NONE.modeName())); SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); // write options overwrite the session config @@ -305,8 +309,10 @@ public void testSparkWriteConfDistributionModeWithEverything() { () -> { Table table = validationCatalog.loadTable(tableIdent); - CaseInsensitiveStringMap writeOptions = CaseInsensitiveStringMap.empty(); - writeOptions.put(SparkWriteOptions.DISTRIBUTION_MODE, DistributionMode.NONE.modeName()); + CaseInsensitiveStringMap writeOptions = + new CaseInsensitiveStringMap( + ImmutableMap.of( + SparkWriteOptions.DISTRIBUTION_MODE, DistributionMode.NONE.modeName())); table .updateProperties() @@ -415,8 +421,9 @@ public void testExtraSnapshotMetadataWriteOptionsOverrideSessionConfig() { ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key", "session-value"), () -> { Table table = validationCatalog.loadTable(tableIdent); - CaseInsensitiveStringMap writeOptions = CaseInsensitiveStringMap.empty(); - writeOptions.put("snapshot-property.test-key", "write-option-value"); + CaseInsensitiveStringMap writeOptions = + new CaseInsensitiveStringMap( + ImmutableMap.of("snapshot-property.test-key", "write-option-value")); SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions); Map metadata = writeConf.extraSnapshotMetadata();