diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index 5b1f64ff16..8b16f87134 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -23,6 +23,9 @@ import org.apache.uniffle.common.config.ConfigOptions; import org.apache.uniffle.common.config.ConfigUtils; import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker; +import org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker; +import org.apache.uniffle.coordinator.access.checker.AccessSupportRssChecker; import org.apache.uniffle.coordinator.conf.ClientConfParser; import org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStrategy; import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory; @@ -91,8 +94,9 @@ public class CoordinatorConf extends RssBaseConf { .stringType() .asList() .defaultValues( - "org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker", - "org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker") + AccessClusterLoadChecker.class.getCanonicalName(), + AccessQuotaChecker.class.getCanonicalName(), + AccessSupportRssChecker.class.getCanonicalName()) .withDescription("Access checkers"); public static final ConfigOption COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC = ConfigOptions.key("rss.coordinator.access.candidates.updateIntervalSec") @@ -257,6 +261,15 @@ public class CoordinatorConf extends RssBaseConf { .defaultValues("appHeartbeat", "heartbeat") .withDescription("Exclude record rpc audit operation list, separated by ','"); + public static final ConfigOption> COORDINATOR_UNSUPPORTED_CONFIGS = + ConfigOptions.key("rss.coordinator.unsupportedConfigs") + .stringType() + .asList() + .defaultValues("serializer:org.apache.hadoop.io.serializer.JavaSerialization") + .withDescription( + "The unsupported config list separated by ',', the key value separated by ':'. If the client configures these properties " + + "and they are set to be denied access, the client's access will be rejected."); + public CoordinatorConf() {} public CoordinatorConf(String fileName) { diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java index d6110cce42..1833879b1e 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java @@ -22,8 +22,16 @@ /** Abstract class for checking the access info from the client-side. */ public abstract class AbstractAccessChecker implements AccessChecker { - protected AbstractAccessChecker(AccessManager accessManager) throws Exception {} + private final AccessManager accessManager; + + protected AbstractAccessChecker(AccessManager accessManager) throws Exception { + this.accessManager = accessManager; + } @Override public void refreshAccessChecker() {} + + public AccessManager getAccessManager() { + return accessManager; + } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessSupportRssChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessSupportRssChecker.java new file mode 100644 index 0000000000..9c143cac80 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessSupportRssChecker.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.access.checker; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.util.Constants; +import org.apache.uniffle.coordinator.AccessManager; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.coordinator.access.AccessCheckResult; +import org.apache.uniffle.coordinator.access.AccessInfo; +import org.apache.uniffle.coordinator.metric.CoordinatorMetrics; + +/** + * AccessSupportRssChecker checks whether the extra properties support rss, for example, the + * serializer is java, rss is not supported. + */ +public class AccessSupportRssChecker extends AbstractAccessChecker { + private static final Logger LOG = LoggerFactory.getLogger(AccessSupportRssChecker.class); + private final HashMap unsupportedConfigMap; + + public AccessSupportRssChecker(AccessManager accessManager) throws Exception { + super(accessManager); + List unsupportedConfigs = + accessManager.getCoordinatorConf().get(CoordinatorConf.COORDINATOR_UNSUPPORTED_CONFIGS); + unsupportedConfigMap = new HashMap<>(); + if (unsupportedConfigs != null && !unsupportedConfigs.isEmpty()) { + for (String keyValue : unsupportedConfigs) { + String[] pair = keyValue.split(":", 2); + if (pair.length == 2) { + unsupportedConfigMap.put(pair[0], pair[1]); + } else { + LOG.error("Unsupported config {} has wrong format, skip it.", keyValue); + } + } + } + } + + @Override + public AccessCheckResult check(AccessInfo accessInfo) { + for (Map.Entry entry : unsupportedConfigMap.entrySet()) { + String unsupportedConfKey = entry.getKey(); + String unsupportedConfValue = entry.getValue(); + String actualConfValue = accessInfo.getExtraProperties().get(unsupportedConfKey); + if (Objects.equals(actualConfValue, unsupportedConfValue)) { + String msg = + String.format( + "Denied by AccessSupportRssChecker, %s is %s, AccessSupportRssChecker does not supported.", + unsupportedConfKey, actualConfValue); + LOG.debug(msg); + CoordinatorMetrics.counterTotalSupportRssDeniedRequest.inc(); + return new AccessCheckResult(false, msg); + } + } + + return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE); + } + + @Override + public void close() throws IOException {} +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java index a97892526e..060f589cfc 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java @@ -43,6 +43,7 @@ public class CoordinatorMetrics { private static final String TOTAL_CANDIDATES_DENIED_REQUEST = "total_candidates_denied_request"; private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request"; private static final String TOTAL_QUOTA_DENIED_REQUEST = "total_quota_denied_request"; + private static final String TOTAL_SUPPORT_RSS_DENIED_REQUEST = "total_support_rss_denied_request"; public static final String REMOTE_STORAGE_IN_USED_PREFIX = "remote_storage_in_used_"; public static final String APP_NUM_TO_USER = "app_num"; public static final String USER_LABEL = "user_name"; @@ -57,6 +58,7 @@ public class CoordinatorMetrics { public static Counter counterTotalCandidatesDeniedRequest; public static Counter counterTotalQuotaDeniedRequest; public static Counter counterTotalLoadDeniedRequest; + public static Counter counterTotalSupportRssDeniedRequest; public static final Map GAUGE_USED_REMOTE_STORAGE = JavaUtils.newConcurrentMap(); private static MetricsManager metricsManager; @@ -118,5 +120,7 @@ private static void setUpMetrics() { metricsManager.addCounter(TOTAL_CANDIDATES_DENIED_REQUEST); counterTotalQuotaDeniedRequest = metricsManager.addCounter(TOTAL_QUOTA_DENIED_REQUEST); counterTotalLoadDeniedRequest = metricsManager.addCounter(TOTAL_LOAD_DENIED_REQUEST); + counterTotalSupportRssDeniedRequest = + metricsManager.addCounter(TOTAL_SUPPORT_RSS_DENIED_REQUEST); } } diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessSupportRssCheckerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessSupportRssCheckerTest.java new file mode 100644 index 0000000000..0e2795e26e --- /dev/null +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessSupportRssCheckerTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.checker; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.serializer.JavaSerialization; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.uniffle.coordinator.AccessManager; +import org.apache.uniffle.coordinator.ApplicationManager; +import org.apache.uniffle.coordinator.ClusterManager; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.coordinator.SimpleClusterManager; +import org.apache.uniffle.coordinator.access.AccessInfo; +import org.apache.uniffle.coordinator.access.checker.AccessSupportRssChecker; +import org.apache.uniffle.coordinator.metric.CoordinatorMetrics; + +import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ACCESS_CHECKERS; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +public class AccessSupportRssCheckerTest { + + @BeforeEach + public void setUp() { + CoordinatorMetrics.register(); + } + + @AfterEach + public void clear() { + CoordinatorMetrics.clear(); + } + + @Test + public void test() throws Exception { + ClusterManager clusterManager = mock(SimpleClusterManager.class); + + CoordinatorConf conf = new CoordinatorConf(); + conf.set( + COORDINATOR_ACCESS_CHECKERS, + Collections.singletonList(AccessSupportRssChecker.class.getName())); + Map properties = new HashMap<>(); + + /** case1: check success when the serializer config is empty. */ + try (ApplicationManager applicationManager = new ApplicationManager(conf)) { + AccessManager accessManager = + new AccessManager( + conf, clusterManager, applicationManager.getQuotaManager(), new Configuration()); + AccessSupportRssChecker checker = + (AccessSupportRssChecker) accessManager.getAccessCheckers().get(0); + AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user"); + assertTrue(checker.check(accessInfo).isSuccess()); + } + + /** case2: check failed when the serializer config is JavaSerialization. */ + properties.put("serializer", JavaSerialization.class.getCanonicalName()); + try (ApplicationManager applicationManager = new ApplicationManager(conf)) { + AccessManager accessManager = + new AccessManager( + conf, clusterManager, applicationManager.getQuotaManager(), new Configuration()); + AccessSupportRssChecker checker = + (AccessSupportRssChecker) accessManager.getAccessCheckers().get(0); + AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user"); + assertFalse(checker.check(accessInfo).isSuccess()); + } + + /** case3: check success when the serializer config is other than JavaSerialization. */ + properties.put("serializer", WritableSerialization.class.getCanonicalName()); + try (ApplicationManager applicationManager = new ApplicationManager(conf)) { + AccessManager accessManager = + new AccessManager( + conf, clusterManager, applicationManager.getQuotaManager(), new Configuration()); + AccessSupportRssChecker checker = + (AccessSupportRssChecker) accessManager.getAccessCheckers().get(0); + AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user"); + assertTrue(checker.check(accessInfo).isSuccess()); + } + } +} diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md index bb9fce77ed..bd87734b6f 100644 --- a/docs/coordinator_guide.md +++ b/docs/coordinator_guide.md @@ -82,35 +82,36 @@ This document will introduce how to deploy Uniffle coordinators. ## Configuration ### Common settings -| Property Name | Default | Description | -|--------------------------------------------------------|------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| rss.coordinator.server.heartbeat.timeout | 30000 | Timeout if can't get heartbeat from shuffle server | -| rss.coordinator.server.periodic.output.interval.times | 30 | The periodic interval times of output alive nodes. The interval sec can be calculated by (rss.coordinator.server.heartbeat.timeout/3 * rss.coordinator.server.periodic.output.interval.times). Default output interval is 5min. | -| rss.coordinator.assignment.strategy | PARTITION_BALANCE | Strategy for assigning shuffle server, PARTITION_BALANCE should be used for workload balance | -| rss.coordinator.app.expired | 60000 | Application expired time (ms), the heartbeat interval should be less than it | -| rss.coordinator.shuffle.nodes.max | 9 | The max number of shuffle server when do the assignment | -| rss.coordinator.dynamicClientConf.path | - | The path of configuration file which have default conf for rss client | -| rss.coordinator.exclude.nodes.file.path | - | The path of configuration file which have exclude nodes | -| rss.coordinator.exclude.nodes.check.interval.ms | 60000 | Update interval (ms) for exclude nodes | -| rss.coordinator.access.checkers | org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker | The access checkers will be used when the spark client use the DelegationShuffleManager, which will decide whether to use rss according to the result of the specified access checkers | -| rss.coordinator.access.loadChecker.memory.percentage | 15.0 | The minimal percentage of available memory percentage of a server | -| rss.coordinator.dynamicClientConf.enabled | false | whether to enable dynamic client conf, which will be fetched by spark client | -| rss.coordinator.dynamicClientConf.path | - | The dynamic client conf of this cluster and can be stored in HADOOP FS or local | -| rss.coordinator.dynamicClientConf.updateIntervalSec | 120 | The dynamic client conf update interval in seconds | -| rss.coordinator.remote.storage.cluster.conf | - | Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';' | -| rss.rpc.server.port | - | RPC port for coordinator | -| rss.jetty.http.port | - | Http port for coordinator | -| rss.coordinator.remote.storage.select.strategy | APP_BALANCE | Strategy for selecting the remote path | -| rss.coordinator.remote.storage.io.sample.schedule.time | 60000 | The time of scheduling the read and write time of the paths to obtain different HADOOP FS | -| rss.coordinator.remote.storage.io.sample.file.size | 204800000 | The size of the file that the scheduled thread reads and writes | -| rss.coordinator.remote.storage.io.sample.access.times | 3 | The number of times to read and write HADOOP FS files | -| rss.coordinator.startup-silent-period.enabled | false | Enable the startup-silent-period to reject the assignment requests for avoiding partial assignments. To avoid service interruption, this mechanism is disabled by default. Especially it's recommended to use in coordinator HA mode when restarting single coordinator. | -| rss.coordinator.startup-silent-period.duration | 20000 | The waiting duration(ms) when conf of rss.coordinator.startup-silent-period.enabled is enabled. | -| rss.coordinator.select.partition.strategy | CONTINUOUS | There are two strategies for selecting partitions: ROUND and CONTINUOUS. ROUND will poll to allocate partitions to ShuffleServer, and CONTINUOUS will try to allocate consecutive partitions to ShuffleServer, this feature can improve performance in AQE scenarios. | -| rss.metrics.reporter.class | - | The class of metrics reporter. | -| rss.reconfigure.interval.sec | 5 | Reconfigure check interval. | -| rss.coordinator.rpc.audit.log.enabled | true | When set to true, for auditing purposes, the coordinator will log audit records for every rpc request operation. | -| rss.coordinator.rpc.audit.log.excludeList | appHeartbeat,heartbeat | Exclude record rpc audit operation list, separated by ','. | +| Property Name | Default | Description | +|--------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| rss.coordinator.server.heartbeat.timeout | 30000 | Timeout if can't get heartbeat from shuffle server | +| rss.coordinator.server.periodic.output.interval.times | 30 | The periodic interval times of output alive nodes. The interval sec can be calculated by (rss.coordinator.server.heartbeat.timeout/3 * rss.coordinator.server.periodic.output.interval.times). Default output interval is 5min. | +| rss.coordinator.assignment.strategy | PARTITION_BALANCE | Strategy for assigning shuffle server, PARTITION_BALANCE should be used for workload balance | +| rss.coordinator.app.expired | 60000 | Application expired time (ms), the heartbeat interval should be less than it | +| rss.coordinator.shuffle.nodes.max | 9 | The max number of shuffle server when do the assignment | +| rss.coordinator.dynamicClientConf.path | - | The path of configuration file which have default conf for rss client | +| rss.coordinator.exclude.nodes.file.path | - | The path of configuration file which have exclude nodes | +| rss.coordinator.exclude.nodes.check.interval.ms | 60000 | Update interval (ms) for exclude nodes | +| rss.coordinator.access.checkers | org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker,org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker,org.apache.uniffle.coordinator.access.checker.AccessSupportRssChecker | The access checkers will be used when the spark client use the DelegationShuffleManager, which will decide whether to use rss according to the result of the specified access checkers | +| rss.coordinator.access.loadChecker.memory.percentage | 15.0 | The minimal percentage of available memory percentage of a server | +| rss.coordinator.dynamicClientConf.enabled | false | whether to enable dynamic client conf, which will be fetched by spark client | +| rss.coordinator.dynamicClientConf.path | - | The dynamic client conf of this cluster and can be stored in HADOOP FS or local | +| rss.coordinator.dynamicClientConf.updateIntervalSec | 120 | The dynamic client conf update interval in seconds | +| rss.coordinator.remote.storage.cluster.conf | - | Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';' | +| rss.rpc.server.port | - | RPC port for coordinator | +| rss.jetty.http.port | - | Http port for coordinator | +| rss.coordinator.remote.storage.select.strategy | APP_BALANCE | Strategy for selecting the remote path | +| rss.coordinator.remote.storage.io.sample.schedule.time | 60000 | The time of scheduling the read and write time of the paths to obtain different HADOOP FS | +| rss.coordinator.remote.storage.io.sample.file.size | 204800000 | The size of the file that the scheduled thread reads and writes | +| rss.coordinator.remote.storage.io.sample.access.times | 3 | The number of times to read and write HADOOP FS files | +| rss.coordinator.startup-silent-period.enabled | false | Enable the startup-silent-period to reject the assignment requests for avoiding partial assignments. To avoid service interruption, this mechanism is disabled by default. Especially it's recommended to use in coordinator HA mode when restarting single coordinator. | +| rss.coordinator.startup-silent-period.duration | 20000 | The waiting duration(ms) when conf of rss.coordinator.startup-silent-period.enabled is enabled. | +| rss.coordinator.select.partition.strategy | CONTINUOUS | There are two strategies for selecting partitions: ROUND and CONTINUOUS. ROUND will poll to allocate partitions to ShuffleServer, and CONTINUOUS will try to allocate consecutive partitions to ShuffleServer, this feature can improve performance in AQE scenarios. | +| rss.metrics.reporter.class | - | The class of metrics reporter. | +| rss.reconfigure.interval.sec | 5 | Reconfigure check interval. | +| rss.coordinator.rpc.audit.log.enabled | true | When set to true, for auditing purposes, the coordinator will log audit records for every rpc request operation. | +| rss.coordinator.rpc.audit.log.excludeList | appHeartbeat,heartbeat | Exclude record rpc audit operation list, separated by ','. | +| rss.coordinator.unsupportedConfigs | serializer:org.apache.hadoop.io.serializer.JavaSerialization | The unsupported config list separated by ',', the key value separated by ':'. If the client configures these properties and they are set to be denied access, the client's access will be rejected. | ### AccessClusterLoadChecker settings |Property Name|Default| Description|