From 5cc5fafa11b6d4190abf25052877e5f3ade4d50d Mon Sep 17 00:00:00 2001 From: hanmz Date: Fri, 23 May 2025 15:10:30 +0800 Subject: [PATCH 1/2] [fix][test] Fix flaky NegativeAcksTest.testNegativeAcksWithBackoff --- .../java/org/apache/pulsar/client/impl/NegativeAcksTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 10f4f666607af..80e5bd6b76389 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -240,6 +240,7 @@ public void testNegativeAcksWithBackoff(boolean batching, boolean usePartitions, Producer producer = pulsarClient.newProducer(Schema.STRING) .topic(topic) .enableBatching(batching) + .batchingMaxPublishDelay(30, TimeUnit.SECONDS) .create(); Set sentMessages = new HashSet<>(); @@ -259,7 +260,7 @@ public void testNegativeAcksWithBackoff(boolean batching, boolean usePartitions, Message msg = null; for (int j = 0; j < N; j++) { msg = consumer.receive(); - log.info("Received message {}", msg.getValue()); + log.info("Received msgId: {}, message {}", msg.getMessageId(), msg.getValue()); if (!batching) { consumer.negativeAcknowledge(msg); } From d7eb50044ba10d736dafb95789837f733972593f Mon Sep 17 00:00:00 2001 From: hanmz Date: Mon, 26 May 2025 13:18:11 +0800 Subject: [PATCH 2/2] [fix][test] Fix flaky NegativeAcksTest.testNegativeAcksWithBackoff --- .../org/apache/pulsar/client/impl/NegativeAcksTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 80e5bd6b76389..e41c43daf00c1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -259,7 +259,8 @@ public void testNegativeAcksWithBackoff(boolean batching, boolean usePartitions, for (int i = 0; i < redeliverCount; i++) { Message msg = null; for (int j = 0; j < N; j++) { - msg = consumer.receive(); + msg = consumer.receive(10, TimeUnit.SECONDS); + assertNotNull(msg); log.info("Received msgId: {}, message {}", msg.getMessageId(), msg.getValue()); if (!batching) { consumer.negativeAcknowledge(msg); @@ -276,7 +277,8 @@ public void testNegativeAcksWithBackoff(boolean batching, boolean usePartitions, // All the messages should be received again for (int i = 0; i < N; i++) { - Message msg = consumer.receive(); + Message msg = consumer.receive(10, TimeUnit.SECONDS); + assertNotNull(msg); receivedMessages.add(msg.getValue()); consumer.acknowledge(msg); }