diff --git a/core/src/main/java/org/apache/iceberg/RetryableValidationException.java b/core/src/main/java/org/apache/iceberg/RetryableValidationException.java new file mode 100644 index 000000000000..9aba79c1f9e6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/RetryableValidationException.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.google.errorprone.annotations.FormatMethod; +import org.apache.iceberg.exceptions.ValidationException; + +/** + * A {@link ValidationException} that indicates the client can retry with refreshed metadata. + * + *
This is used for validation failures caused by concurrent commits, such as sequence number + * conflicts. Server-side retry won't help since the conflict is in the request itself, but the + * client can retry after refreshing its metadata. + */ +public class RetryableValidationException extends ValidationException { + @FormatMethod + public RetryableValidationException(String message, Object... args) { + super(message, args); + } + + @FormatMethod + public RetryableValidationException(Throwable cause, String message, Object... args) { + super(cause, message, args); + } + + @FormatMethod + public static void check(boolean test, String message, Object... args) { + if (!test) { + throw new RetryableValidationException(message, args); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 5aa76d5680fa..66d5a2bf84fb 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -1250,7 +1250,7 @@ public Builder addSnapshot(Snapshot snapshot) { "Snapshot already exists for id: %s", snapshot.snapshotId()); - ValidationException.check( + RetryableValidationException.check( formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber || snapshot.parentId() == null, diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index 2e88d5264950..1ee2df484015 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -53,6 +53,7 @@ import org.apache.iceberg.IncrementalAppendScan; import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RetryableValidationException; import org.apache.iceberg.Scan; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; @@ -634,7 +635,16 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { // apply changes TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base); - request.updates().forEach(update -> update.applyTo(metadataBuilder)); + try { + request.updates().forEach(update -> update.applyTo(metadataBuilder)); + } catch (RetryableValidationException e) { + // Sequence number conflicts from concurrent commits are retryable by the client, + // but server-side retry won't help since the sequence number is in the request. + // Wrap in ValidationFailureException to skip server retry, return to client as + // CommitFailedException so the client can retry with refreshed metadata. + throw new ValidationFailureException( + new CommitFailedException(e, "Commit conflict: %s", e.getMessage())); + } TableMetadata updated = metadataBuilder.build(); if (updated.changes().isEmpty()) { diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 2d569ae8264b..02399956a4f5 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -46,8 +46,14 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -61,12 +67,14 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ManageSnapshots; import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.UpdateSchema; @@ -83,6 +91,7 @@ import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.exceptions.ServiceFailureException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.inmemory.InMemoryCatalog; import org.apache.iceberg.io.FileIO; @@ -90,6 +99,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.rest.HTTPRequest.HTTPMethod; import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode; import org.apache.iceberg.rest.auth.AuthManager; @@ -109,6 +119,7 @@ import org.apache.iceberg.rest.responses.OAuthTokenResponse; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.Tasks; import org.assertj.core.api.InstanceOfAssertFactories; import org.awaitility.Awaitility; import org.eclipse.jetty.server.Server; @@ -3606,6 +3617,139 @@ protected RESTSessionCatalog newSessionCatalog( return catalog; } + /** + * Test concurrent appends on multiple branches simultaneously to verify proper handling of + * sequence number conflicts. + * + *
Uses a barrier to synchronize threads so they all load the same table state and commit + * simultaneously, creating deterministic conflicts. With retries disabled, we can verify exact + * counts: one success per round, and (numBranches - 1) CommitFailedExceptions per round. + * + *
This verifies that: 1. Sequence number conflicts are caught by AssertLastSequenceNumber
+ * requirement 2. Conflicts result in CommitFailedException (retryable) not ValidationException
+ * (non-retryable) 3. The REST catalog properly handles concurrent modifications across branches
+ */
+ @Test
+ public void testConcurrentAppendsOnMultipleBranches() {
+ int numBranches = 5;
+ int numRounds = 10;
+
+ RESTCatalog catalog = catalog();
+
+ Namespace ns = Namespace.of("concurrent_test");
+ TableIdentifier tableIdent = TableIdentifier.of(ns, "test_table");
+
+ catalog.createNamespace(ns);
+ Table table = catalog.buildTable(tableIdent, SCHEMA).withPartitionSpec(SPEC).create();
+
+ // Disable retries so we can count exact conflicts
+ table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, "-1").commit();
+
+ // Create branches from the main branch
+ String[] branchNames = new String[numBranches];
+ ManageSnapshots manageSnapshots = table.manageSnapshots();
+ for (int i = 0; i < numBranches; i++) {
+ branchNames[i] = "branch-" + i;
+ manageSnapshots = manageSnapshots.createBranch(branchNames[i]);
+ }
+ manageSnapshots.commit();
+
+ CyclicBarrier startBarrier = new CyclicBarrier(numBranches);
+ CyclicBarrier endBarrier = new CyclicBarrier(numBranches);
+ AtomicIntegerArray successPerRound = new AtomicIntegerArray(numRounds);
+ AtomicIntegerArray failuresPerRound = new AtomicIntegerArray(numRounds);
+ AtomicInteger validationFailureCount = new AtomicInteger(0);
+
+ ExecutorService executor =
+ MoreExecutors.getExitingExecutorService(
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(numBranches));
+
+ Tasks.range(numBranches)
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .executeWith(executor)
+ .run(
+ branchIdx -> {
+ String branchName = branchNames[branchIdx];
+
+ for (int round = 0; round < numRounds; round++) {
+ // Load table
+ Table localTable = catalog.loadTable(tableIdent);
+
+ DataFile newFile =
+ DataFiles.builder(SPEC)
+ .withPath(
+ String.format("/path/to/branch-%d-round-%d.parquet", branchIdx, round))
+ .withFileSizeInBytes(15)
+ .withPartitionPath(String.format("id_bucket=%d", branchIdx % 16))
+ .withRecordCount(3)
+ .build();
+
+ try {
+ // Wait for all threads to be ready to commit
+ startBarrier.await();
+
+ // All threads commit simultaneously
+ localTable.newFastAppend().appendFile(newFile).toBranch(branchName).commit();
+ successPerRound.incrementAndGet(round);
+ } catch (CommitFailedException e) {
+ // Expected for conflicts - this is the correct behavior with the fix
+ failuresPerRound.incrementAndGet(round);
+ } catch (ValidationException e) {
+ // This indicates the fix is not working
+ validationFailureCount.incrementAndGet();
+ throw e;
+ } catch (Exception e) {
+ // Handle barrier exceptions
+ throw new RuntimeException(e);
+ }
+
+ try {
+ // Ensure all threads complete this round before starting the next
+ endBarrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ executor.shutdown();
+
+ int totalAttempts = numBranches * numRounds;
+
+ // Verify: no ValidationException should have been thrown
+ assertThat(validationFailureCount.get())
+ .as(
+ "Sequence number conflicts should throw CommitFailedException (retryable), "
+ + "not ValidationException")
+ .isEqualTo(0);
+
+ // Verify per-round exactly one success and (numBranches - 1) failures per round
+ for (int round = 0; round < numRounds; round++) {
+ assertThat(successPerRound.get(round))
+ .as("Exactly one commit should succeed in round " + round)
+ .isEqualTo(1);
+ assertThat(failuresPerRound.get(round))
+ .as("Exactly (numBranches - 1) commits should fail in round " + round)
+ .isEqualTo(numBranches - 1);
+ }
+
+ // Verify no attempts were lost
+ int totalSuccesses = 0;
+ int totalFailures = 0;
+ for (int round = 0; round < numRounds; round++) {
+ totalSuccesses += successPerRound.get(round);
+ totalFailures += failuresPerRound.get(round);
+ }
+
+ assertThat(totalSuccesses)
+ .as("Total successes should equal number of rounds")
+ .isEqualTo(numRounds);
+ assertThat(totalSuccesses + totalFailures)
+ .as("All commit attempts should either succeed or fail with CommitFailedException")
+ .isEqualTo(totalAttempts);
+ }
+
private static List