From e2bacf768c72a39b62aaa88440d61b0ca321c8f8 Mon Sep 17 00:00:00 2001 From: Aleksei Ieshin Date: Mon, 22 Dec 2025 18:21:59 +1100 Subject: [PATCH] HDDS-14212 Change failoverCount to be per request --- .../src/main/resources/ozone-default.xml | 6 +- .../ozone/om/protocolPB/GrpcOmTransport.java | 20 +- ...TestGrpcOmTransportConcurrentFailover.java | 305 ++++++++++++++++++ .../om/protocolPB/TestS3GrpcOmTransport.java | 42 ++- 4 files changed, 358 insertions(+), 15 deletions(-) create mode 100644 hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestGrpcOmTransportConcurrentFailover.java diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 7a67f27cc49e..274ab009b5ab 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -3091,9 +3091,9 @@ Expert only. Ozone RpcClient attempts talking to each OzoneManager ipc.client.connect.max.retries (default = 10) number of times before failing over to another OzoneManager, if available. This parameter - represents the number of times the client will failover before giving - up. This value is kept high so that client does not give up trying to - connect to OMs easily. + represents the number of times per request the client will failover + before giving up. This value is kept high so that client does not + give up trying to connect to OMs easily. diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java index 75ea99f77a8e..ef4ec32ddc26 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java @@ -81,12 +81,11 @@ public class GrpcOmTransport implements OmTransport { private ConfigurationSource conf; private AtomicReference host; - private AtomicInteger syncFailoverCount; + private AtomicInteger globalFailoverCount; private final int maxSize; private SecurityConfig secConfig; private RetryPolicy retryPolicy; - private int failoverCount = 0; private GrpcOMFailoverProxyProvider omFailoverProxyProvider; @@ -102,9 +101,7 @@ public GrpcOmTransport(ConfigurationSource conf, this.clients = new HashMap<>(); this.conf = conf; this.host = new AtomicReference(); - this.failoverCount = 0; - this.syncFailoverCount = new AtomicInteger(); - + this.globalFailoverCount = new AtomicInteger(); secConfig = new SecurityConfig(conf); maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH, @@ -175,12 +172,13 @@ public void start() throws IOException { @Override public OMResponse submitRequest(OMRequest payload) throws IOException { AtomicReference resp = new AtomicReference<>(); + int requestFailoverCount = 0; boolean tryOtherHost = true; int expectedFailoverCount = 0; ResultCodes resultCode = ResultCodes.INTERNAL_ERROR; while (tryOtherHost) { tryOtherHost = false; - expectedFailoverCount = syncFailoverCount.get(); + expectedFailoverCount = globalFailoverCount.get(); try { InetAddress inetAddress = InetAddress.getLocalHost(); Context.current() @@ -201,7 +199,7 @@ public OMResponse submitRequest(OMRequest payload) throws IOException { } Exception exp = new Exception(e); tryOtherHost = shouldRetry(unwrapException(exp), - expectedFailoverCount); + expectedFailoverCount, ++requestFailoverCount); if (!tryOtherHost) { throw new OMException(resultCode); } @@ -251,11 +249,11 @@ private Exception unwrapException(Exception ex) { return grpcException; } - private boolean shouldRetry(Exception ex, int expectedFailoverCount) { + private boolean shouldRetry(Exception ex, int expectedFailoverCount, int requestFailoverCount) { boolean retry = false; RetryPolicy.RetryAction action = null; try { - action = retryPolicy.shouldRetry((Exception)ex, 0, failoverCount++, true); + action = retryPolicy.shouldRetry(ex, 0, requestFailoverCount, true); LOG.debug("grpc failover retry action {}", action.action); if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { retry = false; @@ -273,9 +271,9 @@ private boolean shouldRetry(Exception ex, int expectedFailoverCount) { } } // switch om host to current proxy OMNodeId - if (syncFailoverCount.get() == expectedFailoverCount) { + if (globalFailoverCount.get() == expectedFailoverCount) { omFailoverProxyProvider.performFailover(null); - syncFailoverCount.getAndIncrement(); + globalFailoverCount.getAndIncrement(); } else { LOG.warn("A failover has occurred since the start of current" + " thread retry, NOT failover using current proxy"); diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestGrpcOmTransportConcurrentFailover.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestGrpcOmTransportConcurrentFailover.java new file mode 100644 index 000000000000..c8890eaa49c8 --- /dev/null +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestGrpcOmTransportConcurrentFailover.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.protocolPB; + +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_PORT_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_NODES_KEY; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.Mockito.mock; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.StringJoiner; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.ha.ConfUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Concurrent test for GrpcOmTransport client. + */ +public class TestGrpcOmTransportConcurrentFailover { + private static final Logger LOG = + LoggerFactory.getLogger(TestGrpcOmTransportConcurrentFailover.class); + + private static final String OM_SERVICE_ID = "om-service-test"; + private static final String NODE_ID_BASE = "om"; + private static final int NUM_OMS = 3; + private static final int BASE_PORT = 19860; + + private Map mockServers; + private GrpcOmTransport transport; + + @BeforeEach + public void setUp() throws Exception { + mockServers = new HashMap<>(); + OzoneConfiguration conf = new OzoneConfiguration(); + + conf.setLong(OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY, 250); + conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 10); + conf.set(OZONE_OM_SERVICE_IDS_KEY, OM_SERVICE_ID); + + StringJoiner omNodes = new StringJoiner(","); + + for (int i = 0; i < NUM_OMS; i++) { + String nodeId = NODE_ID_BASE + i; + omNodes.add(nodeId); + + int port = BASE_PORT + i; + MockOMServer server = new MockOMServer(nodeId, port); + server.start(); + mockServers.put(nodeId, server); + + conf.set(ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY, OM_SERVICE_ID, nodeId), + "localhost"); + conf.setInt(ConfUtils.addKeySuffixes(OZONE_OM_GRPC_PORT_KEY, OM_SERVICE_ID, nodeId), + port); + } + + conf.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID), + omNodes.toString()); + + failover("om0", "om1", "om2"); + + transport = new GrpcOmTransport(conf, UserGroupInformation.getCurrentUser(), OM_SERVICE_ID); + } + + @AfterEach + public void tearDown() throws Exception { + if (transport != null) { + transport.close(); + } + for (MockOMServer server : mockServers.values()) { + server.stop(); + } + } + + @Test + public void testConcurrentFailoverTriesAllOMs() throws Exception { + final int numThreads = 500; + final int requestsPerThread = 10; + + sendInitialOmRequestsBeforeFailover(); + failover("om2", "om0", "om1"); + runConcurrentClientRequests(numThreads, requestsPerThread); + + int omsWithFailuresCount = omRequestFailoverDistributionReport(); + int om0FailuresCount = mockServers.get("om0").getFailureCount(); + int om2SuccessesCount = mockServers.get("om2").getSuccessCount(); + + assertTrue(omsWithFailuresCount >= 1, + "At least 1 OMs should receive failed requests during failover. Got: " + omsWithFailuresCount); + assertTrue(om0FailuresCount > 0, "om0 should receive failed requests"); + assertEquals(numThreads * requestsPerThread, om2SuccessesCount, + "All requests should eventually succeed on om2 (leader)"); + } + + private int omRequestFailoverDistributionReport() { + int totalRequests = 0; + int totalFailures = 0; + int totalSuccesses = 0; + int omsWithFailures = 0; + + for (int i = 0; i < NUM_OMS; i++) { + String omId = NODE_ID_BASE + i; + MockOMServer server = mockServers.get(omId); + totalRequests += server.getRequestCount(); + totalFailures += server.getFailureCount(); + totalSuccesses += server.getSuccessCount(); + if (server.getFailureCount() > 0) { + omsWithFailures++; + } + } + + LOG.info("Total requests: {} (failures: {}, successes: {})", totalRequests, totalFailures, totalSuccesses); + LOG.info("OMs that received failed requests: {}/{}", omsWithFailures, NUM_OMS); + + LOG.info("--- Failed Requests (Failover Attempts) ---"); + for (int i = 0; i < NUM_OMS; i++) { + String omId = NODE_ID_BASE + i; + int failures = mockServers.get(omId).getFailureCount(); + double percentage = totalFailures > 0 ? (failures * 100.0 / totalFailures) : 0; + String status = failures == 0 ? " NEVER TRIED!" : ""; + LOG.info(" {}: {} failures ({} %){}", omId, failures, String.format("%.1f", percentage), status); + } + + LOG.info("--- Successful Requests ---"); + for (int i = 0; i < NUM_OMS; i++) { + String omId = NODE_ID_BASE + i; + int successes = mockServers.get(omId).getSuccessCount(); + double percentage = totalSuccesses > 0 ? (successes * 100.0 / totalSuccesses) : 0; + String status = successes > 0 ? " LEADER" : ""; + LOG.info(" {}: {} successes ({} %){}", omId, successes, String.format("%.1f", percentage), status); + } + return omsWithFailures; + } + + private void failover(String leader, String follower1, String follower2) { + mockServers.get(leader).setAsLeader(true); + mockServers.get(follower1).setAsLeader(false); + mockServers.get(follower2).setAsLeader(false); + } + + private void runConcurrentClientRequests(int numThreads, int requestsPerThread) throws InterruptedException { + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + CyclicBarrier startBarrier = new CyclicBarrier(numThreads); + CountDownLatch completionLatch = new CountDownLatch(numThreads); + + for (int threadId = 0; threadId < numThreads; threadId++) { + final int id = threadId; + executor.submit(() -> { + try { + startBarrier.await(); + + for (int attempt = 0; attempt < requestsPerThread; attempt++) { + OMRequest request = OMRequest.newBuilder() + .setCmdType(Type.ListVolume) + .setClientId("test-client") + .build(); + + try { + transport.submitRequest(request); + } catch (Exception e) { + LOG.error("Thread: {}, Request {} failed: {}", id, attempt + 1, e.getMessage()); + } + + Thread.sleep(1); + } + } catch (Exception e) { + LOG.error("Thread: {}, Failed: {}", id, e.getMessage()); + } finally { + completionLatch.countDown(); + } + }); + } + + if (!completionLatch.await(30, TimeUnit.SECONDS)) { + LOG.info("Latch didn't count down before timeout"); + } + executor.shutdown(); + } + + private void sendInitialOmRequestsBeforeFailover() throws IOException { + for (int i = 0; i < 5; i++) { + OMRequest request = OMRequest.newBuilder() + .setCmdType(Type.ListVolume) + .setClientId("test-client") + .build(); + transport.submitRequest(request); + } + } + + private static class MockOMServer { + private final String nodeId; + private final int port; + private final AtomicInteger requestCount = new AtomicInteger(0); + private final AtomicInteger successCount = new AtomicInteger(0); + private final AtomicInteger failureCount = new AtomicInteger(0); + private final AtomicBoolean isLeader = new AtomicBoolean(false); + private final OzoneManagerServiceGrpc.OzoneManagerServiceImplBase serviceImpl = + mock(OzoneManagerServiceGrpc.OzoneManagerServiceImplBase.class, + delegatesTo(new OzoneManagerServiceGrpc.OzoneManagerServiceImplBase() { + @Override + public void submitRequest(OMRequest request, StreamObserver responseObserver) { + requestCount.incrementAndGet(); + + if (!isLeader.get()) { + failureCount.incrementAndGet(); + String errorMsg = "org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException: " + + "OM:" + nodeId + " is not the leader. Suggested leader: om2"; + + responseObserver.onError(new StatusRuntimeException( + Status.INTERNAL.withDescription(errorMsg))); + } else { + successCount.incrementAndGet(); + OMResponse response = OMResponse.newBuilder() + .setCmdType(request.getCmdType()) + .setStatus(org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK) + .setMessage("Success from " + nodeId) + .build(); + + responseObserver.onNext(response); + responseObserver.onCompleted(); + } + } + })); + private Server server; + + MockOMServer(String nodeId, int port) { + this.nodeId = nodeId; + this.port = port; + } + + public void start() throws Exception { + server = ServerBuilder.forPort(port) + .addService(serviceImpl) + .build() + .start(); + } + + public void stop() throws Exception { + if (server != null) { + server.shutdown(); + server.awaitTermination(5, TimeUnit.SECONDS); + } + } + + public void setAsLeader(boolean leader) { + this.isLeader.set(leader); + } + + public int getRequestCount() { + return requestCount.get(); + } + + public int getSuccessCount() { + return successCount.get(); + } + + public int getFailureCount() { + return failureCount.get(); + } + } +} + diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java index d5b8723e0eff..176d9b6d03bd 100644 --- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java @@ -66,6 +66,8 @@ public class TestS3GrpcOmTransport { .build(); private boolean doFailover = false; + private boolean completeFailover = true; + private int failoverCount; private OzoneConfiguration conf; @@ -91,7 +93,10 @@ public void submitRequest(org.apache.hadoop.ozone.protocol.proto responseObserver) { try { if (doFailover) { - doFailover = false; + if (completeFailover) { + doFailover = false; + } + failoverCount++; throw createNotLeaderException(); } else { responseObserver.onNext(omResponse); @@ -122,6 +127,7 @@ private ServiceException createNotLeaderException() { @BeforeEach public void setUp() throws Exception { + failoverCount = 0; // Generate a unique in-process server name. serverName = InProcessServerBuilder.generateName(); @@ -190,6 +196,7 @@ public void testGrpcFailoverProxy() throws Exception { @Test public void testGrpcFailoverProxyExhaustRetry() throws Exception { + final int expectedFailoverCount = 1; ServiceListRequest req = ServiceListRequest.newBuilder().build(); final OMRequest omRequest = OMRequest.newBuilder() @@ -210,6 +217,29 @@ public void testGrpcFailoverProxyExhaustRetry() throws Exception { // max failovers assertThrows(Exception.class, () -> client.submitRequest(omRequest)); + assertEquals(expectedFailoverCount, failoverCount); + } + + @Test + public void testGrpcFailoverProxyCalculatesFailoverCountPerRequest() throws Exception { + final int maxFailoverAttempts = 2; + final int expectedRequest2FailoverAttemptsCount = 1; + doFailover = true; + completeFailover = false; + conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, maxFailoverAttempts); + conf.setLong(OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY, 50); + client = new GrpcOmTransport(conf, ugi, omServiceId); + client.startClient(channel); + + assertThrows(Exception.class, () -> client.submitRequest(arbitraryOmRequest())); + assertEquals(maxFailoverAttempts, failoverCount); + + failoverCount = 0; + completeFailover = true; + //No exception this time + client.submitRequest(arbitraryOmRequest()); + + assertEquals(expectedRequest2FailoverAttemptsCount, failoverCount); } @Test @@ -241,4 +271,14 @@ public void testGrpcFailoverExceedMaxMesgLen() throws Exception { // rather to fail. assertThrows(Exception.class, () -> client.submitRequest(omRequest)); } + + private static OMRequest arbitraryOmRequest() { + ServiceListRequest req = ServiceListRequest.newBuilder().build(); + return OMRequest.newBuilder() + .setCmdType(Type.ServiceList) + .setVersion(CURRENT_VERSION) + .setClientId("test") + .setServiceListRequest(req) + .build(); + } }