From 5c96bfc6755902961e9d0cb1c70f889585f9dde3 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 20 May 2024 01:16:05 +0800 Subject: [PATCH 1/3] Fix ClientCnx channel inactive exception --- .../buffer/impl/TransactionBufferHandlerImpl.java | 15 ++++++++++----- .../org/apache/pulsar/client/impl/ClientCnx.java | 6 +++--- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java index 34ee28693b4fc..235e133ef9d66 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java @@ -156,12 +156,17 @@ public void endTxn(OpRequestSend op) { } } else { Throwable cause = FutureUtil.unwrapCompletionException(ex); - log.error("endTxn error topic: [{}]", op.topic, cause); - if (cause instanceof PulsarClientException.BrokerMetadataException) { - op.cb.complete(null); + if (cause instanceof PulsarClientException.ConnectException) { + log.warn("Client connection already closed, topic: [{}]", op.topic); + op.cb.completeExceptionally(cause); } else { - op.cb.completeExceptionally( - new PulsarClientException.LookupException(cause.getMessage())); + log.error("endTxn error topic: [{}]", op.topic, cause); + if (cause instanceof PulsarClientException.BrokerMetadataException) { + op.cb.complete(null); + } else { + op.cb.completeExceptionally( + new PulsarClientException.LookupException(cause.getMessage())); + } } onResponse(op); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 03e0f406dd2f2..104da195ebab7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -323,12 +323,12 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { connectionsClosedCounter.increment(); lastDisconnectedTimestamp = System.currentTimeMillis(); log.info("{} Disconnected", ctx.channel()); - if (!connectionFuture.isDone()) { - connectionFuture.completeExceptionally(new PulsarClientException("Connection already closed")); - } ConnectException e = new ConnectException( "Disconnected from server at " + ctx.channel().remoteAddress()); + if (!connectionFuture.isDone()) { + connectionFuture.completeExceptionally(e); + } // Fail out all the pending ops pendingRequests.forEach((key, future) -> { From 3dd4425174d0c8b6e6eb99ab9945f43558fce728 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 20 May 2024 21:24:02 +0800 Subject: [PATCH 2/3] Fix ClientCnx channel inactive exception --- .../buffer/impl/TransactionBufferHandlerImpl.java | 2 +- .../pulsar/client/api/PulsarClientException.java | 13 ++++++++++++- .../org/apache/pulsar/client/impl/ClientCnx.java | 7 ++++--- 3 files changed, 17 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java index 235e133ef9d66..f0188bc037c52 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.java @@ -156,7 +156,7 @@ public void endTxn(OpRequestSend op) { } } else { Throwable cause = FutureUtil.unwrapCompletionException(ex); - if (cause instanceof PulsarClientException.ConnectException) { + if (cause instanceof PulsarClientException.ConnectFailedException) { log.warn("Client connection already closed, topic: [{}]", op.topic); op.cb.completeExceptionally(cause); } else { diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index c460fee11d0e6..ae7a1992401a9 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -175,6 +175,15 @@ public InvalidServiceURL(String msg, Throwable t) { } } + /** + * Clients are unable to connect to the broker. + */ + public static class ConnectFailedException extends PulsarClientException { + public ConnectFailedException(String msg) { + super(msg); + } + } + /** * Invalid Configuration exception thrown by Pulsar client. */ @@ -1127,8 +1136,10 @@ public static PulsarClientException unwrap(Throwable t) { newException = new NotFoundException(msg); } else if (cause instanceof TransactionHasOperationFailedException) { newException = new TransactionHasOperationFailedException(msg); + } else if (cause instanceof ConnectFailedException) { + newException = new ConnectFailedException(msg); } else { - newException = new PulsarClientException(t); + newException = new PulsarClientException(cause); } Collection previousExceptions = getPreviousExceptions(t); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 104da195ebab7..d7c7790c527f4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -323,12 +323,13 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { connectionsClosedCounter.increment(); lastDisconnectedTimestamp = System.currentTimeMillis(); log.info("{} Disconnected", ctx.channel()); + if (!connectionFuture.isDone()) { + connectionFuture.completeExceptionally( + new PulsarClientException.ConnectFailedException("Connection already closed")); + } ConnectException e = new ConnectException( "Disconnected from server at " + ctx.channel().remoteAddress()); - if (!connectionFuture.isDone()) { - connectionFuture.completeExceptionally(e); - } // Fail out all the pending ops pendingRequests.forEach((key, future) -> { From b5a5610fa56f3885cb14891351ab852fcdef6aed Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 20 May 2024 21:31:49 +0800 Subject: [PATCH 3/3] Fix ClientCnx channel inactive exception --- .../org/apache/pulsar/client/api/PulsarClientException.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index ae7a1992401a9..e349943dfcd0e 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -1139,7 +1139,7 @@ public static PulsarClientException unwrap(Throwable t) { } else if (cause instanceof ConnectFailedException) { newException = new ConnectFailedException(msg); } else { - newException = new PulsarClientException(cause); + newException = new PulsarClientException(t); } Collection previousExceptions = getPreviousExceptions(t);