Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import com.google.errorprone.annotations.FormatMethod;
import org.apache.iceberg.exceptions.ValidationException;

/**
* A {@link ValidationException} that indicates the client can retry with refreshed metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit too specific to the REST context because it mentions the client.

I think that this should focus on what the problem is: a validation failed but the commit can be fixed and retried. This is specifically not a conflict.

*
* <p>This is used for validation failures caused by concurrent commits, such as sequence number
* conflicts. Server-side retry won't help since the conflict is in the request itself, but the
* client can retry after refreshing its metadata.
*/
public class RetryableValidationException extends ValidationException {
@FormatMethod
public RetryableValidationException(String message, Object... args) {
super(message, args);
}

@FormatMethod
public RetryableValidationException(Throwable cause, String message, Object... args) {
super(cause, message, args);
}

@FormatMethod
public static void check(boolean test, String message, Object... args) {
if (!test) {
throw new RetryableValidationException(message, args);
}
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,7 @@ public Builder addSnapshot(Snapshot snapshot) {
"Snapshot already exists for id: %s",
snapshot.snapshotId());

ValidationException.check(
RetryableValidationException.check(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first row ID check also needs to be updated.

formatVersion == 1
|| snapshot.sequenceNumber() > lastSequenceNumber
|| snapshot.parentId() == null,
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.iceberg.IncrementalAppendScan;
import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RetryableValidationException;
import org.apache.iceberg.Scan;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
Expand Down Expand Up @@ -634,7 +635,16 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) {

// apply changes
TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base);
request.updates().forEach(update -> update.applyTo(metadataBuilder));
try {
request.updates().forEach(update -> update.applyTo(metadataBuilder));
} catch (RetryableValidationException e) {
// Sequence number conflicts from concurrent commits are retryable by the client,
// but server-side retry won't help since the sequence number is in the request.
// Wrap in ValidationFailureException to skip server retry, return to client as
// CommitFailedException so the client can retry with refreshed metadata.
throw new ValidationFailureException(
new CommitFailedException(e, "Commit conflict: %s", e.getMessage()));
Copy link
Contributor

@rdblue rdblue Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not rethrow a different exception. Instead, add RetryableValidationException to exceptions passed to Tasks.onlyRetryOn.

Also, this commit message is incorrect. We are not throwing CommitFailedException because there was not a commit conflict.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue this exception is only retryable on the client side, server side retries will not help because snapshot is provided by the client side.
If we want to change SnapshotProducer to retry on this RetryableValidationException, then should we create a different status code in the SPEC other than 409 which is CommitFailedException? Or should we keep the wrapping of CommitFailedException(but update message to something more specific like Retryable validation failure) and avoid the spec change?

}

TableMetadata updated = metadataBuilder.build();
if (updated.changes().isEmpty()) {
Expand Down
144 changes: 144 additions & 0 deletions core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand All @@ -61,12 +67,14 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.UpdateSchema;
Expand All @@ -83,13 +91,15 @@
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.exceptions.ServiceFailureException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.rest.HTTPRequest.HTTPMethod;
import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode;
import org.apache.iceberg.rest.auth.AuthManager;
Expand All @@ -109,6 +119,7 @@
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.Tasks;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.awaitility.Awaitility;
import org.eclipse.jetty.server.Server;
Expand Down Expand Up @@ -3606,6 +3617,139 @@ protected RESTSessionCatalog newSessionCatalog(
return catalog;
}

/**
* Test concurrent appends on multiple branches simultaneously to verify proper handling of
* sequence number conflicts.
*
* <p>Uses a barrier to synchronize threads so they all load the same table state and commit
* simultaneously, creating deterministic conflicts. With retries disabled, we can verify exact
* counts: one success per round, and (numBranches - 1) CommitFailedExceptions per round.
*
* <p>This verifies that: 1. Sequence number conflicts are caught by AssertLastSequenceNumber
* requirement 2. Conflicts result in CommitFailedException (retryable) not ValidationException
* (non-retryable) 3. The REST catalog properly handles concurrent modifications across branches
*/
@Test
public void testConcurrentAppendsOnMultipleBranches() {
int numBranches = 5;
int numRounds = 10;

RESTCatalog catalog = catalog();

Namespace ns = Namespace.of("concurrent_test");
TableIdentifier tableIdent = TableIdentifier.of(ns, "test_table");

catalog.createNamespace(ns);
Table table = catalog.buildTable(tableIdent, SCHEMA).withPartitionSpec(SPEC).create();

// Disable retries so we can count exact conflicts
table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, "-1").commit();

// Create branches from the main branch
String[] branchNames = new String[numBranches];
ManageSnapshots manageSnapshots = table.manageSnapshots();
for (int i = 0; i < numBranches; i++) {
branchNames[i] = "branch-" + i;
manageSnapshots = manageSnapshots.createBranch(branchNames[i]);
}
manageSnapshots.commit();

CyclicBarrier startBarrier = new CyclicBarrier(numBranches);
CyclicBarrier endBarrier = new CyclicBarrier(numBranches);
AtomicIntegerArray successPerRound = new AtomicIntegerArray(numRounds);
AtomicIntegerArray failuresPerRound = new AtomicIntegerArray(numRounds);
AtomicInteger validationFailureCount = new AtomicInteger(0);

ExecutorService executor =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(numBranches));

Tasks.range(numBranches)
.stopOnFailure()
.throwFailureWhenFinished()
.executeWith(executor)
.run(
branchIdx -> {
String branchName = branchNames[branchIdx];

for (int round = 0; round < numRounds; round++) {
// Load table
Table localTable = catalog.loadTable(tableIdent);

DataFile newFile =
DataFiles.builder(SPEC)
.withPath(
String.format("/path/to/branch-%d-round-%d.parquet", branchIdx, round))
.withFileSizeInBytes(15)
.withPartitionPath(String.format("id_bucket=%d", branchIdx % 16))
.withRecordCount(3)
.build();

try {
// Wait for all threads to be ready to commit
startBarrier.await();

// All threads commit simultaneously
localTable.newFastAppend().appendFile(newFile).toBranch(branchName).commit();
successPerRound.incrementAndGet(round);
} catch (CommitFailedException e) {
// Expected for conflicts - this is the correct behavior with the fix
failuresPerRound.incrementAndGet(round);
} catch (ValidationException e) {
// This indicates the fix is not working
validationFailureCount.incrementAndGet();
throw e;
} catch (Exception e) {
// Handle barrier exceptions
throw new RuntimeException(e);
}

try {
// Ensure all threads complete this round before starting the next
endBarrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});

executor.shutdown();

int totalAttempts = numBranches * numRounds;

// Verify: no ValidationException should have been thrown
assertThat(validationFailureCount.get())
.as(
"Sequence number conflicts should throw CommitFailedException (retryable), "
+ "not ValidationException")
.isEqualTo(0);

// Verify per-round exactly one success and (numBranches - 1) failures per round
for (int round = 0; round < numRounds; round++) {
assertThat(successPerRound.get(round))
.as("Exactly one commit should succeed in round " + round)
.isEqualTo(1);
assertThat(failuresPerRound.get(round))
.as("Exactly (numBranches - 1) commits should fail in round " + round)
.isEqualTo(numBranches - 1);
}

// Verify no attempts were lost
int totalSuccesses = 0;
int totalFailures = 0;
for (int round = 0; round < numRounds; round++) {
totalSuccesses += successPerRound.get(round);
totalFailures += failuresPerRound.get(round);
}

assertThat(totalSuccesses)
.as("Total successes should equal number of rounds")
.isEqualTo(numRounds);
assertThat(totalSuccesses + totalFailures)
.as("All commit attempts should either succeed or fail with CommitFailedException")
.isEqualTo(totalAttempts);
}

private static List<HTTPRequest> allRequests(RESTCatalogAdapter adapter) {
ArgumentCaptor<HTTPRequest> captor = ArgumentCaptor.forClass(HTTPRequest.class);
verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(), any());
Expand Down