diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java index 34ee28693b4fc..f0188bc037c52 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java @@ -156,12 +156,17 @@ public void endTxn(OpRequestSend op) { } } else { Throwable cause = FutureUtil.unwrapCompletionException(ex); - log.error("endTxn error topic: [{}]", op.topic, cause); - if (cause instanceof PulsarClientException.BrokerMetadataException) { - op.cb.complete(null); + if (cause instanceof PulsarClientException.ConnectFailedException) { + log.warn("Client connection already closed, topic: [{}]", op.topic); + op.cb.completeExceptionally(cause); } else { - op.cb.completeExceptionally( - new PulsarClientException.LookupException(cause.getMessage())); + log.error("endTxn error topic: [{}]", op.topic, cause); + if (cause instanceof PulsarClientException.BrokerMetadataException) { + op.cb.complete(null); + } else { + op.cb.completeExceptionally( + new PulsarClientException.LookupException(cause.getMessage())); + } } onResponse(op); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index c460fee11d0e6..e349943dfcd0e 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -175,6 +175,15 @@ public InvalidServiceURL(String msg, Throwable t) { } } + /** + * Clients are unable to connect to the broker. + */ + public static class ConnectFailedException extends PulsarClientException { + public ConnectFailedException(String msg) { + super(msg); + } + } + /** * Invalid Configuration exception thrown by Pulsar client. */ @@ -1127,6 +1136,8 @@ public static PulsarClientException unwrap(Throwable t) { newException = new NotFoundException(msg); } else if (cause instanceof TransactionHasOperationFailedException) { newException = new TransactionHasOperationFailedException(msg); + } else if (cause instanceof ConnectFailedException) { + newException = new ConnectFailedException(msg); } else { newException = new PulsarClientException(t); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 03e0f406dd2f2..d7c7790c527f4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -324,7 +324,8 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { lastDisconnectedTimestamp = System.currentTimeMillis(); log.info("{} Disconnected", ctx.channel()); if (!connectionFuture.isDone()) { - connectionFuture.completeExceptionally(new PulsarClientException("Connection already closed")); + connectionFuture.completeExceptionally( + new PulsarClientException.ConnectFailedException("Connection already closed")); } ConnectException e = new ConnectException(