From 0054f093552ff2b0509995814eaedededf52f2f4 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Mon, 2 Mar 2026 14:58:31 -0800 Subject: [PATCH 1/9] Add graceful shutdown drain to ServiceBusProcessorClient (#45716) When ServiceBusProcessorClient.close() is called while message handlers are still executing, the receiver was disposed immediately, causing in-flight handlers to fail with IllegalStateException. Add drain-before-dispose logic using an AtomicInteger handler counter and Object monitor wait/notify to all processor shutdown paths: - MessagePump (V2 non-session) - ServiceBusProcessor.RollingMessagePump (V2 non-session lifecycle) - SessionsMessagePump.RollingSessionReceiver (V2 session) - ServiceBusProcessorClient V1 close path The drain executes before subscription cancellation/disposal, with a configurable timeout (default 30s) to prevent indefinite blocking. Includes 3 regression tests in ServiceBusProcessorGracefulShutdownTest. --- .../messaging/servicebus/MessagePump.java | 77 +++- .../servicebus/ServiceBusProcessor.java | 12 + .../servicebus/ServiceBusProcessorClient.java | 100 ++++-- .../servicebus/SessionsMessagePump.java | 79 +++- ...rviceBusProcessorGracefulShutdownTest.java | 338 ++++++++++++++++++ 5 files changed, 552 insertions(+), 54 deletions(-) create mode 100644 sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java index 8d0ebeaa85cc..a980bc1e36b1 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java @@ -17,6 +17,8 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; @@ -56,6 +58,8 @@ final class MessagePump { private final boolean enableAutoLockRenew; private final Scheduler workerScheduler; private final ServiceBusReceiverInstrumentation instrumentation; + private final AtomicInteger activeHandlerCount = new AtomicInteger(0); + private final Object drainLock = new Object(); /** * Instantiate {@link MessagePump} that pumps messages emitted by the given {@code client}. The messages @@ -138,24 +142,33 @@ private Mono pollConnectionState() { } private void handleMessage(ServiceBusReceivedMessage message) { - instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> { - final Disposable lockRenewDisposable; - if (enableAutoLockRenew) { - lockRenewDisposable = client.beginLockRenewal(message); - } else { - lockRenewDisposable = Disposables.disposed(); - } - final Throwable error = notifyMessage(message); - if (enableAutoDisposition) { - if (error == null) { - complete(message); + activeHandlerCount.incrementAndGet(); + try { + instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> { + final Disposable lockRenewDisposable; + if (enableAutoLockRenew) { + lockRenewDisposable = client.beginLockRenewal(message); } else { - abandon(message); + lockRenewDisposable = Disposables.disposed(); + } + final Throwable error = notifyMessage(message); + if (enableAutoDisposition) { + if (error == null) { + complete(message); + } else { + abandon(message); + } + } + lockRenewDisposable.dispose(); + return error; + }); + } finally { + if (activeHandlerCount.decrementAndGet() == 0) { + synchronized (drainLock) { + drainLock.notifyAll(); } } - lockRenewDisposable.dispose(); - return error; - }); + } } private Throwable notifyMessage(ServiceBusReceivedMessage message) { @@ -193,6 +206,40 @@ private void abandon(ServiceBusReceivedMessage message) { } } + /** + * Wait for all in-flight message handlers to complete, up to the specified timeout. + * This is called during processor close to ensure graceful shutdown — all messages currently + * being processed are allowed to complete (including settlement) before the underlying client + * is disposed. + * + * @param timeout the maximum time to wait for in-flight handlers to complete. + * @return true if all handlers completed within the timeout, false otherwise. + */ + boolean drainHandlers(Duration timeout) { + final long deadline = System.nanoTime() + timeout.toNanos(); + synchronized (drainLock) { + while (activeHandlerCount.get() > 0) { + final long remainingNanos = deadline - System.nanoTime(); + if (remainingNanos <= 0) { + logger.atWarning() + .addKeyValue("activeHandlers", activeHandlerCount.get()) + .log("Drain timeout expired with active handlers still running."); + return false; + } + try { + final long millis = TimeUnit.NANOSECONDS.toMillis(remainingNanos); + final int nanos = (int) (remainingNanos % 1_000_000); + drainLock.wait(millis, nanos); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.atWarning().log("Drain interrupted while waiting for in-flight handlers."); + return false; + } + } + } + return true; + } + private void logCPUResourcesConcurrencyMismatch() { final int cores = Runtime.getRuntime().availableProcessors(); final int poolSize = DEFAULT_BOUNDED_ELASTIC_SIZE; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessor.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessor.java index 96abd6baa79c..63425b25dc1d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessor.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessor.java @@ -141,6 +141,7 @@ static final class RollingMessagePump extends AtomicBoolean { private static final RuntimeException DISPOSED_ERROR = new RuntimeException("The Processor closure disposed the RollingMessagePump."); private static final Duration NEXT_PUMP_BACKOFF = Duration.ofSeconds(5); + private static final Duration DRAIN_TIMEOUT = Duration.ofSeconds(30); private final ClientLogger logger; private final Kind kind; private final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder nonSessionBuilder; @@ -151,6 +152,7 @@ static final class RollingMessagePump extends AtomicBoolean { private final Boolean enableAutoDisposition; private final Disposable.Composite disposable = Disposables.composite(); private final AtomicReference clientIdentifier = new AtomicReference<>(); + private volatile MessagePump currentPump; /** * Instantiate {@link RollingMessagePump} that stream messages using {@link MessagePump}. @@ -228,6 +230,7 @@ Mono beginIntern() { clientIdentifier.set(client.getIdentifier()); final MessagePump pump = new MessagePump(client, processMessage, processError, concurrency, enableAutoDisposition); + currentPump = pump; return pump.begin(); }, client -> { client.close(); @@ -256,6 +259,15 @@ String getClientIdentifier() { } void dispose() { + // Drain in-flight message handlers BEFORE disposing the subscription. + // Disposing cancels the reactive chain, which interrupts handler threads (via Reactor's + // publishOn worker disposal). Draining first ensures handlers can complete message + // settlement before the client is closed. + // See https://github.com/Azure/azure-sdk-for-java/issues/45716 + final MessagePump pump = currentPump; + if (pump != null) { + pump.drainHandlers(DRAIN_TIMEOUT); + } disposable.dispose(); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index 508b0e2c6b2e..86f44ad8e49a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -14,11 +14,13 @@ import reactor.core.Disposable; import reactor.core.scheduler.Schedulers; +import java.time.Duration; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -186,6 +188,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable { private static final int SCHEDULER_INTERVAL_IN_SECONDS = 10; + private static final Duration V1_DRAIN_TIMEOUT = Duration.ofSeconds(30); private static final ClientLogger LOGGER = new ClientLogger(ServiceBusProcessorClient.class); private final ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder; private final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder; @@ -203,6 +206,9 @@ public final class ServiceBusProcessorClient implements AutoCloseable { private Disposable monitorDisposable; private boolean wasStopped = false; private final ServiceBusProcessor processorV2; + // V1 handler tracking for graceful shutdown (not used in V2 path). + private final AtomicInteger activeV1HandlerCount = new AtomicInteger(0); + private final Object v1DrainLock = new Object(); /** * Constructor to create a sessions-enabled processor. @@ -352,6 +358,12 @@ public synchronized void close() { return; } isRunning.set(false); + // Drain in-flight V1 message handlers BEFORE cancelling subscriptions. + // Cancelling subscriptions triggers Reactor's publishOn worker disposal, which interrupts + // handler threads. Draining first ensures handlers can complete message settlement + // before the underlying client is closed. + // See https://github.com/Azure/azure-sdk-for-java/issues/45716 + drainV1Handlers(V1_DRAIN_TIMEOUT); receiverSubscriptions.keySet().forEach(Subscription::cancel); receiverSubscriptions.clear(); if (monitorDisposable != null) { @@ -449,36 +461,45 @@ public void onSubscribe(Subscription subscription) { @SuppressWarnings("try") @Override public void onNext(ServiceBusMessageContext serviceBusMessageContext) { - Context span = serviceBusMessageContext.getMessage() != null - ? serviceBusMessageContext.getMessage().getContext() - : Context.NONE; - Exception exception = null; - AutoCloseable scope = tracer.makeSpanCurrent(span); + activeV1HandlerCount.incrementAndGet(); try { - if (serviceBusMessageContext.hasError()) { - handleError(serviceBusMessageContext.getThrowable()); - } else { - ServiceBusReceivedMessageContext serviceBusReceivedMessageContext - = new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext); - - try { - processMessage.accept(serviceBusReceivedMessageContext); - } catch (Exception ex) { - handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK)); - - if (!processorOptions.isDisableAutoComplete()) { - LOGGER.warning("Error when processing message. Abandoning message.", ex); - abandonMessage(serviceBusMessageContext, receiverClient); + Context span = serviceBusMessageContext.getMessage() != null + ? serviceBusMessageContext.getMessage().getContext() + : Context.NONE; + Exception exception = null; + AutoCloseable scope = tracer.makeSpanCurrent(span); + try { + if (serviceBusMessageContext.hasError()) { + handleError(serviceBusMessageContext.getThrowable()); + } else { + ServiceBusReceivedMessageContext serviceBusReceivedMessageContext + = new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext); + + try { + processMessage.accept(serviceBusReceivedMessageContext); + } catch (Exception ex) { + handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK)); + + if (!processorOptions.isDisableAutoComplete()) { + LOGGER.warning("Error when processing message. Abandoning message.", ex); + abandonMessage(serviceBusMessageContext, receiverClient); + } + exception = ex; } - exception = ex; } - } - if (isRunning.get()) { - LOGGER.verbose("Requesting 1 more message from upstream"); - subscription.request(1); + if (isRunning.get()) { + LOGGER.verbose("Requesting 1 more message from upstream"); + subscription.request(1); + } + } finally { + tracer.endSpan(exception, span, scope); } } finally { - tracer.endSpan(exception, span, scope); + if (activeV1HandlerCount.decrementAndGet() == 0) { + synchronized (v1DrainLock) { + v1DrainLock.notifyAll(); + } + } } } @@ -555,4 +576,33 @@ private ServiceBusReceiverAsyncClient createNewReceiver() { ? this.sessionReceiverBuilder.buildAsyncClientForProcessor() : this.receiverBuilder.buildAsyncClientForProcessor(); } + + /** + * Wait for all in-flight V1 message handlers to complete, up to the specified timeout. + * Called during V1 close to ensure graceful shutdown before disposing the underlying client. + * + * @param timeout the maximum time to wait for handlers to complete. + */ + private void drainV1Handlers(Duration timeout) { + final long deadline = System.nanoTime() + timeout.toNanos(); + synchronized (v1DrainLock) { + while (activeV1HandlerCount.get() > 0) { + final long remainingNanos = deadline - System.nanoTime(); + if (remainingNanos <= 0) { + LOGGER.warning("V1 drain timeout expired with {} active handlers still running.", + activeV1HandlerCount.get()); + return; + } + try { + final long millis = TimeUnit.NANOSECONDS.toMillis(remainingNanos); + final int nanos = (int) (remainingNanos % 1_000_000); + v1DrainLock.wait(millis, nanos); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warning("V1 drain interrupted while waiting for in-flight handlers."); + return; + } + } + } + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java index a21da9a86324..f4503ef82493 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java @@ -34,7 +34,9 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -361,12 +363,15 @@ private static final class RollingSessionReceiver extends AtomicReference INIT = State.init(); private static final State TERMINATED = State.terminated(); + private static final Duration DRAIN_TIMEOUT = Duration.ofSeconds(30); private final ClientLogger logger; private final long pumpId; private final int rollerId; private final String fullyQualifiedNamespace; private final String entityPath; private final int concurrency; + private final AtomicInteger activeHandlerCount = new AtomicInteger(0); + private final Object drainLock = new Object(); private final Consumer processMessage; private final Consumer processError; private final boolean enableAutoDisposition; @@ -445,10 +450,47 @@ private Mono terminate(TerminalSignalType signalType, Scheduler workerSche // by the ServiceBusSessionReactorReceiver. logger.atInfo().log("Roller terminated. rollerId:" + rollerId + " signal:" + signalType); nextSessionStream.close(); + // Drain in-flight message handlers BEFORE disposing the worker scheduler. + // Disposing the scheduler interrupts handler threads (via ScheduledExecutorService.shutdownNow()). + // Draining first ensures handlers can complete message settlement before threads are interrupted. + // See https://github.com/Azure/azure-sdk-for-java/issues/45716 + drainHandlers(DRAIN_TIMEOUT); workerScheduler.dispose(); return Mono.empty(); } + /** + * Wait for all in-flight session message handlers to complete, up to the specified timeout. + * Called during session receiver termination to ensure graceful shutdown — all messages currently + * being processed are allowed to complete (including settlement) before the worker scheduler + * is disposed. + * + * @param timeout the maximum time to wait for in-flight handlers to complete. + */ + private void drainHandlers(Duration timeout) { + final long deadline = System.nanoTime() + timeout.toNanos(); + synchronized (drainLock) { + while (activeHandlerCount.get() > 0) { + final long remainingNanos = deadline - System.nanoTime(); + if (remainingNanos <= 0) { + logger.atWarning() + .addKeyValue("activeHandlers", activeHandlerCount.get()) + .log("Session drain timeout expired with active handlers still running."); + return; + } + try { + final long millis = TimeUnit.NANOSECONDS.toMillis(remainingNanos); + final int nanos = (int) (remainingNanos % 1_000_000); + drainLock.wait(millis, nanos); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.atWarning().log("Session drain interrupted while waiting for in-flight handlers."); + return; + } + } + } + } + private ServiceBusSessionReactorReceiver nextSessionReceiver(ServiceBusSessionAcquirer.Session nextSession) { final State lastState = super.get(); if (lastState == TERMINATED) { @@ -485,22 +527,31 @@ private void handleMessage(Message qpidMessage) { final ServiceBusReceivedMessage message = serializer.deserialize(qpidMessage, ServiceBusReceivedMessage.class); - instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> { - logger.atVerbose() - .addKeyValue(SESSION_ID_KEY, message.getSessionId()) - .addKeyValue(MESSAGE_ID_LOGGING_KEY, message.getMessageId()) - .log("Received message."); - - final Throwable error = notifyMessage(msg); - if (enableAutoDisposition) { - if (error == null) { - complete(msg); - } else { - abandon(msg); + activeHandlerCount.incrementAndGet(); + try { + instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> { + logger.atVerbose() + .addKeyValue(SESSION_ID_KEY, message.getSessionId()) + .addKeyValue(MESSAGE_ID_LOGGING_KEY, message.getMessageId()) + .log("Received message."); + + final Throwable error = notifyMessage(msg); + if (enableAutoDisposition) { + if (error == null) { + complete(msg); + } else { + abandon(msg); + } + } + return error; + }); + } finally { + if (activeHandlerCount.decrementAndGet() == 0) { + synchronized (drainLock) { + drainLock.notifyAll(); } } - return error; - }); + } } private Throwable notifyMessage(ServiceBusReceivedMessage message) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java new file mode 100644 index 000000000000..5628cf9e5b6e --- /dev/null +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java @@ -0,0 +1,338 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.servicebus; + +import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusReceiverClientBuilder; +import com.azure.messaging.servicebus.ServiceBusProcessor.RollingMessagePump; +import com.azure.messaging.servicebus.implementation.ServiceBusProcessorClientOptions; +import com.azure.messaging.servicebus.implementation.instrumentation.ReceiverKind; +import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.junit.jupiter.api.parallel.Isolated; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for graceful shutdown behavior during processor close. + *

+ * Validates that when a processor is closed, in-flight message handlers are allowed to complete + * (including message settlement) before the underlying client is disposed. This prevents + * {@link IllegalStateException} when handlers attempt to settle messages on a disposed receiver. + *

+ * + *

Coverage Matrix

+ *
    + *
  • V2 Non-Session — {@link #v2CloseShouldWaitForInFlightHandlerBeforeClosingClient()}: + * Tests drain in {@code RollingMessagePump.dispose()} → {@code MessagePump.drainHandlers()}
  • + *
  • V1 Non-Session — {@link #v1CloseShouldWaitForInFlightHandlerBeforeClosingClient()}: + * Tests drain in {@code ServiceBusProcessorClient.close()} → {@code drainV1Handlers()}
  • + *
  • Drain Timeout — {@link #v2DrainShouldRespectTimeout()}: + * Tests {@code MessagePump.drainHandlers()} timeout behavior directly
  • + *
  • V2 Session — Not directly unit-testable. The drain in + * {@code SessionsMessagePump.RollingSessionReceiver.terminate()} uses the identical + * {@code AtomicInteger} + {@code Object} monitor wait/notifyAll pattern as {@code MessagePump}. + * {@code SessionsMessagePump} requires a {@code ServiceBusSessionAcquirer} (AMQP connections) + * and {@code RollingSessionReceiver} is a private inner class, making unit testing infeasible. + * The session drain behavior should be verified via live/integration tests.
  • + *
+ * + * @see Issue #45716 + */ +@Execution(ExecutionMode.SAME_THREAD) +@Isolated +public class ServiceBusProcessorGracefulShutdownTest { + private static final ServiceBusReceiverInstrumentation INSTRUMENTATION + = new ServiceBusReceiverInstrumentation(null, null, "FQDN", "entityPath", null, ReceiverKind.PROCESSOR); + + private AutoCloseable mocksCloseable; + + @BeforeEach + public void setup() { + mocksCloseable = MockitoAnnotations.openMocks(this); + } + + @AfterEach + public void teardown() throws Exception { + Mockito.framework().clearInlineMock(this); + if (mocksCloseable != null) { + mocksCloseable.close(); + } + } + + /** + * Verifies that when the V2 processor pump is disposed, in-flight message handlers + * are allowed to complete before the underlying client is closed. + *

+ * Regression test for #45716. + * Before the fix, disposing the pump would immediately cancel the reactive chain (interrupting + * handler threads via Reactor's publishOn worker disposal), then close the client. Handlers + * that called {@code client.complete(message).block()} would fail with + * {@link IllegalStateException}: "Cannot perform operation on a disposed receiver". + *

+ *

+ * The fix drains in-flight handlers in {@code RollingMessagePump.dispose()} BEFORE disposing + * the subscription, ensuring handlers complete message settlement first. + *

+ */ + @Test + public void v2CloseShouldWaitForInFlightHandlerBeforeClosingClient() throws InterruptedException { + final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceiverClientBuilder builder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient client = mock(ServiceBusReceiverAsyncClient.class); + + when(builder.buildAsyncClientForProcessor()).thenReturn(client); + when(client.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(client.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(client.getEntityPath()).thenReturn("entityPath"); + when(client.isConnectionClosed()).thenReturn(false); + when(client.isAutoLockRenewRequested()).thenReturn(false); + // Emit one message on boundedElastic then hang. publishOn ensures the handler doesn't block + // the subscription thread when concurrency=1 (which uses Schedulers.immediate() for the worker). + when(client.nonSessionProcessorReceiveV2()) + .thenReturn(Flux.concat(Flux.just(message), Flux.never()) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + when(client.complete(any())).thenReturn(Mono.empty()); + doNothing().when(client).close(); + + // Latches to coordinate between the handler thread and the test thread. + final CountDownLatch handlerStarted = new CountDownLatch(1); + final CountDownLatch handlerCanProceed = new CountDownLatch(1); + final AtomicBoolean handlerCompleted = new AtomicBoolean(false); + + // The handler signals when it starts, then waits for the test to allow it to proceed. + final Consumer messageConsumer = (messageContext) -> { + handlerStarted.countDown(); + try { + handlerCanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + handlerCompleted.set(true); + }; + + final RollingMessagePump pump = new RollingMessagePump(builder, messageConsumer, e -> { + }, 1, true); + + // Start the pump. + pump.begin(); + + // Wait for the handler to start processing the message. + assertTrue(handlerStarted.await(5, TimeUnit.SECONDS), "Handler should have started processing"); + + // Dispose the pump while the handler is still in-flight. + // dispose() now drains FIRST (before cancelling the subscription), so it blocks until + // the handler completes. Run on a separate thread to avoid blocking the test. + final CountDownLatch disposeDone = new CountDownLatch(1); + final Thread disposeThread = new Thread(() -> { + pump.dispose(); + disposeDone.countDown(); + }); + disposeThread.start(); + + // Give dispose a moment to start; it should be blocked in drainHandlers(). + Thread.sleep(200); + + // Verify: client has NOT been closed yet (handler is still running, drain is blocking dispose). + verify(client, never()).close(); + assertFalse(handlerCompleted.get(), "Handler should still be in-flight"); + + // Now let the handler complete. + handlerCanProceed.countDown(); + + // Wait for dispose to finish. + assertTrue(disposeDone.await(5, TimeUnit.SECONDS), "Dispose should complete after handler finishes"); + assertTrue(handlerCompleted.get(), "Handler should have completed"); + + // Verify the client was closed (after the handler completed and drain returned). + verify(client, timeout(2000)).close(); + // Verify complete was called (auto-disposition is enabled). + verify(client).complete(any()); + } + + /** + * Verifies that when the V1 processor is closed, in-flight message handlers are allowed to + * complete before the underlying client is closed. + *

+ * Regression test for #45716. + * Before the fix, the V1 path would cancel subscriptions (which interrupts handler threads + * via Reactor's publishOn worker disposal) and then immediately close the async client. + *

+ *

+ * The fix drains in-flight handlers BEFORE cancelling subscriptions. Setting + * {@code isRunning = false} prevents new message requests while the drain waits for + * in-flight handlers to complete. + *

+ */ + @Test + public void v1CloseShouldWaitForInFlightHandlerBeforeClosingClient() throws InterruptedException { + final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); + final Flux messageFlux = Flux.concat(Flux.just(message), Flux.never()); + + final ServiceBusReceiverClientBuilder receiverBuilder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient asyncClient = mock(ServiceBusReceiverAsyncClient.class); + + when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient); + when(asyncClient.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(asyncClient.getEntityPath()).thenReturn("entityPath"); + when(asyncClient.isConnectionClosed()).thenReturn(false); + final ServiceBusReceiverInstrumentation instrumentation + = new ServiceBusReceiverInstrumentation(null, null, "FQDN", "entityPath", null, ReceiverKind.PROCESSOR); + when(asyncClient.getInstrumentation()).thenReturn(instrumentation); + // V1 path uses receiveMessagesWithContext, publishOn(boundedElastic) matches real behavior + // and ensures the handler runs on a separate thread (needed for drain testing). + when(asyncClient.receiveMessagesWithContext()).thenReturn(messageFlux.map(ServiceBusMessageContext::new) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + doNothing().when(asyncClient).close(); + + // Latches to coordinate between the handler thread and the test thread. + final CountDownLatch handlerStarted = new CountDownLatch(1); + final CountDownLatch handlerCanProceed = new CountDownLatch(1); + final AtomicBoolean handlerCompleted = new AtomicBoolean(false); + + final Consumer messageConsumer = (messageContext) -> { + handlerStarted.countDown(); + try { + handlerCanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + handlerCompleted.set(true); + }; + + // Build V1 processor (isV2 = false by NOT setting options.setV2(true)) + final ServiceBusProcessorClientOptions options + = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1); + // V1 path: do not set V2 + final ServiceBusProcessorClient processorClient + = new ServiceBusProcessorClient(receiverBuilder, "entityPath", null, null, messageConsumer, error -> { + }, options); + + // Start the processor (V1 path). + processorClient.start(); + + // Wait for the handler to start processing the message. + assertTrue(handlerStarted.await(5, TimeUnit.SECONDS), "Handler should have started processing"); + + // Close the processor while the handler is still in-flight. + // close() now drains FIRST (before cancelling subscriptions), so it blocks until + // the handler completes. Run on a separate thread to avoid blocking the test. + final CountDownLatch closeDone = new CountDownLatch(1); + final Thread closeThread = new Thread(() -> { + processorClient.close(); + closeDone.countDown(); + }); + closeThread.start(); + + // Give close a moment to start; it should be blocked in drainV1Handlers(). + Thread.sleep(200); + + // Verify: client has NOT been closed yet (handler is still running, drain is blocking close). + verify(asyncClient, never()).close(); + assertFalse(handlerCompleted.get(), "Handler should still be in-flight"); + + // Now let the handler complete. + handlerCanProceed.countDown(); + + // Wait for close to finish. + assertTrue(closeDone.await(5, TimeUnit.SECONDS), "Close should complete after handler finishes"); + assertTrue(handlerCompleted.get(), "Handler should have completed"); + + // Verify the client was closed (after the handler completed). + verify(asyncClient, timeout(2000)).close(); + } + + /** + * Verifies that the V2 drain mechanism respects the timeout. If a handler takes longer than + * the drain timeout, {@code drainHandlers} returns false and the processor doesn't hang + * indefinitely. + *

+ * This tests the drain mechanism directly on a {@link MessagePump} without going through + * the full RollingMessagePump dispose path. The handler blocks forever, and we verify + * that {@code drainHandlers()} with a short timeout returns false after approximately + * the timeout duration. + *

+ */ + @Test + public void v2DrainShouldRespectTimeout() throws InterruptedException { + final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceiverClientBuilder builder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient client = mock(ServiceBusReceiverAsyncClient.class); + + when(builder.buildAsyncClientForProcessor()).thenReturn(client); + when(client.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(client.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(client.getEntityPath()).thenReturn("entityPath"); + when(client.isConnectionClosed()).thenReturn(false); + when(client.isAutoLockRenewRequested()).thenReturn(false); + when(client.nonSessionProcessorReceiveV2()) + .thenReturn(Flux.concat(Flux.just(message), Flux.never()) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + when(client.complete(any())).thenReturn(Mono.empty()); + doNothing().when(client).close(); + + // Handler blocks forever (never releases the latch). + final CountDownLatch handlerStarted = new CountDownLatch(1); + final CountDownLatch neverReleasedLatch = new CountDownLatch(1); + + final Consumer messageConsumer = (messageContext) -> { + handlerStarted.countDown(); + try { + neverReleasedLatch.await(); // Block forever + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }; + + final MessagePump pump = new MessagePump(client, messageConsumer, e -> { + }, 1, false); + + // Subscribe to start pumping. + final AtomicReference subscription = new AtomicReference<>(); + subscription.set(pump.begin().subscribe()); + + // Wait for the handler to start. + assertTrue(handlerStarted.await(5, TimeUnit.SECONDS), "Handler should have started processing"); + + // Call drainHandlers with a very short timeout while the handler is still running. + // DO NOT dispose the subscription first — disposing cancels the reactive chain, which + // interrupts the handler's thread via Reactor's publishOn worker disposal. The drain must + // be called while the subscription (and handler) is still active. + final long startTime = System.nanoTime(); + final boolean drained = pump.drainHandlers(Duration.ofMillis(500)); + final long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); + + // Drain should return false (timed out) and should take close to 500ms. + assertFalse(drained, "Drain should return false when timeout expires with active handlers"); + assertTrue(elapsed >= 400, + "Drain should wait at least close to the timeout duration, but took " + elapsed + "ms"); + assertTrue(elapsed < 3000, "Drain should not take excessively long, but took " + elapsed + "ms"); + + // Clean up: release the blocked handler and dispose the subscription. + neverReleasedLatch.countDown(); + subscription.get().dispose(); + } +} From 26baa9ae536d73e4e3092bbcbbb2b8b5cfdf771f Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Mon, 2 Mar 2026 15:27:09 -0800 Subject: [PATCH 2/9] Add re-entrancy guards to drain methods to prevent self-deadlock - Add ThreadLocal flag to detect when drainHandlers is called from within a message handler (e.g., user calls close() inside processMessage callback) - Guard all three drain paths: MessagePump.drainHandlers(), ServiceBusProcessorClient.drainV1Handlers(), and SessionsMessagePump.RollingSessionReceiver.drainHandlers() - When re-entrant call detected, skip drain with warning log and return immediately to avoid self-deadlock - Add v2DrainFromWithinHandlerShouldNotDeadlock test verifying the guard prevents deadlock when drain is called from handler thread --- .../messaging/servicebus/MessagePump.java | 13 ++++ .../servicebus/ServiceBusProcessorClient.java | 12 ++++ .../servicebus/SessionsMessagePump.java | 13 ++++ ...rviceBusProcessorGracefulShutdownTest.java | 67 +++++++++++++++++++ 4 files changed, 105 insertions(+) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java index a980bc1e36b1..47c8cb9b2c50 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java @@ -60,6 +60,7 @@ final class MessagePump { private final ServiceBusReceiverInstrumentation instrumentation; private final AtomicInteger activeHandlerCount = new AtomicInteger(0); private final Object drainLock = new Object(); + private final ThreadLocal isHandlerThread = ThreadLocal.withInitial(() -> Boolean.FALSE); /** * Instantiate {@link MessagePump} that pumps messages emitted by the given {@code client}. The messages @@ -143,6 +144,7 @@ private Mono pollConnectionState() { private void handleMessage(ServiceBusReceivedMessage message) { activeHandlerCount.incrementAndGet(); + isHandlerThread.set(Boolean.TRUE); try { instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> { final Disposable lockRenewDisposable; @@ -163,6 +165,7 @@ private void handleMessage(ServiceBusReceivedMessage message) { return error; }); } finally { + isHandlerThread.set(Boolean.FALSE); if (activeHandlerCount.decrementAndGet() == 0) { synchronized (drainLock) { drainLock.notifyAll(); @@ -216,6 +219,16 @@ private void abandon(ServiceBusReceivedMessage message) { * @return true if all handlers completed within the timeout, false otherwise. */ boolean drainHandlers(Duration timeout) { + if (isHandlerThread.get()) { + // Re-entrant call from within a message handler (e.g., user called close() inside processMessage). + // Waiting here would self-deadlock because this thread's handler incremented the counter and + // cannot decrement it until it returns. Skip the drain — remaining handlers (if any) will + // complete naturally after this handler returns. + logger.atWarning() + .log("drainHandlers called from within a message handler (re-entrant). " + + "Skipping drain to avoid self-deadlock."); + return false; + } final long deadline = System.nanoTime() + timeout.toNanos(); synchronized (drainLock) { while (activeHandlerCount.get() > 0) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index 86f44ad8e49a..8fe4ad2a94b1 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -209,6 +209,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable { // V1 handler tracking for graceful shutdown (not used in V2 path). private final AtomicInteger activeV1HandlerCount = new AtomicInteger(0); private final Object v1DrainLock = new Object(); + private final ThreadLocal isV1HandlerThread = ThreadLocal.withInitial(() -> Boolean.FALSE); /** * Constructor to create a sessions-enabled processor. @@ -462,6 +463,7 @@ public void onSubscribe(Subscription subscription) { @Override public void onNext(ServiceBusMessageContext serviceBusMessageContext) { activeV1HandlerCount.incrementAndGet(); + isV1HandlerThread.set(Boolean.TRUE); try { Context span = serviceBusMessageContext.getMessage() != null ? serviceBusMessageContext.getMessage().getContext() @@ -495,6 +497,7 @@ public void onNext(ServiceBusMessageContext serviceBusMessageContext) { tracer.endSpan(exception, span, scope); } } finally { + isV1HandlerThread.set(Boolean.FALSE); if (activeV1HandlerCount.decrementAndGet() == 0) { synchronized (v1DrainLock) { v1DrainLock.notifyAll(); @@ -584,6 +587,15 @@ private ServiceBusReceiverAsyncClient createNewReceiver() { * @param timeout the maximum time to wait for handlers to complete. */ private void drainV1Handlers(Duration timeout) { + if (isV1HandlerThread.get()) { + // Re-entrant call from within a V1 message handler (e.g., user called close() inside processMessage). + // Waiting here would self-deadlock because this thread's handler incremented the counter and + // cannot decrement it until it returns. Skip the drain — remaining handlers (if any) will + // complete naturally after this handler returns. + LOGGER.warning("drainV1Handlers called from within a V1 message handler (re-entrant). " + + "Skipping drain to avoid self-deadlock."); + return; + } final long deadline = System.nanoTime() + timeout.toNanos(); synchronized (v1DrainLock) { while (activeV1HandlerCount.get() > 0) { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java index f4503ef82493..3174aa24ae08 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java @@ -372,6 +372,7 @@ private static final class RollingSessionReceiver extends AtomicReference isHandlerThread = ThreadLocal.withInitial(() -> Boolean.FALSE); private final Consumer processMessage; private final Consumer processError; private final boolean enableAutoDisposition; @@ -468,6 +469,16 @@ private Mono terminate(TerminalSignalType signalType, Scheduler workerSche * @param timeout the maximum time to wait for in-flight handlers to complete. */ private void drainHandlers(Duration timeout) { + if (isHandlerThread.get()) { + // Re-entrant call from within a session message handler (e.g., user called close() inside processMessage). + // Waiting here would self-deadlock because this thread's handler incremented the counter and + // cannot decrement it until it returns. Skip the drain — remaining handlers (if any) will + // complete naturally after this handler returns. + logger.atWarning() + .log("drainHandlers called from within a session message handler (re-entrant). " + + "Skipping drain to avoid self-deadlock."); + return; + } final long deadline = System.nanoTime() + timeout.toNanos(); synchronized (drainLock) { while (activeHandlerCount.get() > 0) { @@ -528,6 +539,7 @@ private void handleMessage(Message qpidMessage) { = serializer.deserialize(qpidMessage, ServiceBusReceivedMessage.class); activeHandlerCount.incrementAndGet(); + isHandlerThread.set(Boolean.TRUE); try { instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> { logger.atVerbose() @@ -546,6 +558,7 @@ private void handleMessage(Message qpidMessage) { return error; }); } finally { + isHandlerThread.set(Boolean.FALSE); if (activeHandlerCount.decrementAndGet() == 0) { synchronized (drainLock) { drainLock.notifyAll(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java index 5628cf9e5b6e..382749cb4364 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java @@ -335,4 +335,71 @@ public void v2DrainShouldRespectTimeout() throws InterruptedException { neverReleasedLatch.countDown(); subscription.get().dispose(); } + + /** + * Verifies that calling {@code drainHandlers()} from within a message handler (re-entrant) + * does not deadlock. This simulates a user calling {@code processor.close()} from inside + * their {@code processMessage} callback. + *

+ * Without the re-entrancy guard, the handler thread would enter {@code drainHandlers()}, + * which waits for {@code activeHandlerCount} to reach 0. But the handler itself has + * incremented the counter and won't decrement it until it returns — classic self-deadlock. + *

+ *

+ * The fix detects the re-entrant call via a {@link ThreadLocal} flag and skips the drain, + * returning {@code false} with a warning log. + *

+ */ + @Test + public void v2DrainFromWithinHandlerShouldNotDeadlock() throws InterruptedException { + final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceiverAsyncClient client = mock(ServiceBusReceiverAsyncClient.class); + + when(client.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(client.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(client.getEntityPath()).thenReturn("entityPath"); + when(client.isConnectionClosed()).thenReturn(false); + when(client.isAutoLockRenewRequested()).thenReturn(false); + when(client.nonSessionProcessorReceiveV2()) + .thenReturn(Flux.concat(Flux.just(message), Flux.never()) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + when(client.complete(any())).thenReturn(Mono.empty()); + doNothing().when(client).close(); + + final CountDownLatch handlerStarted = new CountDownLatch(1); + final CountDownLatch handlerDone = new CountDownLatch(1); + final AtomicBoolean drainReturnedFalse = new AtomicBoolean(false); + + // Create the pump first, then reference it inside the handler via AtomicReference. + final AtomicReference pumpRef = new AtomicReference<>(); + + final Consumer messageConsumer = (messageContext) -> { + handlerStarted.countDown(); + // Simulate user calling close() from within processMessage, which triggers drainHandlers(). + // With the re-entrancy guard, this should return false immediately instead of deadlocking. + boolean result = pumpRef.get().drainHandlers(Duration.ofSeconds(5)); + drainReturnedFalse.set(!result); + handlerDone.countDown(); + }; + + final MessagePump pump = new MessagePump(client, messageConsumer, e -> { + }, 1, false); + pumpRef.set(pump); + + // Subscribe to start pumping. + final AtomicReference subscription = new AtomicReference<>(); + subscription.set(pump.begin().subscribe()); + + // Wait for the handler to start and complete (should NOT deadlock). + assertTrue(handlerStarted.await(5, TimeUnit.SECONDS), "Handler should have started processing"); + assertTrue(handlerDone.await(5, TimeUnit.SECONDS), + "Handler should have completed without deadlocking on re-entrant drainHandlers()"); + + // The re-entrant drain should have returned false (skipped). + assertTrue(drainReturnedFalse.get(), + "Re-entrant drainHandlers() should return false (skip drain to avoid self-deadlock)"); + + // Clean up. + subscription.get().dispose(); + } } From 924859547474f7c85323ce8fb27e93444bf2cbd0 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Mon, 2 Mar 2026 16:54:36 -0800 Subject: [PATCH 3/9] Add closing flag to prevent handler dispatch after drain starts After drainHandlers() returns but before the Flux subscription is disposed, flatMap can dispatch a new handler that attempts settlement on a closing client. Add a volatile boolean closing flag to MessagePump and SessionsMessagePump.RollingSessionReceiver, set at the start of drainHandlers() and checked at the top of handleMessage(). Handlers that see the flag skip processing and return immediately. V1 path is unaffected (isRunning already gates subscription.request). New test: v2ClosingFlagPreventsNewHandlersAfterDrainStarts. --- .../messaging/servicebus/MessagePump.java | 6 ++ .../servicebus/SessionsMessagePump.java | 6 ++ ...rviceBusProcessorGracefulShutdownTest.java | 92 +++++++++++++++++++ 3 files changed, 104 insertions(+) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java index 47c8cb9b2c50..e645104fc7b9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java @@ -61,6 +61,7 @@ final class MessagePump { private final AtomicInteger activeHandlerCount = new AtomicInteger(0); private final Object drainLock = new Object(); private final ThreadLocal isHandlerThread = ThreadLocal.withInitial(() -> Boolean.FALSE); + private volatile boolean closing; /** * Instantiate {@link MessagePump} that pumps messages emitted by the given {@code client}. The messages @@ -146,6 +147,10 @@ private void handleMessage(ServiceBusReceivedMessage message) { activeHandlerCount.incrementAndGet(); isHandlerThread.set(Boolean.TRUE); try { + if (closing) { + logger.atVerbose().log("Skipping handler execution, pump is closing."); + return; + } instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> { final Disposable lockRenewDisposable; if (enableAutoLockRenew) { @@ -219,6 +224,7 @@ private void abandon(ServiceBusReceivedMessage message) { * @return true if all handlers completed within the timeout, false otherwise. */ boolean drainHandlers(Duration timeout) { + closing = true; if (isHandlerThread.get()) { // Re-entrant call from within a message handler (e.g., user called close() inside processMessage). // Waiting here would self-deadlock because this thread's handler incremented the counter and diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java index 3174aa24ae08..ae22439717d9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java @@ -373,6 +373,7 @@ private static final class RollingSessionReceiver extends AtomicReference isHandlerThread = ThreadLocal.withInitial(() -> Boolean.FALSE); + private volatile boolean closing; private final Consumer processMessage; private final Consumer processError; private final boolean enableAutoDisposition; @@ -469,6 +470,7 @@ private Mono terminate(TerminalSignalType signalType, Scheduler workerSche * @param timeout the maximum time to wait for in-flight handlers to complete. */ private void drainHandlers(Duration timeout) { + closing = true; if (isHandlerThread.get()) { // Re-entrant call from within a session message handler (e.g., user called close() inside processMessage). // Waiting here would self-deadlock because this thread's handler incremented the counter and @@ -541,6 +543,10 @@ private void handleMessage(Message qpidMessage) { activeHandlerCount.incrementAndGet(); isHandlerThread.set(Boolean.TRUE); try { + if (closing) { + logger.atVerbose().log("Skipping handler execution, session pump is closing."); + return; + } instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> { logger.atVerbose() .addKeyValue(SESSION_ID_KEY, message.getSessionId()) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java index 382749cb4364..50ab536801a6 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java @@ -402,4 +402,96 @@ public void v2DrainFromWithinHandlerShouldNotDeadlock() throws InterruptedExcept // Clean up. subscription.get().dispose(); } + + /** + * Verifies that the closing flag prevents new message handlers from processing after + * {@code drainHandlers()} is called. + *

+ * Race condition scenario: with {@code flatMap(concurrency=2)}, two messages are emitted. + * The first handler blocks (simulating in-flight work). {@code drainHandlers()} is called + * on a separate thread, which sets {@code closing = true} and waits for the first handler + * to complete. A second message arrives while closing is true. The second handler should + * see the closing flag and skip processing. + *

+ *

+ * Without the closing flag, the second handler could start real work (including settlement) + * between drain returning and subscription disposal, reintroducing the original failure mode. + *

+ */ + @Test + public void v2ClosingFlagPreventsNewHandlersAfterDrainStarts() throws InterruptedException { + final ServiceBusReceivedMessage message1 = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceivedMessage message2 = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceiverAsyncClient client = mock(ServiceBusReceiverAsyncClient.class); + + when(client.getInstrumentation()).thenReturn(INSTRUMENTATION); + when(client.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(client.getEntityPath()).thenReturn("entityPath"); + when(client.isConnectionClosed()).thenReturn(false); + when(client.isAutoLockRenewRequested()).thenReturn(false); + when(client.complete(any())).thenReturn(Mono.empty()); + doNothing().when(client).close(); + + // Emit message1 immediately, then message2 after a short delay (to ensure message1's handler starts first). + // Use concurrency=2 so flatMap can dispatch both handlers concurrently. + when(client.nonSessionProcessorReceiveV2()) + .thenReturn(Flux.concat( + Flux.just(message1), + Flux.just(message2).delayElements(Duration.ofMillis(200)), + Flux.never()) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + + final CountDownLatch handler1Started = new CountDownLatch(1); + final CountDownLatch handler1CanProceed = new CountDownLatch(1); + final AtomicBoolean handler2ProcessMessageInvoked = new AtomicBoolean(false); + + final Consumer messageConsumer = (messageContext) -> { + if (messageContext.getMessage() == message1) { + handler1Started.countDown(); + try { + handler1CanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else { + // If this executes, the closing flag did NOT prevent the second handler. + handler2ProcessMessageInvoked.set(true); + } + }; + + final MessagePump pump = new MessagePump(client, messageConsumer, e -> { + }, 2, false); + + final AtomicReference subscription = new AtomicReference<>(); + subscription.set(pump.begin().subscribe()); + + // Wait for handler1 to start. + assertTrue(handler1Started.await(5, TimeUnit.SECONDS), "Handler1 should have started processing"); + + // Call drainHandlers on a separate thread. This sets closing=true and waits for handler1. + final CountDownLatch drainDone = new CountDownLatch(1); + final AtomicBoolean drainResult = new AtomicBoolean(false); + final Thread drainThread = new Thread(() -> { + drainResult.set(pump.drainHandlers(Duration.ofSeconds(10))); + drainDone.countDown(); + }); + drainThread.start(); + + // Give time for drain to start (sets closing=true) and for message2 to arrive. + Thread.sleep(500); + + // Release handler1 so the drain can complete. + handler1CanProceed.countDown(); + + // Wait for drain to finish. + assertTrue(drainDone.await(5, TimeUnit.SECONDS), "Drain should complete after handler1 finishes"); + assertTrue(drainResult.get(), "Drain should return true (all handlers completed)"); + + // The second message's handler should NOT have invoked processMessage because closing was true. + assertFalse(handler2ProcessMessageInvoked.get(), + "Second handler should have been skipped by the closing flag"); + + // Clean up. + subscription.get().dispose(); + } } From aaa21c9956d5e6a610bf55d5adc017d5359de12e Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Mon, 2 Mar 2026 17:19:28 -0800 Subject: [PATCH 4/9] Use ThreadLocal.remove() instead of set(FALSE) in handler finally blocks On pooled threads (Reactor boundedElastic), set(FALSE) leaves a stale entry in the ThreadLocalMap after the pump is GC'd. remove() clears the entry immediately, following Java best practice for ThreadLocal cleanup on long-lived worker threads. --- .../main/java/com/azure/messaging/servicebus/MessagePump.java | 2 +- .../azure/messaging/servicebus/ServiceBusProcessorClient.java | 2 +- .../com/azure/messaging/servicebus/SessionsMessagePump.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java index e645104fc7b9..c757cfc264f0 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java @@ -170,7 +170,7 @@ private void handleMessage(ServiceBusReceivedMessage message) { return error; }); } finally { - isHandlerThread.set(Boolean.FALSE); + isHandlerThread.remove(); if (activeHandlerCount.decrementAndGet() == 0) { synchronized (drainLock) { drainLock.notifyAll(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index 8fe4ad2a94b1..cbfe149e1871 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -497,7 +497,7 @@ public void onNext(ServiceBusMessageContext serviceBusMessageContext) { tracer.endSpan(exception, span, scope); } } finally { - isV1HandlerThread.set(Boolean.FALSE); + isV1HandlerThread.remove(); if (activeV1HandlerCount.decrementAndGet() == 0) { synchronized (v1DrainLock) { v1DrainLock.notifyAll(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java index ae22439717d9..de82550750bc 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java @@ -564,7 +564,7 @@ private void handleMessage(Message qpidMessage) { return error; }); } finally { - isHandlerThread.set(Boolean.FALSE); + isHandlerThread.remove(); if (activeHandlerCount.decrementAndGet() == 0) { synchronized (drainLock) { drainLock.notifyAll(); From d8ec586fe59776110f992f139d1f6ad236bd0408 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Mon, 2 Mar 2026 17:29:42 -0800 Subject: [PATCH 5/9] Fix spotless formatting in graceful shutdown test --- .../ServiceBusProcessorGracefulShutdownTest.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java index 50ab536801a6..70d0b07506ba 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java @@ -435,10 +435,9 @@ public void v2ClosingFlagPreventsNewHandlersAfterDrainStarts() throws Interrupte // Emit message1 immediately, then message2 after a short delay (to ensure message1's handler starts first). // Use concurrency=2 so flatMap can dispatch both handlers concurrently. when(client.nonSessionProcessorReceiveV2()) - .thenReturn(Flux.concat( - Flux.just(message1), - Flux.just(message2).delayElements(Duration.ofMillis(200)), - Flux.never()) + .thenReturn(Flux + .concat(Flux.just(message1), Flux.just(message2).delayElements(Duration.ofMillis(200)), + Flux.never()) .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); final CountDownLatch handler1Started = new CountDownLatch(1); @@ -488,8 +487,7 @@ public void v2ClosingFlagPreventsNewHandlersAfterDrainStarts() throws Interrupte assertTrue(drainResult.get(), "Drain should return true (all handlers completed)"); // The second message's handler should NOT have invoked processMessage because closing was true. - assertFalse(handler2ProcessMessageInvoked.get(), - "Second handler should have been skipped by the closing flag"); + assertFalse(handler2ProcessMessageInvoked.get(), "Second handler should have been skipped by the closing flag"); // Clean up. subscription.get().dispose(); From 94a93a1c5078d7d1c1de99eaa4ab897f84b82d89 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Mon, 2 Mar 2026 19:30:15 -0800 Subject: [PATCH 6/9] Re-entrant drain waits for other concurrent handlers before closing When a handler calls close() re-entrantly, the drain now waits for OTHER concurrent handlers to complete (threshold=1) before cancelling subscriptions and closing the client. Previously the drain returned immediately, which could interrupt concurrent handlers mid-settlement. Applied consistently across V1 (ServiceBusProcessorClient), V2 (MessagePump), and sessions (SessionsMessagePump). Notification threshold updated from == 0 to <= 1 so the re-entrant waiter gets notified. --- .../messaging/servicebus/MessagePump.java | 22 ++-- .../servicebus/ServiceBusProcessorClient.java | 22 ++-- .../servicebus/SessionsMessagePump.java | 22 ++-- ...rviceBusProcessorGracefulShutdownTest.java | 120 ++++++++++++++++-- 4 files changed, 153 insertions(+), 33 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java index c757cfc264f0..8c9981d977d0 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java @@ -171,7 +171,7 @@ private void handleMessage(ServiceBusReceivedMessage message) { }); } finally { isHandlerThread.remove(); - if (activeHandlerCount.decrementAndGet() == 0) { + if (activeHandlerCount.decrementAndGet() <= 1) { synchronized (drainLock) { drainLock.notifyAll(); } @@ -225,19 +225,25 @@ private void abandon(ServiceBusReceivedMessage message) { */ boolean drainHandlers(Duration timeout) { closing = true; + final int threshold; if (isHandlerThread.get()) { // Re-entrant call from within a message handler (e.g., user called close() inside processMessage). - // Waiting here would self-deadlock because this thread's handler incremented the counter and - // cannot decrement it until it returns. Skip the drain — remaining handlers (if any) will - // complete naturally after this handler returns. - logger.atWarning() + // Cannot wait for this thread's own handler to complete (would self-deadlock), but we can + // wait for OTHER concurrent handlers to finish settlement before the underlying client is disposed. + threshold = 1; + if (activeHandlerCount.get() <= threshold) { + return true; + } + logger.atInfo() + .addKeyValue("otherActiveHandlers", activeHandlerCount.get() - 1) .log("drainHandlers called from within a message handler (re-entrant). " - + "Skipping drain to avoid self-deadlock."); - return false; + + "Waiting for other active handlers to complete."); + } else { + threshold = 0; } final long deadline = System.nanoTime() + timeout.toNanos(); synchronized (drainLock) { - while (activeHandlerCount.get() > 0) { + while (activeHandlerCount.get() > threshold) { final long remainingNanos = deadline - System.nanoTime(); if (remainingNanos <= 0) { logger.atWarning() diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index cbfe149e1871..8ce207261ed9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -498,7 +498,7 @@ public void onNext(ServiceBusMessageContext serviceBusMessageContext) { } } finally { isV1HandlerThread.remove(); - if (activeV1HandlerCount.decrementAndGet() == 0) { + if (activeV1HandlerCount.decrementAndGet() <= 1) { synchronized (v1DrainLock) { v1DrainLock.notifyAll(); } @@ -587,18 +587,24 @@ private ServiceBusReceiverAsyncClient createNewReceiver() { * @param timeout the maximum time to wait for handlers to complete. */ private void drainV1Handlers(Duration timeout) { + final int threshold; if (isV1HandlerThread.get()) { // Re-entrant call from within a V1 message handler (e.g., user called close() inside processMessage). - // Waiting here would self-deadlock because this thread's handler incremented the counter and - // cannot decrement it until it returns. Skip the drain — remaining handlers (if any) will - // complete naturally after this handler returns. - LOGGER.warning("drainV1Handlers called from within a V1 message handler (re-entrant). " - + "Skipping drain to avoid self-deadlock."); - return; + // Cannot wait for this thread's own handler to complete (would self-deadlock), but we can + // wait for OTHER concurrent handlers to finish settlement before cancelling subscriptions + // and closing the underlying client. + threshold = 1; + if (activeV1HandlerCount.get() <= threshold) { + return; + } + LOGGER.info("drainV1Handlers called from within a V1 message handler (re-entrant). " + + "Waiting for {} other active handler(s) to complete.", activeV1HandlerCount.get() - 1); + } else { + threshold = 0; } final long deadline = System.nanoTime() + timeout.toNanos(); synchronized (v1DrainLock) { - while (activeV1HandlerCount.get() > 0) { + while (activeV1HandlerCount.get() > threshold) { final long remainingNanos = deadline - System.nanoTime(); if (remainingNanos <= 0) { LOGGER.warning("V1 drain timeout expired with {} active handlers still running.", diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java index de82550750bc..d47310c138df 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java @@ -471,19 +471,25 @@ private Mono terminate(TerminalSignalType signalType, Scheduler workerSche */ private void drainHandlers(Duration timeout) { closing = true; + final int threshold; if (isHandlerThread.get()) { // Re-entrant call from within a session message handler (e.g., user called close() inside processMessage). - // Waiting here would self-deadlock because this thread's handler incremented the counter and - // cannot decrement it until it returns. Skip the drain — remaining handlers (if any) will - // complete naturally after this handler returns. - logger.atWarning() + // Cannot wait for this thread's own handler to complete (would self-deadlock), but we can + // wait for OTHER concurrent handlers to finish settlement before disposing the worker scheduler. + threshold = 1; + if (activeHandlerCount.get() <= threshold) { + return; + } + logger.atInfo() + .addKeyValue("otherActiveHandlers", activeHandlerCount.get() - 1) .log("drainHandlers called from within a session message handler (re-entrant). " - + "Skipping drain to avoid self-deadlock."); - return; + + "Waiting for other active handlers to complete."); + } else { + threshold = 0; } final long deadline = System.nanoTime() + timeout.toNanos(); synchronized (drainLock) { - while (activeHandlerCount.get() > 0) { + while (activeHandlerCount.get() > threshold) { final long remainingNanos = deadline - System.nanoTime(); if (remainingNanos <= 0) { logger.atWarning() @@ -565,7 +571,7 @@ private void handleMessage(Message qpidMessage) { }); } finally { isHandlerThread.remove(); - if (activeHandlerCount.decrementAndGet() == 0) { + if (activeHandlerCount.decrementAndGet() <= 1) { synchronized (drainLock) { drainLock.notifyAll(); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java index 70d0b07506ba..c66a7cd408ce 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java @@ -52,6 +52,12 @@ * Tests drain in {@code ServiceBusProcessorClient.close()} → {@code drainV1Handlers()} *
  • Drain Timeout — {@link #v2DrainShouldRespectTimeout()}: * Tests {@code MessagePump.drainHandlers()} timeout behavior directly
  • + *
  • Re-entrant (single) — {@link #v2DrainFromWithinHandlerShouldNotDeadlock()}: + * Tests re-entrant drain with no other concurrent handlers (returns true immediately)
  • + *
  • Closing Flag — {@link #v2ClosingFlagPreventsNewHandlersAfterDrainStarts()}: + * Tests that the closing flag prevents new handler dispatch during drain
  • + *
  • Re-entrant (concurrent) — {@link #v1ReentrantCloseWaitsForOtherConcurrentHandlers()}: + * Tests re-entrant drain with concurrent handlers — waits for other handlers before closing
  • *
  • V2 Session — Not directly unit-testable. The drain in * {@code SessionsMessagePump.RollingSessionReceiver.terminate()} uses the identical * {@code AtomicInteger} + {@code Object} monitor wait/notifyAll pattern as {@code MessagePump}. @@ -339,15 +345,16 @@ public void v2DrainShouldRespectTimeout() throws InterruptedException { /** * Verifies that calling {@code drainHandlers()} from within a message handler (re-entrant) * does not deadlock. This simulates a user calling {@code processor.close()} from inside - * their {@code processMessage} callback. + * their {@code processMessage} callback when only this handler is active (no concurrent handlers). *

    * Without the re-entrancy guard, the handler thread would enter {@code drainHandlers()}, * which waits for {@code activeHandlerCount} to reach 0. But the handler itself has * incremented the counter and won't decrement it until it returns — classic self-deadlock. *

    *

    - * The fix detects the re-entrant call via a {@link ThreadLocal} flag and skips the drain, - * returning {@code false} with a warning log. + * The fix detects the re-entrant call via a {@link ThreadLocal} flag and uses a threshold of 1 + * (only wait for OTHER handlers). With no other handlers active, it returns {@code true} + * immediately. *

    */ @Test @@ -368,7 +375,7 @@ public void v2DrainFromWithinHandlerShouldNotDeadlock() throws InterruptedExcept final CountDownLatch handlerStarted = new CountDownLatch(1); final CountDownLatch handlerDone = new CountDownLatch(1); - final AtomicBoolean drainReturnedFalse = new AtomicBoolean(false); + final AtomicBoolean drainReturnedTrue = new AtomicBoolean(false); // Create the pump first, then reference it inside the handler via AtomicReference. final AtomicReference pumpRef = new AtomicReference<>(); @@ -376,9 +383,10 @@ public void v2DrainFromWithinHandlerShouldNotDeadlock() throws InterruptedExcept final Consumer messageConsumer = (messageContext) -> { handlerStarted.countDown(); // Simulate user calling close() from within processMessage, which triggers drainHandlers(). - // With the re-entrancy guard, this should return false immediately instead of deadlocking. + // With only the current handler active (no other concurrent handlers), the re-entrant drain + // should return true immediately (nothing to drain) instead of deadlocking. boolean result = pumpRef.get().drainHandlers(Duration.ofSeconds(5)); - drainReturnedFalse.set(!result); + drainReturnedTrue.set(result); handlerDone.countDown(); }; @@ -395,9 +403,9 @@ public void v2DrainFromWithinHandlerShouldNotDeadlock() throws InterruptedExcept assertTrue(handlerDone.await(5, TimeUnit.SECONDS), "Handler should have completed without deadlocking on re-entrant drainHandlers()"); - // The re-entrant drain should have returned false (skipped). - assertTrue(drainReturnedFalse.get(), - "Re-entrant drainHandlers() should return false (skip drain to avoid self-deadlock)"); + // Re-entrant drain with no other concurrent handlers should return true (nothing to drain). + assertTrue(drainReturnedTrue.get(), + "Re-entrant drainHandlers() with no other concurrent handlers should return true"); // Clean up. subscription.get().dispose(); @@ -492,4 +500,98 @@ public void v2ClosingFlagPreventsNewHandlersAfterDrainStarts() throws Interrupte // Clean up. subscription.get().dispose(); } + + /** + * Verifies that when a V1 handler calls {@code close()} re-entrantly with other concurrent + * handlers running, the re-entrant drain waits for those other handlers to complete before + * proceeding to cancel subscriptions and close the underlying client. + *

    + * With {@code maxConcurrentCalls=2}, two handlers run concurrently on separate + * {@code boundedElastic} threads. Handler B calls {@code processorClient.close()} while + * Handler A is still processing. {@code drainV1Handlers()} detects the re-entrant call + * (threshold=1) and waits until only the calling handler remains before allowing + * {@code close()} to proceed with subscription cancellation and client disposal. + *

    + *

    + * Without this fix, the re-entrant drain would return immediately, and {@code close()} + * would cancel subscriptions and call {@code asyncClient.close()} while Handler A + * is mid-settlement, reintroducing the original failure mode from issue #45716. + *

    + */ + @Test + public void v1ReentrantCloseWaitsForOtherConcurrentHandlers() throws InterruptedException { + final ServiceBusReceivedMessage message1 = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceivedMessage message2 = mock(ServiceBusReceivedMessage.class); + + final ServiceBusReceiverClientBuilder receiverBuilder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient asyncClient = mock(ServiceBusReceiverAsyncClient.class); + + when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient); + when(asyncClient.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(asyncClient.getEntityPath()).thenReturn("entityPath"); + when(asyncClient.isConnectionClosed()).thenReturn(false); + when(asyncClient.getInstrumentation()).thenReturn(INSTRUMENTATION); + // Two messages arrive on separate parallel rails, processed concurrently. + when(asyncClient.receiveMessagesWithContext()).thenReturn(Flux.just(message1, message2) + .map(ServiceBusMessageContext::new) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + doNothing().when(asyncClient).close(); + + final CountDownLatch handler1Started = new CountDownLatch(1); + final CountDownLatch handler1CanProceed = new CountDownLatch(1); + final CountDownLatch handler2Started = new CountDownLatch(1); + final AtomicBoolean handler1Completed = new AtomicBoolean(false); + final AtomicReference processorRef = new AtomicReference<>(); + + final Consumer messageConsumer = (messageContext) -> { + if (messageContext.getMessage() == message1) { + handler1Started.countDown(); + try { + handler1CanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + handler1Completed.set(true); + } else { + handler2Started.countDown(); + // Wait for handler1 to start before calling close(), ensuring both handlers + // are running concurrently when the re-entrant close occurs. + try { + handler1Started.await(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + // Re-entrant close from within a handler. The drain should wait for handler1 + // (the other concurrent handler) before proceeding to close the client. + processorRef.get().close(); + } + }; + + final ServiceBusProcessorClientOptions options + = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(2); + final ServiceBusProcessorClient processorClient + = new ServiceBusProcessorClient(receiverBuilder, "entityPath", null, null, messageConsumer, error -> { + }, options); + processorRef.set(processorClient); + + processorClient.start(); + + // Wait for both handlers to start. + assertTrue(handler1Started.await(5, TimeUnit.SECONDS), "Handler1 should have started processing"); + assertTrue(handler2Started.await(5, TimeUnit.SECONDS), "Handler2 should have started processing"); + + // Give handler2's close() a moment to enter drainV1Handlers and start waiting. + Thread.sleep(300); + + // Handler1 is still running, handler2 is blocked in close() waiting for handler1 to finish. + verify(asyncClient, never()).close(); + assertFalse(handler1Completed.get(), "Handler1 should still be in-flight"); + + // Release handler1. + handler1CanProceed.countDown(); + + // Handler2's close() should now complete (handler1 finished, drain threshold reached). + verify(asyncClient, timeout(5000)).close(); + assertTrue(handler1Completed.get(), "Handler1 should have completed before client was closed"); + } } From 18c7f4ae7f2a01578870459e21d82fc15ec887ff Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Mon, 2 Mar 2026 20:10:29 -0800 Subject: [PATCH 7/9] Add close() Javadoc for drain behavior and try/finally cleanup in tests - Document that close() blocks while draining in-flight handlers (up to 30s) before cancelling subscriptions and closing the underlying client. - Add try/finally in V2 and V1 drain tests to release latches and join threads on assertion failure, preventing leaked threads from blocking the test suite. --- .../servicebus/ServiceBusProcessorClient.java | 7 ++ ...rviceBusProcessorGracefulShutdownTest.java | 70 +++++++++++-------- 2 files changed, 47 insertions(+), 30 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index 8ce207261ed9..20a6864312a7 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -351,6 +351,13 @@ public synchronized void stop() { /** * Stops message processing and closes the processor. The receiving links and sessions are closed and calling * {@link #start()} will create a new processing cycle with new links and new sessions. + * + *

    This method blocks while waiting for in-flight message handlers to complete (up to 30 seconds) before + * cancelling subscriptions and closing the underlying client. This ensures handlers can finish message + * settlement without encountering a disposed receiver. Callers should avoid invoking {@code close()} on + * latency-sensitive threads. If the drain timeout expires, the processor proceeds with shutdown regardless.

    + * + * @see Issue #45716 */ @Override public synchronized void close() { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java index c66a7cd408ce..d818b3be521a 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java @@ -159,24 +159,29 @@ public void v2CloseShouldWaitForInFlightHandlerBeforeClosingClient() throws Inte }); disposeThread.start(); - // Give dispose a moment to start; it should be blocked in drainHandlers(). - Thread.sleep(200); - - // Verify: client has NOT been closed yet (handler is still running, drain is blocking dispose). - verify(client, never()).close(); - assertFalse(handlerCompleted.get(), "Handler should still be in-flight"); - - // Now let the handler complete. - handlerCanProceed.countDown(); - - // Wait for dispose to finish. - assertTrue(disposeDone.await(5, TimeUnit.SECONDS), "Dispose should complete after handler finishes"); - assertTrue(handlerCompleted.get(), "Handler should have completed"); - - // Verify the client was closed (after the handler completed and drain returned). - verify(client, timeout(2000)).close(); - // Verify complete was called (auto-disposition is enabled). - verify(client).complete(any()); + try { + // Give dispose a moment to start; it should be blocked in drainHandlers(). + Thread.sleep(200); + + // Verify: client has NOT been closed yet (handler is still running, drain is blocking dispose). + verify(client, never()).close(); + assertFalse(handlerCompleted.get(), "Handler should still be in-flight"); + + // Now let the handler complete. + handlerCanProceed.countDown(); + + // Wait for dispose to finish. + assertTrue(disposeDone.await(5, TimeUnit.SECONDS), "Dispose should complete after handler finishes"); + assertTrue(handlerCompleted.get(), "Handler should have completed"); + + // Verify the client was closed (after the handler completed and drain returned). + verify(client, timeout(2000)).close(); + // Verify complete was called (auto-disposition is enabled). + verify(client).complete(any()); + } finally { + handlerCanProceed.countDown(); + disposeThread.join(5000); + } } /** @@ -253,22 +258,27 @@ public void v1CloseShouldWaitForInFlightHandlerBeforeClosingClient() throws Inte }); closeThread.start(); - // Give close a moment to start; it should be blocked in drainV1Handlers(). - Thread.sleep(200); + try { + // Give close a moment to start; it should be blocked in drainV1Handlers(). + Thread.sleep(200); - // Verify: client has NOT been closed yet (handler is still running, drain is blocking close). - verify(asyncClient, never()).close(); - assertFalse(handlerCompleted.get(), "Handler should still be in-flight"); + // Verify: client has NOT been closed yet (handler is still running, drain is blocking close). + verify(asyncClient, never()).close(); + assertFalse(handlerCompleted.get(), "Handler should still be in-flight"); - // Now let the handler complete. - handlerCanProceed.countDown(); + // Now let the handler complete. + handlerCanProceed.countDown(); - // Wait for close to finish. - assertTrue(closeDone.await(5, TimeUnit.SECONDS), "Close should complete after handler finishes"); - assertTrue(handlerCompleted.get(), "Handler should have completed"); + // Wait for close to finish. + assertTrue(closeDone.await(5, TimeUnit.SECONDS), "Close should complete after handler finishes"); + assertTrue(handlerCompleted.get(), "Handler should have completed"); - // Verify the client was closed (after the handler completed). - verify(asyncClient, timeout(2000)).close(); + // Verify the client was closed (after the handler completed). + verify(asyncClient, timeout(2000)).close(); + } finally { + handlerCanProceed.countDown(); + closeThread.join(5000); + } } /** From db626827363fb237bf3b0b5e8a484213f1228813 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Mon, 2 Mar 2026 20:49:48 -0800 Subject: [PATCH 8/9] Add V1 closing flag to prevent handler dispatch during drain-to-cancel window --- .../servicebus/ServiceBusProcessorClient.java | 6 ++ ...rviceBusProcessorGracefulShutdownTest.java | 95 ++++++++++++++++++- 2 files changed, 100 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index 20a6864312a7..f1415f426506 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -210,6 +210,7 @@ public final class ServiceBusProcessorClient implements AutoCloseable { private final AtomicInteger activeV1HandlerCount = new AtomicInteger(0); private final Object v1DrainLock = new Object(); private final ThreadLocal isV1HandlerThread = ThreadLocal.withInitial(() -> Boolean.FALSE); + private volatile boolean v1Closing; /** * Constructor to create a sessions-enabled processor. @@ -472,6 +473,10 @@ public void onNext(ServiceBusMessageContext serviceBusMessageContext) { activeV1HandlerCount.incrementAndGet(); isV1HandlerThread.set(Boolean.TRUE); try { + if (v1Closing) { + LOGGER.verbose("Skipping V1 handler execution, processor is closing."); + return; + } Context span = serviceBusMessageContext.getMessage() != null ? serviceBusMessageContext.getMessage().getContext() : Context.NONE; @@ -594,6 +599,7 @@ private ServiceBusReceiverAsyncClient createNewReceiver() { * @param timeout the maximum time to wait for handlers to complete. */ private void drainV1Handlers(Duration timeout) { + v1Closing = true; final int threshold; if (isV1HandlerThread.get()) { // Re-entrant call from within a V1 message handler (e.g., user called close() inside processMessage). diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java index d818b3be521a..37a79e83aaa6 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java @@ -55,7 +55,9 @@ *
  • Re-entrant (single) — {@link #v2DrainFromWithinHandlerShouldNotDeadlock()}: * Tests re-entrant drain with no other concurrent handlers (returns true immediately)
  • *
  • Closing Flag — {@link #v2ClosingFlagPreventsNewHandlersAfterDrainStarts()}: - * Tests that the closing flag prevents new handler dispatch during drain
  • + * Tests that the V2 closing flag prevents new handler dispatch during drain + *
  • V1 Closing Flag — {@link #v1ClosingFlagPreventsNewHandlersAfterDrainStarts()}: + * Tests that the V1 closing flag prevents new handler dispatch in the drain-to-cancel window
  • *
  • Re-entrant (concurrent) — {@link #v1ReentrantCloseWaitsForOtherConcurrentHandlers()}: * Tests re-entrant drain with concurrent handlers — waits for other handlers before closing
  • *
  • V2 Session — Not directly unit-testable. The drain in @@ -604,4 +606,95 @@ public void v1ReentrantCloseWaitsForOtherConcurrentHandlers() throws Interrupted verify(asyncClient, timeout(5000)).close(); assertTrue(handler1Completed.get(), "Handler1 should have completed before client was closed"); } + + /** + * Verifies that the V1 closing flag prevents new message handlers from executing user + * callback/settlement after {@code drainV1Handlers()} is triggered. + *

    + * Race condition scenario: with {@code maxConcurrentCalls=1}, a single message is + * in-flight when {@code close()} is called. {@code drainV1Handlers()} sets + * {@code v1Closing = true} and waits for the handler. Meanwhile, the subscriber still + * has an outstanding {@code request(1)}, so a second message can arrive via + * {@code onNext} during the drain-to-cancel window. The closing flag ensures that + * no user callback runs for messages arriving after shutdown begins. + *

    + */ + @Test + public void v1ClosingFlagPreventsNewHandlersAfterDrainStarts() throws InterruptedException { + final ServiceBusReceivedMessage message1 = mock(ServiceBusReceivedMessage.class); + final ServiceBusReceivedMessage message2 = mock(ServiceBusReceivedMessage.class); + + final ServiceBusReceiverClientBuilder receiverBuilder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient asyncClient = mock(ServiceBusReceiverAsyncClient.class); + + when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient); + when(asyncClient.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(asyncClient.getEntityPath()).thenReturn("entityPath"); + when(asyncClient.isConnectionClosed()).thenReturn(false); + when(asyncClient.getInstrumentation()).thenReturn(INSTRUMENTATION); + // Emit message1 immediately, then message2 after a delay (simulating a message arriving + // during the drain-to-cancel window). + when(asyncClient.receiveMessagesWithContext()).thenReturn(Flux + .concat(Flux.just(message1), Flux.just(message2).delayElements(Duration.ofMillis(300)), + Flux.never()) + .map(ServiceBusMessageContext::new) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + doNothing().when(asyncClient).close(); + + final CountDownLatch handler1Started = new CountDownLatch(1); + final CountDownLatch handler1CanProceed = new CountDownLatch(1); + final AtomicBoolean handler2ProcessMessageInvoked = new AtomicBoolean(false); + + final Consumer messageConsumer = (messageContext) -> { + if (messageContext.getMessage() == message1) { + handler1Started.countDown(); + try { + handler1CanProceed.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } else { + // If this executes, the V1 closing flag did NOT prevent the second handler. + handler2ProcessMessageInvoked.set(true); + } + }; + + final ServiceBusProcessorClientOptions options + = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1); + final ServiceBusProcessorClient processorClient + = new ServiceBusProcessorClient(receiverBuilder, "entityPath", null, null, messageConsumer, error -> { + }, options); + + processorClient.start(); + + // Wait for handler1 to start processing. + assertTrue(handler1Started.await(5, TimeUnit.SECONDS), "Handler1 should have started processing"); + + // Close the processor on a separate thread. This sets v1Closing=true and drains handler1. + final CountDownLatch closeDone = new CountDownLatch(1); + final Thread closeThread = new Thread(() -> { + processorClient.close(); + closeDone.countDown(); + }); + closeThread.start(); + + try { + // Give close a moment to enter drainV1Handlers (sets v1Closing=true). + Thread.sleep(200); + + // Release handler1 so the drain completes. After drain returns, close() proceeds to + // cancel subscriptions. Message2 may arrive in this window via the outstanding request(1). + handler1CanProceed.countDown(); + + // Wait for close to finish. + assertTrue(closeDone.await(5, TimeUnit.SECONDS), "Close should complete"); + } finally { + handler1CanProceed.countDown(); + closeThread.join(5000); + } + + // The second message's handler should NOT have invoked processMessage because v1Closing was true. + assertFalse(handler2ProcessMessageInvoked.get(), + "Second handler should have been skipped by the V1 closing flag"); + } } From f361b40475a8b6f3416983d223f43341ca247d36 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Mon, 2 Mar 2026 21:44:38 -0800 Subject: [PATCH 9/9] Reset v1Closing flag in start() to support processor restart after close --- .../servicebus/ServiceBusProcessorClient.java | 3 + ...rviceBusProcessorGracefulShutdownTest.java | 66 +++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java index f1415f426506..d9b255cf502d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java @@ -309,6 +309,9 @@ public synchronized void start() { return; } + // Reset shutdown-only state so the processor can restart after a close() cycle. + v1Closing = false; + if (wasStopped) { wasStopped = false; LOGGER.warning( diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java index 37a79e83aaa6..db17d376a732 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java @@ -58,6 +58,8 @@ * Tests that the V2 closing flag prevents new handler dispatch during drain
  • *
  • V1 Closing Flag — {@link #v1ClosingFlagPreventsNewHandlersAfterDrainStarts()}: * Tests that the V1 closing flag prevents new handler dispatch in the drain-to-cancel window
  • + *
  • V1 Restart — {@link #v1StartAfterCloseResetsClosingFlag()}: + * Tests that {@code start()} after {@code close()} resets {@code v1Closing} so handlers run
  • *
  • Re-entrant (concurrent) — {@link #v1ReentrantCloseWaitsForOtherConcurrentHandlers()}: * Tests re-entrant drain with concurrent handlers — waits for other handlers before closing
  • *
  • V2 Session — Not directly unit-testable. The drain in @@ -697,4 +699,68 @@ public void v1ClosingFlagPreventsNewHandlersAfterDrainStarts() throws Interrupte assertFalse(handler2ProcessMessageInvoked.get(), "Second handler should have been skipped by the V1 closing flag"); } + + /** + * Verifies that calling {@code start()} after {@code close()} resets the {@code v1Closing} + * flag so that the processor can begin a new processing cycle. Without the reset, all + * {@code onNext} calls would short-circuit and no messages would be processed. + */ + @Test + public void v1StartAfterCloseResetsClosingFlag() throws InterruptedException { + final ServiceBusReceivedMessage message = mock(ServiceBusReceivedMessage.class); + + final ServiceBusReceiverClientBuilder receiverBuilder = mock(ServiceBusReceiverClientBuilder.class); + final ServiceBusReceiverAsyncClient asyncClient1 = mock(ServiceBusReceiverAsyncClient.class); + final ServiceBusReceiverAsyncClient asyncClient2 = mock(ServiceBusReceiverAsyncClient.class); + + // First call returns asyncClient1 (constructor), second returns asyncClient2 (restart after close). + when(receiverBuilder.buildAsyncClientForProcessor()).thenReturn(asyncClient1, asyncClient2); + + for (ServiceBusReceiverAsyncClient client : new ServiceBusReceiverAsyncClient[] { + asyncClient1, + asyncClient2 }) { + when(client.getFullyQualifiedNamespace()).thenReturn("FQDN"); + when(client.getEntityPath()).thenReturn("entityPath"); + when(client.isConnectionClosed()).thenReturn(false); + when(client.getInstrumentation()).thenReturn(INSTRUMENTATION); + doNothing().when(client).close(); + } + + // First cycle: emit nothing (just close immediately). + when(asyncClient1.receiveMessagesWithContext()).thenReturn( + Flux.never().publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + + // Second cycle: emit one message, then never complete. + when(asyncClient2.receiveMessagesWithContext()).thenReturn(Flux.just(message) + .map(ServiceBusMessageContext::new) + .concatWith(Flux.never()) + .publishOn(reactor.core.scheduler.Schedulers.boundedElastic())); + + final CountDownLatch messageProcessed = new CountDownLatch(1); + + final Consumer messageConsumer = (messageContext) -> { + messageProcessed.countDown(); + }; + + final ServiceBusProcessorClientOptions options + = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1); + final ServiceBusProcessorClient processorClient + = new ServiceBusProcessorClient(receiverBuilder, "entityPath", null, null, messageConsumer, error -> { + }, options); + + try { + // First cycle: start then close (sets v1Closing=true during drain). + processorClient.start(); + processorClient.close(); + + // Second cycle: start again. If v1Closing is not reset, onNext will skip the handler. + processorClient.start(); + + // Verify the handler runs, proving v1Closing was reset. + assertTrue(messageProcessed.await(5, TimeUnit.SECONDS), + "Handler should run after restart, proving v1Closing was reset"); + } finally { + processorClient.close(); + } + } }