diff --git a/core/src/main/java/org/apache/iceberg/RetryableValidationException.java b/core/src/main/java/org/apache/iceberg/RetryableValidationException.java new file mode 100644 index 000000000000..9aba79c1f9e6 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/RetryableValidationException.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import com.google.errorprone.annotations.FormatMethod; +import org.apache.iceberg.exceptions.ValidationException; + +/** + * A {@link ValidationException} that indicates the client can retry with refreshed metadata. + * + *

This is used for validation failures caused by concurrent commits, such as sequence number + * conflicts. Server-side retry won't help since the conflict is in the request itself, but the + * client can retry after refreshing its metadata. + */ +public class RetryableValidationException extends ValidationException { + @FormatMethod + public RetryableValidationException(String message, Object... args) { + super(message, args); + } + + @FormatMethod + public RetryableValidationException(Throwable cause, String message, Object... args) { + super(cause, message, args); + } + + @FormatMethod + public static void check(boolean test, String message, Object... args) { + if (!test) { + throw new RetryableValidationException(message, args); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 5aa76d5680fa..66d5a2bf84fb 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -1250,7 +1250,7 @@ public Builder addSnapshot(Snapshot snapshot) { "Snapshot already exists for id: %s", snapshot.snapshotId()); - ValidationException.check( + RetryableValidationException.check( formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber || snapshot.parentId() == null, diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index 2e88d5264950..1ee2df484015 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -53,6 +53,7 @@ import org.apache.iceberg.IncrementalAppendScan; import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RetryableValidationException; import org.apache.iceberg.Scan; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; @@ -634,7 +635,16 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { // apply changes TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base); - request.updates().forEach(update -> update.applyTo(metadataBuilder)); + try { + request.updates().forEach(update -> update.applyTo(metadataBuilder)); + } catch (RetryableValidationException e) { + // Sequence number conflicts from concurrent commits are retryable by the client, + // but server-side retry won't help since the sequence number is in the request. + // Wrap in ValidationFailureException to skip server retry, return to client as + // CommitFailedException so the client can retry with refreshed metadata. + throw new ValidationFailureException( + new CommitFailedException(e, "Commit conflict: %s", e.getMessage())); + } TableMetadata updated = metadataBuilder.build(); if (updated.changes().isEmpty()) { diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 2d569ae8264b..02399956a4f5 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -46,8 +46,14 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -61,12 +67,14 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ManageSnapshots; import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.UpdateSchema; @@ -83,6 +91,7 @@ import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.exceptions.ServiceFailureException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.inmemory.InMemoryCatalog; import org.apache.iceberg.io.FileIO; @@ -90,6 +99,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.rest.HTTPRequest.HTTPMethod; import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode; import org.apache.iceberg.rest.auth.AuthManager; @@ -109,6 +119,7 @@ import org.apache.iceberg.rest.responses.OAuthTokenResponse; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.Tasks; import org.assertj.core.api.InstanceOfAssertFactories; import org.awaitility.Awaitility; import org.eclipse.jetty.server.Server; @@ -3606,6 +3617,139 @@ protected RESTSessionCatalog newSessionCatalog( return catalog; } + /** + * Test concurrent appends on multiple branches simultaneously to verify proper handling of + * sequence number conflicts. + * + *

Uses a barrier to synchronize threads so they all load the same table state and commit + * simultaneously, creating deterministic conflicts. With retries disabled, we can verify exact + * counts: one success per round, and (numBranches - 1) CommitFailedExceptions per round. + * + *

This verifies that: 1. Sequence number conflicts are caught by AssertLastSequenceNumber + * requirement 2. Conflicts result in CommitFailedException (retryable) not ValidationException + * (non-retryable) 3. The REST catalog properly handles concurrent modifications across branches + */ + @Test + public void testConcurrentAppendsOnMultipleBranches() { + int numBranches = 5; + int numRounds = 10; + + RESTCatalog catalog = catalog(); + + Namespace ns = Namespace.of("concurrent_test"); + TableIdentifier tableIdent = TableIdentifier.of(ns, "test_table"); + + catalog.createNamespace(ns); + Table table = catalog.buildTable(tableIdent, SCHEMA).withPartitionSpec(SPEC).create(); + + // Disable retries so we can count exact conflicts + table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, "-1").commit(); + + // Create branches from the main branch + String[] branchNames = new String[numBranches]; + ManageSnapshots manageSnapshots = table.manageSnapshots(); + for (int i = 0; i < numBranches; i++) { + branchNames[i] = "branch-" + i; + manageSnapshots = manageSnapshots.createBranch(branchNames[i]); + } + manageSnapshots.commit(); + + CyclicBarrier startBarrier = new CyclicBarrier(numBranches); + CyclicBarrier endBarrier = new CyclicBarrier(numBranches); + AtomicIntegerArray successPerRound = new AtomicIntegerArray(numRounds); + AtomicIntegerArray failuresPerRound = new AtomicIntegerArray(numRounds); + AtomicInteger validationFailureCount = new AtomicInteger(0); + + ExecutorService executor = + MoreExecutors.getExitingExecutorService( + (ThreadPoolExecutor) Executors.newFixedThreadPool(numBranches)); + + Tasks.range(numBranches) + .stopOnFailure() + .throwFailureWhenFinished() + .executeWith(executor) + .run( + branchIdx -> { + String branchName = branchNames[branchIdx]; + + for (int round = 0; round < numRounds; round++) { + // Load table + Table localTable = catalog.loadTable(tableIdent); + + DataFile newFile = + DataFiles.builder(SPEC) + .withPath( + String.format("/path/to/branch-%d-round-%d.parquet", branchIdx, round)) + .withFileSizeInBytes(15) + .withPartitionPath(String.format("id_bucket=%d", branchIdx % 16)) + .withRecordCount(3) + .build(); + + try { + // Wait for all threads to be ready to commit + startBarrier.await(); + + // All threads commit simultaneously + localTable.newFastAppend().appendFile(newFile).toBranch(branchName).commit(); + successPerRound.incrementAndGet(round); + } catch (CommitFailedException e) { + // Expected for conflicts - this is the correct behavior with the fix + failuresPerRound.incrementAndGet(round); + } catch (ValidationException e) { + // This indicates the fix is not working + validationFailureCount.incrementAndGet(); + throw e; + } catch (Exception e) { + // Handle barrier exceptions + throw new RuntimeException(e); + } + + try { + // Ensure all threads complete this round before starting the next + endBarrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + executor.shutdown(); + + int totalAttempts = numBranches * numRounds; + + // Verify: no ValidationException should have been thrown + assertThat(validationFailureCount.get()) + .as( + "Sequence number conflicts should throw CommitFailedException (retryable), " + + "not ValidationException") + .isEqualTo(0); + + // Verify per-round exactly one success and (numBranches - 1) failures per round + for (int round = 0; round < numRounds; round++) { + assertThat(successPerRound.get(round)) + .as("Exactly one commit should succeed in round " + round) + .isEqualTo(1); + assertThat(failuresPerRound.get(round)) + .as("Exactly (numBranches - 1) commits should fail in round " + round) + .isEqualTo(numBranches - 1); + } + + // Verify no attempts were lost + int totalSuccesses = 0; + int totalFailures = 0; + for (int round = 0; round < numRounds; round++) { + totalSuccesses += successPerRound.get(round); + totalFailures += failuresPerRound.get(round); + } + + assertThat(totalSuccesses) + .as("Total successes should equal number of rounds") + .isEqualTo(numRounds); + assertThat(totalSuccesses + totalFailures) + .as("All commit attempts should either succeed or fail with CommitFailedException") + .isEqualTo(totalAttempts); + } + private static List allRequests(RESTCatalogAdapter adapter) { ArgumentCaptor captor = ArgumentCaptor.forClass(HTTPRequest.class); verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(), any());