Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ private BatchScan newDistributedScan(PlanningMode dataMode, PlanningMode deleteM
.set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
.set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
.commit();
SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of());
SparkReadConf readConf = new SparkReadConf(spark, table);
return new SparkDistributedDataScan(spark, table, readConf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
import org.apache.iceberg.util.TableScanUtil;
Expand Down Expand Up @@ -107,7 +106,7 @@ public void tearDownBenchmark() {
@Benchmark
@Threads(1)
public void planTaskGroups(Blackhole blackhole) {
SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of());
SparkReadConf readConf = new SparkReadConf(spark, table);
List<ScanTaskGroup<FileScanTask>> taskGroups =
TableScanUtil.planTaskGroups(
fileTasks,
Expand Down Expand Up @@ -137,7 +136,7 @@ public void planTaskGroups(Blackhole blackhole) {
@Benchmark
@Threads(1)
public void planTaskGroupsWithGrouping(Blackhole blackhole) {
SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of());
SparkReadConf readConf = new SparkReadConf(spark, table);

List<ScanTaskGroup<FileScanTask>> taskGroups =
TableScanUtil.planTaskGroups(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1375,15 +1375,17 @@ public void testDeleteToWapBranchWithTableBranchIdentifier() throws NoSuchTableE
append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
createBranchIfNeeded();

// writing to explicit branch should succeed even with WAP branch set
withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
() ->
assertThatThrownBy(() -> sql("DELETE FROM %s t WHERE id=0", commitTarget()))
.isInstanceOf(ValidationException.class)
.hasMessage(
String.format(
"Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [wap]",
branch)));
() -> {
sql("DELETE FROM %s t WHERE id=0", commitTarget());

assertEquals(
"Should have deleted row in explicit branch",
ImmutableList.of(row(1, "hr"), row(2, "hr")),
sql("SELECT * FROM %s ORDER BY id", commitTarget()));
});
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2983,21 +2983,21 @@ public void testMergeToWapBranchWithTableBranchIdentifier() {
ImmutableList<Object[]> expectedRows =
ImmutableList.of(row(-1), row(0), row(1), row(2), row(3), row(4));

// writing to explicit branch should succeed even with WAP branch set
withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
() ->
assertThatThrownBy(
() ->
sql(
"MERGE INTO %s t USING source s ON t.id = s.id "
+ "WHEN MATCHED THEN UPDATE SET *"
+ "WHEN NOT MATCHED THEN INSERT *",
commitTarget()))
.isInstanceOf(ValidationException.class)
.hasMessage(
String.format(
"Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [wap]",
branch)));
() -> {
sql(
"MERGE INTO %s t USING source s ON t.id = s.id "
+ "WHEN MATCHED THEN UPDATE SET *"
+ "WHEN NOT MATCHED THEN INSERT *",
commitTarget());

assertEquals(
"Should have expected rows in explicit branch",
expectedRows,
sql("SELECT * FROM %s ORDER BY id", commitTarget()));
});
}

private void checkJoinAndFilterConditions(String query, String join, String icebergFilters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1500,15 +1500,17 @@ public void testUpdateToWapBranchWithTableBranchIdentifier() {
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);

// writing to explicit branch should succeed even with WAP branch set
withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
() ->
assertThatThrownBy(() -> sql("UPDATE %s SET dep='hr' WHERE dep='a'", commitTarget()))
.isInstanceOf(ValidationException.class)
.hasMessage(
String.format(
"Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [wap]",
branch)));
() -> {
sql("UPDATE %s SET dep='software' WHERE dep='hr'", commitTarget());

assertEquals(
"Should have updated row in explicit branch",
ImmutableList.of(row(1, "software")),
sql("SELECT * FROM %s ORDER BY id", commitTarget()));
});
}

private RowLevelOperationMode mode(Table table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@

import static org.apache.iceberg.PlanningMode.LOCAL;

import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* A class for common Iceberg configs for Spark reads.
Expand Down Expand Up @@ -57,18 +55,24 @@ public class SparkReadConf {
private final SparkSession spark;
private final Table table;
private final String branch;
private final CaseInsensitiveStringMap options;
private final SparkConfParser confParser;

public SparkReadConf(SparkSession spark, Table table, Map<String, String> readOptions) {
this(spark, table, null, readOptions);
public SparkReadConf(SparkSession spark, Table table) {
this(spark, table, CaseInsensitiveStringMap.empty());
}

public SparkReadConf(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
this(spark, table, null, options);
}

public SparkReadConf(
SparkSession spark, Table table, String branch, Map<String, String> readOptions) {
SparkSession spark, Table table, String branch, CaseInsensitiveStringMap options) {
this.spark = spark;
this.table = table;
this.branch = branch;
this.confParser = new SparkConfParser(spark, table, readOptions);
this.options = options;
this.confParser = new SparkConfParser(spark, table, options);
}

public boolean caseSensitive() {
Expand Down Expand Up @@ -102,29 +106,7 @@ public Long endSnapshotId() {
}

public String branch() {
String optionBranch = confParser.stringConf().option(SparkReadOptions.BRANCH).parseOptional();
Copy link
Contributor Author

@aokolnychyi aokolnychyi Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved into a utility call. I will remove this method in subsequent PRs and use the utility directly.

ValidationException.check(
branch == null || optionBranch == null || optionBranch.equals(branch),
"Must not specify different branches in both table identifier and read option, "
+ "got [%s] in identifier and [%s] in options",
branch,
optionBranch);
String inputBranch = branch != null ? branch : optionBranch;
if (inputBranch != null) {
return inputBranch;
}

boolean wapEnabled =
PropertyUtil.propertyAsBoolean(
table.properties(), TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, false);
if (wapEnabled) {
String wapBranch = spark.conf().get(SparkSQLProperties.WAP_BRANCH, null);
if (wapBranch != null && table.refs().containsKey(wapBranch)) {
return wapBranch;
}
}

return null;
return SparkTableUtil.determineReadBranch(spark, table, branch, options);
}

public String tag() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.TableMigrationUtil;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.hadoop.Util;
Expand Down Expand Up @@ -1025,34 +1024,102 @@ private static DataSourceV2Relation createRelation(
sparkTable, Option.empty(), Option.empty(), options, Option.empty());
}

public static String determineWriteBranch(SparkSession spark, Table table, String branch) {
return determineWriteBranch(spark, table, branch, CaseInsensitiveStringMap.empty());
}

/**
* Determine the write branch.
*
* <p>Validate wap config and determine the write branch.
* <p>The target branch can be specified via table identifier, write option, or in SQL:
*
* <ul>
* <li>The identifier and option branches can't conflict. If both are set, they must match.
* <li>Identifier and option branches take priority over the session WAP branch.
* <li>If neither the option nor the identifier branch is set and WAP is enabled for this table,
* use the WAP branch from the session SQL config.
* </ul>
*
* <p>Note: WAP ID and WAP branch cannot be set at the same time.
*
* <p>Note: The target branch may be created during the write operation if it does not exist.
*
* @param spark a Spark Session
* @param branch write branch if there is no WAP branch configured
* @return branch for write operation
* @param table the table being written to
* @param branch write branch configured via table identifier, or null
* @param options write options
* @return branch for write operation, or null for main branch
*/
public static String determineWriteBranch(SparkSession spark, String branch) {
public static String determineWriteBranch(
SparkSession spark, Table table, String branch, CaseInsensitiveStringMap options) {
String optionBranch = options.get(SparkWriteOptions.BRANCH);
if (optionBranch != null) {
Preconditions.checkArgument(
branch == null || optionBranch.equals(branch),
"Explicitly configured branch [%s] and write option [%s] are in conflict",
branch,
optionBranch);
return optionBranch;
}

if (branch == null && wapEnabled(table)) {
return wapSessionBranch(spark);
}

return branch;
}

/**
* Determine the read branch.
*
* <p>The target branch can be specified via table identifier, read option, or in SQL:
*
* <ul>
* <li>The identifier and option branches can't conflict. If both are set, they must match.
* <li>Identifier and option branches take priority over the session WAP branch.
* <li>If neither the option nor the identifier branch is set and WAP is enabled for this table,
* use the WAP branch from the session SQL config (only if the branch already exists).
* </ul>
*
* <p>Note: WAP ID and WAP branch cannot be set at the same time.
*
* @param spark a Spark Session
* @param table the table being read from
* @param branch read branch configured via table identifier, or null
* @param options read options
* @return branch for read operation, or null for main branch
*/
public static String determineReadBranch(
SparkSession spark, Table table, String branch, CaseInsensitiveStringMap options) {
String optionBranch = options.get(SparkReadOptions.BRANCH);
if (optionBranch != null) {
Preconditions.checkArgument(
branch == null || optionBranch.equals(branch),
"Explicitly configured branch [%s] and read option [%s] are in conflict",
branch,
optionBranch);
return optionBranch;
}

if (branch == null && wapEnabled(table)) {
String wapBranch = wapSessionBranch(spark);
if (wapBranch != null && table.refs().containsKey(wapBranch)) {
return wapBranch;
}
}

return branch;
}

private static String wapSessionBranch(SparkSession spark) {
String wapId = spark.conf().get(SparkSQLProperties.WAP_ID, null);
String wapBranch = spark.conf().get(SparkSQLProperties.WAP_BRANCH, null);
ValidationException.check(
Preconditions.checkArgument(
wapId == null || wapBranch == null,
"Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]",
wapId,
wapBranch);

if (wapBranch != null) {
ValidationException.check(
branch == null,
"Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [%s]",
branch,
wapBranch);

return wapBranch;
}
return branch;
return wapBranch;
}

public static boolean wapEnabled(Table table) {
Expand Down
Loading