diff --git a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
index 4fbec13860b..b476cb205e4 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/subscription/RocksdbGroupConfigTransferTest.java
@@ -17,6 +17,15 @@
package org.apache.rocketmq.broker.subscription;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Stream;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
import org.apache.rocketmq.common.BrokerConfig;
@@ -34,15 +43,6 @@
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -78,24 +78,28 @@ public void destroy() {
if (notToBeExecuted()) {
return;
}
- Path pathToBeDeleted = Paths.get(basePath);
-
- try {
- Files.walk(pathToBeDeleted)
- .sorted(Comparator.reverseOrder())
- .forEach(path -> {
- try {
- Files.delete(path);
- } catch (IOException e) {
- // ignore
- }
- });
- } catch (IOException e) {
- // ignore
- }
+
if (rocksDBSubscriptionGroupManager != null) {
rocksDBSubscriptionGroupManager.stop();
}
+
+ Path root = Paths.get(basePath);
+ if (Files.notExists(root)) {
+ return;
+ }
+
+ try (Stream The method iterates through all queues exactly once, but starts from a rotating index
+ * derived from {@code startIndex} (round-robin) to avoid always scanning from position 0 . For each queue, it computes a penalty via {@link #evaluatePenalty} using
+ * the provided {@code penalizers}. The queue with the smallest penalty is selected. Short-circuit rule: if any queue has a {@code penalty<= 0}, it is returned immediately,
+ * since no better result than 0 is expected. The input {@code queuesWithPriority} is a list of queue groups ordered by priority.
+ * For each priority group, this method delegates to {@link #selectLeastPenalty} to pick the best queue
+ * within that group and obtain its penalty. Short-circuit rule: if any priority group yields a queue whose {@code penalty <= 0},
+ * that result is returned immediately. Otherwise, it returns the queue with the smallest positive penalty among all groups.
+ * If multiple groups produce the same minimum penalty, the first encountered one wins.
+ * The priority value follows the convention that smaller numeric values indicate higher priority.
+ * For example, priority 0 is higher than priority 1.
+ *
+ * Smaller values indicate higher priority. For example:
+ * {
+
+ /**
+ * Returns the penalty value for the given MessageQueue; lower is better.
+ */
+ int penaltyOf(Q messageQueue);
+
+ /**
+ * Aggregates penalties from multiple penalizers for the same MessageQueue (by summing them up).
+ */
+ static
int evaluatePenalty(Q messageQueue, List
p : penalizers) {
+ sum += p.penaltyOf(messageQueue);
+ }
+ return sum;
+ }
+
+ /**
+ * Selects the queue with the lowest evaluated penalty from the given queue list.
+ *
+ *
queue type
+ * @return a {@code Pair} of (selected queue, penalty), or {@code null} if {@code queues} is null/empty
+ */
+ static
Pair
selectLeastPenalty(List
queues,
+ List
queue type
+ * @return a {@code Pair} of (selected queue, penalty), or {@code null} if {@code queuesWithPriority} is null/empty
+ */
+ static
Pair
selectLeastPenaltyWithPriority(List
> queuesWithPriority,
+ List
queues : queuesWithPriority) {
+ Pair
queueAndPenalty = selectLeastPenalty(queues, penalizers, startIndex);
+ int penalty = queueAndPenalty.getRight();
+ if (queueAndPenalty.getRight() <= 0) {
+ return queueAndPenalty;
+ }
+ if (penalty < bestPenalty) {
+ bestPenalty = penalty;
+ bestQueue = queueAndPenalty.getLeft();
+ }
+ }
+ return Pair.of(bestQueue, bestPenalty);
+ }
+}
\ No newline at end of file
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java
new file mode 100644
index 00000000000..57b6e65fe5c
--- /dev/null
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.route;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+/**
+ * A functional interface for providing priority values for message queues.
+ * This interface allows custom priority determination logic to be applied to message queues,
+ * enabling queue selection and routing based on priority levels.
+ *
the type of message queue, must extend {@link MessageQueue}
+ */
+@FunctionalInterface
+public interface MessageQueuePriorityProvider
{
+
+ /**
+ * Determines the priority value of the given message queue.
+ *
+ *
+ *
+ * This static utility method takes a list of message queues and a priority provider, + * then organizes the queues into groups based on their priority values. + * The returned list is ordered from highest priority to lowest priority. + *
+ * + * @paramthe type of message queue, must extend {@link MessageQueue} + * @param queues the list of message queues to group by priority, can be null or empty + * @param provider the priority provider to determine the priority of each queue + * @return a list of lists, where each inner list contains queues of the same priority level, + * ordered from highest priority (smallest value) to lowest priority (largest value). + * Returns an empty list if the input queues are null or empty. + */ + staticList> buildPriorityGroups(List
queues, MessageQueuePriorityProviderprovider) { + if (queues == null || queues.isEmpty()) { + return Collections.emptyList(); + } + + Map> buckets = new TreeMap<>(); + for (Q q : queues) { + int p = provider.priorityOf(q); + buckets.computeIfAbsent(p, k -> new ArrayList<>()).add(q); + } + return new ArrayList<>(buckets.values()); + } +} diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java index f25fb907ef2..0b028fa461a 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelector.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.proxy.service.route; import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; import com.google.common.math.IntMath; import java.util.ArrayList; import java.util.Collections; @@ -30,13 +29,16 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; -import org.apache.rocketmq.client.latency.MQFaultStrategy; +import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.protocol.route.QueueData; +import static org.apache.rocketmq.proxy.service.route.MessageQueuePenalizer.selectLeastPenaltyWithPriority; +import static org.apache.rocketmq.proxy.service.route.MessageQueuePriorityProvider.buildPriorityGroups; + public class MessageQueueSelector { private static final int BROKER_ACTING_QUEUE_ID = -1; @@ -47,9 +49,18 @@ public class MessageQueueSelector { private final Map brokerNameQueueMap = new ConcurrentHashMap<>(); private final AtomicInteger queueIndex; private final AtomicInteger brokerIndex; - private MQFaultStrategy mqFaultStrategy; + private final List > penalizers = new ArrayList<>(); + + // ordered by priority asc (smaller => higher priority) + private final List > queuesWithPriority; + private final List
> brokerActingQueuesWithPriority; + + public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean read) { + this(topicRouteWrapper, read, null); + } - public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, MQFaultStrategy mqFaultStrategy, boolean read) { + public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, boolean read, + MessageQueuePriorityProvider
priorityProvider) { if (read) { this.queues.addAll(buildRead(topicRouteWrapper)); } else { @@ -59,7 +70,12 @@ public MessageQueueSelector(TopicRouteWrapper topicRouteWrapper, MQFaultStrategy Random random = new Random(); this.queueIndex = new AtomicInteger(random.nextInt()); this.brokerIndex = new AtomicInteger(random.nextInt()); - this.mqFaultStrategy = mqFaultStrategy; + + if (priorityProvider == null) { + priorityProvider = new DefaultMessageQueuePriorityProvider(); + } + this.queuesWithPriority = buildPriorityGroups(queues, priorityProvider); + this.brokerActingQueuesWithPriority = buildPriorityGroups(brokerActingQueues, priorityProvider); } private static List buildRead(TopicRouteWrapper topicRoute) { @@ -138,7 +154,7 @@ private static List buildWrite(TopicRouteWrapper topicR private void buildBrokerActingQueues(String topic, List normalQueues) { for (AddressableMessageQueue mq : normalQueues) { AddressableMessageQueue brokerActingQueue = new AddressableMessageQueue( - new MessageQueue(topic, mq.getMessageQueue().getBrokerName(), BROKER_ACTING_QUEUE_ID), + new MessageQueue(topic, mq.getBrokerName(), BROKER_ACTING_QUEUE_ID), mq.getBrokerAddr()); if (!brokerActingQueues.contains(brokerActingQueue)) { @@ -160,38 +176,15 @@ public AddressableMessageQueue selectOne(boolean onlyBroker) { } public AddressableMessageQueue selectOneByPipeline(boolean onlyBroker) { - if (mqFaultStrategy != null && mqFaultStrategy.isSendLatencyFaultEnable()) { - List messageQueueList = null; - MessageQueue messageQueue = null; + if (CollectionUtils.isNotEmpty(penalizers)) { + Pair queueAndPenalty; if (onlyBroker) { - messageQueueList = transferAddressableQueues(brokerActingQueues); + queueAndPenalty = selectLeastPenaltyWithPriority(brokerActingQueuesWithPriority, penalizers, brokerIndex); } else { - messageQueueList = transferAddressableQueues(queues); + queueAndPenalty = selectLeastPenaltyWithPriority(queuesWithPriority, penalizers, queueIndex); } - AddressableMessageQueue addressableMessageQueue = null; - - // use both available filter. - messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex, - mqFaultStrategy.getAvailableFilter(), mqFaultStrategy.getReachableFilter()); - addressableMessageQueue = transferQueue2Addressable(messageQueue); - if (addressableMessageQueue != null) { - return addressableMessageQueue; - } - - // use available filter. - messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex, - mqFaultStrategy.getAvailableFilter()); - addressableMessageQueue = transferQueue2Addressable(messageQueue); - if (addressableMessageQueue != null) { - return addressableMessageQueue; - } - - // no available filter, then use reachable filter. - messageQueue = selectOneMessageQueue(messageQueueList, onlyBroker ? brokerIndex : queueIndex, - mqFaultStrategy.getReachableFilter()); - addressableMessageQueue = transferQueue2Addressable(messageQueue); - if (addressableMessageQueue != null) { - return addressableMessageQueue; + if (queueAndPenalty != null && queueAndPenalty.getLeft() != null) { + return queueAndPenalty.getLeft(); } } @@ -199,46 +192,6 @@ public AddressableMessageQueue selectOneByPipeline(boolean onlyBroker) { return selectOne(onlyBroker); } - private MessageQueue selectOneMessageQueue(List messageQueueList, AtomicInteger sendQueue, TopicPublishInfo.QueueFilter...filter) { - if (messageQueueList == null || messageQueueList.isEmpty()) { - return null; - } - if (filter != null && filter.length != 0) { - for (int i = 0; i < messageQueueList.size(); i++) { - int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); - MessageQueue mq = messageQueueList.get(index); - boolean filterResult = true; - for (TopicPublishInfo.QueueFilter f: filter) { - Preconditions.checkNotNull(f); - filterResult &= f.filter(mq); - } - if (filterResult) { - return mq; - } - } - } - return null; - } - - public List transferAddressableQueues(List addressableMessageQueueList) { - if (addressableMessageQueueList == null) { - return null; - } - - return addressableMessageQueueList.stream() - .map(AddressableMessageQueue::getMessageQueue) - .collect(Collectors.toList()); - } - - private AddressableMessageQueue transferQueue2Addressable(MessageQueue messageQueue) { - for (AddressableMessageQueue amq: queues) { - if (amq.getMessageQueue().equals(messageQueue)) { - return amq; - } - } - return null; - } - public AddressableMessageQueue selectNextOne(AddressableMessageQueue last) { boolean onlyBroker = last.getQueueId() < 0; AddressableMessageQueue newOne = last; @@ -275,12 +228,10 @@ public List getBrokerActingQueues() { return brokerActingQueues; } - public MQFaultStrategy getMQFaultStrategy() { - return mqFaultStrategy; - } - - public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) { - this.mqFaultStrategy = mqFaultStrategy; + public void addPenalizer(MessageQueuePenalizer penalizer) { + if (penalizer != null) { + this.penalizers.add(penalizer); + } } @Override diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java index 898e529f8cb..a0d768d6dae 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java @@ -17,7 +17,8 @@ package org.apache.rocketmq.proxy.service.route; import com.google.common.base.MoreObjects; -import org.apache.rocketmq.client.latency.MQFaultStrategy; +import java.util.List; +import org.apache.commons.collections.CollectionUtils; import org.apache.rocketmq.remoting.protocol.route.TopicRouteData; public class MessageQueueView { @@ -27,11 +28,24 @@ public class MessageQueueView { private final MessageQueueSelector writeSelector; private final TopicRouteWrapper topicRouteWrapper; - public MessageQueueView(String topic, TopicRouteData topicRouteData, MQFaultStrategy mqFaultStrategy) { + + public MessageQueueView(String topic, TopicRouteData topicRouteData, List > penalizer) { + this(topic, topicRouteData, penalizer, null); + } + + public MessageQueueView(String topic, TopicRouteData topicRouteData, List > penalizer, + MessageQueuePriorityProvider priorityProvider) { this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic); - this.readSelector = new MessageQueueSelector(topicRouteWrapper, mqFaultStrategy, true); - this.writeSelector = new MessageQueueSelector(topicRouteWrapper, mqFaultStrategy, false); + this.readSelector = new MessageQueueSelector(topicRouteWrapper, true, priorityProvider); + this.writeSelector = new MessageQueueSelector(topicRouteWrapper, false, priorityProvider); + + if (CollectionUtils.isNotEmpty(penalizer)) { + for (MessageQueuePenalizer p : penalizer) { + this.readSelector.addPenalizer(p); + this.writeSelector.addPenalizer(p); + } + } } public TopicRouteData getTopicRouteData() { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java index bcdf8140bc5..dae30057461 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java @@ -19,11 +19,11 @@ import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; - +import com.google.common.annotations.VisibleForTesting; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.ClientConfig; @@ -32,12 +32,10 @@ import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.client.latency.Resolver; import org.apache.rocketmq.client.latency.ServiceDetector; -import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.thread.ThreadPoolMonitor; import org.apache.rocketmq.common.utils.AbstractStartAndShutdown; -import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.proxy.common.Address; @@ -53,19 +51,15 @@ public abstract class TopicRouteService extends AbstractStartAndShutdown { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); - private final MQClientAPIFactory mqClientAPIFactory; - private MQFaultStrategy mqFaultStrategy; - + private final MQFaultStrategy mqFaultStrategy; protected final LoadingCache topicCache; - protected final ScheduledExecutorService scheduledExecutorService; protected final ThreadPoolExecutor cacheRefreshExecutor; + protected final List > penalizers = new ArrayList<>(); + protected MessageQueuePriorityProvider priorityProvider = new DefaultMessageQueuePriorityProvider(); public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) { ProxyConfig config = ConfigurationManager.getProxyConfig(); - this.scheduledExecutorService = ThreadUtils.newSingleThreadScheduledExecutor( - new ThreadFactoryImpl("TopicRouteService_") - ); this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor( config.getTopicRouteServiceThreadPoolNums(), config.getTopicRouteServiceThreadPoolNums(), @@ -74,7 +68,6 @@ public TopicRouteService(MQClientAPIFactory mqClientAPIFactory) { "TopicRouteCacheRefresh", config.getTopicRouteServiceThreadPoolQueueCapacity() ); - this.mqClientAPIFactory = mqClientAPIFactory; this.topicCache = Caffeine.newBuilder().maximumSize(config.getTopicRouteServiceCacheMaxNum()) .expireAfterAccess(config.getTopicRouteServiceCacheExpiredSeconds(), TimeUnit.SECONDS) @@ -134,6 +127,8 @@ public String resolve(String name) { } } }, serviceDetector); + + this.penalizers.addAll(buildPenalizerByMQFaultStrategy(mqFaultStrategy)); this.init(); } @@ -146,22 +141,7 @@ private Optional pickTopic() { } protected void init() { - this.appendShutdown(this.scheduledExecutorService::shutdown); - this.appendStartAndShutdown(this.mqClientAPIFactory); - } - - @Override - public void shutdown() throws Exception { - if (this.mqFaultStrategy.isStartDetectorEnable()) { - mqFaultStrategy.shutdown(); - } - } - - @Override - public void start() throws Exception { - if (this.mqFaultStrategy.isStartDetectorEnable()) { - this.mqFaultStrategy.startDetector(); - } + this.appendStartAndShutdown(this.mqFaultStrategy); } public ClientConfig extractClientConfigFromProxyConfig(ProxyConfig proxyConfig) { @@ -220,10 +200,36 @@ protected static boolean isTopicRouteValid(TopicRouteData routeData) { protected MessageQueueView buildMessageQueueView(String topic, TopicRouteData topicRouteData) { if (isTopicRouteValid(topicRouteData)) { - MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, TopicRouteService.this.getMqFaultStrategy()); + MessageQueueView tmp = new MessageQueueView(topic, topicRouteData, this.penalizers, this.priorityProvider); log.debug("load topic route from namesrv. topic: {}, queue: {}", topic, tmp); return tmp; } return MessageQueueView.WRAPPED_EMPTY_QUEUE; } + + public void setPriorityProvider(MessageQueuePriorityProvider priorityProvider) { + this.priorityProvider = priorityProvider; + } + + public void addPenalizer(MessageQueuePenalizer penalizer) { + this.penalizers.add(penalizer); + } + + @VisibleForTesting + public static List > buildPenalizerByMQFaultStrategy(MQFaultStrategy mqFaultStrategy) { + List > penalizers = new ArrayList<>(); + penalizers.add(messageQueue -> { + if (!mqFaultStrategy.isSendLatencyFaultEnable() || mqFaultStrategy.getAvailableFilter().filter(messageQueue)) { + return 0; + } + return 10; + }); + penalizers.add(messageQueue -> { + if (!mqFaultStrategy.isSendLatencyFaultEnable() || mqFaultStrategy.getReachableFilter().filter(messageQueue)) { + return 0; + } + return 100; + }); + return penalizers; + } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java index a64867ddfe1..870aa0424fd 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivityTest.java @@ -59,6 +59,7 @@ import org.junit.Before; import org.junit.Test; +import static org.apache.rocketmq.proxy.service.route.TopicRouteService.buildPenalizerByMQFaultStrategy; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertThrows; @@ -379,7 +380,7 @@ public void testSendNormalMessageQueueSelector() { MQFaultStrategy mqFaultStrategy = mock(MQFaultStrategy.class); when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy); when(mqFaultStrategy.isSendLatencyFaultEnable()).thenReturn(false); - MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, topicRouteService.getMqFaultStrategy()); + MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, null); AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView); AddressableMessageQueue secondSelect = selector.select(ProxyContext.create(), messageQueueView); @@ -415,10 +416,7 @@ public void testSendNormalMessageQueueSelectorPipeLine() throws Exception { mqFaultStrategy.updateFaultItem(BROKER_NAME2, 1000, true, true); mqFaultStrategy.updateFaultItem(BROKER_NAME, 1000, true, false); - TopicRouteService topicRouteService = mock(TopicRouteService.class); - when(topicRouteService.getMqFaultStrategy()).thenReturn(mqFaultStrategy); - MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, topicRouteService.getMqFaultStrategy()); - + MessageQueueView messageQueueView = new MessageQueueView(TOPIC, topicRouteData, buildPenalizerByMQFaultStrategy(mqFaultStrategy)); AddressableMessageQueue firstSelect = selector.select(ProxyContext.create(), messageQueueView); assertEquals(firstSelect.getBrokerName(), BROKER_NAME2); diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java new file mode 100644 index 00000000000..f31d973cce5 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePenalizerTest.java @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.route; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class MessageQueuePenalizerTest { + + /** + * Test evaluatePenalty with null messageQueue should throw NullPointerException + */ + @Test(expected = NullPointerException.class) + public void testEvaluatePenalty_NullMessageQueue() { + List > penalizers = new ArrayList<>(); + penalizers.add(mq -> 10); + MessageQueuePenalizer.evaluatePenalty(null, penalizers); + } + + /** + * Test evaluatePenalty with null penalizers should return 0 + */ + @Test + public void testEvaluatePenalty_NullPenalizers() { + MessageQueue mq = new MessageQueue("topic", "broker", 0); + int penalty = MessageQueuePenalizer.evaluatePenalty(mq, null); + assertEquals(0, penalty); + } + + /** + * Test evaluatePenalty with empty penalizers should return 0 + */ + @Test + public void testEvaluatePenalty_EmptyPenalizers() { + MessageQueue mq = new MessageQueue("topic", "broker", 0); + int penalty = MessageQueuePenalizer.evaluatePenalty(mq, Collections.emptyList()); + assertEquals(0, penalty); + } + + /** + * Test evaluatePenalty aggregates penalties from multiple penalizers by summing them up + */ + @Test + public void testEvaluatePenalty_MultiplePenalizers() { + MessageQueue mq = new MessageQueue("topic", "broker", 0); + List > penalizers = Arrays.asList( + q -> 10, + q -> 20, + q -> 5 + ); + int penalty = MessageQueuePenalizer.evaluatePenalty(mq, penalizers); + assertEquals(35, penalty); + } + + /** + * Test evaluatePenalty with negative penalties (sum should still work) + */ + @Test + public void testEvaluatePenalty_NegativePenalties() { + MessageQueue mq = new MessageQueue("topic", "broker", 0); + List > penalizers = Arrays.asList( + q -> -5, + q -> 10, + q -> -3 + ); + int penalty = MessageQueuePenalizer.evaluatePenalty(mq, penalizers); + assertEquals(2, penalty); + } + + /** + * Test selectLeastPenalty with null queues should return null + */ + @Test + public void testSelectLeastPenalty_NullQueues() { + List > penalizers = Collections.singletonList(mq -> 10); + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenalty(null, penalizers, startIndex); + assertNull(result); + } + + /** + * Test selectLeastPenalty with empty queues should return null + */ + @Test + public void testSelectLeastPenalty_EmptyQueues() { + List > penalizers = Collections.singletonList(mq -> 10); + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenalty( + Collections.emptyList(), penalizers, startIndex); + assertNull(result); + } + + /** + * Test selectLeastPenalty selects the queue with the lowest penalty + */ + @Test + public void testSelectLeastPenalty_LowestPenalty() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + // Penalizer that assigns different penalties based on queue id + List > penalizers = Collections.singletonList( + mq -> mq.getQueueId() == 0 ? 50 : (mq.getQueueId() == 1 ? 10 : 30) + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq1, result.getLeft()); + assertEquals(10, result.getRight().intValue()); + } + + /** + * Test selectLeastPenalty short-circuits when penalty <= 0 + */ + @Test + public void testSelectLeastPenalty_ShortCircuitZeroPenalty() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + // mq1 has penalty 0, should short-circuit + List > penalizers = Collections.singletonList( + mq -> mq.getQueueId() == 0 ? 50 : (mq.getQueueId() == 1 ? 0 : 30) + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq1, result.getLeft()); + assertEquals(0, result.getRight().intValue()); + } + + /** + * Test selectLeastPenalty short-circuits when penalty is negative + */ + @Test + public void testSelectLeastPenalty_ShortCircuitNegativePenalty() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + // mq1 has penalty -5, should short-circuit + List > penalizers = Collections.singletonList( + mq -> mq.getQueueId() == 0 ? 50 : (mq.getQueueId() == 1 ? -5 : 30) + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq1, result.getLeft()); + assertEquals(-5, result.getRight().intValue()); + } + + /** + * Test selectLeastPenalty with round-robin behavior (rotating start index) + * Verifies that startIndex affects the iteration order + */ + @Test + public void testSelectLeastPenalty_RoundRobinStartIndex() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + // All queues have penalty 0, so whichever is encountered first will be returned + List > penalizers = Collections.singletonList(mq -> 0); + + // Starting from index 0 + AtomicInteger startIndex1 = new AtomicInteger(0); + Pair result1 = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex1); + assertNotNull(result1); + assertEquals(mq0, result1.getLeft()); + + // Starting from index 1 + AtomicInteger startIndex2 = new AtomicInteger(1); + Pair result2 = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex2); + assertNotNull(result2); + assertEquals(mq1, result2.getLeft()); + + // Starting from index 2 + AtomicInteger startIndex3 = new AtomicInteger(2); + Pair result3 = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex3); + assertNotNull(result3); + assertEquals(mq2, result3.getLeft()); + } + + /** + * Test selectLeastPenalty increments startIndex for each iteration + */ + @Test + public void testSelectLeastPenalty_IncrementStartIndex() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + List > penalizers = Collections.singletonList(mq -> 10); + + AtomicInteger startIndex = new AtomicInteger(0); + MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex); + + // After iterating through 3 queues, startIndex should be incremented 3 times + assertEquals(3, startIndex.get()); + } + + /** + * Test selectLeastPenalty handles startIndex wrapping with Math.floorMod + */ + @Test + public void testSelectLeastPenalty_StartIndexWrapping() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + MessageQueue mq2 = new MessageQueue("topic", "broker", 2); + List queues = Arrays.asList(mq0, mq1, mq2); + + List > penalizers = Collections.singletonList(mq -> 0); + + // Start with large index to test wrapping + AtomicInteger startIndex = new AtomicInteger(100); + Pair result = MessageQueuePenalizer.selectLeastPenalty(queues, penalizers, startIndex); + + assertNotNull(result); + // 100 % 3 = 1, so should start from mq1 + assertEquals(mq1, result.getLeft()); + } + + /** + * Test selectLeastPenaltyWithPriority with null queuesWithPriority should return null + */ + @Test + public void testSelectLeastPenaltyWithPriority_NullQueues() { + List > penalizers = Collections.singletonList(mq -> 10); + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + null, penalizers, startIndex); + assertNull(result); + } + + /** + * Test selectLeastPenaltyWithPriority with empty queuesWithPriority should return null + */ + @Test + public void testSelectLeastPenaltyWithPriority_EmptyQueues() { + List > penalizers = Collections.singletonList(mq -> 10); + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + Collections.emptyList(), penalizers, startIndex); + assertNull(result); + } + + /** + * Test selectLeastPenaltyWithPriority with single priority group delegates to selectLeastPenalty + */ + @Test + public void testSelectLeastPenaltyWithPriority_SinglePriorityGroup() { + MessageQueue mq0 = new MessageQueue("topic", "broker", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker", 1); + List queues = Arrays.asList(mq0, mq1); + + List > penalizers = Collections.singletonList( + mq -> mq.getQueueId() == 0 ? 20 : 10 + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + Collections.singletonList(queues), penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq1, result.getLeft()); + assertEquals(10, result.getRight().intValue()); + } + + /** + * Test selectLeastPenaltyWithPriority selects queue with lowest penalty across multiple priority groups + */ + @Test + public void testSelectLeastPenaltyWithPriority_MultiplePriorityGroups() { + // Priority group 1 (higher priority) + MessageQueue mq0 = new MessageQueue("topic", "broker-high", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker-high", 1); + List highPriorityQueues = Arrays.asList(mq0, mq1); + + // Priority group 2 (lower priority) + MessageQueue mq2 = new MessageQueue("topic", "broker-low", 0); + MessageQueue mq3 = new MessageQueue("topic", "broker-low", 1); + List lowPriorityQueues = Arrays.asList(mq2, mq3); + + List > queuesWithPriority = Arrays.asList(highPriorityQueues, lowPriorityQueues); + + // Assign penalties: high-priority queues have higher penalties, low-priority have lower + List
> penalizers = Collections.singletonList( + mq -> mq.getBrokerName().equals("broker-high") ? 50 : 10 + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + queuesWithPriority, penalizers, startIndex); + + assertNotNull(result); + // Should select from low-priority group because it has lower penalty + assertTrue(result.getLeft().getBrokerName().equals("broker-low")); + assertEquals(10, result.getRight().intValue()); + } + + /** + * Test selectLeastPenaltyWithPriority short-circuits when a priority group yields penalty <= 0 + */ + @Test + public void testSelectLeastPenaltyWithPriority_ShortCircuitZeroPenalty() { + // Priority group 1 + MessageQueue mq0 = new MessageQueue("topic", "broker-high", 0); + List highPriorityQueues = Collections.singletonList(mq0); + + // Priority group 2 + MessageQueue mq1 = new MessageQueue("topic", "broker-low", 0); + List lowPriorityQueues = Collections.singletonList(mq1); + + List > queuesWithPriority = Arrays.asList(highPriorityQueues, lowPriorityQueues); + + // First group has penalty 0, should short-circuit + List
> penalizers = Collections.singletonList( + mq -> mq.getBrokerName().equals("broker-high") ? 0 : 100 + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + queuesWithPriority, penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq0, result.getLeft()); + assertEquals(0, result.getRight().intValue()); + } + + /** + * Test selectLeastPenaltyWithPriority when first group encounters zero penalty during iteration + */ + @Test + public void testSelectLeastPenaltyWithPriority_FirstGroupHasZeroPenalty() { + // Priority group 1 + MessageQueue mq0 = new MessageQueue("topic", "broker1", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker1", 1); + List group1 = Arrays.asList(mq0, mq1); + + // Priority group 2 + MessageQueue mq2 = new MessageQueue("topic", "broker2", 0); + List group2 = Collections.singletonList(mq2); + + List > queuesWithPriority = Arrays.asList(group1, group2); + + // mq1 in first group has penalty 0 + List
> penalizers = Collections.singletonList( + mq -> mq.getQueueId() == 1 && mq.getBrokerName().equals("broker1") ? 0 : 50 + ); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + queuesWithPriority, penalizers, startIndex); + + assertNotNull(result); + assertEquals(mq1, result.getLeft()); + assertEquals(0, result.getRight().intValue()); + } + + /** + * Test selectLeastPenaltyWithPriority returns first encountered minimum when multiple groups have same minimum penalty + */ + @Test + public void testSelectLeastPenaltyWithPriority_SameMinimumPenalty() { + // Priority group 1 + MessageQueue mq0 = new MessageQueue("topic", "broker1", 0); + List group1 = Collections.singletonList(mq0); + + // Priority group 2 + MessageQueue mq1 = new MessageQueue("topic", "broker2", 0); + List group2 = Collections.singletonList(mq1); + + // Priority group 3 + MessageQueue mq2 = new MessageQueue("topic", "broker3", 0); + List group3 = Collections.singletonList(mq2); + + List > queuesWithPriority = Arrays.asList(group1, group2, group3); + + // All have same penalty + List
> penalizers = Collections.singletonList(mq -> 10); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + queuesWithPriority, penalizers, startIndex); + + assertNotNull(result); + // Should return first encountered (from group1) + assertEquals(mq0, result.getLeft()); + assertEquals(10, result.getRight().intValue()); + } + + /** + * Test selectLeastPenaltyWithPriority with complex scenario: + * Multiple priority groups with varying penalties + */ + @Test + public void testSelectLeastPenaltyWithPriority_ComplexScenario() { + // Priority group 1: penalties 100, 90 + MessageQueue mq0 = new MessageQueue("topic", "broker1", 0); + MessageQueue mq1 = new MessageQueue("topic", "broker1", 1); + List group1 = Arrays.asList(mq0, mq1); + + // Priority group 2: penalties 50, 30 + MessageQueue mq2 = new MessageQueue("topic", "broker2", 0); + MessageQueue mq3 = new MessageQueue("topic", "broker2", 1); + List group2 = Arrays.asList(mq2, mq3); + + // Priority group 3: penalties 80, 20 + MessageQueue mq4 = new MessageQueue("topic", "broker3", 0); + MessageQueue mq5 = new MessageQueue("topic", "broker3", 1); + List group3 = Arrays.asList(mq4, mq5); + + List > queuesWithPriority = Arrays.asList(group1, group2, group3); + + List
> penalizers = Collections.singletonList(mq -> { + if (mq.getBrokerName().equals("broker1")) { + return mq.getQueueId() == 0 ? 100 : 90; + } else if (mq.getBrokerName().equals("broker2")) { + return mq.getQueueId() == 0 ? 50 : 30; + } else { + return mq.getQueueId() == 0 ? 80 : 20; + } + }); + + AtomicInteger startIndex = new AtomicInteger(0); + Pair result = MessageQueuePenalizer.selectLeastPenaltyWithPriority( + queuesWithPriority, penalizers, startIndex); + + assertNotNull(result); + // Should select mq5 from group3 with penalty 20 (the global minimum) + assertEquals(mq5, result.getLeft()); + assertEquals(20, result.getRight().intValue()); + } +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java new file mode 100644 index 00000000000..22f2a68e8b0 --- /dev/null +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueuePriorityProviderTest.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.proxy.service.route; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class MessageQueuePriorityProviderTest { + + @Test + public void testPriorityOfWithLambda() { + // Test functional interface implementation using lambda + MessageQueuePriorityProvider provider = mq -> mq.getQueueId(); + + MessageQueue queue1 = new MessageQueue("topic", "broker", 0); + MessageQueue queue2 = new MessageQueue("topic", "broker", 5); + MessageQueue queue3 = new MessageQueue("topic", "broker", 10); + + assertEquals(0, provider.priorityOf(queue1)); + assertEquals(5, provider.priorityOf(queue2)); + assertEquals(10, provider.priorityOf(queue3)); + } + + @Test + public void testPriorityOfWithConstantValue() { + // Test with constant priority + MessageQueuePriorityProvider constantProvider = mq -> 1; + + MessageQueue queue1 = new MessageQueue("topic1", "broker1", 0); + MessageQueue queue2 = new MessageQueue("topic2", "broker2", 5); + + assertEquals(1, constantProvider.priorityOf(queue1)); + assertEquals(1, constantProvider.priorityOf(queue2)); + } + + @Test + public void testPriorityOfBasedOnBrokerName() { + // Test priority based on broker name hash + MessageQueuePriorityProvider brokerProvider = + mq -> mq.getBrokerName().hashCode() % 10; + + MessageQueue queue1 = new MessageQueue("topic", "broker-a", 0); + MessageQueue queue2 = new MessageQueue("topic", "broker-b", 0); + + int priority1 = brokerProvider.priorityOf(queue1); + int priority2 = brokerProvider.priorityOf(queue2); + + // Priorities should be deterministic for the same broker + assertEquals(priority1, brokerProvider.priorityOf(queue1)); + assertEquals(priority2, brokerProvider.priorityOf(queue2)); + } + + @Test + public void testBuildPriorityGroupsWithNullList() { + MessageQueuePriorityProvider provider = mq -> 0; + List > result = MessageQueuePriorityProvider.buildPriorityGroups(null, provider); + + assertNotNull(result); + assertTrue(result.isEmpty()); + } + + @Test + public void testBuildPriorityGroupsWithEmptyList() { + MessageQueuePriorityProvider
provider = mq -> 0; + List > result = MessageQueuePriorityProvider.buildPriorityGroups( + Collections.emptyList(), provider); + + assertNotNull(result); + assertTrue(result.isEmpty()); + } + + @Test + public void testBuildPriorityGroupsWithSinglePriority() { + MessageQueuePriorityProvider
provider = mq -> 0; + + List queues = Arrays.asList( + new MessageQueue("topic", "broker1", 0), + new MessageQueue("topic", "broker1", 1), + new MessageQueue("topic", "broker1", 2) + ); + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(1, result.size()); + assertEquals(3, result.get(0).size()); + } + + @Test + public void testBuildPriorityGroupsWithMultiplePriorities() { + // Priority based on queue ID: 0->high, 1->medium, 2->low + MessageQueuePriorityProvider
provider = mq -> { + if (mq.getQueueId() < 2) return 0; // High priority + if (mq.getQueueId() < 4) return 1; // Medium priority + return 2; // Low priority + }; + + List queues = Arrays.asList( + new MessageQueue("topic", "broker", 0), // priority 0 + new MessageQueue("topic", "broker", 1), // priority 0 + new MessageQueue("topic", "broker", 2), // priority 1 + new MessageQueue("topic", "broker", 3), // priority 1 + new MessageQueue("topic", "broker", 4), // priority 2 + new MessageQueue("topic", "broker", 5) // priority 2 + ); + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(3, result.size()); + + // First group (highest priority 0) + assertEquals(2, result.get(0).size()); + assertEquals(0, result.get(0).get(0).getQueueId()); + assertEquals(1, result.get(0).get(1).getQueueId()); + + // Second group (medium priority 1) + assertEquals(2, result.get(1).size()); + assertEquals(2, result.get(1).get(0).getQueueId()); + assertEquals(3, result.get(1).get(1).getQueueId()); + + // Third group (low priority 2) + assertEquals(2, result.get(2).size()); + assertEquals(4, result.get(2).get(0).getQueueId()); + assertEquals(5, result.get(2).get(1).getQueueId()); + } + + @Test + public void testBuildPriorityGroupsOrderedByPriority() { + // Test that groups are ordered from high to low priority (ascending numeric value) + MessageQueuePriorityProvider
provider = mq -> mq.getQueueId(); + + List queues = Arrays.asList( + new MessageQueue("topic", "broker", 5), + new MessageQueue("topic", "broker", 0), + new MessageQueue("topic", "broker", 3), + new MessageQueue("topic", "broker", 1) + ); + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(4, result.size()); + + // Verify order: 0, 1, 3, 5 (ascending) + assertEquals(0, result.get(0).get(0).getQueueId()); + assertEquals(1, result.get(1).get(0).getQueueId()); + assertEquals(3, result.get(2).get(0).getQueueId()); + assertEquals(5, result.get(3).get(0).getQueueId()); + } + + @Test + public void testBuildPriorityGroupsWithNegativePriorities() { + // Test with negative priority values + MessageQueuePriorityProvider
provider = mq -> mq.getQueueId() - 5; + + List queues = Arrays.asList( + new MessageQueue("topic", "broker", 0), // priority -5 + new MessageQueue("topic", "broker", 5), // priority 0 + new MessageQueue("topic", "broker", 10) // priority 5 + ); + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(3, result.size()); + + // Verify order: -5, 0, 5 (ascending) + assertEquals(0, result.get(0).get(0).getQueueId()); + assertEquals(5, result.get(1).get(0).getQueueId()); + assertEquals(10, result.get(2).get(0).getQueueId()); + } + + @Test + public void testBuildPriorityGroupsWithMixedBrokers() { + // Priority based on broker name + MessageQueuePriorityProvider
provider = mq -> { + if (mq.getBrokerName().equals("broker-high")) return 0; + if (mq.getBrokerName().equals("broker-medium")) return 1; + return 2; + }; + + List queues = Arrays.asList( + new MessageQueue("topic", "broker-high", 0), + new MessageQueue("topic", "broker-low", 0), + new MessageQueue("topic", "broker-medium", 0), + new MessageQueue("topic", "broker-high", 1), + new MessageQueue("topic", "broker-medium", 1) + ); + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(3, result.size()); + + // High priority group + assertEquals(2, result.get(0).size()); + assertEquals("broker-high", result.get(0).get(0).getBrokerName()); + assertEquals("broker-high", result.get(0).get(1).getBrokerName()); + + // Medium priority group + assertEquals(2, result.get(1).size()); + assertEquals("broker-medium", result.get(1).get(0).getBrokerName()); + + // Low priority group + assertEquals(1, result.get(2).size()); + assertEquals("broker-low", result.get(2).get(0).getBrokerName()); + } + + @Test + public void testBuildPriorityGroupsPreservesQueueOrder() { + // Test that queues with same priority maintain their relative order + MessageQueuePriorityProvider
provider = mq -> 0; + + List queues = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + queues.add(new MessageQueue("topic", "broker", i)); + } + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(1, result.size()); + assertEquals(10, result.get(0).size()); + + // Verify order is maintained + for (int i = 0; i < 10; i++) { + assertEquals(i, result.get(0).get(i).getQueueId()); + } + } + + @Test + public void testBuildPriorityGroupsWithCustomMessageQueue() { + // Test with extended MessageQueue type + class CustomMessageQueue extends MessageQueue { + private int customPriority; + + public CustomMessageQueue(String topic, String brokerName, int queueId, int customPriority) { + super(topic, brokerName, queueId); + this.customPriority = customPriority; + } + + public int getCustomPriority() { + return customPriority; + } + } + + MessageQueuePriorityProvider
provider = + CustomMessageQueue::getCustomPriority; + + List queues = Arrays.asList( + new CustomMessageQueue("topic", "broker", 0, 2), + new CustomMessageQueue("topic", "broker", 1, 0), + new CustomMessageQueue("topic", "broker", 2, 1) + ); + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(3, result.size()); + + // Verify order by custom priority: 0, 1, 2 + assertEquals(0, result.get(0).get(0).getCustomPriority()); + assertEquals(1, result.get(1).get(0).getCustomPriority()); + assertEquals(2, result.get(2).get(0).getCustomPriority()); + } + + @Test + public void testBuildPriorityGroupsWithLargeNumberOfQueues() { + // Test with large number of queues + MessageQueuePriorityProvider
provider = mq -> mq.getQueueId() % 5; + + List queues = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + queues.add(new MessageQueue("topic", "broker", i)); + } + + List > result = MessageQueuePriorityProvider.buildPriorityGroups(queues, provider); + + assertNotNull(result); + assertEquals(5, result.size()); // 5 different priorities (0-4) + + // Each group should have 20 queues (100 / 5) + for (List
group : result) { + assertEquals(20, group.size()); + } + } +} diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java index d150f87c409..e44ed28f4a6 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/route/MessageQueueSelectorTest.java @@ -30,12 +30,12 @@ public class MessageQueueSelectorTest extends BaseServiceTest { public void testReadMessageQueue() { queueData.setPerm(PermName.PERM_READ); queueData.setReadQueueNums(0); - MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true); + MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true); assertTrue(messageQueueSelector.getQueues().isEmpty()); queueData.setPerm(PermName.PERM_READ); queueData.setReadQueueNums(3); - messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, true); + messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), true); assertEquals(3, messageQueueSelector.getQueues().size()); assertEquals(1, messageQueueSelector.getBrokerActingQueues().size()); for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) { @@ -58,12 +58,12 @@ public void testReadMessageQueue() { public void testWriteMessageQueue() { queueData.setPerm(PermName.PERM_WRITE); queueData.setReadQueueNums(0); - MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false); + MessageQueueSelector messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false); assertTrue(messageQueueSelector.getQueues().isEmpty()); queueData.setPerm(PermName.PERM_WRITE); queueData.setWriteQueueNums(3); - messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), null, false); + messageQueueSelector = new MessageQueueSelector(new TopicRouteWrapper(topicRouteData, TOPIC), false); assertEquals(3, messageQueueSelector.getQueues().size()); assertEquals(1, messageQueueSelector.getBrokerActingQueues().size()); for (int i = 0; i < messageQueueSelector.getQueues().size(); i++) {