From 014a951e7cb540e5a91b9c8b083bbd7358ea9d1e Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Mon, 24 Nov 2025 20:06:30 +0800 Subject: [PATCH 1/2] Optimize shutdownScheduledExecutorService to ensure complete task termination Change-Id: I288dcab67dcdb58acf154fd9d3f5f5c2f85fb32d Co-developed-by: Cursor --- .../rocketmq/broker/BrokerController.java | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 7b1701c61a0..4d80a4ac053 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1663,11 +1663,35 @@ protected void shutdownScheduledExecutorService(ScheduledExecutorService schedul if (scheduledExecutorService == null) { return; } + + // Graceful shutdown: stop accepting new tasks and wait for submitted tasks to complete scheduledExecutorService.shutdown(); + try { - scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS); - } catch (InterruptedException ignore) { - BrokerController.LOG.warn("shutdown ScheduledExecutorService was Interrupted! ", ignore); + // Wait for tasks to complete, at most 30 seconds + if (!scheduledExecutorService.awaitTermination(30000, TimeUnit.MILLISECONDS)) { + // If timeout, force shutdown all tasks + BrokerController.LOG.warn("ScheduledExecutorService did not terminate gracefully, forcing shutdown..."); + List pendingTasks = scheduledExecutorService.shutdownNow(); + + if (!pendingTasks.isEmpty()) { + BrokerController.LOG.warn("ScheduledExecutorService had {} pending tasks that were cancelled", + pendingTasks.size()); + } + + // Wait again for a period to ensure all tasks are terminated + if (!scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + BrokerController.LOG.error("ScheduledExecutorService did not terminate after forced shutdown"); + } else { + BrokerController.LOG.info("ScheduledExecutorService terminated successfully after forced shutdown"); + } + } else { + BrokerController.LOG.debug("ScheduledExecutorService terminated gracefully"); + } + } catch (InterruptedException e) { + // If interrupted during waiting, force shutdown + BrokerController.LOG.warn("shutdown ScheduledExecutorService was Interrupted, forcing shutdown...", e); + scheduledExecutorService.shutdownNow(); Thread.currentThread().interrupt(); } } From 31b7c8a66c5cbc30432ec0841d10c092f84f2e72 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Mon, 24 Nov 2025 20:06:53 +0800 Subject: [PATCH 2/2] Optimize shutdownScheduledExecutorService to ensure complete task termination Change-Id: Ib637e6b2eb79cc108cdb6eee9a24f492f5f0c129 --- .../org/apache/rocketmq/broker/BrokerController.java | 10 +++++----- .../org/apache/rocketmq/broker/util/HookUtils.java | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 4d80a4ac053..a9a2a29718b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -1668,17 +1668,17 @@ protected void shutdownScheduledExecutorService(ScheduledExecutorService schedul scheduledExecutorService.shutdown(); try { - // Wait for tasks to complete, at most 30 seconds - if (!scheduledExecutorService.awaitTermination(30000, TimeUnit.MILLISECONDS)) { + // Wait for tasks to complete, at most 60 seconds + if (!scheduledExecutorService.awaitTermination(60000, TimeUnit.MILLISECONDS)) { // If timeout, force shutdown all tasks BrokerController.LOG.warn("ScheduledExecutorService did not terminate gracefully, forcing shutdown..."); List pendingTasks = scheduledExecutorService.shutdownNow(); - + if (!pendingTasks.isEmpty()) { - BrokerController.LOG.warn("ScheduledExecutorService had {} pending tasks that were cancelled", + BrokerController.LOG.warn("ScheduledExecutorService had {} pending tasks that were cancelled", pendingTasks.size()); } - + // Wait again for a period to ensure all tasks are terminated if (!scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS)) { BrokerController.LOG.error("ScheduledExecutorService did not terminate after forced shutdown"); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/util/HookUtils.java b/broker/src/main/java/org/apache/rocketmq/broker/util/HookUtils.java index dec42351d9f..7735c26e919 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/util/HookUtils.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/util/HookUtils.java @@ -75,7 +75,7 @@ public static PutMessageResult checkBeforePutMessage(BrokerController brokerCont if (!brokerController.getMessageStore().getRunningFlags().isWriteable()) { long value = PRINT_TIMES.getAndIncrement(); if ((value % 50000) == 0) { - LOG.warn("message store is not writeable, so putMessage is forbidden " + brokerController.getMessageStore().getRunningFlags().getFlagBits()); + LOG.warn("message store is not writeable, putMessage is forbidden " + brokerController.getMessageStore().getRunningFlags().getFlagBits()); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);