diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java index a7f67e1bc3..08346e1a04 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java @@ -125,6 +125,12 @@ public class RssClientConf { .defaultValue(3) .withDescription("When we fail to send RPC calls, we will retry for maxAttempts times."); + public static final ConfigOption RPC_RETRY_BACKOFF_MS = + ConfigOptions.key("rss.client.rpc.retry.backoff.ms") + .longType() + .defaultValue(500L) + .withDescription("The backoff in milliseconds for RPC retries"); + public static final ConfigOption RPC_NETTY_PAGE_SIZE = ConfigOptions.key("rss.client.rpc.netty.pageSize") .intType() diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java index 1d5d167f10..631e547b11 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/GrpcClient.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.common.util.GrpcNettyUtils; public abstract class GrpcClient { @@ -34,10 +35,25 @@ public abstract class GrpcClient { protected int port; protected boolean usePlaintext; protected int maxRetryAttempts; + protected long rpcRetryBackoffMs; protected ManagedChannel channel; protected GrpcClient(String host, int port, int maxRetryAttempts, boolean usePlaintext) { - this(host, port, maxRetryAttempts, usePlaintext, 0, 0, 0, -1); + this( + host, + port, + maxRetryAttempts, + usePlaintext, + RssClientConf.RPC_RETRY_BACKOFF_MS.defaultValue(), + 0, + 0, + 0, + -1); + } + + protected GrpcClient( + String host, int port, int maxRetryAttempts, boolean usePlaintext, long rpcRetryBackoffMs) { + this(host, port, maxRetryAttempts, usePlaintext, rpcRetryBackoffMs, 0, 0, 0, -1); } protected GrpcClient( @@ -45,6 +61,7 @@ protected GrpcClient( int port, int maxRetryAttempts, boolean usePlaintext, + long rpcRetryBackoffMs, int pageSize, int maxOrder, int smallCacheSize, @@ -53,6 +70,7 @@ protected GrpcClient( this.port = port; this.maxRetryAttempts = maxRetryAttempts; this.usePlaintext = usePlaintext; + this.rpcRetryBackoffMs = rpcRetryBackoffMs; if (nettyEventLoopThreads > 0) { System.setProperty( @@ -69,9 +87,6 @@ protected GrpcClient( channelBuilder.usePlaintext(); } - if (maxRetryAttempts > 0) { - channelBuilder.enableRetry().maxRetryAttempts(maxRetryAttempts); - } channelBuilder.maxInboundMessageSize(Integer.MAX_VALUE); channel = channelBuilder.build(); diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java index 0040e56ef8..e32cc7e14d 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleManagerGrpcClient.java @@ -40,7 +40,9 @@ import org.apache.uniffle.client.response.RssReportShuffleResultResponse; import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse; import org.apache.uniffle.client.response.RssReportShuffleWriteMetricResponse; +import org.apache.uniffle.common.config.RssClientConf; import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.util.RetryUtils; import org.apache.uniffle.proto.RssProtos; import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest; import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse; @@ -57,12 +59,23 @@ public ShuffleManagerGrpcClient(String host, int port, long rpcTimeout) { } public ShuffleManagerGrpcClient(String host, int port, long rpcTimeout, int maxRetryAttempts) { - this(host, port, rpcTimeout, maxRetryAttempts, true); + this( + host, + port, + rpcTimeout, + maxRetryAttempts, + true, + RssClientConf.RPC_RETRY_BACKOFF_MS.defaultValue()); } public ShuffleManagerGrpcClient( - String host, int port, long rpcTimeout, int maxRetryAttempts, boolean usePlaintext) { - super(host, port, maxRetryAttempts, usePlaintext); + String host, + int port, + long rpcTimeout, + int maxRetryAttempts, + boolean usePlaintext, + long rpcRetryBackoffMs) { + super(host, port, maxRetryAttempts, usePlaintext, rpcRetryBackoffMs); blockingStub = ShuffleManagerGrpc.newBlockingStub(channel); this.rpcTimeout = rpcTimeout; } @@ -81,9 +94,14 @@ public RssReportShuffleFetchFailureResponse reportShuffleFetchFailure( ReportShuffleFetchFailureRequest protoRequest = request.toProto(); try { ReportShuffleFetchFailureResponse response = - getBlockingStub().reportShuffleFetchFailure(protoRequest); + RetryUtils.retryWithCondition( + () -> getBlockingStub().reportShuffleFetchFailure(protoRequest), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); return RssReportShuffleFetchFailureResponse.fromProto(response); - } catch (Exception e) { + } catch (Throwable e) { String msg = "Report shuffle fetch failure to host:port[" + host + ":" + port + "] failed"; LOG.warn(msg, e); throw new RssException(msg, e); @@ -94,22 +112,50 @@ public RssReportShuffleFetchFailureResponse reportShuffleFetchFailure( public RssReassignOnStageRetryResponse getPartitionToShufflerServerWithStageRetry( RssPartitionToShuffleServerRequest req) { RssProtos.PartitionToShuffleServerRequest protoRequest = req.toProto(); - RssProtos.ReassignOnStageRetryResponse partitionToShufflerServer = - getBlockingStub().getPartitionToShufflerServerWithStageRetry(protoRequest); - RssReassignOnStageRetryResponse rssReassignOnStageRetryResponse = - RssReassignOnStageRetryResponse.fromProto(partitionToShufflerServer); - return rssReassignOnStageRetryResponse; + try { + RssProtos.ReassignOnStageRetryResponse partitionToShufflerServer = + RetryUtils.retryWithCondition( + () -> getBlockingStub().getPartitionToShufflerServerWithStageRetry(protoRequest), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + return RssReassignOnStageRetryResponse.fromProto(partitionToShufflerServer); + } catch (Throwable e) { + String msg = + "Get partition to shuffle server with stage retry from host:port[" + + host + + ":" + + port + + "] failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } } @Override public RssReassignOnBlockSendFailureResponse getPartitionToShufflerServerWithBlockRetry( RssPartitionToShuffleServerRequest req) { RssProtos.PartitionToShuffleServerRequest protoRequest = req.toProto(); - RssProtos.ReassignOnBlockSendFailureResponse partitionToShufflerServer = - getBlockingStub().getPartitionToShufflerServerWithBlockRetry(protoRequest); - RssReassignOnBlockSendFailureResponse rssReassignOnBlockSendFailureResponse = - RssReassignOnBlockSendFailureResponse.fromProto(partitionToShufflerServer); - return rssReassignOnBlockSendFailureResponse; + try { + RssProtos.ReassignOnBlockSendFailureResponse partitionToShufflerServer = + RetryUtils.retryWithCondition( + () -> getBlockingStub().getPartitionToShufflerServerWithBlockRetry(protoRequest), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + return RssReassignOnBlockSendFailureResponse.fromProto(partitionToShufflerServer); + } catch (Throwable e) { + String msg = + "Get partition to shuffle server with block retry from host:port[" + + host + + ":" + + port + + "] failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } } @Override @@ -118,9 +164,14 @@ public RssReportShuffleWriteFailureResponse reportShuffleWriteFailure( RssProtos.ReportShuffleWriteFailureRequest protoRequest = request.toProto(); try { RssProtos.ReportShuffleWriteFailureResponse response = - getBlockingStub().reportShuffleWriteFailure(protoRequest); + RetryUtils.retryWithCondition( + () -> getBlockingStub().reportShuffleWriteFailure(protoRequest), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); return RssReportShuffleWriteFailureResponse.fromProto(response); - } catch (Exception e) { + } catch (Throwable e) { String msg = "Report shuffle fetch failure to host:port[" + host + ":" + port + "] failed"; LOG.warn(msg, e); throw new RssException(msg, e); @@ -132,47 +183,115 @@ public RssReassignOnBlockSendFailureResponse reassignOnBlockSendFailure( RssReassignOnBlockSendFailureRequest request) { RssProtos.RssReassignOnBlockSendFailureRequest protoReq = RssReassignOnBlockSendFailureRequest.toProto(request); - RssProtos.ReassignOnBlockSendFailureResponse response = - getBlockingStub().reassignOnBlockSendFailure(protoReq); - return RssReassignOnBlockSendFailureResponse.fromProto(response); + try { + RssProtos.ReassignOnBlockSendFailureResponse response = + RetryUtils.retryWithCondition( + () -> getBlockingStub().reassignOnBlockSendFailure(protoReq), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + return RssReassignOnBlockSendFailureResponse.fromProto(response); + } catch (Throwable e) { + String msg = + "Reassign on block send failure from host:port[" + host + ":" + port + "] failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } } @Override public RssGetShuffleResultResponse getShuffleResult(RssGetShuffleResultRequest request) { - RssProtos.GetShuffleResultResponse response = - getBlockingStub().getShuffleResult(request.toProto()); - return RssGetShuffleResultResponse.fromProto(response); + try { + RssProtos.GetShuffleResultResponse response = + RetryUtils.retryWithCondition( + () -> getBlockingStub().getShuffleResult(request.toProto()), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + return RssGetShuffleResultResponse.fromProto(response); + } catch (Throwable e) { + String msg = "Get shuffle result from host:port[" + host + ":" + port + "] failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } } @Override public RssGetShuffleResultResponse getShuffleResultForMultiPart( RssGetShuffleResultForMultiPartRequest request) { - RssProtos.GetShuffleResultForMultiPartResponse response = - getBlockingStub().getShuffleResultForMultiPart(request.toProto()); - return RssGetShuffleResultResponse.fromProto(response); + try { + RssProtos.GetShuffleResultForMultiPartResponse response = + RetryUtils.retryWithCondition( + () -> getBlockingStub().getShuffleResultForMultiPart(request.toProto()), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + return RssGetShuffleResultResponse.fromProto(response); + } catch (Throwable e) { + String msg = + "Get shuffle result for multiport from host:port[" + host + ":" + port + "] failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } } @Override public RssReportShuffleResultResponse reportShuffleResult(RssReportShuffleResultRequest request) { - RssProtos.ReportShuffleResultResponse response = - getBlockingStub().reportShuffleResult(request.toProto()); - return RssReportShuffleResultResponse.fromProto(response); + try { + RssProtos.ReportShuffleResultResponse response = + RetryUtils.retryWithCondition( + () -> getBlockingStub().reportShuffleResult(request.toProto()), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + return RssReportShuffleResultResponse.fromProto(response); + } catch (Throwable e) { + String msg = "Report shuffle result to host:port[" + host + ":" + port + "] failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } } @Override public RssReportShuffleWriteMetricResponse reportShuffleWriteMetric( RssReportShuffleWriteMetricRequest request) { - RssProtos.ReportShuffleWriteMetricResponse response = - getBlockingStub().reportShuffleWriteMetric(request.toProto()); - return RssReportShuffleWriteMetricResponse.fromProto(response); + try { + RssProtos.ReportShuffleWriteMetricResponse response = + RetryUtils.retryWithCondition( + () -> getBlockingStub().reportShuffleWriteMetric(request.toProto()), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + return RssReportShuffleWriteMetricResponse.fromProto(response); + } catch (Throwable e) { + String msg = "Report shuffle write metric to host:port[" + host + ":" + port + "] failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } } @Override public RssReportShuffleReadMetricResponse reportShuffleReadMetric( RssReportShuffleReadMetricRequest request) { - RssProtos.ReportShuffleReadMetricResponse response = - getBlockingStub().reportShuffleReadMetric(request.toProto()); - return RssReportShuffleReadMetricResponse.fromProto(response); + try { + RssProtos.ReportShuffleReadMetricResponse response = + RetryUtils.retryWithCondition( + () -> getBlockingStub().reportShuffleReadMetric(request.toProto()), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + return RssReportShuffleReadMetricResponse.fromProto(response); + } catch (Throwable e) { + String msg = "Report shuffle read metric to host:port[" + host + ":" + port + "] failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } } @Override diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java index 0a5259284c..fe87b30e35 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java @@ -163,6 +163,7 @@ public ShuffleServerGrpcClient(String host, int port) { RssClientConf.RPC_MAX_ATTEMPTS.defaultValue(), RssClientConf.RPC_TIMEOUT_MS.defaultValue(), true, + RssClientConf.RPC_RETRY_BACKOFF_MS.defaultValue(), 0, 0, 0, @@ -185,6 +186,9 @@ public ShuffleServerGrpcClient(RssConf rssConf, String host, int port) { ? RssClientConf.RPC_TIMEOUT_MS.defaultValue() : rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS), true, + rssConf == null + ? RssClientConf.RPC_RETRY_BACKOFF_MS.defaultValue() + : rssConf.getLong(RssClientConf.RPC_RETRY_BACKOFF_MS), 0, 0, 0, @@ -197,6 +201,7 @@ public ShuffleServerGrpcClient( int maxRetryAttempts, long rpcTimeoutMs, boolean usePlaintext, + long rpcRetryBackoffMs, int pageSize, int maxOrder, int smallCacheSize, @@ -206,6 +211,7 @@ public ShuffleServerGrpcClient( port, maxRetryAttempts, usePlaintext, + rpcRetryBackoffMs, pageSize, maxOrder, smallCacheSize, @@ -257,30 +263,35 @@ private ShuffleRegisterResponse doRegisterShuffle( private ShuffleCommitResponse doSendCommit(String appId, int shuffleId) { ShuffleCommitRequest request = ShuffleCommitRequest.newBuilder().setAppId(appId).setShuffleId(shuffleId).build(); - int retryNum = 0; - while (retryNum <= maxRetryAttempts) { - try { - ShuffleCommitResponse response = getBlockingStub().commitShuffleTask(request); - return response; - } catch (Exception e) { - retryNum++; - LOG.warn( - "Send commit to host[" - + host - + "], port[" - + port - + "] failed, try again, retryNum[" - + retryNum - + "]", - e); - } + try { + return RetryUtils.retryWithCondition( + () -> getBlockingStub().commitShuffleTask(request), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + } catch (Throwable e) { + String msg = "Send commit to host[" + host + "], port[" + port + "] failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); } - throw new RssException("Send commit to host[" + host + "], port[" + port + "] failed"); } private AppHeartBeatResponse doSendHeartBeat(String appId, long timeout) { AppHeartBeatRequest request = AppHeartBeatRequest.newBuilder().setAppId(appId).build(); - return blockingStub.withDeadlineAfter(timeout, TimeUnit.MILLISECONDS).appHeartbeat(request); + try { + return RetryUtils.retryWithCondition( + () -> + blockingStub.withDeadlineAfter(timeout, TimeUnit.MILLISECONDS).appHeartbeat(request), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + } catch (Throwable e) { + String msg = "Send heartbeat to host[" + host + "], port[" + port + "] failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } } // Only for tests @@ -449,8 +460,21 @@ private RssProtos.ShuffleUnregisterByAppIdResponse doUnregisterShuffleByAppId( @Override public RssUnregisterShuffleByAppIdResponse unregisterShuffleByAppId( RssUnregisterShuffleByAppIdRequest request) { - RssProtos.ShuffleUnregisterByAppIdResponse rpcResponse = - doUnregisterShuffleByAppId(request.getAppId(), request.getTimeoutSec()); + RssProtos.ShuffleUnregisterByAppIdResponse rpcResponse; + try { + rpcResponse = + RetryUtils.retryWithCondition( + () -> doUnregisterShuffleByAppId(request.getAppId(), request.getTimeoutSec()), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + } catch (Throwable e) { + String msg = + "unregister shuffle by app id from host[" + host + "], port[" + port + "] failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } RssUnregisterShuffleByAppIdResponse response; RssProtos.StatusCode statusCode = rpcResponse.getStatus(); @@ -483,8 +507,22 @@ private RssProtos.ShuffleUnregisterResponse doUnregisterShuffle( @Override public RssUnregisterShuffleResponse unregisterShuffle(RssUnregisterShuffleRequest request) { - RssProtos.ShuffleUnregisterResponse rpcResponse = - doUnregisterShuffle(request.getAppId(), request.getShuffleId(), request.getTimeoutSec()); + RssProtos.ShuffleUnregisterResponse rpcResponse; + try { + rpcResponse = + RetryUtils.retryWithCondition( + () -> + doUnregisterShuffle( + request.getAppId(), request.getShuffleId(), request.getTimeoutSec()), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + } catch (Throwable e) { + String msg = "unregister shuffle from host[" + host + "], port[" + port + "] failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } RssUnregisterShuffleResponse response; RssProtos.StatusCode statusCode = rpcResponse.getStatus(); @@ -512,17 +550,30 @@ public RssUnregisterShuffleResponse unregisterShuffle(RssUnregisterShuffleReques @Override public RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest request) { - ShuffleRegisterResponse rpcResponse = - doRegisterShuffle( - request.getAppId(), - request.getShuffleId(), - request.getPartitionRanges(), - request.getRemoteStorageInfo(), - request.getUser(), - request.getDataDistributionType(), - request.getMaxConcurrencyPerPartitionToWrite(), - request.getMergeContext(), - request.getProperties()); + ShuffleRegisterResponse rpcResponse; + try { + rpcResponse = + RetryUtils.retryWithCondition( + () -> + doRegisterShuffle( + request.getAppId(), + request.getShuffleId(), + request.getPartitionRanges(), + request.getRemoteStorageInfo(), + request.getUser(), + request.getDataDistributionType(), + request.getMaxConcurrencyPerPartitionToWrite(), + request.getMergeContext(), + request.getProperties()), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + } catch (Throwable e) { + String msg = "register shuffle to host[" + host + "], port[" + port + "] failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } RssRegisterShuffleResponse response; RssProtos.StatusCode statusCode = rpcResponse.getStatus(); @@ -834,7 +885,7 @@ private ReportShuffleResultResponse doReportShuffleResult(ReportShuffleResultReq return RetryUtils.retryWithCondition( () -> getBlockingStub().reportShuffleResult(rpcRequest), null, // No specific callback to execute - 0, // No delay between retries, retry immediately + rpcRetryBackoffMs, maxRetryAttempts, // Maximum number of retry attempts t -> { // Define retry condition directly in the method call if (t instanceof StatusRuntimeException) { @@ -863,7 +914,22 @@ public RssGetShuffleResultResponse getShuffleResult(RssGetShuffleResultRequest r .setTaskAttemptIdBits(request.getBlockIdLayout().taskAttemptIdBits) .build()) .build(); - GetShuffleResultResponse rpcResponse = getBlockingStub().getShuffleResult(rpcRequest); + + GetShuffleResultResponse rpcResponse; + try { + rpcResponse = + RetryUtils.retryWithCondition( + () -> getBlockingStub().getShuffleResult(rpcRequest), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + } catch (Throwable e) { + String msg = "get shuffle result from " + host + ":" + port + " failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } + RssProtos.StatusCode statusCode = rpcResponse.getStatus(); RssGetShuffleResultResponse response; @@ -911,8 +977,22 @@ public RssGetShuffleResultResponse getShuffleResultForMultiPart( .setTaskAttemptIdBits(request.getBlockIdLayout().taskAttemptIdBits) .build()) .build(); - GetShuffleResultForMultiPartResponse rpcResponse = - getBlockingStub().getShuffleResultForMultiPart(rpcRequest); + + GetShuffleResultForMultiPartResponse rpcResponse; + try { + rpcResponse = + RetryUtils.retryWithCondition( + () -> getBlockingStub().getShuffleResultForMultiPart(rpcRequest), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + } catch (Throwable e) { + String msg = "get shuffle result for multi-part from " + host + ":" + port + " failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } + RssProtos.StatusCode statusCode = rpcResponse.getStatus(); RssGetShuffleResultResponse response; @@ -982,7 +1062,20 @@ public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest request int retry = 0; GetLocalShuffleDataResponse rpcResponse; while (true) { - rpcResponse = getBlockingStub().getLocalShuffleData(rpcRequest); + try { + rpcResponse = + RetryUtils.retryWithCondition( + () -> getBlockingStub().getLocalShuffleData(rpcRequest), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + } catch (Throwable e) { + String msg = "get shuffle data from " + host + ":" + port + " failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } + if (rpcResponse.getStatus() != NO_BUFFER) { break; } @@ -1041,7 +1134,20 @@ public RssGetShuffleIndexResponse getShuffleIndex(RssGetShuffleIndexRequest requ int retry = 0; GetLocalShuffleIndexResponse rpcResponse; while (true) { - rpcResponse = getBlockingStub().getLocalShuffleIndex(rpcRequest); + try { + rpcResponse = + RetryUtils.retryWithCondition( + () -> getBlockingStub().getLocalShuffleIndex(rpcRequest), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + } catch (Throwable e) { + String msg = "get shuffle index from " + host + ":" + port + " failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } + if (rpcResponse.getStatus() != NO_BUFFER) { break; } @@ -1119,7 +1225,20 @@ public RssGetInMemoryShuffleDataResponse getInMemoryShuffleData( int retry = 0; GetMemoryShuffleDataResponse rpcResponse; while (true) { - rpcResponse = getBlockingStub().getMemoryShuffleData(rpcRequest); + try { + rpcResponse = + RetryUtils.retryWithCondition( + () -> getBlockingStub().getMemoryShuffleData(rpcRequest), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + } catch (Throwable e) { + String msg = "get memory shuffle data from " + host + ":" + port + " failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } + if (rpcResponse.getStatus() != NO_BUFFER) { break; } @@ -1179,8 +1298,23 @@ public RssStartSortMergeResponse startSortMerge(RssStartSortMergeRequest request .setPartitionId(request.getPartitionId()) .setUniqueBlocksBitmap(serializedBlockIdsBytes) .build(); + long start = System.currentTimeMillis(); - RssProtos.StartSortMergeResponse rpcResponse = getBlockingStub().startSortMerge(rpcRequest); + RssProtos.StartSortMergeResponse rpcResponse; + try { + rpcResponse = + RetryUtils.retryWithCondition( + () -> getBlockingStub().startSortMerge(rpcRequest), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + } catch (Throwable e) { + String msg = "start sort merge to " + host + ":" + port + " failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } + String requestInfo = "appId[" + request.getAppId() @@ -1242,7 +1376,20 @@ public RssGetSortedShuffleDataResponse getSortedShuffleData( int retry = 0; RssProtos.GetSortedShuffleDataResponse rpcResponse; while (true) { - rpcResponse = getBlockingStub().getSortedShuffleData(rpcRequest); + try { + rpcResponse = + RetryUtils.retryWithCondition( + () -> getBlockingStub().getSortedShuffleData(rpcRequest), + null, + rpcRetryBackoffMs, + maxRetryAttempts, + e -> e instanceof Exception); + } catch (Throwable e) { + String msg = "get sorted shuffle data from " + host + ":" + port + " failed"; + LOG.warn(msg, e); + throw new RssException(msg, e); + } + if (rpcResponse.getStatus() != NO_BUFFER) { break; } diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java index 080fed297c..51e57b443d 100644 --- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java +++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java @@ -94,6 +94,9 @@ public ShuffleServerGrpcNettyClient(RssConf rssConf, String host, int grpcPort, rssConf == null ? RssClientConf.RPC_TIMEOUT_MS.defaultValue() : rssConf.getLong(RssClientConf.RPC_TIMEOUT_MS), + rssConf == null + ? RssClientConf.RPC_RETRY_BACKOFF_MS.defaultValue() + : rssConf.getLong(RssClientConf.RPC_RETRY_BACKOFF_MS), rssConf == null ? RssClientConf.RPC_NETTY_PAGE_SIZE.defaultValue() : rssConf.getInteger(RssClientConf.RPC_NETTY_PAGE_SIZE), @@ -112,6 +115,7 @@ public ShuffleServerGrpcNettyClient( int nettyPort, int maxRetryAttempts, long rpcTimeoutMs, + long rpcRetryBackoffMs, int pageSize, int maxOrder, int smallCacheSize) { @@ -121,6 +125,7 @@ public ShuffleServerGrpcNettyClient( maxRetryAttempts, rpcTimeoutMs, true, + rpcRetryBackoffMs, pageSize, maxOrder, smallCacheSize, @@ -389,7 +394,7 @@ public RssGetShuffleIndexResponse getShuffleIndex(RssGetShuffleIndexRequest requ public RssGetShuffleDataResponse getShuffleData(RssGetShuffleDataRequest request) { TransportClient transportClient = getTransportClient(); // Construct old version or v2 get shuffle data request to compatible with old server - GetLocalShuffleDataRequest getLocalShuffleDataRequest = null; + GetLocalShuffleDataRequest getLocalShuffleDataRequest; if (request.storageIdSpecified()) { if (request.isNextReadSegmentsReportEnabled()) { getLocalShuffleDataRequest =