Add graceful shutdown drain to ServiceBusProcessorClient#48192
Add graceful shutdown drain to ServiceBusProcessorClient#48192EldertGrootenboer wants to merge 9 commits intoAzure:mainfrom
Conversation
When ServiceBusProcessorClient.close() is called while message handlers are still executing, the receiver was disposed immediately, causing in-flight handlers to fail with IllegalStateException. Add drain-before-dispose logic using an AtomicInteger handler counter and Object monitor wait/notify to all processor shutdown paths: - MessagePump (V2 non-session) - ServiceBusProcessor.RollingMessagePump (V2 non-session lifecycle) - SessionsMessagePump.RollingSessionReceiver (V2 session) - ServiceBusProcessorClient V1 close path The drain executes before subscription cancellation/disposal, with a configurable timeout (default 30s) to prevent indefinite blocking. Includes 3 regression tests in ServiceBusProcessorGracefulShutdownTest.
There was a problem hiding this comment.
Pull request overview
This PR adds “drain before dispose” behavior to Service Bus processor shutdown paths to avoid failing in-flight message handlers (and their settlement calls) with IllegalStateException when the underlying receiver is disposed during close().
Changes:
- Track in-flight handler execution and block shutdown briefly to allow handlers to complete (with a 30s timeout).
- Apply draining to V2 non-session (
MessagePump/RollingMessagePump), V2 session (RollingSessionReceiver.terminate()), and V1 (ServiceBusProcessorClient.close()). - Add regression tests covering V2 non-session and V1 shutdown draining plus a drain-timeout test.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java | Adds handler counting + drainHandlers(Duration) used to block shutdown until in-flight handlers finish or timeout. |
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessor.java | Wires draining into RollingMessagePump.dispose() before disposing the subscription. |
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java | Adds per-session handler counting + drain during termination before disposing the worker scheduler. |
| sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java | Adds V1 handler counting + drainV1Handlers(Duration) invoked during close() before subscription cancellation. |
| sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java | Adds new unit tests validating drain behavior for V2 non-session, V1, and drain timeout. |
Comments suppressed due to low confidence (1)
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java:458
SessionsMessagePump.RollingSessionReceivernow includes new drain-before-dispose behavior, but the added logic isn’t covered by the new regression tests. There are already isolated unit tests forSessionsMessagePumpbehavior; it should be possible to extend them to verify that termination waits for an in-flight handler (or respects the timeout) for the session path as well.
Please add a unit test that exercises session termination while a handler is blocked, to prevent regressions in this new shutdown behavior.
// Drain in-flight message handlers BEFORE disposing the worker scheduler.
// Disposing the scheduler interrupts handler threads (via ScheduledExecutorService.shutdownNow()).
// Draining first ensures handlers can complete message settlement before threads are interrupted.
// See https://github.com/Azure/azure-sdk-for-java/issues/45716
drainHandlers(DRAIN_TIMEOUT);
workerScheduler.dispose();
...bus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java
Show resolved
Hide resolved
...aging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java
Show resolved
Hide resolved
...e-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java
Show resolved
Hide resolved
- Add ThreadLocal<Boolean> flag to detect when drainHandlers is called from within a message handler (e.g., user calls close() inside processMessage callback) - Guard all three drain paths: MessagePump.drainHandlers(), ServiceBusProcessorClient.drainV1Handlers(), and SessionsMessagePump.RollingSessionReceiver.drainHandlers() - When re-entrant call detected, skip drain with warning log and return immediately to avoid self-deadlock - Add v2DrainFromWithinHandlerShouldNotDeadlock test verifying the guard prevents deadlock when drain is called from handler thread
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (3)
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:161
- This test uses a fixed
Thread.sleep(200)to “give dispose a moment to start”. Fixed sleeps are prone to flakiness on slow/loaded CI agents (either too short or unnecessarily long). Prefer a synchronization point that directly observes the expected state (e.g.,assertFalse(disposeDone.await(...)), a latch signaled right before/after entering drain, or Mockito’s timed verification APIs) so the test doesn’t depend on timing heuristics.
// Give dispose a moment to start; it should be blocked in drainHandlers().
Thread.sleep(200);
// Verify: client has NOT been closed yet (handler is still running, drain is blocking dispose).
verify(client, never()).close();
assertFalse(handlerCompleted.get(), "Handler should still be in-flight");
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:256
- This test uses a fixed
Thread.sleep(200)to “give close a moment to start”. As written, it can be flaky under variable scheduling/CPU contention. Use an explicit synchronization condition (e.g.,assertFalse(closeDone.await(...))or a latch that confirms the close thread is blocked in the drain) instead of a fixed sleep.
// Give close a moment to start; it should be blocked in drainV1Handlers().
Thread.sleep(200);
// Verify: client has NOT been closed yet (handler is still running, drain is blocking close).
verify(asyncClient, never()).close();
assertFalse(handlerCompleted.get(), "Handler should still be in-flight");
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:54
- The class-level "Coverage Matrix" JavaDoc is now out of sync with the actual tests in this class: it lists only the V2/V1/timeout scenarios, but the class also includes a re-entrant drain regression test (
v2DrainFromWithinHandlerShouldNotDeadlock). Please update the matrix to reflect all covered scenarios so readers don’t miss this important case.
* <h3>Coverage Matrix</h3>
* <ul>
* <li><b>V2 Non-Session</b> — {@link #v2CloseShouldWaitForInFlightHandlerBeforeClosingClient()}:
* Tests drain in {@code RollingMessagePump.dispose()} → {@code MessagePump.drainHandlers()}</li>
* <li><b>V1 Non-Session</b> — {@link #v1CloseShouldWaitForInFlightHandlerBeforeClosingClient()}:
* Tests drain in {@code ServiceBusProcessorClient.close()} → {@code drainV1Handlers()}</li>
* <li><b>Drain Timeout</b> — {@link #v2DrainShouldRespectTimeout()}:
* Tests {@code MessagePump.drainHandlers()} timeout behavior directly</li>
...e-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessor.java
Show resolved
Hide resolved
After drainHandlers() returns but before the Flux subscription is disposed, flatMap can dispatch a new handler that attempts settlement on a closing client. Add a volatile boolean closing flag to MessagePump and SessionsMessagePump.RollingSessionReceiver, set at the start of drainHandlers() and checked at the top of handleMessage(). Handlers that see the flag skip processing and return immediately. V1 path is unaffected (isRunning already gates subscription.request). New test: v2ClosingFlagPreventsNewHandlersAfterDrainStarts.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (3)
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java:176
isHandlerThreadis an instanceThreadLocal. Setting it back toFALSEleaves an entry in the thread’sThreadLocalMapfor eachMessagePumpinstance that ever ran on that pooled thread. Since pumps can be recreated (rolling/retry) and worker threads are long-lived, this can accumulate stale entries and increase memory usage over time. Prefer callingisHandlerThread.remove()in thefinallyblock instead ofset(Boolean.FALSE)to ensure the entry is cleared.
} finally {
isHandlerThread.set(Boolean.FALSE);
if (activeHandlerCount.decrementAndGet() == 0) {
synchronized (drainLock) {
drainLock.notifyAll();
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java:503
isV1HandlerThreadis an instanceThreadLocal. Resetting it withset(Boolean.FALSE)keeps a per-processor entry in the thread’sThreadLocalMap, which can accumulate on pooled threads over the lifetime of the application. PreferisV1HandlerThread.remove()in thefinallyblock to fully clear the entry after each callback.
} finally {
isV1HandlerThread.set(Boolean.FALSE);
if (activeV1HandlerCount.decrementAndGet() == 0) {
synchronized (v1DrainLock) {
v1DrainLock.notifyAll();
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java:571
isHandlerThreadis an instanceThreadLocal. Setting it toFALSEleaves an entry behind in the thread’sThreadLocalMap, and since session pumps/receivers can be recreated while using long-lived pooled threads, this can accumulate stale entries over time. PreferisHandlerThread.remove()in thefinallyblock to clear theThreadLocalafter each handler execution.
} finally {
isHandlerThread.set(Boolean.FALSE);
if (activeHandlerCount.decrementAndGet() == 0) {
synchronized (drainLock) {
drainLock.notifyAll();
}
On pooled threads (Reactor boundedElastic), set(FALSE) leaves a stale entry in the ThreadLocalMap after the pump is GC'd. remove() clears the entry immediately, following Java best practice for ThreadLocal cleanup on long-lived worker threads.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (4)
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:157
- These
Thread.sleep(...)calls make the tests timing-dependent and can be flaky under load/slow CI. Prefer synchronizing on explicit signals (additional latches) or Mockito’s time-based verification (verify(mock, after(ms).never())...,timeout(ms)with zero invocations) so the test waits deterministically without assuming scheduler timing.
Thread.sleep(200);
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:251
- These
Thread.sleep(...)calls make the tests timing-dependent and can be flaky under load/slow CI. Prefer synchronizing on explicit signals (additional latches) or Mockito’s time-based verification (verify(mock, after(ms).never())...,timeout(ms)with zero invocations) so the test waits deterministically without assuming scheduler timing.
Thread.sleep(200);
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:481
- These
Thread.sleep(...)calls make the tests timing-dependent and can be flaky under load/slow CI. Prefer synchronizing on explicit signals (additional latches) or Mockito’s time-based verification (verify(mock, after(ms).never())...,timeout(ms)with zero invocations) so the test waits deterministically without assuming scheduler timing.
Thread.sleep(500);
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java:483
- The drain implementation (counter + lock + re-entrancy guard + timeout loop) is duplicated across
MessagePump,SessionsMessagePump, and V1 (ServiceBusProcessorClient). To reduce divergence risk and make future fixes (e.g., re-entrancy behavior) consistent, consider extracting a small shared helper (package-private) that encapsulates the counter/lock/wait-notify pattern and exposes a clear “drain result”.
private void drainHandlers(Duration timeout) {
closing = true;
if (isHandlerThread.get()) {
// Re-entrant call from within a session message handler (e.g., user called close() inside processMessage).
// Waiting here would self-deadlock because this thread's handler incremented the counter and
// cannot decrement it until it returns. Skip the drain — remaining handlers (if any) will
// complete naturally after this handler returns.
logger.atWarning()
.log("drainHandlers called from within a session message handler (re-entrant). "
+ "Skipping drain to avoid self-deadlock.");
return;
}
...us/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java
Show resolved
Hide resolved
...aging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java
Show resolved
Hide resolved
When a handler calls close() re-entrantly, the drain now waits for OTHER concurrent handlers to complete (threshold=1) before cancelling subscriptions and closing the client. Previously the drain returned immediately, which could interrupt concurrent handlers mid-settlement. Applied consistently across V1 (ServiceBusProcessorClient), V2 (MessagePump), and sessions (SessionsMessagePump). Notification threshold updated from == 0 to <= 1 so the re-entrant waiter gets notified.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (3)
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java:367
- The public Javadoc for
close()doesn’t mention that it can now block while draining in-flight handlers (up to the 30s timeout) before cancelling subscriptions/closing the client. This is a behavioral change that callers should be aware of (e.g., avoid calling close on latency-sensitive threads, and understand what happens on timeout/interruption). Consider updating the method Javadoc to document the drain + timeout semantics.
/**
* Stops message processing and closes the processor. The receiving links and sessions are closed and calling
* {@link #start()} will create a new processing cycle with new links and new sessions.
*/
@Override
public synchronized void close() {
if (processorV2 != null) {
processorV2.close();
return;
}
isRunning.set(false);
// Drain in-flight V1 message handlers BEFORE cancelling subscriptions.
// Cancelling subscriptions triggers Reactor's publishOn worker disposal, which interrupts
// handler threads. Draining first ensures handlers can complete message settlement
// before the underlying client is closed.
// See https://github.com/Azure/azure-sdk-for-java/issues/45716
drainV1Handlers(V1_DRAIN_TIMEOUT);
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:160
- This test starts a non-daemon
Threadto runpump.dispose()but doesn’t ensure it’s always unblocked/joined in afinallypath. If an assertion fails beforehandlerCanProceed.countDown(), the dispose thread can remain blocked indrainHandlers()for up to the full timeout, potentially delaying the suite or keeping the JVM alive. Consider usingtry/finallycleanup (release latches + join/interrupt the thread) or run the work via an executor/future with a bounded timeout.
final CountDownLatch disposeDone = new CountDownLatch(1);
final Thread disposeThread = new Thread(() -> {
pump.dispose();
disposeDone.countDown();
});
disposeThread.start();
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:255
- Similar to the dispose-thread case, this test starts a non-daemon
Threadthat can block insideprocessorClient.close()until the handler latch is released. If the test fails beforehandlerCanProceed.countDown(), the thread may remain blocked for up to the drain timeout and leak into subsequent tests. Consider ensuring the latch is released and the thread is joined/terminated in afinallycleanup path.
final CountDownLatch closeDone = new CountDownLatch(1);
final Thread closeThread = new Thread(() -> {
processorClient.close();
closeDone.countDown();
});
closeThread.start();
- Document that close() blocks while draining in-flight handlers (up to 30s) before cancelling subscriptions and closing the underlying client. - Add try/finally in V2 and V1 drain tests to release latches and join threads on assertion failure, preventing leaked threads from blocking the test suite.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java:461
terminate(...)now callsdrainHandlers(DRAIN_TIMEOUT)for all terminal signals (COMPLETED/ERRORED/CANCELED). Becausebegin()wiresterminate(TerminalSignalType.ERRORED, ...), a transient receive error can now block termination for up to 30s, delaying session pump recovery/rolling to the next pump and impacting availability. Consider limiting draining to the explicit shutdown path (e.g.,TerminalSignalType.CANCELED) or using a much smaller timeout for ERRORED so error recovery isn’t artificially delayed by the full drain timeout.
private Mono<Void> terminate(TerminalSignalType signalType, Scheduler workerScheduler) {
final State<ServiceBusSessionReactorReceiver> state = super.getAndSet(TERMINATED);
if (state == TERMINATED) {
return Mono.empty();
}
// By the time one of the terminal handlers in RollingSessionReceiver.begin() invokes this terminate(,) API,
// it is guaranteed that the ServiceBusSessionReactorReceiver instance in 'state' (i.e. current session internal
// receiver) is closed by the backing MessageFlux. Hence, we only need to dispose the resources not managed
// by the ServiceBusSessionReactorReceiver.
logger.atInfo().log("Roller terminated. rollerId:" + rollerId + " signal:" + signalType);
nextSessionStream.close();
// Drain in-flight message handlers BEFORE disposing the worker scheduler.
// Disposing the scheduler interrupts handler threads (via ScheduledExecutorService.shutdownNow()).
// Draining first ensures handlers can complete message settlement before threads are interrupted.
// See https://github.com/Azure/azure-sdk-for-java/issues/45716
drainHandlers(DRAIN_TIMEOUT);
workerScheduler.dispose();
return Mono.empty();
...aging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/MessagePump.java:232
drainHandlers()setsclosing=truebut the upstreamnonSessionProcessorReceiveV2()subscription remains active untildisposable.dispose()runs. WhiledrainHandlers()is waiting on a long-running in-flight handler, the processor can continue receiving additional messages, which will be skipped (no settlement) and may be locked/held until the receiver is finally disposed. Consider stopping/pausing upstream message delivery as soon as shutdown begins (while still letting already-started handlers finish) to avoid hoarding locks and increasing redeliveries during close.
boolean drainHandlers(Duration timeout) {
closing = true;
final int threshold;
if (isHandlerThread.get()) {
// Re-entrant call from within a message handler (e.g., user called close() inside processMessage).
// Cannot wait for this thread's own handler to complete (would self-deadlock), but we can
// wait for OTHER concurrent handlers to finish settlement before the underlying client is disposed.
...aging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusProcessorClient.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:666
- This test is intended to assert that
v1Closingprevents a second handler from running during the drain→cancel window, but withmaxConcurrentCalls=1the V1 subscriber only callssubscription.request(1)whenisRunningis true. Sinceclose()setsisRunning=falsebefore draining, the firstonNextwill not request the next message, somessage2may never be delivered andhandler2ProcessMessageInvokedcan remain false without actually exercising the closing-flag path.
To make the assertion meaningful, restructure the test to guarantee a second onNext attempt after v1Closing is set (e.g., use maxConcurrentCalls=2 so another subscriber has an outstanding request, or use a controllable publisher/sink and explicitly coordinate demand/emission).
final ServiceBusProcessorClientOptions options
= new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1);
final ServiceBusProcessorClient processorClient
...e-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/SessionsMessagePump.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (2)
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:513
- This test relies on a fixed
Thread.sleep(500)to assume the delayed second message has been emitted/processed before assertions. On slower/loaded CI,message2may arrive after the assertion, which can produce a false negative (the closing-flag regression wouldn’t be detected). Prefer synchronizing deterministically (e.g., add a latch/flag that is tripped whenhandleMessageis invoked formessage2, and await it before asserting).
// Give time for drain to start (sets closing=true) and for message2 to arrive.
Thread.sleep(500);
// Release handler1 so the drain can complete.
handler1CanProceed.countDown();
// Wait for drain to finish.
assertTrue(drainDone.await(5, TimeUnit.SECONDS), "Drain should complete after handler1 finishes");
assertTrue(drainResult.get(), "Drain should return true (all handlers completed)");
// The second message's handler should NOT have invoked processMessage because closing was true.
assertFalse(handler2ProcessMessageInvoked.get(), "Second handler should have been skipped by the closing flag");
sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusProcessorGracefulShutdownTest.java:690
- This test’s timing makes it possible for
message2to never be delivered beforeclose()cancels subscriptions (it’s delayed 300ms, but the test releases handler1 after a 200ms sleep andclose()can cancel immediately). In that case the assertion passes even if the V1 closing-flag check is broken. Make the test deterministic by explicitly waiting untilonNextis invoked formessage2(e.g., a latch) before allowingclose()to proceed to cancellation, then assert that the handler work was skipped.
try {
// Give close a moment to enter drainV1Handlers (sets v1Closing=true).
Thread.sleep(200);
// Release handler1 so the drain completes. After drain returns, close() proceeds to
// cancel subscriptions. Message2 may arrive in this window via the outstanding request(1).
handler1CanProceed.countDown();
Fixes #45716
When
ServiceBusProcessorClient.close()is called while message handlers are still executing, the receiver is disposed immediately, causing in-flight handlers to fail withIllegalStateException: Cannot perform operation on a disposed receiver.What this PR does
Adds drain-before-dispose logic to all processor shutdown paths. An
AtomicIntegerhandler counter withObjectmonitor wait/notify blocksclose()until all in-flight message handlers complete (or a 30-second timeout expires) before subscription cancellation/disposal:MessagePump(V2 non-session) —drainHandlers()addedServiceBusProcessor.RollingMessagePump(V2 lifecycle) — callspump.drainHandlers()beforedisposable.dispose()SessionsMessagePump.RollingSessionReceiver(V2 session) — per-session drain interminate()beforeworkerScheduler.dispose()ServiceBusProcessorClient(V1) —drainV1Handlers()beforereceiverSubscriptions.cancel()This mirrors the .NET SDK's
StopProcessingAsyncbehavior which awaitsTask.WhenAllon in-flight handlers before disposing.Tests
3 new regression tests in
ServiceBusProcessorGracefulShutdownTest:Full module test suite: 947 tests pass, 0 failures, 0 errors.