Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ public class RssClientConf {
.defaultValue(3)
.withDescription("When we fail to send RPC calls, we will retry for maxAttempts times.");

public static final ConfigOption<Long> RPC_RETRY_BACKOFF_MS =
ConfigOptions.key("rss.client.rpc.retry.backoff.ms")
.longType()
.defaultValue(500L)
.withDescription("The backoff in milliseconds for RPC retries");

public static final ConfigOption<Integer> RPC_NETTY_PAGE_SIZE =
ConfigOptions.key("rss.client.rpc.netty.pageSize")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.util.GrpcNettyUtils;

public abstract class GrpcClient {
Expand All @@ -34,17 +35,33 @@ public abstract class GrpcClient {
protected int port;
protected boolean usePlaintext;
protected int maxRetryAttempts;
protected long rpcRetryBackoffMs;
protected ManagedChannel channel;

protected GrpcClient(String host, int port, int maxRetryAttempts, boolean usePlaintext) {
this(host, port, maxRetryAttempts, usePlaintext, 0, 0, 0, -1);
this(
host,
port,
maxRetryAttempts,
usePlaintext,
RssClientConf.RPC_RETRY_BACKOFF_MS.defaultValue(),
0,
0,
0,
-1);
}

protected GrpcClient(
String host, int port, int maxRetryAttempts, boolean usePlaintext, long rpcRetryBackoffMs) {
this(host, port, maxRetryAttempts, usePlaintext, rpcRetryBackoffMs, 0, 0, 0, -1);
}

protected GrpcClient(
String host,
int port,
int maxRetryAttempts,
boolean usePlaintext,
long rpcRetryBackoffMs,
int pageSize,
int maxOrder,
int smallCacheSize,
Expand All @@ -53,6 +70,7 @@ protected GrpcClient(
this.port = port;
this.maxRetryAttempts = maxRetryAttempts;
this.usePlaintext = usePlaintext;
this.rpcRetryBackoffMs = rpcRetryBackoffMs;

if (nettyEventLoopThreads > 0) {
System.setProperty(
Expand All @@ -69,9 +87,6 @@ protected GrpcClient(
channelBuilder.usePlaintext();
}

if (maxRetryAttempts > 0) {
channelBuilder.enableRetry().maxRetryAttempts(maxRetryAttempts);
}
channelBuilder.maxInboundMessageSize(Integer.MAX_VALUE);

channel = channelBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteFailureResponse;
import org.apache.uniffle.client.response.RssReportShuffleWriteMetricResponse;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureRequest;
import org.apache.uniffle.proto.RssProtos.ReportShuffleFetchFailureResponse;
Expand All @@ -57,12 +59,23 @@ public ShuffleManagerGrpcClient(String host, int port, long rpcTimeout) {
}

public ShuffleManagerGrpcClient(String host, int port, long rpcTimeout, int maxRetryAttempts) {
this(host, port, rpcTimeout, maxRetryAttempts, true);
this(
host,
port,
rpcTimeout,
maxRetryAttempts,
true,
RssClientConf.RPC_RETRY_BACKOFF_MS.defaultValue());
}

public ShuffleManagerGrpcClient(
String host, int port, long rpcTimeout, int maxRetryAttempts, boolean usePlaintext) {
super(host, port, maxRetryAttempts, usePlaintext);
String host,
int port,
long rpcTimeout,
int maxRetryAttempts,
boolean usePlaintext,
long rpcRetryBackoffMs) {
super(host, port, maxRetryAttempts, usePlaintext, rpcRetryBackoffMs);
blockingStub = ShuffleManagerGrpc.newBlockingStub(channel);
this.rpcTimeout = rpcTimeout;
}
Expand All @@ -81,9 +94,14 @@ public RssReportShuffleFetchFailureResponse reportShuffleFetchFailure(
ReportShuffleFetchFailureRequest protoRequest = request.toProto();
try {
ReportShuffleFetchFailureResponse response =
getBlockingStub().reportShuffleFetchFailure(protoRequest);
RetryUtils.retryWithCondition(
() -> getBlockingStub().reportShuffleFetchFailure(protoRequest),
null,
rpcRetryBackoffMs,
maxRetryAttempts,
e -> e instanceof Exception);
return RssReportShuffleFetchFailureResponse.fromProto(response);
} catch (Exception e) {
} catch (Throwable e) {
String msg = "Report shuffle fetch failure to host:port[" + host + ":" + port + "] failed";
LOG.warn(msg, e);
throw new RssException(msg, e);
Expand All @@ -94,22 +112,50 @@ public RssReportShuffleFetchFailureResponse reportShuffleFetchFailure(
public RssReassignOnStageRetryResponse getPartitionToShufflerServerWithStageRetry(
RssPartitionToShuffleServerRequest req) {
RssProtos.PartitionToShuffleServerRequest protoRequest = req.toProto();
RssProtos.ReassignOnStageRetryResponse partitionToShufflerServer =
getBlockingStub().getPartitionToShufflerServerWithStageRetry(protoRequest);
RssReassignOnStageRetryResponse rssReassignOnStageRetryResponse =
RssReassignOnStageRetryResponse.fromProto(partitionToShufflerServer);
return rssReassignOnStageRetryResponse;
try {
RssProtos.ReassignOnStageRetryResponse partitionToShufflerServer =
RetryUtils.retryWithCondition(
() -> getBlockingStub().getPartitionToShufflerServerWithStageRetry(protoRequest),
null,
rpcRetryBackoffMs,
maxRetryAttempts,
e -> e instanceof Exception);
return RssReassignOnStageRetryResponse.fromProto(partitionToShufflerServer);
} catch (Throwable e) {
String msg =
"Get partition to shuffle server with stage retry from host:port["
+ host
+ ":"
+ port
+ "] failed";
LOG.warn(msg, e);
throw new RssException(msg, e);
}
}

@Override
public RssReassignOnBlockSendFailureResponse getPartitionToShufflerServerWithBlockRetry(
RssPartitionToShuffleServerRequest req) {
RssProtos.PartitionToShuffleServerRequest protoRequest = req.toProto();
RssProtos.ReassignOnBlockSendFailureResponse partitionToShufflerServer =
getBlockingStub().getPartitionToShufflerServerWithBlockRetry(protoRequest);
RssReassignOnBlockSendFailureResponse rssReassignOnBlockSendFailureResponse =
RssReassignOnBlockSendFailureResponse.fromProto(partitionToShufflerServer);
return rssReassignOnBlockSendFailureResponse;
try {
RssProtos.ReassignOnBlockSendFailureResponse partitionToShufflerServer =
RetryUtils.retryWithCondition(
() -> getBlockingStub().getPartitionToShufflerServerWithBlockRetry(protoRequest),
null,
rpcRetryBackoffMs,
maxRetryAttempts,
e -> e instanceof Exception);
return RssReassignOnBlockSendFailureResponse.fromProto(partitionToShufflerServer);
} catch (Throwable e) {
String msg =
"Get partition to shuffle server with block retry from host:port["
+ host
+ ":"
+ port
+ "] failed";
LOG.warn(msg, e);
throw new RssException(msg, e);
}
}

@Override
Expand All @@ -118,9 +164,14 @@ public RssReportShuffleWriteFailureResponse reportShuffleWriteFailure(
RssProtos.ReportShuffleWriteFailureRequest protoRequest = request.toProto();
try {
RssProtos.ReportShuffleWriteFailureResponse response =
getBlockingStub().reportShuffleWriteFailure(protoRequest);
RetryUtils.retryWithCondition(
() -> getBlockingStub().reportShuffleWriteFailure(protoRequest),
null,
rpcRetryBackoffMs,
maxRetryAttempts,
e -> e instanceof Exception);
return RssReportShuffleWriteFailureResponse.fromProto(response);
} catch (Exception e) {
} catch (Throwable e) {
String msg = "Report shuffle fetch failure to host:port[" + host + ":" + port + "] failed";
LOG.warn(msg, e);
throw new RssException(msg, e);
Expand All @@ -132,47 +183,115 @@ public RssReassignOnBlockSendFailureResponse reassignOnBlockSendFailure(
RssReassignOnBlockSendFailureRequest request) {
RssProtos.RssReassignOnBlockSendFailureRequest protoReq =
RssReassignOnBlockSendFailureRequest.toProto(request);
RssProtos.ReassignOnBlockSendFailureResponse response =
getBlockingStub().reassignOnBlockSendFailure(protoReq);
return RssReassignOnBlockSendFailureResponse.fromProto(response);
try {
RssProtos.ReassignOnBlockSendFailureResponse response =
RetryUtils.retryWithCondition(
() -> getBlockingStub().reassignOnBlockSendFailure(protoReq),
null,
rpcRetryBackoffMs,
maxRetryAttempts,
e -> e instanceof Exception);
return RssReassignOnBlockSendFailureResponse.fromProto(response);
} catch (Throwable e) {
String msg =
"Reassign on block send failure from host:port[" + host + ":" + port + "] failed";
LOG.warn(msg, e);
throw new RssException(msg, e);
}
}

@Override
public RssGetShuffleResultResponse getShuffleResult(RssGetShuffleResultRequest request) {
RssProtos.GetShuffleResultResponse response =
getBlockingStub().getShuffleResult(request.toProto());
return RssGetShuffleResultResponse.fromProto(response);
try {
RssProtos.GetShuffleResultResponse response =
RetryUtils.retryWithCondition(
() -> getBlockingStub().getShuffleResult(request.toProto()),
null,
rpcRetryBackoffMs,
maxRetryAttempts,
e -> e instanceof Exception);
return RssGetShuffleResultResponse.fromProto(response);
} catch (Throwable e) {
String msg = "Get shuffle result from host:port[" + host + ":" + port + "] failed";
LOG.warn(msg, e);
throw new RssException(msg, e);
}
}

@Override
public RssGetShuffleResultResponse getShuffleResultForMultiPart(
RssGetShuffleResultForMultiPartRequest request) {
RssProtos.GetShuffleResultForMultiPartResponse response =
getBlockingStub().getShuffleResultForMultiPart(request.toProto());
return RssGetShuffleResultResponse.fromProto(response);
try {
RssProtos.GetShuffleResultForMultiPartResponse response =
RetryUtils.retryWithCondition(
() -> getBlockingStub().getShuffleResultForMultiPart(request.toProto()),
null,
rpcRetryBackoffMs,
maxRetryAttempts,
e -> e instanceof Exception);
return RssGetShuffleResultResponse.fromProto(response);
} catch (Throwable e) {
String msg =
"Get shuffle result for multiport from host:port[" + host + ":" + port + "] failed";
LOG.warn(msg, e);
throw new RssException(msg, e);
}
}

@Override
public RssReportShuffleResultResponse reportShuffleResult(RssReportShuffleResultRequest request) {
RssProtos.ReportShuffleResultResponse response =
getBlockingStub().reportShuffleResult(request.toProto());
return RssReportShuffleResultResponse.fromProto(response);
try {
RssProtos.ReportShuffleResultResponse response =
RetryUtils.retryWithCondition(
() -> getBlockingStub().reportShuffleResult(request.toProto()),
null,
rpcRetryBackoffMs,
maxRetryAttempts,
e -> e instanceof Exception);
return RssReportShuffleResultResponse.fromProto(response);
} catch (Throwable e) {
String msg = "Report shuffle result to host:port[" + host + ":" + port + "] failed";
LOG.warn(msg, e);
throw new RssException(msg, e);
}
}

@Override
public RssReportShuffleWriteMetricResponse reportShuffleWriteMetric(
RssReportShuffleWriteMetricRequest request) {
RssProtos.ReportShuffleWriteMetricResponse response =
getBlockingStub().reportShuffleWriteMetric(request.toProto());
return RssReportShuffleWriteMetricResponse.fromProto(response);
try {
RssProtos.ReportShuffleWriteMetricResponse response =
RetryUtils.retryWithCondition(
() -> getBlockingStub().reportShuffleWriteMetric(request.toProto()),
null,
rpcRetryBackoffMs,
maxRetryAttempts,
e -> e instanceof Exception);
return RssReportShuffleWriteMetricResponse.fromProto(response);
} catch (Throwable e) {
String msg = "Report shuffle write metric to host:port[" + host + ":" + port + "] failed";
LOG.warn(msg, e);
throw new RssException(msg, e);
}
}

@Override
public RssReportShuffleReadMetricResponse reportShuffleReadMetric(
RssReportShuffleReadMetricRequest request) {
RssProtos.ReportShuffleReadMetricResponse response =
getBlockingStub().reportShuffleReadMetric(request.toProto());
return RssReportShuffleReadMetricResponse.fromProto(response);
try {
RssProtos.ReportShuffleReadMetricResponse response =
RetryUtils.retryWithCondition(
() -> getBlockingStub().reportShuffleReadMetric(request.toProto()),
null,
rpcRetryBackoffMs,
maxRetryAttempts,
e -> e instanceof Exception);
return RssReportShuffleReadMetricResponse.fromProto(response);
} catch (Throwable e) {
String msg = "Report shuffle read metric to host:port[" + host + ":" + port + "] failed";
LOG.warn(msg, e);
throw new RssException(msg, e);
}
}

@Override
Expand Down
Loading
Loading