From 1a25199d18964721a1e924b60f344f08785fbf9c Mon Sep 17 00:00:00 2001 From: Prashant Singh Date: Fri, 6 Feb 2026 00:14:44 -0800 Subject: [PATCH 1/2] Core: Simplify REST scan planning to 2 modes (client/catalog) Simplified scan planning configuration from boolean to 2-mode enum: - client (default): Use client-side scan planning - catalog: Use server-side scan planning if supported Changes: - RESTCatalogProperties: Added ScanPlanningMode enum with CLIENT and CATALOG - RESTSessionCatalog: Updated to check table config for scan planning mode - Updated OpenAPI specs with simplified documentation The mode can be configured per-table via LoadTableResponse.config() to allow fine-grained control over which tables use server-side planning. --- .../iceberg/rest/RESTCatalogProperties.java | 41 +++- .../iceberg/rest/RESTSessionCatalog.java | 52 +++-- .../iceberg/rest/TestRESTScanPlanning.java | 203 ++++++++++++++++-- open-api/rest-catalog-open-api.py | 3 + open-api/rest-catalog-open-api.yaml | 3 + .../extensions/TestRemoteScanPlanning.java | 2 +- .../extensions/TestRemoteScanPlanning.java | 2 +- .../extensions/TestRemoteScanPlanning.java | 2 +- .../extensions/TestRemoteScanPlanning.java | 2 +- 9 files changed, 275 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java index 7281862481dd..3ec9cc30018b 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java @@ -40,9 +40,10 @@ private RESTCatalogProperties() {} public static final String NAMESPACE_SEPARATOR = "namespace-separator"; - // Enable planning on the REST server side - public static final String REST_SCAN_PLANNING_ENABLED = "rest-scan-planning-enabled"; - public static final boolean REST_SCAN_PLANNING_ENABLED_DEFAULT = false; + // Configure scan planning mode + // Can be set by server in LoadTableResponse.config() for table-level override + public static final String SCAN_PLANNING_MODE = "scan-planning-mode"; + public static final String SCAN_PLANNING_MODE_DEFAULT = ScanPlanningMode.CLIENT.modeName(); public static final String REST_SCAN_PLAN_ID = "rest-scan-plan-id"; @@ -59,4 +60,38 @@ public enum SnapshotMode { ALL, REFS } + + /** + * Enum to represent scan planning mode. + * + * + */ + public enum ScanPlanningMode { + CLIENT("client"), + CATALOG("catalog"); + + private final String modeName; + + ScanPlanningMode(String modeName) { + this.modeName = modeName; + } + + public String modeName() { + return modeName; + } + + public static ScanPlanningMode fromString(String mode) { + for (ScanPlanningMode planningMode : values()) { + if (planningMode.modeName.equalsIgnoreCase(mode)) { + return planningMode; + } + } + + throw new IllegalArgumentException( + String.format("Invalid scan planning mode: %s. Valid values are: client, catalog", mode)); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index cda71fccda3a..8e2a38b8c8b5 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -166,7 +166,6 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private MetricsReporter reporter = null; private boolean reportingViaRestEnabled; private Integer pageSize = null; - private boolean restScanPlanningEnabled; private CloseableGroup closeables = null; private Set endpoints; private Supplier> mutationHeaders = Map::of; @@ -281,12 +280,6 @@ public void initialize(String name, Map unresolved) { RESTCatalogProperties.NAMESPACE_SEPARATOR, RESTUtil.NAMESPACE_SEPARATOR_URLENCODED_UTF_8); - this.restScanPlanningEnabled = - PropertyUtil.propertyAsBoolean( - mergedProps, - RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, - RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED_DEFAULT); - this.tableCache = createTableCache(mergedProps); this.closeables.addCloseable(this.tableCache); @@ -584,7 +577,7 @@ private Supplier createTableSupplier( trackFileIO(ops); - RESTTable table = restTableForScanPlanning(ops, identifier, tableClient); + RESTTable table = restTableForScanPlanning(ops, identifier, tableClient, tableConf); if (table != null) { return table; } @@ -595,9 +588,40 @@ private Supplier createTableSupplier( } private RESTTable restTableForScanPlanning( - TableOperations ops, TableIdentifier finalIdentifier, RESTClient restClient) { - // server supports remote planning endpoint and server / client wants to do server side planning - if (endpoints.contains(Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN) && restScanPlanningEnabled) { + TableOperations ops, + TableIdentifier finalIdentifier, + RESTClient restClient, + Map tableConf) { + // Get client-side and server-side scan planning modes + String clientModeConfig = properties().get(RESTCatalogProperties.SCAN_PLANNING_MODE); + String serverModeConfig = tableConf.get(RESTCatalogProperties.SCAN_PLANNING_MODE); + + // Validate that client and server configs don't conflict + // Only validate if BOTH are explicitly set (not null) + if (clientModeConfig != null && serverModeConfig != null) { + RESTCatalogProperties.ScanPlanningMode clientMode = + RESTCatalogProperties.ScanPlanningMode.fromString(clientModeConfig); + RESTCatalogProperties.ScanPlanningMode serverMode = + RESTCatalogProperties.ScanPlanningMode.fromString(serverModeConfig); + + if (clientMode != serverMode) { + throw new IllegalStateException( + String.format( + "Scan planning mode mismatch for table %s: client config specifies '%s' but server config specifies '%s'. " + + "These must be consistent.", + finalIdentifier, clientMode, serverMode)); + } + } + + // Determine effective mode: prefer server config if present, otherwise use client config + String effectiveModeConfig = serverModeConfig != null ? serverModeConfig : clientModeConfig; + RESTCatalogProperties.ScanPlanningMode effectiveMode = + effectiveModeConfig != null + ? RESTCatalogProperties.ScanPlanningMode.fromString(effectiveModeConfig) + : RESTCatalogProperties.ScanPlanningMode.CLIENT; + + if (effectiveMode == RESTCatalogProperties.ScanPlanningMode.CATALOG + && endpoints.contains(Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN)) { return new RESTTable( ops, fullTableName(finalIdentifier), @@ -610,6 +634,8 @@ private RESTTable restTableForScanPlanning( properties(), conf); } + + // Default to client-side planning return null; } @@ -683,7 +709,7 @@ public Table registerTable( trackFileIO(ops); - RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient); + RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient, tableConf); if (restTable != null) { return restTable; } @@ -952,7 +978,7 @@ public Table create() { trackFileIO(ops); - RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient); + RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient, tableConf); if (restTable != null) { return restTable; } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java index ab0e1d9c56d0..bbd179958f59 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java @@ -40,6 +40,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.ContentFile; import org.apache.iceberg.ContentScanTask; @@ -64,6 +65,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -81,22 +84,25 @@ public T execute( Class responseType, Consumer errorHandler, Consumer> responseHeaders) { - if (ResourcePaths.config().equals(request.path())) { + Object body = roundTripSerialize(request.body(), "request"); + HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build(); + T response = super.execute(req, responseType, errorHandler, responseHeaders); + response = roundTripSerialize(response, "response"); + + // Add scan planning mode to table config for LoadTableResponse + if (response instanceof LoadTableResponse) { + LoadTableResponse loadResponse = (LoadTableResponse) response; return castResponse( responseType, - ConfigResponse.builder() - .withEndpoints( - Arrays.stream(Route.values()) - .map(r -> Endpoint.create(r.method().name(), r.resourcePath())) - .collect(Collectors.toList())) - .withOverrides( - ImmutableMap.of(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true")) + LoadTableResponse.builder() + .withTableMetadata(loadResponse.tableMetadata()) + .addAllConfig(loadResponse.config()) + .addConfig(RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog") + .addAllCredentials(loadResponse.credentials()) .build()); } - Object body = roundTripSerialize(request.body(), "request"); - HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build(); - T response = super.execute(req, responseType, errorHandler, responseHeaders); - return roundTripSerialize(response, "response"); + + return response; } }); } @@ -106,6 +112,29 @@ protected String catalogName() { return "prod-with-scan-planning"; } + @BeforeEach + public void setupCatalogWithScanPlanning() throws Exception { + // Reinitialize the catalog with scan planning mode set on client side + // This matches what the server returns in LoadTableResponse + restCatalog.close(); + restCatalog = + new RESTCatalog( + SessionCatalog.SessionContext.createEmpty(), + (config) -> + HTTPClient.builder(config) + .uri(config.get(CatalogProperties.URI)) + .withHeaders(RESTUtil.configHeaders(config)) + .build()); + restCatalog.setConf(new org.apache.hadoop.conf.Configuration()); + restCatalog.initialize( + catalogName(), + ImmutableMap.builder() + .put(CatalogProperties.URI, httpServer.getURI().toString()) + .put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO") + .put(RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog") + .build()); + } + // ==================== Helper Methods ==================== @Override @@ -805,6 +834,71 @@ private static class CatalogWithAdapter { } } + /** + * Helper method to create a catalog with custom scan planning mode configuration for testing + * client-server mode validation. + * + * @param clientMode The scan planning mode the client requests (null for default) + * @param serverMode The scan planning mode the server returns (null for not set) + * @return CatalogWithAdapter for testing + */ + private CatalogWithAdapter catalogWithScanPlanningModes(String clientMode, String serverMode) { + RESTCatalogAdapter adapter = + Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + public T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + if (ResourcePaths.config().equals(request.path())) { + return castResponse( + responseType, + ConfigResponse.builder() + .withEndpoints( + Arrays.stream(Route.values()) + .map(r -> Endpoint.create(r.method().name(), r.resourcePath())) + .collect(Collectors.toList())) + .build()); + } + Object body = roundTripSerialize(request.body(), "request"); + HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build(); + T response = super.execute(req, responseType, errorHandler, responseHeaders); + response = roundTripSerialize(response, "response"); + + // Add server's scan planning mode to LoadTableResponse if specified + if (response instanceof LoadTableResponse && serverMode != null) { + LoadTableResponse loadResponse = (LoadTableResponse) response; + return castResponse( + responseType, + LoadTableResponse.builder() + .withTableMetadata(loadResponse.tableMetadata()) + .addAllConfig(loadResponse.config()) + .addConfig(RESTCatalogProperties.SCAN_PLANNING_MODE, serverMode) + .addAllCredentials(loadResponse.credentials()) + .build()); + } + + return response; + } + }); + + RESTCatalog catalog = + new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); + + ImmutableMap.Builder configBuilder = + ImmutableMap.builder() + .put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"); + + if (clientMode != null) { + configBuilder.put(RESTCatalogProperties.SCAN_PLANNING_MODE, clientMode); + } + + catalog.initialize("test-scan-planning-modes", configBuilder.build()); + return new CatalogWithAdapter(catalog, adapter); + } + // Helper: Create base catalog endpoints (namespace and table operations) private List baseCatalogEndpoints() { return ImmutableList.of( @@ -840,7 +934,22 @@ public T execute( return castResponse( responseType, ConfigResponse.builder().withEndpoints(endpoints).build()); } - return super.execute(request, responseType, errorHandler, responseHeaders); + T response = super.execute(request, responseType, errorHandler, responseHeaders); + + // Add scan planning mode to table config for LoadTableResponse + if (response instanceof LoadTableResponse) { + LoadTableResponse loadResponse = (LoadTableResponse) response; + return castResponse( + responseType, + LoadTableResponse.builder() + .withTableMetadata(loadResponse.tableMetadata()) + .addAllConfig(loadResponse.config()) + .addConfig(RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog") + .addAllCredentials(loadResponse.credentials()) + .build()); + } + + return response; } }); @@ -855,8 +964,8 @@ public T execute( ImmutableMap.of( CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO", - RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, - "true")); + RESTCatalogProperties.SCAN_PLANNING_MODE, + "catalog")); return new CatalogWithAdapter(catalog, adapter); } @@ -953,4 +1062,68 @@ public void serverSupportsPlanningButNotCancellation() throws IOException { // Verify no exception was thrown - cancelPlan returns false when endpoint not supported assertThat(cancelled).isFalse(); } + + @Test + public void catalogAndTableConfigMismatch() { + CatalogWithAdapter catalogWithAdapter = catalogWithScanPlanningModes("catalog", "client"); + catalogWithAdapter.catalog.createNamespace(NS); + + assertThatThrownBy( + () -> + catalogWithAdapter + .catalog + .buildTable(TableIdentifier.of(NS, "mismatch_test"), SCHEMA) + .create()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Scan planning mode mismatch") + .hasMessageContaining("client config specifies 'CATALOG'") + .hasMessageContaining("server config specifies 'CLIENT'"); + } + + @Test + public void clientExplicitlyRequestsClientSidePlanning() { + CatalogWithAdapter catalogWithAdapter = catalogWithScanPlanningModes("client", "client"); + catalogWithAdapter.catalog.createNamespace(NS); + + Table table = + catalogWithAdapter + .catalog + .buildTable(TableIdentifier.of(NS, "client_explicit_test"), SCHEMA) + .create(); + + assertThat(table).isNotInstanceOf(RESTTable.class); + assertThat(table).isInstanceOf(BaseTable.class); + } + + @Test + public void clientRequestsClientButServerReturnsCatalog() { + CatalogWithAdapter catalogWithAdapter = catalogWithScanPlanningModes("client", "catalog"); + catalogWithAdapter.catalog.createNamespace(NS); + + assertThatThrownBy( + () -> + catalogWithAdapter + .catalog + .buildTable(TableIdentifier.of(NS, "client_override_rejected_test"), SCHEMA) + .create()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Scan planning mode mismatch") + .hasMessageContaining("client config specifies 'CLIENT'") + .hasMessageContaining("server config specifies 'CATALOG'"); + } + + @Test + public void clientRequestsClientAndServerReturnsNothing() { + CatalogWithAdapter catalogWithAdapter = catalogWithScanPlanningModes("client", null); + catalogWithAdapter.catalog.createNamespace(NS); + + Table table = + catalogWithAdapter + .catalog + .buildTable(TableIdentifier.of(NS, "client_server_null_test"), SCHEMA) + .create(); + + assertThat(table).isNotInstanceOf(RESTTable.class); + assertThat(table).isInstanceOf(BaseTable.class); + } } diff --git a/open-api/rest-catalog-open-api.py b/open-api/rest-catalog-open-api.py index 411881cb31ab..8ce173a6bb4a 100644 --- a/open-api/rest-catalog-open-api.py +++ b/open-api/rest-catalog-open-api.py @@ -1468,6 +1468,9 @@ class LoadTableResult(BaseModel): ## General Configurations - `token`: Authorization bearer token to use for table requests if OAuth2 security is enabled + - `scan-planning-mode`: Controls scan planning behavior for table operations. Valid values: + - `client` (default): Clients MUST use client-side scan planning + - `catalog`: Clients MUST use server-side scan planning if the server supports it, otherwise MUST fall back to client-side planning ## AWS Configurations diff --git a/open-api/rest-catalog-open-api.yaml b/open-api/rest-catalog-open-api.yaml index fff71128e5e5..2cdd0e92541a 100644 --- a/open-api/rest-catalog-open-api.yaml +++ b/open-api/rest-catalog-open-api.yaml @@ -3468,6 +3468,9 @@ components: ## General Configurations - `token`: Authorization bearer token to use for table requests if OAuth2 security is enabled + - `scan-planning-mode`: Controls scan planning behavior for table operations. Valid values: + - `client` (default): Clients MUST use client-side scan planning + - `catalog`: Clients MUST use server-side scan planning if the server supports it, otherwise MUST fall back to client-side planning ## AWS Configurations diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index ed90da7fd4a8..3c3d94f36999 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,7 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put(RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog") .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index ed90da7fd4a8..3c3d94f36999 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,7 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put(RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog") .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index ed90da7fd4a8..3c3d94f36999 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,7 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put(RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog") .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index ed90da7fd4a8..3c3d94f36999 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,7 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put(RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog") .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } From e886045639a88a8d3bf243f9cd426534f9edd8d4 Mon Sep 17 00:00:00 2001 From: Prashant Singh Date: Tue, 17 Feb 2026 20:12:37 -0800 Subject: [PATCH 2/2] address steven feedback --- .../iceberg/rest/RESTCatalogProperties.java | 6 +- .../iceberg/rest/RESTSessionCatalog.java | 29 ++++----- .../iceberg/rest/TestRESTScanPlanning.java | 63 +++++++++++-------- open-api/rest-catalog-open-api.py | 2 +- open-api/rest-catalog-open-api.yaml | 2 +- .../extensions/TestRemoteScanPlanning.java | 4 +- .../extensions/TestRemoteScanPlanning.java | 4 +- .../extensions/TestRemoteScanPlanning.java | 4 +- .../extensions/TestRemoteScanPlanning.java | 4 +- 9 files changed, 66 insertions(+), 52 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java index 3ec9cc30018b..70609afe9d20 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java @@ -66,12 +66,12 @@ public enum SnapshotMode { * *
    *
  • CLIENT - Use client-side scan planning - *
  • CATALOG - Use server-side scan planning + *
  • SERVER - Use server-side scan planning *
*/ public enum ScanPlanningMode { CLIENT("client"), - CATALOG("catalog"); + SERVER("server"); private final String modeName; @@ -91,7 +91,7 @@ public static ScanPlanningMode fromString(String mode) { } throw new IllegalArgumentException( - String.format("Invalid scan planning mode: %s. Valid values are: client, catalog", mode)); + String.format("Invalid scan planning mode: %s. Valid values are: client, server", mode)); } } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 8e2a38b8c8b5..f23d1ea72b9d 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -593,34 +593,29 @@ private RESTTable restTableForScanPlanning( RESTClient restClient, Map tableConf) { // Get client-side and server-side scan planning modes - String clientModeConfig = properties().get(RESTCatalogProperties.SCAN_PLANNING_MODE); - String serverModeConfig = tableConf.get(RESTCatalogProperties.SCAN_PLANNING_MODE); + String planningModeClientConfig = properties().get(RESTCatalogProperties.SCAN_PLANNING_MODE); + String planningModeServerConfig = tableConf.get(RESTCatalogProperties.SCAN_PLANNING_MODE); // Validate that client and server configs don't conflict // Only validate if BOTH are explicitly set (not null) - if (clientModeConfig != null && serverModeConfig != null) { - RESTCatalogProperties.ScanPlanningMode clientMode = - RESTCatalogProperties.ScanPlanningMode.fromString(clientModeConfig); - RESTCatalogProperties.ScanPlanningMode serverMode = - RESTCatalogProperties.ScanPlanningMode.fromString(serverModeConfig); - - if (clientMode != serverMode) { - throw new IllegalStateException( - String.format( - "Scan planning mode mismatch for table %s: client config specifies '%s' but server config specifies '%s'. " - + "These must be consistent.", - finalIdentifier, clientMode, serverMode)); - } + if (planningModeClientConfig != null && planningModeServerConfig != null) { + Preconditions.checkState( + planningModeClientConfig.equalsIgnoreCase(planningModeServerConfig), + "Scan planning mode mismatch for table %s: client config=%s, server config=%s", + finalIdentifier, + planningModeClientConfig, + planningModeServerConfig); } // Determine effective mode: prefer server config if present, otherwise use client config - String effectiveModeConfig = serverModeConfig != null ? serverModeConfig : clientModeConfig; + String effectiveModeConfig = + planningModeServerConfig != null ? planningModeServerConfig : planningModeClientConfig; RESTCatalogProperties.ScanPlanningMode effectiveMode = effectiveModeConfig != null ? RESTCatalogProperties.ScanPlanningMode.fromString(effectiveModeConfig) : RESTCatalogProperties.ScanPlanningMode.CLIENT; - if (effectiveMode == RESTCatalogProperties.ScanPlanningMode.CATALOG + if (effectiveMode == RESTCatalogProperties.ScanPlanningMode.SERVER && endpoints.contains(Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN)) { return new RESTTable( ops, diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java index bbd179958f59..d5cbf2e41377 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java @@ -84,6 +84,7 @@ public T execute( Class responseType, Consumer errorHandler, Consumer> responseHeaders) { + // roundTripSerialize before intercepting so we modify the deserialized response Object body = roundTripSerialize(request.body(), "request"); HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build(); T response = super.execute(req, responseType, errorHandler, responseHeaders); @@ -91,15 +92,8 @@ public T execute( // Add scan planning mode to table config for LoadTableResponse if (response instanceof LoadTableResponse) { - LoadTableResponse loadResponse = (LoadTableResponse) response; return castResponse( - responseType, - LoadTableResponse.builder() - .withTableMetadata(loadResponse.tableMetadata()) - .addAllConfig(loadResponse.config()) - .addConfig(RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog") - .addAllCredentials(loadResponse.credentials()) - .build()); + responseType, withServerPlanningMode((LoadTableResponse) response)); } return response; @@ -131,12 +125,25 @@ public void setupCatalogWithScanPlanning() throws Exception { ImmutableMap.builder() .put(CatalogProperties.URI, httpServer.getURI().toString()) .put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO") - .put(RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) .build()); } // ==================== Helper Methods ==================== + private static LoadTableResponse withServerPlanningMode(LoadTableResponse response) { + return LoadTableResponse.builder() + .withTableMetadata(response.tableMetadata()) + .addAllConfig(response.config()) + .addConfig( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) + .addAllCredentials(response.credentials()) + .build(); + } + @Override @SuppressWarnings("unchecked") protected T roundTripSerialize(T payload, String description) { @@ -938,15 +945,8 @@ public T execute( // Add scan planning mode to table config for LoadTableResponse if (response instanceof LoadTableResponse) { - LoadTableResponse loadResponse = (LoadTableResponse) response; return castResponse( - responseType, - LoadTableResponse.builder() - .withTableMetadata(loadResponse.tableMetadata()) - .addAllConfig(loadResponse.config()) - .addConfig(RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog") - .addAllCredentials(loadResponse.credentials()) - .build()); + responseType, withServerPlanningMode((LoadTableResponse) response)); } return response; @@ -965,7 +965,7 @@ public T execute( CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO", RESTCatalogProperties.SCAN_PLANNING_MODE, - "catalog")); + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName())); return new CatalogWithAdapter(catalog, adapter); } @@ -1065,7 +1065,10 @@ public void serverSupportsPlanningButNotCancellation() throws IOException { @Test public void catalogAndTableConfigMismatch() { - CatalogWithAdapter catalogWithAdapter = catalogWithScanPlanningModes("catalog", "client"); + CatalogWithAdapter catalogWithAdapter = + catalogWithScanPlanningModes( + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName(), + RESTCatalogProperties.ScanPlanningMode.CLIENT.modeName()); catalogWithAdapter.catalog.createNamespace(NS); assertThatThrownBy( @@ -1076,13 +1079,16 @@ public void catalogAndTableConfigMismatch() { .create()) .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Scan planning mode mismatch") - .hasMessageContaining("client config specifies 'CATALOG'") - .hasMessageContaining("server config specifies 'CLIENT'"); + .hasMessageContaining("client config=server") + .hasMessageContaining("server config=client"); } @Test public void clientExplicitlyRequestsClientSidePlanning() { - CatalogWithAdapter catalogWithAdapter = catalogWithScanPlanningModes("client", "client"); + CatalogWithAdapter catalogWithAdapter = + catalogWithScanPlanningModes( + RESTCatalogProperties.ScanPlanningMode.CLIENT.modeName(), + RESTCatalogProperties.ScanPlanningMode.CLIENT.modeName()); catalogWithAdapter.catalog.createNamespace(NS); Table table = @@ -1097,7 +1103,10 @@ public void clientExplicitlyRequestsClientSidePlanning() { @Test public void clientRequestsClientButServerReturnsCatalog() { - CatalogWithAdapter catalogWithAdapter = catalogWithScanPlanningModes("client", "catalog"); + CatalogWithAdapter catalogWithAdapter = + catalogWithScanPlanningModes( + RESTCatalogProperties.ScanPlanningMode.CLIENT.modeName(), + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()); catalogWithAdapter.catalog.createNamespace(NS); assertThatThrownBy( @@ -1108,13 +1117,15 @@ public void clientRequestsClientButServerReturnsCatalog() { .create()) .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Scan planning mode mismatch") - .hasMessageContaining("client config specifies 'CLIENT'") - .hasMessageContaining("server config specifies 'CATALOG'"); + .hasMessageContaining("client config=client") + .hasMessageContaining("server config=server"); } @Test public void clientRequestsClientAndServerReturnsNothing() { - CatalogWithAdapter catalogWithAdapter = catalogWithScanPlanningModes("client", null); + CatalogWithAdapter catalogWithAdapter = + catalogWithScanPlanningModes( + RESTCatalogProperties.ScanPlanningMode.CLIENT.modeName(), null); catalogWithAdapter.catalog.createNamespace(NS); Table table = diff --git a/open-api/rest-catalog-open-api.py b/open-api/rest-catalog-open-api.py index 8ce173a6bb4a..bc8a027b28ab 100644 --- a/open-api/rest-catalog-open-api.py +++ b/open-api/rest-catalog-open-api.py @@ -1470,7 +1470,7 @@ class LoadTableResult(BaseModel): - `token`: Authorization bearer token to use for table requests if OAuth2 security is enabled - `scan-planning-mode`: Controls scan planning behavior for table operations. Valid values: - `client` (default): Clients MUST use client-side scan planning - - `catalog`: Clients MUST use server-side scan planning if the server supports it, otherwise MUST fall back to client-side planning + - `server`: Clients MUST use server-side scan planning if the server supports it, otherwise MUST fall back to client-side planning ## AWS Configurations diff --git a/open-api/rest-catalog-open-api.yaml b/open-api/rest-catalog-open-api.yaml index 2cdd0e92541a..20e15fc425e8 100644 --- a/open-api/rest-catalog-open-api.yaml +++ b/open-api/rest-catalog-open-api.yaml @@ -3470,7 +3470,7 @@ components: - `token`: Authorization bearer token to use for table requests if OAuth2 security is enabled - `scan-planning-mode`: Controls scan planning behavior for table operations. Valid values: - `client` (default): Clients MUST use client-side scan planning - - `catalog`: Clients MUST use server-side scan planning if the server supports it, otherwise MUST fall back to client-side planning + - `server`: Clients MUST use server-side scan planning if the server supports it, otherwise MUST fall back to client-side planning ## AWS Configurations diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index 3c3d94f36999..9c31eb970b56 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index 3c3d94f36999..9c31eb970b56 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index 3c3d94f36999..9c31eb970b56 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index 3c3d94f36999..9c31eb970b56 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.SCAN_PLANNING_MODE, "catalog") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" }