From 3d2c4df6a61a8eaa7663f4d10459c78a3f6926c6 Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Sun, 8 Dec 2024 20:53:58 +0800 Subject: [PATCH 1/5] Introduce AccessSupportRssChecker to reject the un-support application earlier --- .../uniffle/coordinator/CoordinatorConf.java | 4 +- .../checker/AccessSupportRssChecker.java | 60 ++++++++++ .../metric/CoordinatorMetrics.java | 4 + .../checker/AccessSupportRssCheckerTest.java | 103 ++++++++++++++++++ 4 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessSupportRssChecker.java create mode 100644 coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessSupportRssCheckerTest.java diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index 5b1f64ff16..d45b6cb949 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -23,6 +23,7 @@ import org.apache.uniffle.common.config.ConfigOptions; import org.apache.uniffle.common.config.ConfigUtils; import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.coordinator.access.checker.AccessSupportRssChecker; import org.apache.uniffle.coordinator.conf.ClientConfParser; import org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStrategy; import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory; @@ -92,7 +93,8 @@ public class CoordinatorConf extends RssBaseConf { .asList() .defaultValues( "org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker", - "org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker") + "org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker", + AccessSupportRssChecker.class.getCanonicalName()) .withDescription("Access checkers"); public static final ConfigOption COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC = ConfigOptions.key("rss.coordinator.access.candidates.updateIntervalSec") diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessSupportRssChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessSupportRssChecker.java new file mode 100644 index 0000000000..add12488d1 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessSupportRssChecker.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.access.checker; + +import java.io.IOException; + +import org.apache.hadoop.io.serializer.JavaSerialization; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.util.Constants; +import org.apache.uniffle.coordinator.AccessManager; +import org.apache.uniffle.coordinator.access.AccessCheckResult; +import org.apache.uniffle.coordinator.access.AccessInfo; +import org.apache.uniffle.coordinator.metric.CoordinatorMetrics; + +/** + * AccessSupportRssChecker checks whether the extra properties support rss, for example, the + * serializer is java, rss is not supported. + */ +public class AccessSupportRssChecker extends AbstractAccessChecker { + private static final Logger LOG = LoggerFactory.getLogger(AccessSupportRssChecker.class); + + public AccessSupportRssChecker(AccessManager accessManager) throws Exception { + super(accessManager); + } + + @Override + public AccessCheckResult check(AccessInfo accessInfo) { + String serializer = accessInfo.getExtraProperties().get("serializer"); + if (JavaSerialization.class.getName().equals(serializer)) { + String msg = String.format("Denied by AccessSupportRssChecker, accessInfo[%s].", accessInfo); + if (LOG.isDebugEnabled()) { + LOG.debug("serializer is {}, {}", serializer, msg); + } + CoordinatorMetrics.counterTotalSupportRssDeniedRequest.inc(); + return new AccessCheckResult(false, msg); + } + + return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE); + } + + @Override + public void close() throws IOException {} +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java index a97892526e..060f589cfc 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java @@ -43,6 +43,7 @@ public class CoordinatorMetrics { private static final String TOTAL_CANDIDATES_DENIED_REQUEST = "total_candidates_denied_request"; private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request"; private static final String TOTAL_QUOTA_DENIED_REQUEST = "total_quota_denied_request"; + private static final String TOTAL_SUPPORT_RSS_DENIED_REQUEST = "total_support_rss_denied_request"; public static final String REMOTE_STORAGE_IN_USED_PREFIX = "remote_storage_in_used_"; public static final String APP_NUM_TO_USER = "app_num"; public static final String USER_LABEL = "user_name"; @@ -57,6 +58,7 @@ public class CoordinatorMetrics { public static Counter counterTotalCandidatesDeniedRequest; public static Counter counterTotalQuotaDeniedRequest; public static Counter counterTotalLoadDeniedRequest; + public static Counter counterTotalSupportRssDeniedRequest; public static final Map GAUGE_USED_REMOTE_STORAGE = JavaUtils.newConcurrentMap(); private static MetricsManager metricsManager; @@ -118,5 +120,7 @@ private static void setUpMetrics() { metricsManager.addCounter(TOTAL_CANDIDATES_DENIED_REQUEST); counterTotalQuotaDeniedRequest = metricsManager.addCounter(TOTAL_QUOTA_DENIED_REQUEST); counterTotalLoadDeniedRequest = metricsManager.addCounter(TOTAL_LOAD_DENIED_REQUEST); + counterTotalSupportRssDeniedRequest = + metricsManager.addCounter(TOTAL_SUPPORT_RSS_DENIED_REQUEST); } } diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessSupportRssCheckerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessSupportRssCheckerTest.java new file mode 100644 index 0000000000..0e2795e26e --- /dev/null +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessSupportRssCheckerTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.checker; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.serializer.JavaSerialization; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.uniffle.coordinator.AccessManager; +import org.apache.uniffle.coordinator.ApplicationManager; +import org.apache.uniffle.coordinator.ClusterManager; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.coordinator.SimpleClusterManager; +import org.apache.uniffle.coordinator.access.AccessInfo; +import org.apache.uniffle.coordinator.access.checker.AccessSupportRssChecker; +import org.apache.uniffle.coordinator.metric.CoordinatorMetrics; + +import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ACCESS_CHECKERS; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +public class AccessSupportRssCheckerTest { + + @BeforeEach + public void setUp() { + CoordinatorMetrics.register(); + } + + @AfterEach + public void clear() { + CoordinatorMetrics.clear(); + } + + @Test + public void test() throws Exception { + ClusterManager clusterManager = mock(SimpleClusterManager.class); + + CoordinatorConf conf = new CoordinatorConf(); + conf.set( + COORDINATOR_ACCESS_CHECKERS, + Collections.singletonList(AccessSupportRssChecker.class.getName())); + Map properties = new HashMap<>(); + + /** case1: check success when the serializer config is empty. */ + try (ApplicationManager applicationManager = new ApplicationManager(conf)) { + AccessManager accessManager = + new AccessManager( + conf, clusterManager, applicationManager.getQuotaManager(), new Configuration()); + AccessSupportRssChecker checker = + (AccessSupportRssChecker) accessManager.getAccessCheckers().get(0); + AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user"); + assertTrue(checker.check(accessInfo).isSuccess()); + } + + /** case2: check failed when the serializer config is JavaSerialization. */ + properties.put("serializer", JavaSerialization.class.getCanonicalName()); + try (ApplicationManager applicationManager = new ApplicationManager(conf)) { + AccessManager accessManager = + new AccessManager( + conf, clusterManager, applicationManager.getQuotaManager(), new Configuration()); + AccessSupportRssChecker checker = + (AccessSupportRssChecker) accessManager.getAccessCheckers().get(0); + AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user"); + assertFalse(checker.check(accessInfo).isSuccess()); + } + + /** case3: check success when the serializer config is other than JavaSerialization. */ + properties.put("serializer", WritableSerialization.class.getCanonicalName()); + try (ApplicationManager applicationManager = new ApplicationManager(conf)) { + AccessManager accessManager = + new AccessManager( + conf, clusterManager, applicationManager.getQuotaManager(), new Configuration()); + AccessSupportRssChecker checker = + (AccessSupportRssChecker) accessManager.getAccessCheckers().get(0); + AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user"); + assertTrue(checker.check(accessInfo).isSuccess()); + } + } +} From 39e01fab0147f5e49387fb47e2a2f6f188888902 Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Fri, 3 Jan 2025 15:23:43 +0800 Subject: [PATCH 2/5] Add a new config "unsupportedConfigs" and refactor code --- .../uniffle/coordinator/CoordinatorConf.java | 9 ++++ .../access/checker/AbstractAccessChecker.java | 10 ++++- .../checker/AccessSupportRssChecker.java | 42 +++++++++++++++---- docs/coordinator_guide.md | 1 + 4 files changed, 53 insertions(+), 9 deletions(-) diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index d45b6cb949..c7bd6f15c0 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -259,6 +259,15 @@ public class CoordinatorConf extends RssBaseConf { .defaultValues("appHeartbeat", "heartbeat") .withDescription("Exclude record rpc audit operation list, separated by ','"); + public static final ConfigOption> COORDINATOR_UNSUPPORTED_CONFIGS = + ConfigOptions.key("rss.coordinator.unsupportedConfigs") + .stringType() + .asList() + .defaultValues("serializer:org.apache.hadoop.io.serializer.JavaSerialization") + .withDescription( + "The unsupported config list separated by ',', the key value separated by ':'. If the client configures these properties " + + "and they are set to be denied access, the client's access will be rejected."); + public CoordinatorConf() {} public CoordinatorConf(String fileName) { diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java index d6110cce42..1833879b1e 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AbstractAccessChecker.java @@ -22,8 +22,16 @@ /** Abstract class for checking the access info from the client-side. */ public abstract class AbstractAccessChecker implements AccessChecker { - protected AbstractAccessChecker(AccessManager accessManager) throws Exception {} + private final AccessManager accessManager; + + protected AbstractAccessChecker(AccessManager accessManager) throws Exception { + this.accessManager = accessManager; + } @Override public void refreshAccessChecker() {} + + public AccessManager getAccessManager() { + return accessManager; + } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessSupportRssChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessSupportRssChecker.java index add12488d1..09149ea8b2 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessSupportRssChecker.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessSupportRssChecker.java @@ -18,13 +18,17 @@ package org.apache.uniffle.coordinator.access.checker; import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; -import org.apache.hadoop.io.serializer.JavaSerialization; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.coordinator.AccessManager; +import org.apache.uniffle.coordinator.CoordinatorConf; import org.apache.uniffle.coordinator.access.AccessCheckResult; import org.apache.uniffle.coordinator.access.AccessInfo; import org.apache.uniffle.coordinator.metric.CoordinatorMetrics; @@ -35,21 +39,43 @@ */ public class AccessSupportRssChecker extends AbstractAccessChecker { private static final Logger LOG = LoggerFactory.getLogger(AccessSupportRssChecker.class); + private final HashMap unsupportedConfigMap; public AccessSupportRssChecker(AccessManager accessManager) throws Exception { super(accessManager); + List unsupportedConfigs = + accessManager.getCoordinatorConf().get(CoordinatorConf.COORDINATOR_UNSUPPORTED_CONFIGS); + unsupportedConfigMap = new HashMap<>(unsupportedConfigs.size()); + if (unsupportedConfigs != null && !unsupportedConfigs.isEmpty()) { + for (String keyValue : unsupportedConfigs) { + String[] pair = keyValue.split(":", 2); + if (pair.length == 2) { + unsupportedConfigMap.put(pair[0], pair[1]); + } else { + LOG.error("Unsupported config {} has wrong format, skip it.", keyValue); + } + } + } } @Override public AccessCheckResult check(AccessInfo accessInfo) { - String serializer = accessInfo.getExtraProperties().get("serializer"); - if (JavaSerialization.class.getName().equals(serializer)) { - String msg = String.format("Denied by AccessSupportRssChecker, accessInfo[%s].", accessInfo); - if (LOG.isDebugEnabled()) { - LOG.debug("serializer is {}, {}", serializer, msg); + if (unsupportedConfigMap == null || unsupportedConfigMap.isEmpty()) { + return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE); + } + for (Map.Entry entry : unsupportedConfigMap.entrySet()) { + String unsupportedConfKey = entry.getKey(); + String unsupportedConfValue = entry.getValue(); + String actualConfValue = accessInfo.getExtraProperties().get(unsupportedConfKey); + if (Objects.equals(actualConfValue, unsupportedConfValue)) { + String msg = + String.format( + "Denied by AccessSupportRssChecker, %s is %s, AccessSupportRssChecker does not supported.", + unsupportedConfKey, actualConfValue); + LOG.debug(msg); + CoordinatorMetrics.counterTotalSupportRssDeniedRequest.inc(); + return new AccessCheckResult(false, msg); } - CoordinatorMetrics.counterTotalSupportRssDeniedRequest.inc(); - return new AccessCheckResult(false, msg); } return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE); diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md index bb9fce77ed..21defe098d 100644 --- a/docs/coordinator_guide.md +++ b/docs/coordinator_guide.md @@ -111,6 +111,7 @@ This document will introduce how to deploy Uniffle coordinators. | rss.reconfigure.interval.sec | 5 | Reconfigure check interval. | | rss.coordinator.rpc.audit.log.enabled | true | When set to true, for auditing purposes, the coordinator will log audit records for every rpc request operation. | | rss.coordinator.rpc.audit.log.excludeList | appHeartbeat,heartbeat | Exclude record rpc audit operation list, separated by ','. | +| rss.coordinator.unsupportedConfigs | serializer:org.apache.hadoop.io.serializer.JavaSerialization | The unsupported config list separated by ',', the key value separated by ':'. If the client configures these properties and they are set to be denied access, the client's access will be rejected. | ### AccessClusterLoadChecker settings |Property Name|Default| Description| From b611bdf82c4dbf8754805da01ec09e8e3d69c08a Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Fri, 3 Jan 2025 15:25:54 +0800 Subject: [PATCH 3/5] Refactor CoordinatorConf to avoid class string --- .../org/apache/uniffle/coordinator/CoordinatorConf.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index c7bd6f15c0..8b16f87134 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -23,6 +23,8 @@ import org.apache.uniffle.common.config.ConfigOptions; import org.apache.uniffle.common.config.ConfigUtils; import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker; +import org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker; import org.apache.uniffle.coordinator.access.checker.AccessSupportRssChecker; import org.apache.uniffle.coordinator.conf.ClientConfParser; import org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStrategy; @@ -92,8 +94,8 @@ public class CoordinatorConf extends RssBaseConf { .stringType() .asList() .defaultValues( - "org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker", - "org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker", + AccessClusterLoadChecker.class.getCanonicalName(), + AccessQuotaChecker.class.getCanonicalName(), AccessSupportRssChecker.class.getCanonicalName()) .withDescription("Access checkers"); public static final ConfigOption COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC = From aa2cbc1f829c4371a31e9d56881b400024b89906 Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Fri, 3 Jan 2025 15:28:41 +0800 Subject: [PATCH 4/5] Update document of rss.coordinator.access.checkers --- docs/coordinator_guide.md | 60 +++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md index 21defe098d..bd87734b6f 100644 --- a/docs/coordinator_guide.md +++ b/docs/coordinator_guide.md @@ -82,36 +82,36 @@ This document will introduce how to deploy Uniffle coordinators. ## Configuration ### Common settings -| Property Name | Default | Description | -|--------------------------------------------------------|------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| rss.coordinator.server.heartbeat.timeout | 30000 | Timeout if can't get heartbeat from shuffle server | -| rss.coordinator.server.periodic.output.interval.times | 30 | The periodic interval times of output alive nodes. The interval sec can be calculated by (rss.coordinator.server.heartbeat.timeout/3 * rss.coordinator.server.periodic.output.interval.times). Default output interval is 5min. | -| rss.coordinator.assignment.strategy | PARTITION_BALANCE | Strategy for assigning shuffle server, PARTITION_BALANCE should be used for workload balance | -| rss.coordinator.app.expired | 60000 | Application expired time (ms), the heartbeat interval should be less than it | -| rss.coordinator.shuffle.nodes.max | 9 | The max number of shuffle server when do the assignment | -| rss.coordinator.dynamicClientConf.path | - | The path of configuration file which have default conf for rss client | -| rss.coordinator.exclude.nodes.file.path | - | The path of configuration file which have exclude nodes | -| rss.coordinator.exclude.nodes.check.interval.ms | 60000 | Update interval (ms) for exclude nodes | -| rss.coordinator.access.checkers | org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker | The access checkers will be used when the spark client use the DelegationShuffleManager, which will decide whether to use rss according to the result of the specified access checkers | -| rss.coordinator.access.loadChecker.memory.percentage | 15.0 | The minimal percentage of available memory percentage of a server | -| rss.coordinator.dynamicClientConf.enabled | false | whether to enable dynamic client conf, which will be fetched by spark client | -| rss.coordinator.dynamicClientConf.path | - | The dynamic client conf of this cluster and can be stored in HADOOP FS or local | -| rss.coordinator.dynamicClientConf.updateIntervalSec | 120 | The dynamic client conf update interval in seconds | -| rss.coordinator.remote.storage.cluster.conf | - | Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';' | -| rss.rpc.server.port | - | RPC port for coordinator | -| rss.jetty.http.port | - | Http port for coordinator | -| rss.coordinator.remote.storage.select.strategy | APP_BALANCE | Strategy for selecting the remote path | -| rss.coordinator.remote.storage.io.sample.schedule.time | 60000 | The time of scheduling the read and write time of the paths to obtain different HADOOP FS | -| rss.coordinator.remote.storage.io.sample.file.size | 204800000 | The size of the file that the scheduled thread reads and writes | -| rss.coordinator.remote.storage.io.sample.access.times | 3 | The number of times to read and write HADOOP FS files | -| rss.coordinator.startup-silent-period.enabled | false | Enable the startup-silent-period to reject the assignment requests for avoiding partial assignments. To avoid service interruption, this mechanism is disabled by default. Especially it's recommended to use in coordinator HA mode when restarting single coordinator. | -| rss.coordinator.startup-silent-period.duration | 20000 | The waiting duration(ms) when conf of rss.coordinator.startup-silent-period.enabled is enabled. | -| rss.coordinator.select.partition.strategy | CONTINUOUS | There are two strategies for selecting partitions: ROUND and CONTINUOUS. ROUND will poll to allocate partitions to ShuffleServer, and CONTINUOUS will try to allocate consecutive partitions to ShuffleServer, this feature can improve performance in AQE scenarios. | -| rss.metrics.reporter.class | - | The class of metrics reporter. | -| rss.reconfigure.interval.sec | 5 | Reconfigure check interval. | -| rss.coordinator.rpc.audit.log.enabled | true | When set to true, for auditing purposes, the coordinator will log audit records for every rpc request operation. | -| rss.coordinator.rpc.audit.log.excludeList | appHeartbeat,heartbeat | Exclude record rpc audit operation list, separated by ','. | -| rss.coordinator.unsupportedConfigs | serializer:org.apache.hadoop.io.serializer.JavaSerialization | The unsupported config list separated by ',', the key value separated by ':'. If the client configures these properties and they are set to be denied access, the client's access will be rejected. | +| Property Name | Default | Description | +|--------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| rss.coordinator.server.heartbeat.timeout | 30000 | Timeout if can't get heartbeat from shuffle server | +| rss.coordinator.server.periodic.output.interval.times | 30 | The periodic interval times of output alive nodes. The interval sec can be calculated by (rss.coordinator.server.heartbeat.timeout/3 * rss.coordinator.server.periodic.output.interval.times). Default output interval is 5min. | +| rss.coordinator.assignment.strategy | PARTITION_BALANCE | Strategy for assigning shuffle server, PARTITION_BALANCE should be used for workload balance | +| rss.coordinator.app.expired | 60000 | Application expired time (ms), the heartbeat interval should be less than it | +| rss.coordinator.shuffle.nodes.max | 9 | The max number of shuffle server when do the assignment | +| rss.coordinator.dynamicClientConf.path | - | The path of configuration file which have default conf for rss client | +| rss.coordinator.exclude.nodes.file.path | - | The path of configuration file which have exclude nodes | +| rss.coordinator.exclude.nodes.check.interval.ms | 60000 | Update interval (ms) for exclude nodes | +| rss.coordinator.access.checkers | org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker,org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker,org.apache.uniffle.coordinator.access.checker.AccessSupportRssChecker | The access checkers will be used when the spark client use the DelegationShuffleManager, which will decide whether to use rss according to the result of the specified access checkers | +| rss.coordinator.access.loadChecker.memory.percentage | 15.0 | The minimal percentage of available memory percentage of a server | +| rss.coordinator.dynamicClientConf.enabled | false | whether to enable dynamic client conf, which will be fetched by spark client | +| rss.coordinator.dynamicClientConf.path | - | The dynamic client conf of this cluster and can be stored in HADOOP FS or local | +| rss.coordinator.dynamicClientConf.updateIntervalSec | 120 | The dynamic client conf update interval in seconds | +| rss.coordinator.remote.storage.cluster.conf | - | Remote Storage Cluster related conf with format $clusterId,$key=$value, separated by ';' | +| rss.rpc.server.port | - | RPC port for coordinator | +| rss.jetty.http.port | - | Http port for coordinator | +| rss.coordinator.remote.storage.select.strategy | APP_BALANCE | Strategy for selecting the remote path | +| rss.coordinator.remote.storage.io.sample.schedule.time | 60000 | The time of scheduling the read and write time of the paths to obtain different HADOOP FS | +| rss.coordinator.remote.storage.io.sample.file.size | 204800000 | The size of the file that the scheduled thread reads and writes | +| rss.coordinator.remote.storage.io.sample.access.times | 3 | The number of times to read and write HADOOP FS files | +| rss.coordinator.startup-silent-period.enabled | false | Enable the startup-silent-period to reject the assignment requests for avoiding partial assignments. To avoid service interruption, this mechanism is disabled by default. Especially it's recommended to use in coordinator HA mode when restarting single coordinator. | +| rss.coordinator.startup-silent-period.duration | 20000 | The waiting duration(ms) when conf of rss.coordinator.startup-silent-period.enabled is enabled. | +| rss.coordinator.select.partition.strategy | CONTINUOUS | There are two strategies for selecting partitions: ROUND and CONTINUOUS. ROUND will poll to allocate partitions to ShuffleServer, and CONTINUOUS will try to allocate consecutive partitions to ShuffleServer, this feature can improve performance in AQE scenarios. | +| rss.metrics.reporter.class | - | The class of metrics reporter. | +| rss.reconfigure.interval.sec | 5 | Reconfigure check interval. | +| rss.coordinator.rpc.audit.log.enabled | true | When set to true, for auditing purposes, the coordinator will log audit records for every rpc request operation. | +| rss.coordinator.rpc.audit.log.excludeList | appHeartbeat,heartbeat | Exclude record rpc audit operation list, separated by ','. | +| rss.coordinator.unsupportedConfigs | serializer:org.apache.hadoop.io.serializer.JavaSerialization | The unsupported config list separated by ',', the key value separated by ':'. If the client configures these properties and they are set to be denied access, the client's access will be rejected. | ### AccessClusterLoadChecker settings |Property Name|Default| Description| From 66210146593e0adb5a07e50fed7f156edcad79ab Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Fri, 3 Jan 2025 15:38:12 +0800 Subject: [PATCH 5/5] Fix findbugs issue --- .../coordinator/access/checker/AccessSupportRssChecker.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessSupportRssChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessSupportRssChecker.java index 09149ea8b2..9c143cac80 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessSupportRssChecker.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessSupportRssChecker.java @@ -45,7 +45,7 @@ public AccessSupportRssChecker(AccessManager accessManager) throws Exception { super(accessManager); List unsupportedConfigs = accessManager.getCoordinatorConf().get(CoordinatorConf.COORDINATOR_UNSUPPORTED_CONFIGS); - unsupportedConfigMap = new HashMap<>(unsupportedConfigs.size()); + unsupportedConfigMap = new HashMap<>(); if (unsupportedConfigs != null && !unsupportedConfigs.isEmpty()) { for (String keyValue : unsupportedConfigs) { String[] pair = keyValue.split(":", 2); @@ -60,9 +60,6 @@ public AccessSupportRssChecker(AccessManager accessManager) throws Exception { @Override public AccessCheckResult check(AccessInfo accessInfo) { - if (unsupportedConfigMap == null || unsupportedConfigMap.isEmpty()) { - return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE); - } for (Map.Entry entry : unsupportedConfigMap.entrySet()) { String unsupportedConfKey = entry.getKey(); String unsupportedConfValue = entry.getValue();