Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -56,6 +58,10 @@ final class MessagePump {
private final boolean enableAutoLockRenew;
private final Scheduler workerScheduler;
private final ServiceBusReceiverInstrumentation instrumentation;
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
private final Object drainLock = new Object();
private final ThreadLocal<Boolean> isHandlerThread = ThreadLocal.withInitial(() -> Boolean.FALSE);
private volatile boolean closing;

/**
* Instantiate {@link MessagePump} that pumps messages emitted by the given {@code client}. The messages
Expand Down Expand Up @@ -138,24 +144,39 @@ private Mono<Void> pollConnectionState() {
}

private void handleMessage(ServiceBusReceivedMessage message) {
instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> {
final Disposable lockRenewDisposable;
if (enableAutoLockRenew) {
lockRenewDisposable = client.beginLockRenewal(message);
} else {
lockRenewDisposable = Disposables.disposed();
activeHandlerCount.incrementAndGet();
isHandlerThread.set(Boolean.TRUE);
try {
if (closing) {
logger.atVerbose().log("Skipping handler execution, pump is closing.");
return;
}
final Throwable error = notifyMessage(message);
if (enableAutoDisposition) {
if (error == null) {
complete(message);
instrumentation.instrumentProcess(message, ReceiverKind.PROCESSOR, msg -> {
final Disposable lockRenewDisposable;
if (enableAutoLockRenew) {
lockRenewDisposable = client.beginLockRenewal(message);
} else {
abandon(message);
lockRenewDisposable = Disposables.disposed();
}
final Throwable error = notifyMessage(message);
if (enableAutoDisposition) {
if (error == null) {
complete(message);
} else {
abandon(message);
}
}
lockRenewDisposable.dispose();
return error;
});
} finally {
isHandlerThread.remove();
if (activeHandlerCount.decrementAndGet() <= 1) {
synchronized (drainLock) {
drainLock.notifyAll();
}
}
lockRenewDisposable.dispose();
return error;
});
}
}

private Throwable notifyMessage(ServiceBusReceivedMessage message) {
Expand Down Expand Up @@ -193,6 +214,57 @@ private void abandon(ServiceBusReceivedMessage message) {
}
}

/**
* Wait for all in-flight message handlers to complete, up to the specified timeout.
* This is called during processor close to ensure graceful shutdown — all messages currently
* being processed are allowed to complete (including settlement) before the underlying client
* is disposed.
*
* @param timeout the maximum time to wait for in-flight handlers to complete.
* @return true if all handlers completed within the timeout, false otherwise.
*/
boolean drainHandlers(Duration timeout) {
closing = true;
final int threshold;
if (isHandlerThread.get()) {
// Re-entrant call from within a message handler (e.g., user called close() inside processMessage).
// Cannot wait for this thread's own handler to complete (would self-deadlock), but we can
// wait for OTHER concurrent handlers to finish settlement before the underlying client is disposed.
threshold = 1;
if (activeHandlerCount.get() <= threshold) {
return true;
}
logger.atInfo()
.addKeyValue("otherActiveHandlers", activeHandlerCount.get() - 1)
.log("drainHandlers called from within a message handler (re-entrant). "
+ "Waiting for other active handlers to complete.");
} else {
threshold = 0;
}
final long deadline = System.nanoTime() + timeout.toNanos();
synchronized (drainLock) {
while (activeHandlerCount.get() > threshold) {
final long remainingNanos = deadline - System.nanoTime();
if (remainingNanos <= 0) {
logger.atWarning()
.addKeyValue("activeHandlers", activeHandlerCount.get())
.log("Drain timeout expired with active handlers still running.");
return false;
}
try {
final long millis = TimeUnit.NANOSECONDS.toMillis(remainingNanos);
final int nanos = (int) (remainingNanos % 1_000_000);
drainLock.wait(millis, nanos);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.atWarning().log("Drain interrupted while waiting for in-flight handlers.");
return false;
}
}
}
return true;
}

private void logCPUResourcesConcurrencyMismatch() {
final int cores = Runtime.getRuntime().availableProcessors();
final int poolSize = DEFAULT_BOUNDED_ELASTIC_SIZE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ static final class RollingMessagePump extends AtomicBoolean {
private static final RuntimeException DISPOSED_ERROR
= new RuntimeException("The Processor closure disposed the RollingMessagePump.");
private static final Duration NEXT_PUMP_BACKOFF = Duration.ofSeconds(5);
private static final Duration DRAIN_TIMEOUT = Duration.ofSeconds(30);
private final ClientLogger logger;
private final Kind kind;
private final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder nonSessionBuilder;
Expand All @@ -151,6 +152,7 @@ static final class RollingMessagePump extends AtomicBoolean {
private final Boolean enableAutoDisposition;
private final Disposable.Composite disposable = Disposables.composite();
private final AtomicReference<String> clientIdentifier = new AtomicReference<>();
private volatile MessagePump currentPump;

/**
* Instantiate {@link RollingMessagePump} that stream messages using {@link MessagePump}.
Expand Down Expand Up @@ -228,6 +230,7 @@ Mono<Void> beginIntern() {
clientIdentifier.set(client.getIdentifier());
final MessagePump pump
= new MessagePump(client, processMessage, processError, concurrency, enableAutoDisposition);
currentPump = pump;
return pump.begin();
}, client -> {
client.close();
Expand Down Expand Up @@ -256,6 +259,15 @@ String getClientIdentifier() {
}

void dispose() {
// Drain in-flight message handlers BEFORE disposing the subscription.
// Disposing cancels the reactive chain, which interrupts handler threads (via Reactor's
// publishOn worker disposal). Draining first ensures handlers can complete message
// settlement before the client is closed.
// See https://github.com/Azure/azure-sdk-for-java/issues/45716
final MessagePump pump = currentPump;
if (pump != null) {
pump.drainHandlers(DRAIN_TIMEOUT);
}
disposable.dispose();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
import reactor.core.Disposable;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

Expand Down Expand Up @@ -186,6 +188,7 @@
public final class ServiceBusProcessorClient implements AutoCloseable {

private static final int SCHEDULER_INTERVAL_IN_SECONDS = 10;
private static final Duration V1_DRAIN_TIMEOUT = Duration.ofSeconds(30);
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusProcessorClient.class);
private final ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder;
private final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder;
Expand All @@ -203,6 +206,11 @@ public final class ServiceBusProcessorClient implements AutoCloseable {
private Disposable monitorDisposable;
private boolean wasStopped = false;
private final ServiceBusProcessor processorV2;
// V1 handler tracking for graceful shutdown (not used in V2 path).
private final AtomicInteger activeV1HandlerCount = new AtomicInteger(0);
private final Object v1DrainLock = new Object();
private final ThreadLocal<Boolean> isV1HandlerThread = ThreadLocal.withInitial(() -> Boolean.FALSE);
private volatile boolean v1Closing;

/**
* Constructor to create a sessions-enabled processor.
Expand Down Expand Up @@ -301,6 +309,9 @@ public synchronized void start() {
return;
}

// Reset shutdown-only state so the processor can restart after a close() cycle.
v1Closing = false;

if (wasStopped) {
wasStopped = false;
LOGGER.warning(
Expand Down Expand Up @@ -344,6 +355,13 @@ public synchronized void stop() {
/**
* Stops message processing and closes the processor. The receiving links and sessions are closed and calling
* {@link #start()} will create a new processing cycle with new links and new sessions.
*
* <p>This method blocks while waiting for in-flight message handlers to complete (up to 30 seconds) before
* cancelling subscriptions and closing the underlying client. This ensures handlers can finish message
* settlement without encountering a disposed receiver. Callers should avoid invoking {@code close()} on
* latency-sensitive threads. If the drain timeout expires, the processor proceeds with shutdown regardless.</p>
*
* @see <a href="https://github.com/Azure/azure-sdk-for-java/issues/45716">Issue #45716</a>
*/
@Override
public synchronized void close() {
Expand All @@ -352,6 +370,12 @@ public synchronized void close() {
return;
}
isRunning.set(false);
// Drain in-flight V1 message handlers BEFORE cancelling subscriptions.
// Cancelling subscriptions triggers Reactor's publishOn worker disposal, which interrupts
// handler threads. Draining first ensures handlers can complete message settlement
// before the underlying client is closed.
// See https://github.com/Azure/azure-sdk-for-java/issues/45716
drainV1Handlers(V1_DRAIN_TIMEOUT);
receiverSubscriptions.keySet().forEach(Subscription::cancel);
receiverSubscriptions.clear();
if (monitorDisposable != null) {
Expand Down Expand Up @@ -449,36 +473,51 @@ public void onSubscribe(Subscription subscription) {
@SuppressWarnings("try")
@Override
public void onNext(ServiceBusMessageContext serviceBusMessageContext) {
Context span = serviceBusMessageContext.getMessage() != null
? serviceBusMessageContext.getMessage().getContext()
: Context.NONE;
Exception exception = null;
AutoCloseable scope = tracer.makeSpanCurrent(span);
activeV1HandlerCount.incrementAndGet();
isV1HandlerThread.set(Boolean.TRUE);
try {
if (serviceBusMessageContext.hasError()) {
handleError(serviceBusMessageContext.getThrowable());
} else {
ServiceBusReceivedMessageContext serviceBusReceivedMessageContext
= new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext);

try {
processMessage.accept(serviceBusReceivedMessageContext);
} catch (Exception ex) {
handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK));

if (!processorOptions.isDisableAutoComplete()) {
LOGGER.warning("Error when processing message. Abandoning message.", ex);
abandonMessage(serviceBusMessageContext, receiverClient);
if (v1Closing) {
LOGGER.verbose("Skipping V1 handler execution, processor is closing.");
return;
}
Context span = serviceBusMessageContext.getMessage() != null
? serviceBusMessageContext.getMessage().getContext()
: Context.NONE;
Exception exception = null;
AutoCloseable scope = tracer.makeSpanCurrent(span);
try {
if (serviceBusMessageContext.hasError()) {
handleError(serviceBusMessageContext.getThrowable());
} else {
ServiceBusReceivedMessageContext serviceBusReceivedMessageContext
= new ServiceBusReceivedMessageContext(receiverClient, serviceBusMessageContext);

try {
processMessage.accept(serviceBusReceivedMessageContext);
} catch (Exception ex) {
handleError(new ServiceBusException(ex, ServiceBusErrorSource.USER_CALLBACK));

if (!processorOptions.isDisableAutoComplete()) {
LOGGER.warning("Error when processing message. Abandoning message.", ex);
abandonMessage(serviceBusMessageContext, receiverClient);
}
exception = ex;
}
exception = ex;
}
}
if (isRunning.get()) {
LOGGER.verbose("Requesting 1 more message from upstream");
subscription.request(1);
if (isRunning.get()) {
LOGGER.verbose("Requesting 1 more message from upstream");
subscription.request(1);
}
} finally {
tracer.endSpan(exception, span, scope);
}
} finally {
tracer.endSpan(exception, span, scope);
isV1HandlerThread.remove();
if (activeV1HandlerCount.decrementAndGet() <= 1) {
synchronized (v1DrainLock) {
v1DrainLock.notifyAll();
}
}
}
}

Expand Down Expand Up @@ -555,4 +594,49 @@ private ServiceBusReceiverAsyncClient createNewReceiver() {
? this.sessionReceiverBuilder.buildAsyncClientForProcessor()
: this.receiverBuilder.buildAsyncClientForProcessor();
}

/**
* Wait for all in-flight V1 message handlers to complete, up to the specified timeout.
* Called during V1 close to ensure graceful shutdown before disposing the underlying client.
*
* @param timeout the maximum time to wait for handlers to complete.
*/
private void drainV1Handlers(Duration timeout) {
v1Closing = true;
final int threshold;
if (isV1HandlerThread.get()) {
// Re-entrant call from within a V1 message handler (e.g., user called close() inside processMessage).
// Cannot wait for this thread's own handler to complete (would self-deadlock), but we can
// wait for OTHER concurrent handlers to finish settlement before cancelling subscriptions
// and closing the underlying client.
threshold = 1;
if (activeV1HandlerCount.get() <= threshold) {
return;
}
LOGGER.info("drainV1Handlers called from within a V1 message handler (re-entrant). "
+ "Waiting for {} other active handler(s) to complete.", activeV1HandlerCount.get() - 1);
} else {
threshold = 0;
}
final long deadline = System.nanoTime() + timeout.toNanos();
synchronized (v1DrainLock) {
while (activeV1HandlerCount.get() > threshold) {
final long remainingNanos = deadline - System.nanoTime();
if (remainingNanos <= 0) {
LOGGER.warning("V1 drain timeout expired with {} active handlers still running.",
activeV1HandlerCount.get());
return;
}
try {
final long millis = TimeUnit.NANOSECONDS.toMillis(remainingNanos);
final int nanos = (int) (remainingNanos % 1_000_000);
v1DrainLock.wait(millis, nanos);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warning("V1 drain interrupted while waiting for in-flight handlers.");
return;
}
}
}
}
}
Loading
Loading