From 44f60597d28e402aad2c088f64e5366e99c32f1d Mon Sep 17 00:00:00 2001 From: Xinyi Lu Date: Fri, 23 Jan 2026 12:38:11 -0800 Subject: [PATCH 1/6] Core: Make sequence number conflicts retryable in concurrent commits When multiple processes concurrently commit to different branches of the same table through the REST catalog, sequence number validation failures in TableMetadata.addSnapshot() were throwing non-retryable ValidationException instead of retryable CommitFailedException. This fix catches the sequence number validation error in CatalogHandlers.commit() and wraps it in ValidationFailureException(CommitFailedException) to: - Skip server-side retry (which won't help since sequence number is in the request) - Return CommitFailedException to the client so it can retry with refreshed metadata --- .../apache/iceberg/rest/CatalogHandlers.java | 16 +- .../rest/TestRestCatalogConcurrentWrites.java | 292 ++++++++++++++++++ 2 files changed, 307 insertions(+), 1 deletion(-) create mode 100644 core/src/test/java/org/apache/iceberg/rest/TestRestCatalogConcurrentWrites.java diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index 2e88d5264950..e9c156b48268 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -72,6 +72,7 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -634,7 +635,20 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { // apply changes TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base); - request.updates().forEach(update -> update.applyTo(metadataBuilder)); + try { + request.updates().forEach(update -> update.applyTo(metadataBuilder)); + } catch (ValidationException e) { + // Sequence number conflicts from concurrent commits are retryable by the client, + // but server-side retry won't help since the sequence number is in the request. + // Wrap in ValidationFailureException to skip server retry, return to client as + // CommitFailedException so the client can retry with refreshed metadata. + if (e.getMessage() != null + && e.getMessage().contains("older than last sequence number")) { + throw new ValidationFailureException( + new CommitFailedException(e, "Commit conflict: %s", e.getMessage())); + } + throw e; + } TableMetadata updated = metadataBuilder.build(); if (updated.changes().isEmpty()) { diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRestCatalogConcurrentWrites.java b/core/src/test/java/org/apache/iceberg/rest/TestRestCatalogConcurrentWrites.java new file mode 100644 index 000000000000..b727f1039b30 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/rest/TestRestCatalogConcurrentWrites.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.BadRequestException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Test concurrent commit operations through the REST catalog to verify that sequence number + * conflicts are properly handled with retryable exceptions. + * + *

This test creates multiple branches and performs parallel commits on each branch to verify + * that the AssertLastSequenceNumber requirement properly catches sequence number conflicts and + * throws CommitFailedException (retryable) instead of ValidationException (non-retryable). + */ +public class TestRestCatalogConcurrentWrites { + private static final Namespace NS = Namespace.of("test"); + private static final TableIdentifier TABLE = TableIdentifier.of(NS, "test_table"); + + private static final Schema SCHEMA = + new Schema( + required(1, "id", Types.LongType.get()), required(2, "data", Types.StringType.get())); + + private static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).bucket("id", 16).build(); + + @TempDir public Path temp; + + private RESTCatalog restCatalog; + private InMemoryCatalog backendCatalog; + private Server httpServer; + + @BeforeEach + public void createCatalog() throws Exception { + File warehouse = temp.toFile(); + + this.backendCatalog = new InMemoryCatalog(); + this.backendCatalog.initialize( + "in-memory", + ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse.getAbsolutePath())); + + RESTCatalogAdapter adaptor = new RESTCatalogAdapter(backendCatalog); + + ServletContextHandler servletContext = + new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + servletContext.addServlet(new ServletHolder(new RESTCatalogServlet(adaptor)), "/*"); + servletContext.setHandler(new GzipHandler()); + + this.httpServer = new Server(0); + httpServer.setHandler(servletContext); + httpServer.start(); + + Configuration conf = new Configuration(); + SessionCatalog.SessionContext context = + new SessionCatalog.SessionContext( + UUID.randomUUID().toString(), + "user", + ImmutableMap.of("credential", "user:12345"), + ImmutableMap.of()); + + this.restCatalog = + new RESTCatalog( + context, + (config) -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build()); + restCatalog.setConf(conf); + Map properties = + ImmutableMap.of( + CatalogProperties.URI, + httpServer.getURI().toString(), + CatalogProperties.FILE_IO_IMPL, + "org.apache.iceberg.inmemory.InMemoryFileIO", + "credential", + "catalog:12345"); + restCatalog.initialize("prod", properties); + } + + @AfterEach + public void closeCatalog() throws Exception { + if (restCatalog != null) { + restCatalog.close(); + } + + if (backendCatalog != null) { + backendCatalog.close(); + } + + if (httpServer != null) { + httpServer.stop(); + httpServer.join(); + } + } + + /** + * Test concurrent appends on multiple branches simultaneously to verify proper handling of + * sequence number conflicts. + * + *

Creates 5 different branches on the table, then performs 10 parallel append commits on each + * branch at the same time (50 total concurrent operations). This verifies that: 1. Sequence + * number conflicts are caught by AssertLastSequenceNumber requirement 2. Conflicts result in + * CommitFailedException (retryable) not ValidationException (non-retryable) 3. The REST catalog + * properly handles concurrent modifications across different branches + */ + @Test + public void testConcurrentAppendsOnMultipleBranches() throws Exception { + int numBranches = 5; + int commitsPerBranch = 10; + int totalConcurrentWrites = numBranches * commitsPerBranch; + + restCatalog.createNamespace(NS); + Table table = restCatalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create(); + + // Add initial data to the main branch + DataFile initialFile = + DataFiles.builder(SPEC) + .withPath("/path/to/initial-data.parquet") + .withFileSizeInBytes(10) + .withPartitionPath("id_bucket=0") + .withRecordCount(2) + .build(); + table.newFastAppend().appendFile(initialFile).commit(); + + // Create 5 branches from the main branch + String[] branchNames = new String[numBranches]; + for (int i = 0; i < numBranches; i++) { + branchNames[i] = "branch-" + i; + table.manageSnapshots().createBranch(branchNames[i]).commit(); + } + + // Refresh to get updated metadata with all branches + table = restCatalog.loadTable(TABLE); + + ExecutorService executor = Executors.newFixedThreadPool(totalConcurrentWrites); + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(totalConcurrentWrites); + + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger validationFailureCount = new AtomicInteger(0); + AtomicInteger commitFailureCount = new AtomicInteger(0); + AtomicInteger unexpectedFailureCount = new AtomicInteger(0); + + List> futures = Lists.newArrayList(); + + try { + // Launch concurrent appends - 10 per branch, all starting at the same time + for (int branchIdx = 0; branchIdx < numBranches; branchIdx++) { + final String branchName = branchNames[branchIdx]; + + for (int commitIdx = 0; commitIdx < commitsPerBranch; commitIdx++) { + final int finalBranchIdx = branchIdx; + final int finalCommitIdx = commitIdx; + + Future future = + executor.submit( + () -> { + try { + // Wait for signal to start all threads simultaneously + startLatch.await(); + + // Each thread loads the table independently + Table localTable = restCatalog.loadTable(TABLE); + + // Create a unique file for this commit + DataFile newFile = + DataFiles.builder(SPEC) + .withPath( + String.format( + "/path/to/branch-%d-commit-%d.parquet", + finalBranchIdx, finalCommitIdx)) + .withFileSizeInBytes(15) + .withPartitionPath(String.format("id_bucket=%d", finalBranchIdx % 16)) + .withRecordCount(3) + .build(); + + // Append to the specific branch + localTable.newFastAppend().appendFile(newFile).toBranch(branchName).commit(); + + successCount.incrementAndGet(); + } catch (BadRequestException e) { + // Sequence number validation errors wrapped in BadRequestException + // indicate the fix is not working properly + if (e.getMessage().contains("Cannot add snapshot with sequence number")) { + validationFailureCount.incrementAndGet(); + } else { + unexpectedFailureCount.incrementAndGet(); + } + } catch (ValidationException e) { + // ValidationException indicates the fix is not working properly + validationFailureCount.incrementAndGet(); + } catch (CommitFailedException e) { + // CommitFailedException is expected - this is the correct behavior + // These will be automatically retried by the client + commitFailureCount.incrementAndGet(); + } catch (Exception e) { + unexpectedFailureCount.incrementAndGet(); + } finally { + doneLatch.countDown(); + } + }); + futures.add(future); + } + } + + // Start all threads simultaneously + startLatch.countDown(); + + // Wait for all to complete + boolean finished = doneLatch.await(180, TimeUnit.SECONDS); + assertThat(finished).as("All processes should complete within timeout").isTrue(); + + // Wait for all futures to complete + for (Future future : futures) { + future.get(10, TimeUnit.SECONDS); + } + + // Verify the fix: with AssertLastSequenceNumber, there should be NO validation failures + // All concurrent conflicts should be caught as CommitFailedException (retryable) + assertThat(validationFailureCount.get()) + .as( + "With the fix, sequence number conflicts should be caught by AssertLastSequenceNumber " + + "and throw CommitFailedException (retryable), not ValidationException") + .isEqualTo(0); + + // At least some should succeed (commits that don't conflict or succeed after retry) + assertThat(successCount.get()).as("At least some appends should succeed").isGreaterThan(0); + + // CommitFailedExceptions are expected and will be retried by the client + // We don't assert on the exact count since it depends on timing + + // No unexpected failures + assertThat(unexpectedFailureCount.get()) + .as("Should have no unexpected failures") + .isEqualTo(0); + + } finally { + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.SECONDS); + } + } +} From b14cbfec754a0bfa7de0d1240fbb6c32c20142e3 Mon Sep 17 00:00:00 2001 From: Xinyi Lu Date: Mon, 26 Jan 2026 14:10:43 -0800 Subject: [PATCH 2/6] move test to TestRestCatalog class --- .../apache/iceberg/rest/TestRESTCatalog.java | 104 +++++++ .../rest/TestRestCatalogConcurrentWrites.java | 292 ------------------ 2 files changed, 104 insertions(+), 292 deletions(-) delete mode 100644 core/src/test/java/org/apache/iceberg/rest/TestRestCatalogConcurrentWrites.java diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 2d569ae8264b..4893914f5155 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -46,8 +46,12 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -77,12 +81,14 @@ import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.BadRequestException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NotAuthorizedException; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.exceptions.ServiceFailureException; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.inmemory.InMemoryCatalog; import org.apache.iceberg.io.FileIO; @@ -90,6 +96,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.rest.HTTPRequest.HTTPMethod; import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode; import org.apache.iceberg.rest.auth.AuthManager; @@ -109,6 +116,7 @@ import org.apache.iceberg.rest.responses.OAuthTokenResponse; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.Tasks; import org.assertj.core.api.InstanceOfAssertFactories; import org.awaitility.Awaitility; import org.eclipse.jetty.server.Server; @@ -3606,6 +3614,102 @@ protected RESTSessionCatalog newSessionCatalog( return catalog; } + /** + * Test concurrent appends on multiple branches simultaneously to verify proper handling of + * sequence number conflicts. + * + *

Creates 5 different branches on the table, then performs 10 parallel append commits on each + * branch at the same time (50 total concurrent operations). This verifies that: 1. Sequence + * number conflicts are caught by AssertLastSequenceNumber requirement 2. Conflicts result in + * CommitFailedException (retryable) not ValidationException (non-retryable) 3. The REST catalog + * properly handles concurrent modifications across different branches + */ + @Test + public void testConcurrentAppendsOnMultipleBranches() { + int numBranches = 5; + int commitsPerBranch = 10; + int totalConcurrentWrites = numBranches * commitsPerBranch; + + RESTCatalog restCatalog = catalog(); + + Namespace ns = Namespace.of("concurrent_test"); + TableIdentifier tableIdent = TableIdentifier.of(ns, "test_table"); + + restCatalog.createNamespace(ns); + Table table = restCatalog.buildTable(tableIdent, SCHEMA).withPartitionSpec(SPEC).create(); + + // Add initial data to the main branch + table.newFastAppend().appendFile(FILE_A).commit(); + + // Create 5 branches from the main branch + String[] branchNames = new String[numBranches]; + for (int i = 0; i < numBranches; i++) { + branchNames[i] = "branch-" + i; + table.manageSnapshots().createBranch(branchNames[i]).commit(); + } + + // Refresh to get updated metadata with all branches + restCatalog.loadTable(tableIdent); + + AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger validationFailureCount = new AtomicInteger(0); + + ExecutorService executor = + MoreExecutors.getExitingExecutorService( + (ThreadPoolExecutor) Executors.newFixedThreadPool(totalConcurrentWrites)); + + Tasks.range(totalConcurrentWrites) + .executeWith(executor) + .suppressFailureWhenFinished() + .onFailure( + (taskIndex, exception) -> { + // Check if sequence number validation error (indicates fix not working) + if (exception instanceof BadRequestException + && exception.getMessage().contains("Cannot add snapshot with sequence number")) { + validationFailureCount.incrementAndGet(); + } else if (exception instanceof ValidationException) { + validationFailureCount.incrementAndGet(); + } + // CommitFailedException is expected - this is the correct retryable behavior + }) + .run( + taskIndex -> { + int branchIdx = taskIndex / commitsPerBranch; + int commitIdx = taskIndex % commitsPerBranch; + String branchName = branchNames[branchIdx]; + + // Each thread loads the table independently + Table localTable = restCatalog.loadTable(tableIdent); + + // Create a unique file for this commit + DataFile newFile = + DataFiles.builder(SPEC) + .withPath( + String.format( + "/path/to/branch-%d-commit-%d.parquet", branchIdx, commitIdx)) + .withFileSizeInBytes(15) + .withPartitionPath(String.format("id_bucket=%d", branchIdx % 16)) + .withRecordCount(3) + .build(); + + // Append to the specific branch + localTable.newFastAppend().appendFile(newFile).toBranch(branchName).commit(); + + successCount.incrementAndGet(); + }); + + // Verify the fix: with AssertLastSequenceNumber, there should be NO validation failures + // All concurrent conflicts should be caught as CommitFailedException (retryable) + assertThat(validationFailureCount.get()) + .as( + "With the fix, sequence number conflicts should be caught by AssertLastSequenceNumber " + + "and throw CommitFailedException (retryable), not ValidationException") + .isEqualTo(0); + + // At least some should succeed (commits that don't conflict or succeed after retry) + assertThat(successCount.get()).as("At least some appends should succeed").isGreaterThan(0); + } + private static List allRequests(RESTCatalogAdapter adapter) { ArgumentCaptor captor = ArgumentCaptor.forClass(HTTPRequest.class); verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(), any()); diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRestCatalogConcurrentWrites.java b/core/src/test/java/org/apache/iceberg/rest/TestRestCatalogConcurrentWrites.java deleted file mode 100644 index b727f1039b30..000000000000 --- a/core/src/test/java/org/apache/iceberg/rest/TestRestCatalogConcurrentWrites.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.rest; - -import static org.apache.iceberg.types.Types.NestedField.required; -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.File; -import java.nio.file.Path; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SessionCatalog; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.BadRequestException; -import org.apache.iceberg.exceptions.CommitFailedException; -import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.inmemory.InMemoryCatalog; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.handler.gzip.GzipHandler; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -/** - * Test concurrent commit operations through the REST catalog to verify that sequence number - * conflicts are properly handled with retryable exceptions. - * - *

This test creates multiple branches and performs parallel commits on each branch to verify - * that the AssertLastSequenceNumber requirement properly catches sequence number conflicts and - * throws CommitFailedException (retryable) instead of ValidationException (non-retryable). - */ -public class TestRestCatalogConcurrentWrites { - private static final Namespace NS = Namespace.of("test"); - private static final TableIdentifier TABLE = TableIdentifier.of(NS, "test_table"); - - private static final Schema SCHEMA = - new Schema( - required(1, "id", Types.LongType.get()), required(2, "data", Types.StringType.get())); - - private static final PartitionSpec SPEC = - PartitionSpec.builderFor(SCHEMA).bucket("id", 16).build(); - - @TempDir public Path temp; - - private RESTCatalog restCatalog; - private InMemoryCatalog backendCatalog; - private Server httpServer; - - @BeforeEach - public void createCatalog() throws Exception { - File warehouse = temp.toFile(); - - this.backendCatalog = new InMemoryCatalog(); - this.backendCatalog.initialize( - "in-memory", - ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse.getAbsolutePath())); - - RESTCatalogAdapter adaptor = new RESTCatalogAdapter(backendCatalog); - - ServletContextHandler servletContext = - new ServletContextHandler(ServletContextHandler.NO_SESSIONS); - servletContext.addServlet(new ServletHolder(new RESTCatalogServlet(adaptor)), "/*"); - servletContext.setHandler(new GzipHandler()); - - this.httpServer = new Server(0); - httpServer.setHandler(servletContext); - httpServer.start(); - - Configuration conf = new Configuration(); - SessionCatalog.SessionContext context = - new SessionCatalog.SessionContext( - UUID.randomUUID().toString(), - "user", - ImmutableMap.of("credential", "user:12345"), - ImmutableMap.of()); - - this.restCatalog = - new RESTCatalog( - context, - (config) -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build()); - restCatalog.setConf(conf); - Map properties = - ImmutableMap.of( - CatalogProperties.URI, - httpServer.getURI().toString(), - CatalogProperties.FILE_IO_IMPL, - "org.apache.iceberg.inmemory.InMemoryFileIO", - "credential", - "catalog:12345"); - restCatalog.initialize("prod", properties); - } - - @AfterEach - public void closeCatalog() throws Exception { - if (restCatalog != null) { - restCatalog.close(); - } - - if (backendCatalog != null) { - backendCatalog.close(); - } - - if (httpServer != null) { - httpServer.stop(); - httpServer.join(); - } - } - - /** - * Test concurrent appends on multiple branches simultaneously to verify proper handling of - * sequence number conflicts. - * - *

Creates 5 different branches on the table, then performs 10 parallel append commits on each - * branch at the same time (50 total concurrent operations). This verifies that: 1. Sequence - * number conflicts are caught by AssertLastSequenceNumber requirement 2. Conflicts result in - * CommitFailedException (retryable) not ValidationException (non-retryable) 3. The REST catalog - * properly handles concurrent modifications across different branches - */ - @Test - public void testConcurrentAppendsOnMultipleBranches() throws Exception { - int numBranches = 5; - int commitsPerBranch = 10; - int totalConcurrentWrites = numBranches * commitsPerBranch; - - restCatalog.createNamespace(NS); - Table table = restCatalog.buildTable(TABLE, SCHEMA).withPartitionSpec(SPEC).create(); - - // Add initial data to the main branch - DataFile initialFile = - DataFiles.builder(SPEC) - .withPath("/path/to/initial-data.parquet") - .withFileSizeInBytes(10) - .withPartitionPath("id_bucket=0") - .withRecordCount(2) - .build(); - table.newFastAppend().appendFile(initialFile).commit(); - - // Create 5 branches from the main branch - String[] branchNames = new String[numBranches]; - for (int i = 0; i < numBranches; i++) { - branchNames[i] = "branch-" + i; - table.manageSnapshots().createBranch(branchNames[i]).commit(); - } - - // Refresh to get updated metadata with all branches - table = restCatalog.loadTable(TABLE); - - ExecutorService executor = Executors.newFixedThreadPool(totalConcurrentWrites); - CountDownLatch startLatch = new CountDownLatch(1); - CountDownLatch doneLatch = new CountDownLatch(totalConcurrentWrites); - - AtomicInteger successCount = new AtomicInteger(0); - AtomicInteger validationFailureCount = new AtomicInteger(0); - AtomicInteger commitFailureCount = new AtomicInteger(0); - AtomicInteger unexpectedFailureCount = new AtomicInteger(0); - - List> futures = Lists.newArrayList(); - - try { - // Launch concurrent appends - 10 per branch, all starting at the same time - for (int branchIdx = 0; branchIdx < numBranches; branchIdx++) { - final String branchName = branchNames[branchIdx]; - - for (int commitIdx = 0; commitIdx < commitsPerBranch; commitIdx++) { - final int finalBranchIdx = branchIdx; - final int finalCommitIdx = commitIdx; - - Future future = - executor.submit( - () -> { - try { - // Wait for signal to start all threads simultaneously - startLatch.await(); - - // Each thread loads the table independently - Table localTable = restCatalog.loadTable(TABLE); - - // Create a unique file for this commit - DataFile newFile = - DataFiles.builder(SPEC) - .withPath( - String.format( - "/path/to/branch-%d-commit-%d.parquet", - finalBranchIdx, finalCommitIdx)) - .withFileSizeInBytes(15) - .withPartitionPath(String.format("id_bucket=%d", finalBranchIdx % 16)) - .withRecordCount(3) - .build(); - - // Append to the specific branch - localTable.newFastAppend().appendFile(newFile).toBranch(branchName).commit(); - - successCount.incrementAndGet(); - } catch (BadRequestException e) { - // Sequence number validation errors wrapped in BadRequestException - // indicate the fix is not working properly - if (e.getMessage().contains("Cannot add snapshot with sequence number")) { - validationFailureCount.incrementAndGet(); - } else { - unexpectedFailureCount.incrementAndGet(); - } - } catch (ValidationException e) { - // ValidationException indicates the fix is not working properly - validationFailureCount.incrementAndGet(); - } catch (CommitFailedException e) { - // CommitFailedException is expected - this is the correct behavior - // These will be automatically retried by the client - commitFailureCount.incrementAndGet(); - } catch (Exception e) { - unexpectedFailureCount.incrementAndGet(); - } finally { - doneLatch.countDown(); - } - }); - futures.add(future); - } - } - - // Start all threads simultaneously - startLatch.countDown(); - - // Wait for all to complete - boolean finished = doneLatch.await(180, TimeUnit.SECONDS); - assertThat(finished).as("All processes should complete within timeout").isTrue(); - - // Wait for all futures to complete - for (Future future : futures) { - future.get(10, TimeUnit.SECONDS); - } - - // Verify the fix: with AssertLastSequenceNumber, there should be NO validation failures - // All concurrent conflicts should be caught as CommitFailedException (retryable) - assertThat(validationFailureCount.get()) - .as( - "With the fix, sequence number conflicts should be caught by AssertLastSequenceNumber " - + "and throw CommitFailedException (retryable), not ValidationException") - .isEqualTo(0); - - // At least some should succeed (commits that don't conflict or succeed after retry) - assertThat(successCount.get()).as("At least some appends should succeed").isGreaterThan(0); - - // CommitFailedExceptions are expected and will be retried by the client - // We don't assert on the exact count since it depends on timing - - // No unexpected failures - assertThat(unexpectedFailureCount.get()) - .as("Should have no unexpected failures") - .isEqualTo(0); - - } finally { - executor.shutdown(); - executor.awaitTermination(30, TimeUnit.SECONDS); - } - } -} From 5ace567242a21e5dd96c276ce0aed1ea4dc459b4 Mon Sep 17 00:00:00 2001 From: Xinyi Lu Date: Fri, 30 Jan 2026 09:06:57 -0800 Subject: [PATCH 3/6] add RetryableValidationException --- .../RetryableValidationException.java | 47 +++++++ .../org/apache/iceberg/TableMetadata.java | 3 +- .../apache/iceberg/rest/CatalogHandlers.java | 12 +- .../apache/iceberg/rest/TestRESTCatalog.java | 127 ++++++++++-------- 4 files changed, 125 insertions(+), 64 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/exceptions/RetryableValidationException.java diff --git a/api/src/main/java/org/apache/iceberg/exceptions/RetryableValidationException.java b/api/src/main/java/org/apache/iceberg/exceptions/RetryableValidationException.java new file mode 100644 index 000000000000..22ee16ce68a7 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/exceptions/RetryableValidationException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.exceptions; + +import com.google.errorprone.annotations.FormatMethod; + +/** + * A {@link ValidationException} that indicates the client can retry with refreshed metadata. + * + *

This is used for validation failures caused by concurrent commits, such as sequence number + * conflicts. Server-side retry won't help since the conflict is in the request itself, but the + * client can retry after refreshing its metadata. + */ +public class RetryableValidationException extends ValidationException { + @FormatMethod + public RetryableValidationException(String message, Object... args) { + super(message, args); + } + + @FormatMethod + public RetryableValidationException(Throwable cause, String message, Object... args) { + super(cause, message, args); + } + + @FormatMethod + public static void check(boolean test, String message, Object... args) { + if (!test) { + throw new RetryableValidationException(message, args); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 5aa76d5680fa..31b8924ae98a 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -32,6 +32,7 @@ import java.util.stream.Stream; import org.apache.iceberg.encryption.EncryptedKey; import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.exceptions.RetryableValidationException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Objects; @@ -1250,7 +1251,7 @@ public Builder addSnapshot(Snapshot snapshot) { "Snapshot already exists for id: %s", snapshot.snapshotId()); - ValidationException.check( + RetryableValidationException.check( formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber || snapshot.parentId() == null, diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index e9c156b48268..79bdc5c3420b 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -72,7 +72,7 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NoSuchViewException; -import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.exceptions.RetryableValidationException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -637,17 +637,13 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(base); try { request.updates().forEach(update -> update.applyTo(metadataBuilder)); - } catch (ValidationException e) { + } catch (RetryableValidationException e) { // Sequence number conflicts from concurrent commits are retryable by the client, // but server-side retry won't help since the sequence number is in the request. // Wrap in ValidationFailureException to skip server retry, return to client as // CommitFailedException so the client can retry with refreshed metadata. - if (e.getMessage() != null - && e.getMessage().contains("older than last sequence number")) { - throw new ValidationFailureException( - new CommitFailedException(e, "Commit conflict: %s", e.getMessage())); - } - throw e; + throw new ValidationFailureException( + new CommitFailedException(e, "Commit conflict: %s", e.getMessage())); } TableMetadata updated = metadataBuilder.build(); diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 4893914f5155..5a3ac43b7a15 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -71,6 +71,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.UpdateSchema; @@ -81,7 +82,6 @@ import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.BadRequestException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NotAuthorizedException; @@ -3618,17 +3618,18 @@ protected RESTSessionCatalog newSessionCatalog( * Test concurrent appends on multiple branches simultaneously to verify proper handling of * sequence number conflicts. * - *

Creates 5 different branches on the table, then performs 10 parallel append commits on each - * branch at the same time (50 total concurrent operations). This verifies that: 1. Sequence - * number conflicts are caught by AssertLastSequenceNumber requirement 2. Conflicts result in - * CommitFailedException (retryable) not ValidationException (non-retryable) 3. The REST catalog - * properly handles concurrent modifications across different branches + *

Uses a barrier to synchronize threads so they all load the same table state and commit + * simultaneously, creating deterministic conflicts. With retries disabled, we can verify exact + * counts: one success per round, and (numBranches - 1) CommitFailedExceptions per round. + * + *

This verifies that: 1. Sequence number conflicts are caught by AssertLastSequenceNumber + * requirement 2. Conflicts result in CommitFailedException (retryable) not ValidationException + * (non-retryable) 3. The REST catalog properly handles concurrent modifications across branches */ @Test public void testConcurrentAppendsOnMultipleBranches() { int numBranches = 5; - int commitsPerBranch = 10; - int totalConcurrentWrites = numBranches * commitsPerBranch; + int numRounds = 10; RESTCatalog restCatalog = catalog(); @@ -3638,76 +3639,92 @@ public void testConcurrentAppendsOnMultipleBranches() { restCatalog.createNamespace(ns); Table table = restCatalog.buildTable(tableIdent, SCHEMA).withPartitionSpec(SPEC).create(); - // Add initial data to the main branch - table.newFastAppend().appendFile(FILE_A).commit(); + // Disable retries so we can count exact conflicts + table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, "-1").commit(); - // Create 5 branches from the main branch + // Create branches from the main branch String[] branchNames = new String[numBranches]; for (int i = 0; i < numBranches; i++) { branchNames[i] = "branch-" + i; table.manageSnapshots().createBranch(branchNames[i]).commit(); } - // Refresh to get updated metadata with all branches - restCatalog.loadTable(tableIdent); - + AtomicInteger barrier = new AtomicInteger(0); AtomicInteger successCount = new AtomicInteger(0); + AtomicInteger commitFailedCount = new AtomicInteger(0); AtomicInteger validationFailureCount = new AtomicInteger(0); ExecutorService executor = MoreExecutors.getExitingExecutorService( - (ThreadPoolExecutor) Executors.newFixedThreadPool(totalConcurrentWrites)); + (ThreadPoolExecutor) Executors.newFixedThreadPool(numBranches)); - Tasks.range(totalConcurrentWrites) + Tasks.range(numBranches) + .stopOnFailure() + .throwFailureWhenFinished() .executeWith(executor) - .suppressFailureWhenFinished() - .onFailure( - (taskIndex, exception) -> { - // Check if sequence number validation error (indicates fix not working) - if (exception instanceof BadRequestException - && exception.getMessage().contains("Cannot add snapshot with sequence number")) { - validationFailureCount.incrementAndGet(); - } else if (exception instanceof ValidationException) { - validationFailureCount.incrementAndGet(); - } - // CommitFailedException is expected - this is the correct retryable behavior - }) .run( - taskIndex -> { - int branchIdx = taskIndex / commitsPerBranch; - int commitIdx = taskIndex % commitsPerBranch; + branchIdx -> { String branchName = branchNames[branchIdx]; - // Each thread loads the table independently - Table localTable = restCatalog.loadTable(tableIdent); - - // Create a unique file for this commit - DataFile newFile = - DataFiles.builder(SPEC) - .withPath( - String.format( - "/path/to/branch-%d-commit-%d.parquet", branchIdx, commitIdx)) - .withFileSizeInBytes(15) - .withPartitionPath(String.format("id_bucket=%d", branchIdx % 16)) - .withRecordCount(3) - .build(); - - // Append to the specific branch - localTable.newFastAppend().appendFile(newFile).toBranch(branchName).commit(); - - successCount.incrementAndGet(); + for (int round = 0; round < numRounds; round++) { + int currentRound = round; + + // Load table + Table localTable = restCatalog.loadTable(tableIdent); + + DataFile newFile = + DataFiles.builder(SPEC) + .withPath( + String.format("/path/to/branch-%d-round-%d.parquet", branchIdx, round)) + .withFileSizeInBytes(15) + .withPartitionPath(String.format("id_bucket=%d", branchIdx % 16)) + .withRecordCount(3) + .build(); + + // Wait for all threads to be ready to commit + barrier.incrementAndGet(); + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .until(() -> barrier.get() >= (currentRound + 1) * numBranches); + + try { + // All threads commit simultaneously + localTable.newFastAppend().appendFile(newFile).toBranch(branchName).commit(); + successCount.incrementAndGet(); + } catch (CommitFailedException e) { + // Expected for conflicts - this is the correct behavior with the fix + commitFailedCount.incrementAndGet(); + } catch (ValidationException e) { + // This indicates the fix is not working + validationFailureCount.incrementAndGet(); + throw e; + } + } }); - // Verify the fix: with AssertLastSequenceNumber, there should be NO validation failures - // All concurrent conflicts should be caught as CommitFailedException (retryable) + int totalAttempts = numBranches * numRounds; + + // Verify: no ValidationException should have been thrown assertThat(validationFailureCount.get()) .as( - "With the fix, sequence number conflicts should be caught by AssertLastSequenceNumber " - + "and throw CommitFailedException (retryable), not ValidationException") + "Sequence number conflicts should throw CommitFailedException (retryable), " + + "not ValidationException") .isEqualTo(0); - // At least some should succeed (commits that don't conflict or succeed after retry) - assertThat(successCount.get()).as("At least some appends should succeed").isGreaterThan(0); + // Verify at least one commit succeeded per round + assertThat(successCount.get()) + .as("At least one commit should succeed per round") + .isGreaterThanOrEqualTo(numRounds); + + // Verify some conflicts happened (proves concurrent contention occurred) + assertThat(commitFailedCount.get()) + .as("Some commits should fail with CommitFailedException due to conflicts") + .isGreaterThan(0); + + // Verify no attempts were lost + assertThat(successCount.get() + commitFailedCount.get()) + .as("All commit attempts should either succeed or fail with CommitFailedException") + .isEqualTo(totalAttempts); } private static List allRequests(RESTCatalogAdapter adapter) { From 8732caf452cdaef8c8fe3a7a6486fbfb2b44fb7a Mon Sep 17 00:00:00 2001 From: Xinyi Lu Date: Fri, 30 Jan 2026 09:41:03 -0800 Subject: [PATCH 4/6] fix checkstyle error --- .../java/org/apache/iceberg/rest/TestRESTCatalog.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 5a3ac43b7a15..e53b75fb41e6 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -3631,13 +3631,13 @@ public void testConcurrentAppendsOnMultipleBranches() { int numBranches = 5; int numRounds = 10; - RESTCatalog restCatalog = catalog(); + RESTCatalog catalog = catalog(); Namespace ns = Namespace.of("concurrent_test"); TableIdentifier tableIdent = TableIdentifier.of(ns, "test_table"); - restCatalog.createNamespace(ns); - Table table = restCatalog.buildTable(tableIdent, SCHEMA).withPartitionSpec(SPEC).create(); + catalog.createNamespace(ns); + Table table = catalog.buildTable(tableIdent, SCHEMA).withPartitionSpec(SPEC).create(); // Disable retries so we can count exact conflicts table.updateProperties().set(TableProperties.COMMIT_NUM_RETRIES, "-1").commit(); @@ -3670,7 +3670,7 @@ public void testConcurrentAppendsOnMultipleBranches() { int currentRound = round; // Load table - Table localTable = restCatalog.loadTable(tableIdent); + Table localTable = catalog.loadTable(tableIdent); DataFile newFile = DataFiles.builder(SPEC) From 0f65a6553516e4006abda004396d4d9d8f15dfc1 Mon Sep 17 00:00:00 2001 From: Xinyi Lu Date: Fri, 30 Jan 2026 11:26:01 -0800 Subject: [PATCH 5/6] move validation class to core --- .../java/org/apache/iceberg}/RetryableValidationException.java | 3 ++- core/src/main/java/org/apache/iceberg/TableMetadata.java | 1 - .../src/main/java/org/apache/iceberg/rest/CatalogHandlers.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) rename {api/src/main/java/org/apache/iceberg/exceptions => core/src/main/java/org/apache/iceberg}/RetryableValidationException.java (95%) diff --git a/api/src/main/java/org/apache/iceberg/exceptions/RetryableValidationException.java b/core/src/main/java/org/apache/iceberg/RetryableValidationException.java similarity index 95% rename from api/src/main/java/org/apache/iceberg/exceptions/RetryableValidationException.java rename to core/src/main/java/org/apache/iceberg/RetryableValidationException.java index 22ee16ce68a7..9aba79c1f9e6 100644 --- a/api/src/main/java/org/apache/iceberg/exceptions/RetryableValidationException.java +++ b/core/src/main/java/org/apache/iceberg/RetryableValidationException.java @@ -16,9 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iceberg.exceptions; +package org.apache.iceberg; import com.google.errorprone.annotations.FormatMethod; +import org.apache.iceberg.exceptions.ValidationException; /** * A {@link ValidationException} that indicates the client can retry with refreshed metadata. diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 31b8924ae98a..66d5a2bf84fb 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -32,7 +32,6 @@ import java.util.stream.Stream; import org.apache.iceberg.encryption.EncryptedKey; import org.apache.iceberg.encryption.EncryptionUtil; -import org.apache.iceberg.exceptions.RetryableValidationException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Objects; diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index 79bdc5c3420b..1ee2df484015 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -53,6 +53,7 @@ import org.apache.iceberg.IncrementalAppendScan; import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RetryableValidationException; import org.apache.iceberg.Scan; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; @@ -72,7 +73,6 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NoSuchViewException; -import org.apache.iceberg.exceptions.RetryableValidationException; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; From 24c66b47cc40a3982e621ead7a27f112f2d9791d Mon Sep 17 00:00:00 2001 From: Xinyi Lu Date: Mon, 9 Feb 2026 21:55:44 -0800 Subject: [PATCH 6/6] update test according to comment --- .../apache/iceberg/rest/TestRESTCatalog.java | 71 ++++++++++++------- 1 file changed, 47 insertions(+), 24 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index e53b75fb41e6..02399956a4f5 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -46,12 +46,14 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -65,6 +67,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ManageSnapshots; import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -3644,14 +3647,17 @@ public void testConcurrentAppendsOnMultipleBranches() { // Create branches from the main branch String[] branchNames = new String[numBranches]; + ManageSnapshots manageSnapshots = table.manageSnapshots(); for (int i = 0; i < numBranches; i++) { branchNames[i] = "branch-" + i; - table.manageSnapshots().createBranch(branchNames[i]).commit(); + manageSnapshots = manageSnapshots.createBranch(branchNames[i]); } + manageSnapshots.commit(); - AtomicInteger barrier = new AtomicInteger(0); - AtomicInteger successCount = new AtomicInteger(0); - AtomicInteger commitFailedCount = new AtomicInteger(0); + CyclicBarrier startBarrier = new CyclicBarrier(numBranches); + CyclicBarrier endBarrier = new CyclicBarrier(numBranches); + AtomicIntegerArray successPerRound = new AtomicIntegerArray(numRounds); + AtomicIntegerArray failuresPerRound = new AtomicIntegerArray(numRounds); AtomicInteger validationFailureCount = new AtomicInteger(0); ExecutorService executor = @@ -3667,8 +3673,6 @@ public void testConcurrentAppendsOnMultipleBranches() { String branchName = branchNames[branchIdx]; for (int round = 0; round < numRounds; round++) { - int currentRound = round; - // Load table Table localTable = catalog.loadTable(tableIdent); @@ -3681,27 +3685,36 @@ public void testConcurrentAppendsOnMultipleBranches() { .withRecordCount(3) .build(); - // Wait for all threads to be ready to commit - barrier.incrementAndGet(); - Awaitility.await() - .atMost(10, TimeUnit.SECONDS) - .until(() -> barrier.get() >= (currentRound + 1) * numBranches); - try { + // Wait for all threads to be ready to commit + startBarrier.await(); + // All threads commit simultaneously localTable.newFastAppend().appendFile(newFile).toBranch(branchName).commit(); - successCount.incrementAndGet(); + successPerRound.incrementAndGet(round); } catch (CommitFailedException e) { // Expected for conflicts - this is the correct behavior with the fix - commitFailedCount.incrementAndGet(); + failuresPerRound.incrementAndGet(round); } catch (ValidationException e) { // This indicates the fix is not working validationFailureCount.incrementAndGet(); throw e; + } catch (Exception e) { + // Handle barrier exceptions + throw new RuntimeException(e); + } + + try { + // Ensure all threads complete this round before starting the next + endBarrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); } } }); + executor.shutdown(); + int totalAttempts = numBranches * numRounds; // Verify: no ValidationException should have been thrown @@ -3711,18 +3724,28 @@ public void testConcurrentAppendsOnMultipleBranches() { + "not ValidationException") .isEqualTo(0); - // Verify at least one commit succeeded per round - assertThat(successCount.get()) - .as("At least one commit should succeed per round") - .isGreaterThanOrEqualTo(numRounds); - - // Verify some conflicts happened (proves concurrent contention occurred) - assertThat(commitFailedCount.get()) - .as("Some commits should fail with CommitFailedException due to conflicts") - .isGreaterThan(0); + // Verify per-round exactly one success and (numBranches - 1) failures per round + for (int round = 0; round < numRounds; round++) { + assertThat(successPerRound.get(round)) + .as("Exactly one commit should succeed in round " + round) + .isEqualTo(1); + assertThat(failuresPerRound.get(round)) + .as("Exactly (numBranches - 1) commits should fail in round " + round) + .isEqualTo(numBranches - 1); + } // Verify no attempts were lost - assertThat(successCount.get() + commitFailedCount.get()) + int totalSuccesses = 0; + int totalFailures = 0; + for (int round = 0; round < numRounds; round++) { + totalSuccesses += successPerRound.get(round); + totalFailures += failuresPerRound.get(round); + } + + assertThat(totalSuccesses) + .as("Total successes should equal number of rounds") + .isEqualTo(numRounds); + assertThat(totalSuccesses + totalFailures) .as("All commit attempts should either succeed or fail with CommitFailedException") .isEqualTo(totalAttempts); }