diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 7b1701c61a0..a9a2a29718b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1663,11 +1663,35 @@ protected void shutdownScheduledExecutorService(ScheduledExecutorService schedul if (scheduledExecutorService == null) { return; } + + // Graceful shutdown: stop accepting new tasks and wait for submitted tasks to complete scheduledExecutorService.shutdown(); + try { - scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS); - } catch (InterruptedException ignore) { - BrokerController.LOG.warn("shutdown ScheduledExecutorService was Interrupted! ", ignore); + // Wait for tasks to complete, at most 60 seconds + if (!scheduledExecutorService.awaitTermination(60000, TimeUnit.MILLISECONDS)) { + // If timeout, force shutdown all tasks + BrokerController.LOG.warn("ScheduledExecutorService did not terminate gracefully, forcing shutdown..."); + List pendingTasks = scheduledExecutorService.shutdownNow(); + + if (!pendingTasks.isEmpty()) { + BrokerController.LOG.warn("ScheduledExecutorService had {} pending tasks that were cancelled", + pendingTasks.size()); + } + + // Wait again for a period to ensure all tasks are terminated + if (!scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + BrokerController.LOG.error("ScheduledExecutorService did not terminate after forced shutdown"); + } else { + BrokerController.LOG.info("ScheduledExecutorService terminated successfully after forced shutdown"); + } + } else { + BrokerController.LOG.debug("ScheduledExecutorService terminated gracefully"); + } + } catch (InterruptedException e) { + // If interrupted during waiting, force shutdown + BrokerController.LOG.warn("shutdown ScheduledExecutorService was Interrupted, forcing shutdown...", e); + scheduledExecutorService.shutdownNow(); Thread.currentThread().interrupt(); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/util/HookUtils.java b/broker/src/main/java/org/apache/rocketmq/broker/util/HookUtils.java index dec42351d9f..7735c26e919 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/util/HookUtils.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/util/HookUtils.java @@ -75,7 +75,7 @@ public static PutMessageResult checkBeforePutMessage(BrokerController brokerCont if (!brokerController.getMessageStore().getRunningFlags().isWriteable()) { long value = PRINT_TIMES.getAndIncrement(); if ((value % 50000) == 0) { - LOG.warn("message store is not writeable, so putMessage is forbidden " + brokerController.getMessageStore().getRunningFlags().getFlagBits()); + LOG.warn("message store is not writeable, putMessage is forbidden " + brokerController.getMessageStore().getRunningFlags().getFlagBits()); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);