From 88f9de5eaf12067a9a52a3d725e396902122bedf Mon Sep 17 00:00:00 2001 From: qianye Date: Mon, 29 Dec 2025 15:11:47 +0800 Subject: [PATCH 01/16] fix Change-Id: I64287b60b56d5526dd299bfa3989581953e1a2a8 --- .../route/AddressableMessageQueue.java | 50 ++------ .../service/route/MessageQueuePenalizer.java | 84 ++++++++++++++ .../service/route/MessageQueueSelector.java | 109 +++++------------- 3 files changed, 120 insertions(+), 123 deletions(-) create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java index ca877f3278f..c831b4d9f84 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java @@ -16,23 +16,22 @@ */ package org.apache.rocketmq.proxy.service.route; -import com.google.common.base.MoreObjects; -import java.util.Objects; import org.apache.rocketmq.common.message.MessageQueue; -public class AddressableMessageQueue implements Comparable { - - private final MessageQueue messageQueue; +public class AddressableMessageQueue extends MessageQueue { private final String brokerAddr; public AddressableMessageQueue(MessageQueue messageQueue, String brokerAddr) { - this.messageQueue = messageQueue; + super(messageQueue); this.brokerAddr = brokerAddr; } - @Override - public int compareTo(AddressableMessageQueue o) { - return messageQueue.compareTo(o.messageQueue); + public String getBrokerAddr() { + return brokerAddr; + } + + public MessageQueue getMessageQueue() { + return new MessageQueue(getTopic(), getBrokerName(), getQueueId()); } @Override @@ -43,40 +42,11 @@ public boolean equals(Object o) { if (!(o instanceof AddressableMessageQueue)) { return false; } - AddressableMessageQueue queue = (AddressableMessageQueue) o; - return Objects.equals(messageQueue, queue.messageQueue); - } - - @Override - public int hashCode() { - return messageQueue == null ? 1 : messageQueue.hashCode(); - } - - public int getQueueId() { - return this.messageQueue.getQueueId(); - } - - public String getBrokerName() { - return this.messageQueue.getBrokerName(); - } - - public String getTopic() { - return messageQueue.getTopic(); - } - - public MessageQueue getMessageQueue() { - return messageQueue; - } - - public String getBrokerAddr() { - return brokerAddr; + return super.equals(o); } @Override public String toString() { - return MoreObjects.toStringHelper(this) - .add("messageQueue", messageQueue) - .add("brokerAddr", brokerAddr) - .toString(); + return super.toString(); } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java new file mode 100644 index 00000000000..fd479eacb2e --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java @@ -0,0 +1,84 @@ +package org.apache.rocketmq.proxy.service.route; + +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.rocketmq.common.message.MessageQueue; + +@FunctionalInterface +public interface MessageQueuePenalizer { + + /** + * Returns the penalty value for the given MessageQueue; lower is better. + */ + int penaltyOf(Q messageQueue); + + /** + * Aggregates penalties from multiple penalizers for the same MessageQueue (by summing them up). + */ + static int evaluatePenalty(Q messageQueue, List> penalizers) { + Objects.requireNonNull(messageQueue, "messageQueue"); + if (penalizers == null || penalizers.isEmpty()) { + return 0; + } + return penalizers.stream() + .mapToInt(p -> p.penaltyOf(messageQueue)) + .sum(); + } + + /** + * Selects the MessageQueue with the smallest total penalty. + * Returns null if queues is null/empty. + */ + static Pair selectLeastPenalty(List queues, + List> penalizers, AtomicInteger startIndex) { + if (queues == null || queues.isEmpty()) { + return null; + } + Q bestQueue = null; + int bestPenalty = Integer.MAX_VALUE; + + for (int i = 0; i < queues.size(); i++) { + int index = Math.floorMod(startIndex.getAndIncrement(), queues.size()); + Q messageQueue = queues.get(index); + int penalty = evaluatePenalty(messageQueue, penalizers); + + // Short-circuit: cannot do better than 0 + if (penalty <= 0) { + return Pair.of(messageQueue, penalty); + } + + if (penalty < bestPenalty) { + bestPenalty = penalty; + bestQueue = messageQueue; + } + } + return Pair.of(bestQueue, bestPenalty); + } + + /** + * Selects the MessageQueue with the smallest total penalty. + * Returns null if queuesWithPriority is null/empty. + */ + static Pair selectLeastPenaltyWithPriority(List> queuesWithPriority, + List> penalizers, AtomicInteger startIndex) { + if (queuesWithPriority == null || queuesWithPriority.isEmpty()) { + return null; + } + Q bestQueue = null; + int bestPenalty = Integer.MAX_VALUE; + for (List queues : queuesWithPriority) { + Pair queueAndPenalty = selectLeastPenalty(queues, penalizers, startIndex); + int penalty = queueAndPenalty.getRight(); + if (queueAndPenalty.getRight() <= 0) { + return queueAndPenalty; + } + if (penalty < bestPenalty) { + bestPenalty = penalty; + bestQueue = queueAndPenalty.getLeft(); + } + } + return Pair.of(bestQueue, bestPenalty); + } +} \ No newline at end of file diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java index f25fb907ef2..0bdd92b86f1 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.proxy.service.route; import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; import com.google.common.math.IntMath; import java.util.ArrayList; import java.util.Collections; @@ -30,13 +29,16 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.route.QueueData; +import static org.apache.rocketmq.proxy.service.route.MessageQueuePenalizer.selectLeastPenalty; + public class MessageQueueSelector { private static final int BROKER_ACTING_QUEUE_ID = -1; @@ -47,7 +49,7 @@ public class MessageQueueSelector { private final Map brokerNameQueueMap = new ConcurrentHashMap<>(); private final AtomicInteger queueIndex; private final AtomicInteger brokerIndex; - private MQFaultStrategy mqFaultStrategy; + private final List> penalizers = new ArrayList<>(); public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, MQFaultStrategy mqFaultStrategy, boolean read) { if (read) { @@ -59,7 +61,18 @@ public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, MQFaultStrategy Random random = new Random(); this.queueIndex = new AtomicInteger(random.nextInt()); this.brokerIndex = new AtomicInteger(random.nextInt()); - this.mqFaultStrategy = mqFaultStrategy; + this.addPenalizer(messageQueue -> { + if (mqFaultStrategy.getAvailableFilter().filter(messageQueue)) { + return 0; + } + return 10; + }); + this.addPenalizer(messageQueue -> { + if (mqFaultStrategy.getReachableFilter().filter(messageQueue)) { + return 0; + } + return 100; + }); } private static List buildRead(TopicRouteWrapper topicRoute) { @@ -138,7 +151,7 @@ private static List buildWrite(TopicRouteWrapper topicR private void buildBrokerActingQueues(String topic, List normalQueues) { for (AddressableMessageQueue mq : normalQueues) { AddressableMessageQueue brokerActingQueue = new AddressableMessageQueue( - new MessageQueue(topic, mq.getMessageQueue().getBrokerName(), BROKER_ACTING_QUEUE_ID), + new MessageQueue(topic, mq.getBrokerName(), BROKER_ACTING_QUEUE_ID), mq.getBrokerAddr()); if (!brokerActingQueues.contains(brokerActingQueue)) { @@ -160,38 +173,12 @@ public AddressableMessageQueue selectOne(boolean onlyBroker) { } public AddressableMessageQueue selectOneByPipeline(boolean onlyBroker) { - if (mqFaultStrategy != null && mqFaultStrategy.isSendLatencyFaultEnable()) { - List messageQueueList = null; - MessageQueue messageQueue = null; - if (onlyBroker) { - messageQueueList = transferAddressableQueues(brokerActingQueues); - } else { - messageQueueList = transferAddressableQueues(queues); - } - AddressableMessageQueue addressableMessageQueue = null; - - // use both available filter. - messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex, - mqFaultStrategy.getAvailableFilter(), mqFaultStrategy.getReachableFilter()); - addressableMessageQueue = transferQueue2Addressable(messageQueue); - if (addressableMessageQueue != null) { - return addressableMessageQueue; - } - - // use available filter. - messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex, - mqFaultStrategy.getAvailableFilter()); - addressableMessageQueue = transferQueue2Addressable(messageQueue); - if (addressableMessageQueue != null) { - return addressableMessageQueue; - } - - // no available filter, then use reachable filter. - messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex, - mqFaultStrategy.getReachableFilter()); - addressableMessageQueue = transferQueue2Addressable(messageQueue); - if (addressableMessageQueue != null) { - return addressableMessageQueue; + if (CollectionUtils.isNotEmpty(penalizers)) { + List messageQueueList = onlyBroker ? brokerActingQueues : queues ; + Pair queueAndPenalty = selectLeastPenalty(messageQueueList, penalizers, onlyBroker ? brokerIndex : queueIndex); + AddressableMessageQueue messageQueue = queueAndPenalty.getLeft(); + if (messageQueue != null) { + return messageQueue; } } @@ -199,46 +186,6 @@ public AddressableMessageQueue selectOneByPipeline(boolean onlyBroker) { return selectOne(onlyBroker); } - private MessageQueue selectOneMessageQueue(List messageQueueList, AtomicInteger sendQueue, TopicPublishInfo.QueueFilter...filter) { - if (messageQueueList == null || messageQueueList.isEmpty()) { - return null; - } - if (filter != null && filter.length != 0) { - for (int i = 0; i < messageQueueList.size(); i++) { - int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); - MessageQueue mq = messageQueueList.get(index); - boolean filterResult = true; - for (TopicPublishInfo.QueueFilter f: filter) { - Preconditions.checkNotNull(f); - filterResult &= f.filter(mq); - } - if (filterResult) { - return mq; - } - } - } - return null; - } - - public List transferAddressableQueues(List addressableMessageQueueList) { - if (addressableMessageQueueList == null) { - return null; - } - - return addressableMessageQueueList.stream() - .map(AddressableMessageQueue::getMessageQueue) - .collect(Collectors.toList()); - } - - private AddressableMessageQueue transferQueue2Addressable(MessageQueue messageQueue) { - for (AddressableMessageQueue amq: queues) { - if (amq.getMessageQueue().equals(messageQueue)) { - return amq; - } - } - return null; - } - public AddressableMessageQueue selectNextOne(AddressableMessageQueue last) { boolean onlyBroker = last.getQueueId() < 0; AddressableMessageQueue newOne = last; @@ -275,12 +222,8 @@ public List getBrokerActingQueues() { return brokerActingQueues; } - public MQFaultStrategy getMQFaultStrategy() { - return mqFaultStrategy; - } - - public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) { - this.mqFaultStrategy = mqFaultStrategy; + public void addPenalizer(MessageQueuePenalizer penalizer) { + this.penalizers.add(penalizer); } @Override From adc23160e9f9f023057c9ec0345fffcadb690aab Mon Sep 17 00:00:00 2001 From: qianye Date: Mon, 29 Dec 2025 15:12:22 +0800 Subject: [PATCH 02/16] fix Change-Id: Icab066eb50669ef95d05b1b140cdb3c810bceec2 --- .../service/route/MessageQueuePenalizer.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java index fd479eacb2e..005be4eecc4 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.proxy.service.route; import java.util.List; From 3ca62b48bdb3cd6dcb5b119ba92b8524d19de0f9 Mon Sep 17 00:00:00 2001 From: qianye Date: Mon, 29 Dec 2025 15:40:24 +0800 Subject: [PATCH 03/16] fix Change-Id: Ibf9ab37aa9b47734fe2ea76917d7dc30ad7d6526 --- .../service/route/MessageQueuePenalizer.java | 37 +++++++++++++++++-- .../service/route/MessageQueueSelector.java | 9 ++--- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java index 005be4eecc4..df492e03be8 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java @@ -44,8 +44,22 @@ static int evaluatePenalty(Q messageQueue, ListThe method iterates through all queues exactly once, but starts from a rotating index + * derived from {@code startIndex} (round-robin) to avoid always scanning from position 0 .

+ * + *

For each queue, it computes a penalty via {@link #evaluatePenalty} using + * the provided {@code penalizers}. The queue with the smallest penalty is selected.

+ * + *

Short-circuit rule: if any queue has a {@code penalty<= 0}, it is returned immediately, + * since no better result than 0 is expected.

+ * + * @param queues candidate queues to select from + * @param penalizers penalty evaluators applied to each queue + * @param startIndex atomic counter used to determine the rotating start position (round-robin) + * @param queue type + * @return a {@code Pair} of (selected queue, penalty), or {@code null} if {@code queues} is null/empty */ static Pair selectLeastPenalty(List queues, List> penalizers, AtomicInteger startIndex) { @@ -74,8 +88,23 @@ static Pair selectLeastPenalty(List queu } /** - * Selects the MessageQueue with the smallest total penalty. - * Returns null if queuesWithPriority is null/empty. + * Selects a queue with the lowest computed penalty from multiple priority groups. + * + *

The input {@code queuesWithPriority} is a list of queue groups ordered by priority. + * For each priority group, this method delegates to {@link #selectLeastPenalty} to pick the best queue + * within that group and obtain its penalty.

+ * + *

Short-circuit rule: if any priority group yields a queue whose {@code penalty <= 0}, + * that result is returned immediately.

+ * + *

Otherwise, it returns the queue with the smallest positive penalty among all groups. + * If multiple groups produce the same minimum penalty, the first encountered one wins.

+ * + * @param queuesWithPriority priority-ordered groups of queues; each inner list represents one priority level + * @param penalizers penalty calculators used by {@code selectLeastPenalty} to score queues + * @param startIndex round-robin start index forwarded to {@code selectLeastPenalty} to reduce contention/hotspots + * @param queue type + * @return a {@code Pair} of (selected queue, penalty), or {@code null} if {@code queuesWithPriority} is null/empty */ static Pair selectLeastPenaltyWithPriority(List> queuesWithPriority, List> penalizers, AtomicInteger startIndex) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java index 0bdd92b86f1..41d3c66293f 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java @@ -174,11 +174,10 @@ public AddressableMessageQueue selectOne(boolean onlyBroker) { public AddressableMessageQueue selectOneByPipeline(boolean onlyBroker) { if (CollectionUtils.isNotEmpty(penalizers)) { - List messageQueueList = onlyBroker ? brokerActingQueues : queues ; - Pair queueAndPenalty = selectLeastPenalty(messageQueueList, penalizers, onlyBroker ? brokerIndex : queueIndex); - AddressableMessageQueue messageQueue = queueAndPenalty.getLeft(); - if (messageQueue != null) { - return messageQueue; + Pair queueAndPenalty = selectLeastPenalty(onlyBroker ? brokerActingQueues : queues, + penalizers, onlyBroker ? brokerIndex : queueIndex); + if (queueAndPenalty != null && queueAndPenalty.getLeft() != null) { + return queueAndPenalty.getLeft(); } } From af6c3b8dc41b50af14dcfafca382a9ee0069fbe9 Mon Sep 17 00:00:00 2001 From: qianye Date: Mon, 29 Dec 2025 16:59:54 +0800 Subject: [PATCH 04/16] fix Change-Id: I1dee395ecb3adb61b1414cc2bd3ae1f0ec25e0fd --- .../route/AddressableMessageQueue.java | 5 +++ .../service/route/MessageQueuePenalizer.java | 2 +- .../service/route/TopicRouteService.java | 32 ++++++++++--------- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java index c831b4d9f84..c6f57f6c4f8 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java @@ -34,6 +34,11 @@ public MessageQueue getMessageQueue() { return new MessageQueue(getTopic(), getBrokerName(), getQueueId()); } + @Override + public int hashCode() { + return super.hashCode(); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java index df492e03be8..92c5ce154f2 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java @@ -23,7 +23,7 @@ import org.apache.rocketmq.common.message.MessageQueue; @FunctionalInterface -public interface MessageQueuePenalizer { +public interface MessageQueuePenalizer { /** * Returns the penalty value for the given MessageQueue; lower is better. diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index bcdf8140bc5..50910d7044e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -19,7 +19,6 @@ import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; - import java.time.Duration; import java.util.List; import java.util.Optional; @@ -37,6 +36,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; +import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -134,6 +134,22 @@ public String resolve(String name) { } } }, serviceDetector); + + this.appendStartAndShutdown(new StartAndShutdown() { + @Override + public void shutdown() throws Exception { + if (mqFaultStrategy.isStartDetectorEnable()) { + mqFaultStrategy.shutdown(); + } + } + + @Override + public void start() throws Exception { + if (mqFaultStrategy.isStartDetectorEnable()) { + mqFaultStrategy.startDetector(); + } + } + }); this.init(); } @@ -150,20 +166,6 @@ protected void init() { this.appendStartAndShutdown(this.mqClientAPIFactory); } - @Override - public void shutdown() throws Exception { - if (this.mqFaultStrategy.isStartDetectorEnable()) { - mqFaultStrategy.shutdown(); - } - } - - @Override - public void start() throws Exception { - if (this.mqFaultStrategy.isStartDetectorEnable()) { - this.mqFaultStrategy.startDetector(); - } - } - public ClientConfig extractClientConfigFromProxyConfig(ProxyConfig proxyConfig) { ClientConfig tempClientConfig = new ClientConfig(); tempClientConfig.setSendLatencyEnable(proxyConfig.getSendLatencyEnable()); From b1150d09c6d0cd03dcb572fa330707eb3d809f07 Mon Sep 17 00:00:00 2001 From: qianye Date: Tue, 30 Dec 2025 10:59:55 +0800 Subject: [PATCH 05/16] fix Change-Id: Iae0b9b5cdc334901527864242ee8ceef044c8eac --- .../client/latency/MQFaultStrategy.java | 8 ++++- .../service/route/MessageQueueSelector.java | 19 +++-------- .../proxy/service/route/MessageQueueView.java | 16 ++++++--- .../service/route/TopicRouteService.java | 33 ++++++++----------- .../route/MessageQueueSelectorTest.java | 8 ++--- 5 files changed, 41 insertions(+), 43 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java index 69fb533e5ad..76875378df6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java @@ -21,8 +21,9 @@ import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo.QueueFilter; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.StartAndShutdown; -public class MQFaultStrategy { +public class MQFaultStrategy implements StartAndShutdown { private LatencyFaultTolerance latencyFaultTolerance; private volatile boolean sendLatencyFaultEnable; private volatile boolean startDetectorEnable; @@ -130,6 +131,11 @@ public void startDetector() { this.latencyFaultTolerance.startDetector(); } + @Override + public void start() throws Exception { + this.startDetector(); + } + public void shutdown() { this.latencyFaultTolerance.shutdown(); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java index 41d3c66293f..5d06b4f69a6 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java @@ -32,7 +32,6 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.route.QueueData; @@ -51,7 +50,7 @@ public class MessageQueueSelector { private final AtomicInteger brokerIndex; private final List> penalizers = new ArrayList<>(); - public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, MQFaultStrategy mqFaultStrategy, boolean read) { + public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean read) { if (read) { this.queues.addAll(buildRead(topicRouteWrapper)); } else { @@ -61,18 +60,6 @@ public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, MQFaultStrategy Random random = new Random(); this.queueIndex = new AtomicInteger(random.nextInt()); this.brokerIndex = new AtomicInteger(random.nextInt()); - this.addPenalizer(messageQueue -> { - if (mqFaultStrategy.getAvailableFilter().filter(messageQueue)) { - return 0; - } - return 10; - }); - this.addPenalizer(messageQueue -> { - if (mqFaultStrategy.getReachableFilter().filter(messageQueue)) { - return 0; - } - return 100; - }); } private static List buildRead(TopicRouteWrapper topicRoute) { @@ -222,7 +209,9 @@ public List getBrokerActingQueues() { } public void addPenalizer(MessageQueuePenalizer penalizer) { - this.penalizers.add(penalizer); + if (penalizer != null) { + this.penalizers.add(penalizer); + } } @Override diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java index 898e529f8cb..35cb27beaa7 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java @@ -17,7 +17,8 @@ package org.apache.rocketmq.proxy.service.route; import com.google.common.base.MoreObjects; -import org.apache.rocketmq.client.latency.MQFaultStrategy; +import java.util.List; +import org.apache.commons.collections.CollectionUtils; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; public class MessageQueueView { @@ -27,11 +28,18 @@ public class MessageQueueView { private final MessageQueueSelector writeSelector; private final TopicRouteWrapper topicRouteWrapper; - public MessageQueueView(String topic, TopicRouteData topicRouteData, MQFaultStrategy mqFaultStrategy) { + public MessageQueueView(String topic, TopicRouteData topicRouteData, List> penalizer) { this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic); - this.readSelector = new MessageQueueSelector(topicRouteWrapper, mqFaultStrategy, true); - this.writeSelector = new MessageQueueSelector(topicRouteWrapper, mqFaultStrategy, false); + this.readSelector = new MessageQueueSelector(topicRouteWrapper, true); + this.writeSelector = new MessageQueueSelector(topicRouteWrapper, false); + + if (CollectionUtils.isNotEmpty(penalizer)) { + for (MessageQueuePenalizer p : penalizer) { + this.readSelector.addPenalizer(p); + this.writeSelector.addPenalizer(p); + } + } } public TopicRouteData getTopicRouteData() { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index 50910d7044e..064766e2177 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -20,6 +20,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; @@ -36,7 +37,6 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; -import org.apache.rocketmq.common.utils.StartAndShutdown; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -53,12 +53,11 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); - private final MQClientAPIFactory mqClientAPIFactory; - private MQFaultStrategy mqFaultStrategy; - + private final MQFaultStrategy mqFaultStrategy; protected final LoadingCache topicCache; protected final ScheduledExecutorService scheduledExecutorService; protected final ThreadPoolExecutor cacheRefreshExecutor; + protected final List> penalizers = new ArrayList<>(); public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) { ProxyConfig config = ConfigurationManager.getProxyConfig(); @@ -74,7 +73,6 @@ public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) { "TopicRouteCacheRefresh", config.getTopicRouteServiceThreadPoolQueueCapacity() ); - this.mqClientAPIFactory = mqClientAPIFactory; this.topicCache = Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum()) .expireAfterAccess(config.getTopicRouteServiceCacheExpiredSeconds(), TimeUnit.SECONDS) @@ -135,20 +133,17 @@ public String resolve(String name) { } }, serviceDetector); - this.appendStartAndShutdown(new StartAndShutdown() { - @Override - public void shutdown() throws Exception { - if (mqFaultStrategy.isStartDetectorEnable()) { - mqFaultStrategy.shutdown(); - } + this.penalizers.add(messageQueue -> { + if (mqFaultStrategy.getAvailableFilter().filter(messageQueue)) { + return 0; } - - @Override - public void start() throws Exception { - if (mqFaultStrategy.isStartDetectorEnable()) { - mqFaultStrategy.startDetector(); - } + return 10; + }); + this.penalizers.add(messageQueue -> { + if (mqFaultStrategy.getReachableFilter().filter(messageQueue)) { + return 0; } + return 100; }); this.init(); } @@ -162,8 +157,8 @@ private Optional pickTopic() { } protected void init() { + this.appendStartAndShutdown(this.mqFaultStrategy); this.appendShutdown(this.scheduledExecutorService::shutdown); - this.appendStartAndShutdown(this.mqClientAPIFactory); } public ClientConfig extractClientConfigFromProxyConfig(ProxyConfig proxyConfig) { @@ -222,7 +217,7 @@ protected static boolean isTopicRouteValid(TopicRouteData routeData) { protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData topicRouteData) { if (isTopicRouteValid(topicRouteData)) { - MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, TopicRouteService.this.getMqFaultStrategy()); + MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, this.penalizers); log.debug("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); return tmp; } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java index d150f87c409..e44ed28f4a6 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java @@ -30,12 +30,12 @@ public class MessageQueueSelectorTest extends BaseServiceTest { public void testReadMessageQueue() { queueData.setPerm(PermName.PERM_READ); queueData.setReadQueueNums(0); - MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true); + MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true); assertTrue(messageQueueSelector.getQueues().isEmpty()); queueData.setPerm(PermName.PERM_READ); queueData.setReadQueueNums(3); - messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true); + messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true); assertEquals(3, messageQueueSelector.getQueues().size()); assertEquals(1, messageQueueSelector.getBrokerActingQueues().size()); for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) { @@ -58,12 +58,12 @@ public void testReadMessageQueue() { public void testWriteMessageQueue() { queueData.setPerm(PermName.PERM_WRITE); queueData.setReadQueueNums(0); - MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false); + MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false); assertTrue(messageQueueSelector.getQueues().isEmpty()); queueData.setPerm(PermName.PERM_WRITE); queueData.setWriteQueueNums(3); - messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false); + messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false); assertEquals(3, messageQueueSelector.getQueues().size()); assertEquals(1, messageQueueSelector.getBrokerActingQueues().size()); for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) { From f65f35e57ed1b48168a1a726373bf83ed01a088d Mon Sep 17 00:00:00 2001 From: qianye Date: Tue, 30 Dec 2025 11:06:27 +0800 Subject: [PATCH 06/16] fix Change-Id: Ib5984f3e4aef46d32be03f61c5fd843e8470ef70 --- .../proxy/service/route/TopicRouteService.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index 064766e2177..9ebda7a8850 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.ClientConfig; @@ -32,12 +31,10 @@ import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.client.latency.Resolver; import org.apache.rocketmq.client.latency.ServiceDetector; -import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; -import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.Address; @@ -55,16 +52,12 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { private final MQFaultStrategy mqFaultStrategy; protected final LoadingCache topicCache; - protected final ScheduledExecutorService scheduledExecutorService; protected final ThreadPoolExecutor cacheRefreshExecutor; protected final List> penalizers = new ArrayList<>(); public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) { ProxyConfig config = ConfigurationManager.getProxyConfig(); - this.scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor( - new ThreadFactoryImpl("TopicRouteService_") - ); this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor( config.getTopicRouteServiceThreadPoolNums(), config.getTopicRouteServiceThreadPoolNums(), @@ -134,13 +127,13 @@ public String resolve(String name) { }, serviceDetector); this.penalizers.add(messageQueue -> { - if (mqFaultStrategy.getAvailableFilter().filter(messageQueue)) { + if (!mqFaultStrategy.isSendLatencyFaultEnable() || mqFaultStrategy.getAvailableFilter().filter(messageQueue)) { return 0; } return 10; }); this.penalizers.add(messageQueue -> { - if (mqFaultStrategy.getReachableFilter().filter(messageQueue)) { + if (!mqFaultStrategy.isSendLatencyFaultEnable() || mqFaultStrategy.getReachableFilter().filter(messageQueue)) { return 0; } return 100; @@ -158,7 +151,6 @@ private Optional pickTopic() { protected void init() { this.appendStartAndShutdown(this.mqFaultStrategy); - this.appendShutdown(this.scheduledExecutorService::shutdown); } public ClientConfig extractClientConfigFromProxyConfig(ProxyConfig proxyConfig) { From afd97f3ea3549dccacc902f0310e92c94360f25d Mon Sep 17 00:00:00 2001 From: qianye Date: Tue, 30 Dec 2025 13:57:44 +0800 Subject: [PATCH 07/16] fix Change-Id: I5b506cbeee7c810b2401484309ab3ab769720cad --- .../service/route/TopicRouteService.java | 32 ++++++++++++------- .../v2/producer/SendMessageActivityTest.java | 8 ++--- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index 9ebda7a8850..99ab1de60fc 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -19,6 +19,7 @@ import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.annotations.VisibleForTesting; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -126,18 +127,7 @@ public String resolve(String name) { } }, serviceDetector); - this.penalizers.add(messageQueue -> { - if (!mqFaultStrategy.isSendLatencyFaultEnable() || mqFaultStrategy.getAvailableFilter().filter(messageQueue)) { - return 0; - } - return 10; - }); - this.penalizers.add(messageQueue -> { - if (!mqFaultStrategy.isSendLatencyFaultEnable() || mqFaultStrategy.getReachableFilter().filter(messageQueue)) { - return 0; - } - return 100; - }); + this.penalizers.addAll(buildPenalizerByMQFaultStrategy(mqFaultStrategy)); this.init(); } @@ -215,4 +205,22 @@ protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData to } return MessageQueueView.WRAPPED_EMPTY_QUEUE; } + + @VisibleForTesting + public static List> buildPenalizerByMQFaultStrategy(MQFaultStrategy mqFaultStrategy) { + List> penalizers = new ArrayList<>(); + penalizers.add(messageQueue -> { + if (!mqFaultStrategy.isSendLatencyFaultEnable() || mqFaultStrategy.getAvailableFilter().filter(messageQueue)) { + return 0; + } + return 10; + }); + penalizers.add(messageQueue -> { + if (!mqFaultStrategy.isSendLatencyFaultEnable() || mqFaultStrategy.getReachableFilter().filter(messageQueue)) { + return 0; + } + return 100; + }); + return penalizers; + } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java index a64867ddfe1..870aa0424fd 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java @@ -59,6 +59,7 @@ import org.junit.Before; import org.junit.Test; +import static org.apache.rocketmq.proxy.service.route.TopicRouteService.buildPenalizerByMQFaultStrategy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThrows; @@ -379,7 +380,7 @@ public void testSendNormalMessageQueueSelector() { MQFaultStrategy mqFaultStrategy = mock(MQFaultStrategy.class); when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy); when(mqFaultStrategy.isSendLatencyFaultEnable()).thenReturn(false); - MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, topicRouteService.getMqFaultStrategy()); + MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, null); AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView); AddressableMessageQueue secondSelect = selector.select(ProxyContext.create(), messageQueueView); @@ -415,10 +416,7 @@ public void testSendNormalMessageQueueSelectorPipeLine() throws Exception { mqFaultStrategy.updateFaultItem(BROKER_NAME2, 1000, true, true); mqFaultStrategy.updateFaultItem(BROKER_NAME, 1000, true, false); - TopicRouteService topicRouteService = mock(TopicRouteService.class); - when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy); - MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, topicRouteService.getMqFaultStrategy()); - + MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, buildPenalizerByMQFaultStrategy(mqFaultStrategy)); AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView); assertEquals(firstSelect.getBrokerName(), BROKER_NAME2); From 81da4bc2f5bb89e0154b6d2008b006748974db36 Mon Sep 17 00:00:00 2001 From: qianye Date: Sun, 4 Jan 2026 13:52:35 +0800 Subject: [PATCH 08/16] add priorityProvider Change-Id: I11ca84aeb92c2c03a7750622034284e8bca5a2d9 --- .../DefaultMessageQueuePriorityProvider.java | 25 +++++++++++ .../service/route/MessageQueuePenalizer.java | 3 ++ .../route/MessageQueuePriorityProvider.java | 42 +++++++++++++++++++ .../service/route/MessageQueueSelector.java | 26 ++++++++++-- .../proxy/service/route/MessageQueueView.java | 10 ++++- 5 files changed, 101 insertions(+), 5 deletions(-) create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/service/route/DefaultMessageQueuePriorityProvider.java create mode 100644 proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/DefaultMessageQueuePriorityProvider.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/DefaultMessageQueuePriorityProvider.java new file mode 100644 index 00000000000..2714b98e64c --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/DefaultMessageQueuePriorityProvider.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.route; + +public class DefaultMessageQueuePriorityProvider implements MessageQueuePriorityProvider { + @Override + public int priorityOf(AddressableMessageQueue queue) { + return 10; + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java index 92c5ce154f2..02a5a19637e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizer.java @@ -111,6 +111,9 @@ static Pair selectLeastPenaltyWithPriority( if (queuesWithPriority == null || queuesWithPriority.isEmpty()) { return null; } + if (queuesWithPriority.size() == 1) { + return selectLeastPenalty(queuesWithPriority.get(0), penalizers, startIndex); + } Q bestQueue = null; int bestPenalty = Integer.MAX_VALUE; for (List queues : queuesWithPriority) { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java new file mode 100644 index 00000000000..47b0a862e26 --- /dev/null +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.route; + +import java.util.List; +import org.apache.rocketmq.common.message.MessageQueue; + +@FunctionalInterface +public interface MessageQueuePriorityProvider { + /** + * smaller value is higher priority + * */ + int priorityOf(Q q); + + static List> buildPriorityGroups(List queues, MessageQueuePriorityProvider provider) { + if (queues == null || queues.isEmpty()) { + return java.util.Collections.emptyList(); + } + + java.util.Map> buckets = new java.util.TreeMap<>(); + for (Q q : queues) { + int p = provider.priorityOf(q); + buckets.computeIfAbsent(p, k -> new java.util.ArrayList<>()).add(q); + } + return new java.util.ArrayList<>(buckets.values()); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java index 5d06b4f69a6..0b028fa461a 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java @@ -36,7 +36,8 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.route.QueueData; -import static org.apache.rocketmq.proxy.service.route.MessageQueuePenalizer.selectLeastPenalty; +import static org.apache.rocketmq.proxy.service.route.MessageQueuePenalizer.selectLeastPenaltyWithPriority; +import static org.apache.rocketmq.proxy.service.route.MessageQueuePriorityProvider.buildPriorityGroups; public class MessageQueueSelector { private static final int BROKER_ACTING_QUEUE_ID = -1; @@ -50,7 +51,16 @@ public class MessageQueueSelector { private final AtomicInteger brokerIndex; private final List> penalizers = new ArrayList<>(); + // ordered by priority asc (smaller => higher priority) + private final List> queuesWithPriority; + private final List> brokerActingQueuesWithPriority; + public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean read) { + this(topicRouteWrapper, read, null); + } + + public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean read, + MessageQueuePriorityProvider priorityProvider) { if (read) { this.queues.addAll(buildRead(topicRouteWrapper)); } else { @@ -60,6 +70,12 @@ public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean read) { Random random = new Random(); this.queueIndex = new AtomicInteger(random.nextInt()); this.brokerIndex = new AtomicInteger(random.nextInt()); + + if (priorityProvider == null) { + priorityProvider = new DefaultMessageQueuePriorityProvider(); + } + this.queuesWithPriority = buildPriorityGroups(queues, priorityProvider); + this.brokerActingQueuesWithPriority = buildPriorityGroups(brokerActingQueues, priorityProvider); } private static List buildRead(TopicRouteWrapper topicRoute) { @@ -161,8 +177,12 @@ public AddressableMessageQueue selectOne(boolean onlyBroker) { public AddressableMessageQueue selectOneByPipeline(boolean onlyBroker) { if (CollectionUtils.isNotEmpty(penalizers)) { - Pair queueAndPenalty = selectLeastPenalty(onlyBroker ? brokerActingQueues : queues, - penalizers, onlyBroker ? brokerIndex : queueIndex); + Pair queueAndPenalty; + if (onlyBroker) { + queueAndPenalty = selectLeastPenaltyWithPriority(brokerActingQueuesWithPriority, penalizers, brokerIndex); + } else { + queueAndPenalty = selectLeastPenaltyWithPriority(queuesWithPriority, penalizers, queueIndex); + } if (queueAndPenalty != null && queueAndPenalty.getLeft() != null) { return queueAndPenalty.getLeft(); } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java index 35cb27beaa7..a0d768d6dae 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java @@ -28,11 +28,17 @@ public class MessageQueueView { private final MessageQueueSelector writeSelector; private final TopicRouteWrapper topicRouteWrapper; + public MessageQueueView(String topic, TopicRouteData topicRouteData, List> penalizer) { + this(topic, topicRouteData, penalizer, null); + } + + public MessageQueueView(String topic, TopicRouteData topicRouteData, List> penalizer, + MessageQueuePriorityProvider priorityProvider) { this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic); - this.readSelector = new MessageQueueSelector(topicRouteWrapper, true); - this.writeSelector = new MessageQueueSelector(topicRouteWrapper, false); + this.readSelector = new MessageQueueSelector(topicRouteWrapper, true, priorityProvider); + this.writeSelector = new MessageQueueSelector(topicRouteWrapper, false, priorityProvider); if (CollectionUtils.isNotEmpty(penalizer)) { for (MessageQueuePenalizer p : penalizer) { From c3b2a5abd57da4d297dada72cfd1dd0d35a85a2c Mon Sep 17 00:00:00 2001 From: qianye Date: Sun, 4 Jan 2026 14:02:06 +0800 Subject: [PATCH 09/16] fix Change-Id: I953dd2b7f2c86d81910e945554a3fc95d77d9335 --- .../proxy/service/route/AddressableMessageQueue.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java index c6f57f6c4f8..19f2c0db852 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/AddressableMessageQueue.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.proxy.service.route; +import com.google.common.base.MoreObjects; import org.apache.rocketmq.common.message.MessageQueue; public class AddressableMessageQueue extends MessageQueue { @@ -52,6 +53,9 @@ public boolean equals(Object o) { @Override public String toString() { - return super.toString(); + return MoreObjects.toStringHelper(this) + .add("messageQueue", super.toString()) + .add("brokerAddr", brokerAddr) + .toString(); } } From c1e91d64f20a84061212baf64d7b8d84e8c58044 Mon Sep 17 00:00:00 2001 From: qianye Date: Mon, 5 Jan 2026 10:54:25 +0800 Subject: [PATCH 10/16] add test Change-Id: If805d662e500437e65142b6065d55f0c774a13df --- .../route/MessageQueuePenalizerTest.java | 472 ++++++++++++++++++ 1 file changed, 472 insertions(+) create mode 100644 proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java new file mode 100644 index 00000000000..f31d973cce5 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.route; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class MessageQueuePenalizerTest { + + /** + * Test evaluatePenalty with null messageQueue should throw NullPointerException + */ + @Test(expected = NullPointerException.class) + public void testEvaluatePenalty_NullMessageQueue() { + List> penalizers = new ArrayList<>(); + penalizers.add(mq -> 10); + MessageQueuePenalizer.evaluatePenalty(null, penalizers); + } + + /** + * Test evaluatePenalty with null penalizers should return 0 + */ + @Test + public void testEvaluatePenalty_NullPenalizers() { + MessageQueue mq = new MessageQueue("topic", "broker", 0); + int penalty = MessageQueuePenalizer.evaluatePenalty(mq, null); + assertEquals(0, penalty); + } + + /** + * Test evaluatePenalty with empty penalizers should return 0 + */ + @Test + public void testEvaluatePenalty_EmptyPenalizers() { + MessageQueue mq = new MessageQueue("topic", "broker", 0); + int penalty = MessageQueuePenalizer.evaluatePenalty(mq, Collections.emptyList()); + assertEquals(0, penalty); + } + + /** + * Test evaluatePenalty aggregates penalties from multiple penalizers by summing them up + */ + @Test + public void testEvaluatePenalty_MultiplePenalizers() { + MessageQueue mq = new MessageQueue("topic", "broker", 0); + List> penalizers = Arrays.asList( + q -> 10, + q -> 20, + q -> 5 + ); + int penalty = MessageQueuePenalizer.evaluatePenalty(mq, penalizers); + assertEquals(35, penalty); + } + + /** + * Test evaluatePenalty with negative penalties (sum should still work) + */ + @Test + public void testEvaluatePenalty_NegativePenalties() { + MessageQueue mq = new MessageQueue("topic", "broker", 0); + List> penalizers = Arrays.asList( + q -> -5, + q -> 10, + q -> -3 + ); + int penalty = MessageQueuePenalizer.evaluatePenalty(mq, penalizers); + assertEquals(2, penalty); + } + + /** + * Test selectLeastPenalty with null queues should return null + */ + @Test + public void testSelectLeastPenalty_NullQueues() { + List> penalizers = Collections.singletonList(mq -> 10); + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenalty(null, penalizers, startIndex); + assertNull(result); + } + + /** + * Test selectLeastPenalty with empty queues should return null + */ + @Test + public void testSelectLeastPenalty_EmptyQueues() { + List> penalizers = Collections.singletonList(mq -> 10); + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenalty( + Collections.emptyList(), penalizers, startIndex); + assertNull(result); + } + + /** + * Test selectLeastPenalty selects the queue with the lowest penalty + */ + @Test + public void testSelectLeastPenalty_LowestPenalty() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + // Penalizer that assigns different penalties based on queue id + List> penalizers = Collections.singletonList( + mq -> mq.getQueueId() == 0 ? 50 : (mq.getQueueId() == 1 ? 10 : 30) + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq1, result.getLeft()); + assertEquals(10, result.getRight().intValue()); + } + + /** + * Test selectLeastPenalty short-circuits when penalty <= 0 + */ + @Test + public void testSelectLeastPenalty_ShortCircuitZeroPenalty() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + // mq1 has penalty 0, should short-circuit + List> penalizers = Collections.singletonList( + mq -> mq.getQueueId() == 0 ? 50 : (mq.getQueueId() == 1 ? 0 : 30) + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq1, result.getLeft()); + assertEquals(0, result.getRight().intValue()); + } + + /** + * Test selectLeastPenalty short-circuits when penalty is negative + */ + @Test + public void testSelectLeastPenalty_ShortCircuitNegativePenalty() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + // mq1 has penalty -5, should short-circuit + List> penalizers = Collections.singletonList( + mq -> mq.getQueueId() == 0 ? 50 : (mq.getQueueId() == 1 ? -5 : 30) + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq1, result.getLeft()); + assertEquals(-5, result.getRight().intValue()); + } + + /** + * Test selectLeastPenalty with round-robin behavior (rotating start index) + * Verifies that startIndex affects the iteration order + */ + @Test + public void testSelectLeastPenalty_RoundRobinStartIndex() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + // All queues have penalty 0, so whichever is encountered first will be returned + List> penalizers = Collections.singletonList(mq -> 0); + + // Starting from index 0 + AtomicInteger startIndex1 = new AtomicInteger(0); + Pair result1 = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex1); + assertNotNull(result1); + assertEquals(mq0, result1.getLeft()); + + // Starting from index 1 + AtomicInteger startIndex2 = new AtomicInteger(1); + Pair result2 = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex2); + assertNotNull(result2); + assertEquals(mq1, result2.getLeft()); + + // Starting from index 2 + AtomicInteger startIndex3 = new AtomicInteger(2); + Pair result3 = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex3); + assertNotNull(result3); + assertEquals(mq2, result3.getLeft()); + } + + /** + * Test selectLeastPenalty increments startIndex for each iteration + */ + @Test + public void testSelectLeastPenalty_IncrementStartIndex() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + List> penalizers = Collections.singletonList(mq -> 10); + + AtomicInteger startIndex = new AtomicInteger(0); + MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex); + + // After iterating through 3 queues, startIndex should be incremented 3 times + assertEquals(3, startIndex.get()); + } + + /** + * Test selectLeastPenalty handles startIndex wrapping with Math.floorMod + */ + @Test + public void testSelectLeastPenalty_StartIndexWrapping() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + List> penalizers = Collections.singletonList(mq -> 0); + + // Start with large index to test wrapping + AtomicInteger startIndex = new AtomicInteger(100); + Pair result = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex); + + assertNotNull(result); + // 100 % 3 = 1, so should start from mq1 + assertEquals(mq1, result.getLeft()); + } + + /** + * Test selectLeastPenaltyWithPriority with null queuesWithPriority should return null + */ + @Test + public void testSelectLeastPenaltyWithPriority_NullQueues() { + List> penalizers = Collections.singletonList(mq -> 10); + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + null, penalizers, startIndex); + assertNull(result); + } + + /** + * Test selectLeastPenaltyWithPriority with empty queuesWithPriority should return null + */ + @Test + public void testSelectLeastPenaltyWithPriority_EmptyQueues() { + List> penalizers = Collections.singletonList(mq -> 10); + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + Collections.emptyList(), penalizers, startIndex); + assertNull(result); + } + + /** + * Test selectLeastPenaltyWithPriority with single priority group delegates to selectLeastPenalty + */ + @Test + public void testSelectLeastPenaltyWithPriority_SinglePriorityGroup() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + List queues = Arrays.asList(mq0, mq1); + + List> penalizers = Collections.singletonList( + mq -> mq.getQueueId() == 0 ? 20 : 10 + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + Collections.singletonList(queues), penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq1, result.getLeft()); + assertEquals(10, result.getRight().intValue()); + } + + /** + * Test selectLeastPenaltyWithPriority selects queue with lowest penalty across multiple priority groups + */ + @Test + public void testSelectLeastPenaltyWithPriority_MultiplePriorityGroups() { + // Priority group 1 (higher priority) + MessageQueue mq0 = new MessageQueue("topic", "broker-high", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker-high", 1); + List highPriorityQueues = Arrays.asList(mq0, mq1); + + // Priority group 2 (lower priority) + MessageQueue mq2 = new MessageQueue("topic", "broker-low", 0); + MessageQueue mq3 = new MessageQueue("topic", "broker-low", 1); + List lowPriorityQueues = Arrays.asList(mq2, mq3); + + List> queuesWithPriority = Arrays.asList(highPriorityQueues, lowPriorityQueues); + + // Assign penalties: high-priority queues have higher penalties, low-priority have lower + List> penalizers = Collections.singletonList( + mq -> mq.getBrokerName().equals("broker-high") ? 50 : 10 + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + queuesWithPriority, penalizers, startIndex); + + assertNotNull(result); + // Should select from low-priority group because it has lower penalty + assertTrue(result.getLeft().getBrokerName().equals("broker-low")); + assertEquals(10, result.getRight().intValue()); + } + + /** + * Test selectLeastPenaltyWithPriority short-circuits when a priority group yields penalty <= 0 + */ + @Test + public void testSelectLeastPenaltyWithPriority_ShortCircuitZeroPenalty() { + // Priority group 1 + MessageQueue mq0 = new MessageQueue("topic", "broker-high", 0); + List highPriorityQueues = Collections.singletonList(mq0); + + // Priority group 2 + MessageQueue mq1 = new MessageQueue("topic", "broker-low", 0); + List lowPriorityQueues = Collections.singletonList(mq1); + + List> queuesWithPriority = Arrays.asList(highPriorityQueues, lowPriorityQueues); + + // First group has penalty 0, should short-circuit + List> penalizers = Collections.singletonList( + mq -> mq.getBrokerName().equals("broker-high") ? 0 : 100 + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + queuesWithPriority, penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq0, result.getLeft()); + assertEquals(0, result.getRight().intValue()); + } + + /** + * Test selectLeastPenaltyWithPriority when first group encounters zero penalty during iteration + */ + @Test + public void testSelectLeastPenaltyWithPriority_FirstGroupHasZeroPenalty() { + // Priority group 1 + MessageQueue mq0 = new MessageQueue("topic", "broker1", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker1", 1); + List group1 = Arrays.asList(mq0, mq1); + + // Priority group 2 + MessageQueue mq2 = new MessageQueue("topic", "broker2", 0); + List group2 = Collections.singletonList(mq2); + + List> queuesWithPriority = Arrays.asList(group1, group2); + + // mq1 in first group has penalty 0 + List> penalizers = Collections.singletonList( + mq -> mq.getQueueId() == 1 && mq.getBrokerName().equals("broker1") ? 0 : 50 + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + queuesWithPriority, penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq1, result.getLeft()); + assertEquals(0, result.getRight().intValue()); + } + + /** + * Test selectLeastPenaltyWithPriority returns first encountered minimum when multiple groups have same minimum penalty + */ + @Test + public void testSelectLeastPenaltyWithPriority_SameMinimumPenalty() { + // Priority group 1 + MessageQueue mq0 = new MessageQueue("topic", "broker1", 0); + List group1 = Collections.singletonList(mq0); + + // Priority group 2 + MessageQueue mq1 = new MessageQueue("topic", "broker2", 0); + List group2 = Collections.singletonList(mq1); + + // Priority group 3 + MessageQueue mq2 = new MessageQueue("topic", "broker3", 0); + List group3 = Collections.singletonList(mq2); + + List> queuesWithPriority = Arrays.asList(group1, group2, group3); + + // All have same penalty + List> penalizers = Collections.singletonList(mq -> 10); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + queuesWithPriority, penalizers, startIndex); + + assertNotNull(result); + // Should return first encountered (from group1) + assertEquals(mq0, result.getLeft()); + assertEquals(10, result.getRight().intValue()); + } + + /** + * Test selectLeastPenaltyWithPriority with complex scenario: + * Multiple priority groups with varying penalties + */ + @Test + public void testSelectLeastPenaltyWithPriority_ComplexScenario() { + // Priority group 1: penalties 100, 90 + MessageQueue mq0 = new MessageQueue("topic", "broker1", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker1", 1); + List group1 = Arrays.asList(mq0, mq1); + + // Priority group 2: penalties 50, 30 + MessageQueue mq2 = new MessageQueue("topic", "broker2", 0); + MessageQueue mq3 = new MessageQueue("topic", "broker2", 1); + List group2 = Arrays.asList(mq2, mq3); + + // Priority group 3: penalties 80, 20 + MessageQueue mq4 = new MessageQueue("topic", "broker3", 0); + MessageQueue mq5 = new MessageQueue("topic", "broker3", 1); + List group3 = Arrays.asList(mq4, mq5); + + List> queuesWithPriority = Arrays.asList(group1, group2, group3); + + List> penalizers = Collections.singletonList(mq -> { + if (mq.getBrokerName().equals("broker1")) { + return mq.getQueueId() == 0 ? 100 : 90; + } else if (mq.getBrokerName().equals("broker2")) { + return mq.getQueueId() == 0 ? 50 : 30; + } else { + return mq.getQueueId() == 0 ? 80 : 20; + } + }); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + queuesWithPriority, penalizers, startIndex); + + assertNotNull(result); + // Should select mq5 from group3 with penalty 20 (the global minimum) + assertEquals(mq5, result.getLeft()); + assertEquals(20, result.getRight().intValue()); + } +} From 67b1a5c2a18aebb1c426758c88d9a9ebfdf67d76 Mon Sep 17 00:00:00 2001 From: qianye Date: Mon, 5 Jan 2026 11:24:32 +0800 Subject: [PATCH 11/16] add test Change-Id: I70a41904468954a177dad16a2fd8019b6082716c --- .../route/MessageQueuePriorityProvider.java | 42 ++- .../MessageQueuePriorityProviderTest.java | 311 ++++++++++++++++++ 2 files changed, 351 insertions(+), 2 deletions(-) create mode 100644 proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java index 47b0a862e26..007717fda2d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java @@ -20,13 +20,51 @@ import java.util.List; import org.apache.rocketmq.common.message.MessageQueue; +/** + * A functional interface for providing priority values for message queues. + * This interface allows custom priority determination logic to be applied to message queues, + * enabling queue selection and routing based on priority levels. + *

+ * The priority value follows the convention that smaller numeric values indicate higher priority. + * For example, priority 0 is higher than priority 1. + *

+ * + * @param the type of message queue, must extend {@link MessageQueue} + */ @FunctionalInterface public interface MessageQueuePriorityProvider { + /** - * smaller value is higher priority - * */ + * Determines the priority value of the given message queue. + *

+ * Smaller values indicate higher priority. For example: + *

    + *
  • Priority 0: Highest priority
  • + *
  • Priority 1: Medium priority
  • + *
  • Priority 2: Lower priority
  • + *
+ *

+ * + * @param q the message queue to evaluate + * @return the priority value, where smaller values indicate higher priority + */ int priorityOf(Q q); + /** + * Groups message queues by their priority levels and returns them in priority order. + *

+ * This static utility method takes a list of message queues and a priority provider, + * then organizes the queues into groups based on their priority values. + * The returned list is ordered from highest priority to lowest priority. + *

+ * + * @param the type of message queue, must extend {@link MessageQueue} + * @param queues the list of message queues to group by priority, can be null or empty + * @param provider the priority provider to determine the priority of each queue + * @return a list of lists, where each inner list contains queues of the same priority level, + * ordered from highest priority (smallest value) to lowest priority (largest value). + * Returns an empty list if the input queues are null or empty. + */ static List> buildPriorityGroups(List queues, MessageQueuePriorityProvider provider) { if (queues == null || queues.isEmpty()) { return java.util.Collections.emptyList(); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java new file mode 100644 index 00000000000..22f2a68e8b0 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.route; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class MessageQueuePriorityProviderTest { + + @Test + public void testPriorityOfWithLambda() { + // Test functional interface implementation using lambda + MessageQueuePriorityProvider provider = mq -> mq.getQueueId(); + + MessageQueue queue1 = new MessageQueue("topic", "broker", 0); + MessageQueue queue2 = new MessageQueue("topic", "broker", 5); + MessageQueue queue3 = new MessageQueue("topic", "broker", 10); + + assertEquals(0, provider.priorityOf(queue1)); + assertEquals(5, provider.priorityOf(queue2)); + assertEquals(10, provider.priorityOf(queue3)); + } + + @Test + public void testPriorityOfWithConstantValue() { + // Test with constant priority + MessageQueuePriorityProvider constantProvider = mq -> 1; + + MessageQueue queue1 = new MessageQueue("topic1", "broker1", 0); + MessageQueue queue2 = new MessageQueue("topic2", "broker2", 5); + + assertEquals(1, constantProvider.priorityOf(queue1)); + assertEquals(1, constantProvider.priorityOf(queue2)); + } + + @Test + public void testPriorityOfBasedOnBrokerName() { + // Test priority based on broker name hash + MessageQueuePriorityProvider brokerProvider = + mq -> mq.getBrokerName().hashCode() % 10; + + MessageQueue queue1 = new MessageQueue("topic", "broker-a", 0); + MessageQueue queue2 = new MessageQueue("topic", "broker-b", 0); + + int priority1 = brokerProvider.priorityOf(queue1); + int priority2 = brokerProvider.priorityOf(queue2); + + // Priorities should be deterministic for the same broker + assertEquals(priority1, brokerProvider.priorityOf(queue1)); + assertEquals(priority2, brokerProvider.priorityOf(queue2)); + } + + @Test + public void testBuildPriorityGroupsWithNullList() { + MessageQueuePriorityProvider provider = mq -> 0; + List> result = MessageQueuePriorityProvider.buildPriorityGroups(null, provider); + + assertNotNull(result); + assertTrue(result.isEmpty()); + } + + @Test + public void testBuildPriorityGroupsWithEmptyList() { + MessageQueuePriorityProvider provider = mq -> 0; + List> result = MessageQueuePriorityProvider.buildPriorityGroups( + Collections.emptyList(), provider); + + assertNotNull(result); + assertTrue(result.isEmpty()); + } + + @Test + public void testBuildPriorityGroupsWithSinglePriority() { + MessageQueuePriorityProvider provider = mq -> 0; + + List queues = Arrays.asList( + new MessageQueue("topic", "broker1", 0), + new MessageQueue("topic", "broker1", 1), + new MessageQueue("topic", "broker1", 2) + ); + + List> result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(1, result.size()); + assertEquals(3, result.get(0).size()); + } + + @Test + public void testBuildPriorityGroupsWithMultiplePriorities() { + // Priority based on queue ID: 0->high, 1->medium, 2->low + MessageQueuePriorityProvider provider = mq -> { + if (mq.getQueueId() < 2) return 0; // High priority + if (mq.getQueueId() < 4) return 1; // Medium priority + return 2; // Low priority + }; + + List queues = Arrays.asList( + new MessageQueue("topic", "broker", 0), // priority 0 + new MessageQueue("topic", "broker", 1), // priority 0 + new MessageQueue("topic", "broker", 2), // priority 1 + new MessageQueue("topic", "broker", 3), // priority 1 + new MessageQueue("topic", "broker", 4), // priority 2 + new MessageQueue("topic", "broker", 5) // priority 2 + ); + + List> result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(3, result.size()); + + // First group (highest priority 0) + assertEquals(2, result.get(0).size()); + assertEquals(0, result.get(0).get(0).getQueueId()); + assertEquals(1, result.get(0).get(1).getQueueId()); + + // Second group (medium priority 1) + assertEquals(2, result.get(1).size()); + assertEquals(2, result.get(1).get(0).getQueueId()); + assertEquals(3, result.get(1).get(1).getQueueId()); + + // Third group (low priority 2) + assertEquals(2, result.get(2).size()); + assertEquals(4, result.get(2).get(0).getQueueId()); + assertEquals(5, result.get(2).get(1).getQueueId()); + } + + @Test + public void testBuildPriorityGroupsOrderedByPriority() { + // Test that groups are ordered from high to low priority (ascending numeric value) + MessageQueuePriorityProvider provider = mq -> mq.getQueueId(); + + List queues = Arrays.asList( + new MessageQueue("topic", "broker", 5), + new MessageQueue("topic", "broker", 0), + new MessageQueue("topic", "broker", 3), + new MessageQueue("topic", "broker", 1) + ); + + List> result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(4, result.size()); + + // Verify order: 0, 1, 3, 5 (ascending) + assertEquals(0, result.get(0).get(0).getQueueId()); + assertEquals(1, result.get(1).get(0).getQueueId()); + assertEquals(3, result.get(2).get(0).getQueueId()); + assertEquals(5, result.get(3).get(0).getQueueId()); + } + + @Test + public void testBuildPriorityGroupsWithNegativePriorities() { + // Test with negative priority values + MessageQueuePriorityProvider provider = mq -> mq.getQueueId() - 5; + + List queues = Arrays.asList( + new MessageQueue("topic", "broker", 0), // priority -5 + new MessageQueue("topic", "broker", 5), // priority 0 + new MessageQueue("topic", "broker", 10) // priority 5 + ); + + List> result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(3, result.size()); + + // Verify order: -5, 0, 5 (ascending) + assertEquals(0, result.get(0).get(0).getQueueId()); + assertEquals(5, result.get(1).get(0).getQueueId()); + assertEquals(10, result.get(2).get(0).getQueueId()); + } + + @Test + public void testBuildPriorityGroupsWithMixedBrokers() { + // Priority based on broker name + MessageQueuePriorityProvider provider = mq -> { + if (mq.getBrokerName().equals("broker-high")) return 0; + if (mq.getBrokerName().equals("broker-medium")) return 1; + return 2; + }; + + List queues = Arrays.asList( + new MessageQueue("topic", "broker-high", 0), + new MessageQueue("topic", "broker-low", 0), + new MessageQueue("topic", "broker-medium", 0), + new MessageQueue("topic", "broker-high", 1), + new MessageQueue("topic", "broker-medium", 1) + ); + + List> result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(3, result.size()); + + // High priority group + assertEquals(2, result.get(0).size()); + assertEquals("broker-high", result.get(0).get(0).getBrokerName()); + assertEquals("broker-high", result.get(0).get(1).getBrokerName()); + + // Medium priority group + assertEquals(2, result.get(1).size()); + assertEquals("broker-medium", result.get(1).get(0).getBrokerName()); + + // Low priority group + assertEquals(1, result.get(2).size()); + assertEquals("broker-low", result.get(2).get(0).getBrokerName()); + } + + @Test + public void testBuildPriorityGroupsPreservesQueueOrder() { + // Test that queues with same priority maintain their relative order + MessageQueuePriorityProvider provider = mq -> 0; + + List queues = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + queues.add(new MessageQueue("topic", "broker", i)); + } + + List> result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(1, result.size()); + assertEquals(10, result.get(0).size()); + + // Verify order is maintained + for (int i = 0; i < 10; i++) { + assertEquals(i, result.get(0).get(i).getQueueId()); + } + } + + @Test + public void testBuildPriorityGroupsWithCustomMessageQueue() { + // Test with extended MessageQueue type + class CustomMessageQueue extends MessageQueue { + private int customPriority; + + public CustomMessageQueue(String topic, String brokerName, int queueId, int customPriority) { + super(topic, brokerName, queueId); + this.customPriority = customPriority; + } + + public int getCustomPriority() { + return customPriority; + } + } + + MessageQueuePriorityProvider provider = + CustomMessageQueue::getCustomPriority; + + List queues = Arrays.asList( + new CustomMessageQueue("topic", "broker", 0, 2), + new CustomMessageQueue("topic", "broker", 1, 0), + new CustomMessageQueue("topic", "broker", 2, 1) + ); + + List> result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(3, result.size()); + + // Verify order by custom priority: 0, 1, 2 + assertEquals(0, result.get(0).get(0).getCustomPriority()); + assertEquals(1, result.get(1).get(0).getCustomPriority()); + assertEquals(2, result.get(2).get(0).getCustomPriority()); + } + + @Test + public void testBuildPriorityGroupsWithLargeNumberOfQueues() { + // Test with large number of queues + MessageQueuePriorityProvider provider = mq -> mq.getQueueId() % 5; + + List queues = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + queues.add(new MessageQueue("topic", "broker", i)); + } + + List> result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(5, result.size()); // 5 different priorities (0-4) + + // Each group should have 20 queues (100 / 5) + for (List group : result) { + assertEquals(20, group.size()); + } + } +} From 77ac97be6483dba90f5b7e5016d858d263999846 Mon Sep 17 00:00:00 2001 From: qianye Date: Mon, 5 Jan 2026 11:25:30 +0800 Subject: [PATCH 12/16] add test Change-Id: Ic03e6591fc36dfb819607b51bbb13d958a7a3b0a --- .../service/route/MessageQueuePriorityProvider.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java index 007717fda2d..57b6e65fe5c 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java @@ -17,7 +17,11 @@ package org.apache.rocketmq.proxy.service.route; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.TreeMap; import org.apache.rocketmq.common.message.MessageQueue; /** @@ -67,14 +71,14 @@ public interface MessageQueuePriorityProvider { */ static List> buildPriorityGroups(List queues, MessageQueuePriorityProvider provider) { if (queues == null || queues.isEmpty()) { - return java.util.Collections.emptyList(); + return Collections.emptyList(); } - java.util.Map> buckets = new java.util.TreeMap<>(); + Map> buckets = new TreeMap<>(); for (Q q : queues) { int p = provider.priorityOf(q); - buckets.computeIfAbsent(p, k -> new java.util.ArrayList<>()).add(q); + buckets.computeIfAbsent(p, k -> new ArrayList<>()).add(q); } - return new java.util.ArrayList<>(buckets.values()); + return new ArrayList<>(buckets.values()); } } From 9795d4607d8ffcb19eec19b0c78ed6478bf92a8c Mon Sep 17 00:00:00 2001 From: qianye Date: Mon, 5 Jan 2026 18:05:40 +0800 Subject: [PATCH 13/16] fix Change-Id: I45f7faabfe8e31c973b3d79c3d88f1b02fd40376 --- .../proxy/service/route/TopicRouteService.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index 99ab1de60fc..ec67b5dbfdc 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -55,6 +55,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { protected final LoadingCache topicCache; protected final ThreadPoolExecutor cacheRefreshExecutor; protected final List> penalizers = new ArrayList<>(); + protected MessageQueuePriorityProvider priorityProvider = new DefaultMessageQueuePriorityProvider(); public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) { ProxyConfig config = ConfigurationManager.getProxyConfig(); @@ -199,13 +200,21 @@ protected static boolean isTopicRouteValid(TopicRouteData routeData) { protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData topicRouteData) { if (isTopicRouteValid(topicRouteData)) { - MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, this.penalizers); + MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, this.penalizers, priorityProvider); log.debug("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); return tmp; } return MessageQueueView.WRAPPED_EMPTY_QUEUE; } + public void setPriorityProvider(MessageQueuePriorityProvider priorityProvider) { + this.priorityProvider = priorityProvider; + } + + public void addPenalizer(MessageQueuePenalizer penalizer) { + this.penalizers.add(penalizer); + } + @VisibleForTesting public static List> buildPenalizerByMQFaultStrategy(MQFaultStrategy mqFaultStrategy) { List> penalizers = new ArrayList<>(); From 68d239434332e1e3cef2197f99839c570d069295 Mon Sep 17 00:00:00 2001 From: qianye Date: Mon, 5 Jan 2026 18:06:05 +0800 Subject: [PATCH 14/16] fix Change-Id: I6ef88ee71a5ee119ac9ec3aa1b956de60f2223da --- .../apache/rocketmq/proxy/service/route/TopicRouteService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index ec67b5dbfdc..43a4729487c 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -55,7 +55,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { protected final LoadingCache topicCache; protected final ThreadPoolExecutor cacheRefreshExecutor; protected final List> penalizers = new ArrayList<>(); - protected MessageQueuePriorityProvider priorityProvider = new DefaultMessageQueuePriorityProvider(); + protected MessageQueuePriorityProvider priorityProvider = null; public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) { ProxyConfig config = ConfigurationManager.getProxyConfig(); From 3243973062d0629316d89eca04533cfbca71a090 Mon Sep 17 00:00:00 2001 From: qianye Date: Mon, 5 Jan 2026 18:06:53 +0800 Subject: [PATCH 15/16] fix Change-Id: Ia5415f869cc6e2e72b86fc4c333c3ed94d0c03af --- .../rocketmq/proxy/service/route/TopicRouteService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index 43a4729487c..dae30057461 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -55,7 +55,7 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { protected final LoadingCache topicCache; protected final ThreadPoolExecutor cacheRefreshExecutor; protected final List> penalizers = new ArrayList<>(); - protected MessageQueuePriorityProvider priorityProvider = null; + protected MessageQueuePriorityProvider priorityProvider = new DefaultMessageQueuePriorityProvider(); public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) { ProxyConfig config = ConfigurationManager.getProxyConfig(); @@ -200,7 +200,7 @@ protected static boolean isTopicRouteValid(TopicRouteData routeData) { protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData topicRouteData) { if (isTopicRouteValid(topicRouteData)) { - MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, this.penalizers, priorityProvider); + MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, this.penalizers, this.priorityProvider); log.debug("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); return tmp; } From 0b27de880ab75de89d4307e02d055f17509171f0 Mon Sep 17 00:00:00 2001 From: qianye Date: Mon, 5 Jan 2026 18:07:11 +0800 Subject: [PATCH 16/16] fix Change-Id: Ied8e0d9cff181abfe7082e7c3af8dc9c4472ed55 --- .../service/route/DefaultMessageQueuePriorityProvider.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/DefaultMessageQueuePriorityProvider.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/DefaultMessageQueuePriorityProvider.java index 2714b98e64c..90b0114f61b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/DefaultMessageQueuePriorityProvider.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/DefaultMessageQueuePriorityProvider.java @@ -20,6 +20,6 @@ public class DefaultMessageQueuePriorityProvider implements MessageQueuePriorityProvider { @Override public int priorityOf(AddressableMessageQueue queue) { - return 10; + return 0; } }