diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java index 7281862481dd..70609afe9d20 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java @@ -40,9 +40,10 @@ private RESTCatalogProperties() {} public static final String NAMESPACE_SEPARATOR = "namespace-separator"; - // Enable planning on the REST server side - public static final String REST_SCAN_PLANNING_ENABLED = "rest-scan-planning-enabled"; - public static final boolean REST_SCAN_PLANNING_ENABLED_DEFAULT = false; + // Configure scan planning mode + // Can be set by server in LoadTableResponse.config() for table-level override + public static final String SCAN_PLANNING_MODE = "scan-planning-mode"; + public static final String SCAN_PLANNING_MODE_DEFAULT = ScanPlanningMode.CLIENT.modeName(); public static final String REST_SCAN_PLAN_ID = "rest-scan-plan-id"; @@ -59,4 +60,38 @@ public enum SnapshotMode { ALL, REFS } + + /** + * Enum to represent scan planning mode. + * + * + */ + public enum ScanPlanningMode { + CLIENT("client"), + SERVER("server"); + + private final String modeName; + + ScanPlanningMode(String modeName) { + this.modeName = modeName; + } + + public String modeName() { + return modeName; + } + + public static ScanPlanningMode fromString(String mode) { + for (ScanPlanningMode planningMode : values()) { + if (planningMode.modeName.equalsIgnoreCase(mode)) { + return planningMode; + } + } + + throw new IllegalArgumentException( + String.format("Invalid scan planning mode: %s. Valid values are: client, server", mode)); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index cda71fccda3a..f23d1ea72b9d 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -166,7 +166,6 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private MetricsReporter reporter = null; private boolean reportingViaRestEnabled; private Integer pageSize = null; - private boolean restScanPlanningEnabled; private CloseableGroup closeables = null; private Set endpoints; private Supplier> mutationHeaders = Map::of; @@ -281,12 +280,6 @@ public void initialize(String name, Map unresolved) { RESTCatalogProperties.NAMESPACE_SEPARATOR, RESTUtil.NAMESPACE_SEPARATOR_URLENCODED_UTF_8); - this.restScanPlanningEnabled = - PropertyUtil.propertyAsBoolean( - mergedProps, - RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, - RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED_DEFAULT); - this.tableCache = createTableCache(mergedProps); this.closeables.addCloseable(this.tableCache); @@ -584,7 +577,7 @@ private Supplier createTableSupplier( trackFileIO(ops); - RESTTable table = restTableForScanPlanning(ops, identifier, tableClient); + RESTTable table = restTableForScanPlanning(ops, identifier, tableClient, tableConf); if (table != null) { return table; } @@ -595,9 +588,35 @@ private Supplier createTableSupplier( } private RESTTable restTableForScanPlanning( - TableOperations ops, TableIdentifier finalIdentifier, RESTClient restClient) { - // server supports remote planning endpoint and server / client wants to do server side planning - if (endpoints.contains(Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN) && restScanPlanningEnabled) { + TableOperations ops, + TableIdentifier finalIdentifier, + RESTClient restClient, + Map tableConf) { + // Get client-side and server-side scan planning modes + String planningModeClientConfig = properties().get(RESTCatalogProperties.SCAN_PLANNING_MODE); + String planningModeServerConfig = tableConf.get(RESTCatalogProperties.SCAN_PLANNING_MODE); + + // Validate that client and server configs don't conflict + // Only validate if BOTH are explicitly set (not null) + if (planningModeClientConfig != null && planningModeServerConfig != null) { + Preconditions.checkState( + planningModeClientConfig.equalsIgnoreCase(planningModeServerConfig), + "Scan planning mode mismatch for table %s: client config=%s, server config=%s", + finalIdentifier, + planningModeClientConfig, + planningModeServerConfig); + } + + // Determine effective mode: prefer server config if present, otherwise use client config + String effectiveModeConfig = + planningModeServerConfig != null ? planningModeServerConfig : planningModeClientConfig; + RESTCatalogProperties.ScanPlanningMode effectiveMode = + effectiveModeConfig != null + ? RESTCatalogProperties.ScanPlanningMode.fromString(effectiveModeConfig) + : RESTCatalogProperties.ScanPlanningMode.CLIENT; + + if (effectiveMode == RESTCatalogProperties.ScanPlanningMode.SERVER + && endpoints.contains(Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN)) { return new RESTTable( ops, fullTableName(finalIdentifier), @@ -610,6 +629,8 @@ private RESTTable restTableForScanPlanning( properties(), conf); } + + // Default to client-side planning return null; } @@ -683,7 +704,7 @@ public Table registerTable( trackFileIO(ops); - RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient); + RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient, tableConf); if (restTable != null) { return restTable; } @@ -952,7 +973,7 @@ public Table create() { trackFileIO(ops); - RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient); + RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient, tableConf); if (restTable != null) { return restTable; } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java index ab0e1d9c56d0..d5cbf2e41377 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java @@ -40,6 +40,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.ContentFile; import org.apache.iceberg.ContentScanTask; @@ -64,6 +65,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -81,22 +84,19 @@ public T execute( Class responseType, Consumer errorHandler, Consumer> responseHeaders) { - if (ResourcePaths.config().equals(request.path())) { - return castResponse( - responseType, - ConfigResponse.builder() - .withEndpoints( - Arrays.stream(Route.values()) - .map(r -> Endpoint.create(r.method().name(), r.resourcePath())) - .collect(Collectors.toList())) - .withOverrides( - ImmutableMap.of(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true")) - .build()); - } + // roundTripSerialize before intercepting so we modify the deserialized response Object body = roundTripSerialize(request.body(), "request"); HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build(); T response = super.execute(req, responseType, errorHandler, responseHeaders); - return roundTripSerialize(response, "response"); + response = roundTripSerialize(response, "response"); + + // Add scan planning mode to table config for LoadTableResponse + if (response instanceof LoadTableResponse) { + return castResponse( + responseType, withServerPlanningMode((LoadTableResponse) response)); + } + + return response; } }); } @@ -106,8 +106,44 @@ protected String catalogName() { return "prod-with-scan-planning"; } + @BeforeEach + public void setupCatalogWithScanPlanning() throws Exception { + // Reinitialize the catalog with scan planning mode set on client side + // This matches what the server returns in LoadTableResponse + restCatalog.close(); + restCatalog = + new RESTCatalog( + SessionCatalog.SessionContext.createEmpty(), + (config) -> + HTTPClient.builder(config) + .uri(config.get(CatalogProperties.URI)) + .withHeaders(RESTUtil.configHeaders(config)) + .build()); + restCatalog.setConf(new org.apache.hadoop.conf.Configuration()); + restCatalog.initialize( + catalogName(), + ImmutableMap.builder() + .put(CatalogProperties.URI, httpServer.getURI().toString()) + .put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) + .build()); + } + // ==================== Helper Methods ==================== + private static LoadTableResponse withServerPlanningMode(LoadTableResponse response) { + return LoadTableResponse.builder() + .withTableMetadata(response.tableMetadata()) + .addAllConfig(response.config()) + .addConfig( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) + .addAllCredentials(response.credentials()) + .build(); + } + @Override @SuppressWarnings("unchecked") protected T roundTripSerialize(T payload, String description) { @@ -805,6 +841,71 @@ private static class CatalogWithAdapter { } } + /** + * Helper method to create a catalog with custom scan planning mode configuration for testing + * client-server mode validation. + * + * @param clientMode The scan planning mode the client requests (null for default) + * @param serverMode The scan planning mode the server returns (null for not set) + * @return CatalogWithAdapter for testing + */ + private CatalogWithAdapter catalogWithScanPlanningModes(String clientMode, String serverMode) { + RESTCatalogAdapter adapter = + Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + public T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + if (ResourcePaths.config().equals(request.path())) { + return castResponse( + responseType, + ConfigResponse.builder() + .withEndpoints( + Arrays.stream(Route.values()) + .map(r -> Endpoint.create(r.method().name(), r.resourcePath())) + .collect(Collectors.toList())) + .build()); + } + Object body = roundTripSerialize(request.body(), "request"); + HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build(); + T response = super.execute(req, responseType, errorHandler, responseHeaders); + response = roundTripSerialize(response, "response"); + + // Add server's scan planning mode to LoadTableResponse if specified + if (response instanceof LoadTableResponse && serverMode != null) { + LoadTableResponse loadResponse = (LoadTableResponse) response; + return castResponse( + responseType, + LoadTableResponse.builder() + .withTableMetadata(loadResponse.tableMetadata()) + .addAllConfig(loadResponse.config()) + .addConfig(RESTCatalogProperties.SCAN_PLANNING_MODE, serverMode) + .addAllCredentials(loadResponse.credentials()) + .build()); + } + + return response; + } + }); + + RESTCatalog catalog = + new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); + + ImmutableMap.Builder configBuilder = + ImmutableMap.builder() + .put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"); + + if (clientMode != null) { + configBuilder.put(RESTCatalogProperties.SCAN_PLANNING_MODE, clientMode); + } + + catalog.initialize("test-scan-planning-modes", configBuilder.build()); + return new CatalogWithAdapter(catalog, adapter); + } + // Helper: Create base catalog endpoints (namespace and table operations) private List baseCatalogEndpoints() { return ImmutableList.of( @@ -840,7 +941,15 @@ public T execute( return castResponse( responseType, ConfigResponse.builder().withEndpoints(endpoints).build()); } - return super.execute(request, responseType, errorHandler, responseHeaders); + T response = super.execute(request, responseType, errorHandler, responseHeaders); + + // Add scan planning mode to table config for LoadTableResponse + if (response instanceof LoadTableResponse) { + return castResponse( + responseType, withServerPlanningMode((LoadTableResponse) response)); + } + + return response; } }); @@ -855,8 +964,8 @@ public T execute( ImmutableMap.of( CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO", - RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, - "true")); + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName())); return new CatalogWithAdapter(catalog, adapter); } @@ -953,4 +1062,79 @@ public void serverSupportsPlanningButNotCancellation() throws IOException { // Verify no exception was thrown - cancelPlan returns false when endpoint not supported assertThat(cancelled).isFalse(); } + + @Test + public void catalogAndTableConfigMismatch() { + CatalogWithAdapter catalogWithAdapter = + catalogWithScanPlanningModes( + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName(), + RESTCatalogProperties.ScanPlanningMode.CLIENT.modeName()); + catalogWithAdapter.catalog.createNamespace(NS); + + assertThatThrownBy( + () -> + catalogWithAdapter + .catalog + .buildTable(TableIdentifier.of(NS, "mismatch_test"), SCHEMA) + .create()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Scan planning mode mismatch") + .hasMessageContaining("client config=server") + .hasMessageContaining("server config=client"); + } + + @Test + public void clientExplicitlyRequestsClientSidePlanning() { + CatalogWithAdapter catalogWithAdapter = + catalogWithScanPlanningModes( + RESTCatalogProperties.ScanPlanningMode.CLIENT.modeName(), + RESTCatalogProperties.ScanPlanningMode.CLIENT.modeName()); + catalogWithAdapter.catalog.createNamespace(NS); + + Table table = + catalogWithAdapter + .catalog + .buildTable(TableIdentifier.of(NS, "client_explicit_test"), SCHEMA) + .create(); + + assertThat(table).isNotInstanceOf(RESTTable.class); + assertThat(table).isInstanceOf(BaseTable.class); + } + + @Test + public void clientRequestsClientButServerReturnsCatalog() { + CatalogWithAdapter catalogWithAdapter = + catalogWithScanPlanningModes( + RESTCatalogProperties.ScanPlanningMode.CLIENT.modeName(), + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()); + catalogWithAdapter.catalog.createNamespace(NS); + + assertThatThrownBy( + () -> + catalogWithAdapter + .catalog + .buildTable(TableIdentifier.of(NS, "client_override_rejected_test"), SCHEMA) + .create()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Scan planning mode mismatch") + .hasMessageContaining("client config=client") + .hasMessageContaining("server config=server"); + } + + @Test + public void clientRequestsClientAndServerReturnsNothing() { + CatalogWithAdapter catalogWithAdapter = + catalogWithScanPlanningModes( + RESTCatalogProperties.ScanPlanningMode.CLIENT.modeName(), null); + catalogWithAdapter.catalog.createNamespace(NS); + + Table table = + catalogWithAdapter + .catalog + .buildTable(TableIdentifier.of(NS, "client_server_null_test"), SCHEMA) + .create(); + + assertThat(table).isNotInstanceOf(RESTTable.class); + assertThat(table).isInstanceOf(BaseTable.class); + } } diff --git a/open-api/rest-catalog-open-api.py b/open-api/rest-catalog-open-api.py index 411881cb31ab..bc8a027b28ab 100644 --- a/open-api/rest-catalog-open-api.py +++ b/open-api/rest-catalog-open-api.py @@ -1468,6 +1468,9 @@ class LoadTableResult(BaseModel): ## General Configurations - `token`: Authorization bearer token to use for table requests if OAuth2 security is enabled + - `scan-planning-mode`: Controls scan planning behavior for table operations. Valid values: + - `client` (default): Clients MUST use client-side scan planning + - `server`: Clients MUST use server-side scan planning if the server supports it, otherwise MUST fall back to client-side planning ## AWS Configurations diff --git a/open-api/rest-catalog-open-api.yaml b/open-api/rest-catalog-open-api.yaml index fff71128e5e5..20e15fc425e8 100644 --- a/open-api/rest-catalog-open-api.yaml +++ b/open-api/rest-catalog-open-api.yaml @@ -3468,6 +3468,9 @@ components: ## General Configurations - `token`: Authorization bearer token to use for table requests if OAuth2 security is enabled + - `scan-planning-mode`: Controls scan planning behavior for table operations. Valid values: + - `client` (default): Clients MUST use client-side scan planning + - `server`: Clients MUST use server-side scan planning if the server supports it, otherwise MUST fall back to client-side planning ## AWS Configurations diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index ed90da7fd4a8..9c31eb970b56 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index ed90da7fd4a8..9c31eb970b56 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index ed90da7fd4a8..9c31eb970b56 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index ed90da7fd4a8..9c31eb970b56 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" }