Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo.QueueFilter;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.StartAndShutdown;

public class MQFaultStrategy {
public class MQFaultStrategy implements StartAndShutdown {
private LatencyFaultTolerance<String> latencyFaultTolerance;
private volatile boolean sendLatencyFaultEnable;
private volatile boolean startDetectorEnable;
Expand Down Expand Up @@ -130,6 +131,11 @@ public void startDetector() {
this.latencyFaultTolerance.startDetector();
}

@Override
public void start() throws Exception {
this.startDetector();
}

public void shutdown() {
this.latencyFaultTolerance.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@
package org.apache.rocketmq.proxy.service.route;

import com.google.common.base.MoreObjects;
import java.util.Objects;
import org.apache.rocketmq.common.message.MessageQueue;

public class AddressableMessageQueue implements Comparable<AddressableMessageQueue> {

private final MessageQueue messageQueue;
public class AddressableMessageQueue extends MessageQueue {
private final String brokerAddr;

public AddressableMessageQueue(MessageQueue messageQueue, String brokerAddr) {
this.messageQueue = messageQueue;
super(messageQueue);
this.brokerAddr = brokerAddr;
}

public String getBrokerAddr() {
return brokerAddr;
}

public MessageQueue getMessageQueue() {
return new MessageQueue(getTopic(), getBrokerName(), getQueueId());
}

@Override
public int compareTo(AddressableMessageQueue o) {
return messageQueue.compareTo(o.messageQueue);
public int hashCode() {
return super.hashCode();
}

@Override
Expand All @@ -43,39 +48,13 @@ public boolean equals(Object o) {
if (!(o instanceof AddressableMessageQueue)) {
return false;
}
AddressableMessageQueue queue = (AddressableMessageQueue) o;
return Objects.equals(messageQueue, queue.messageQueue);
}

@Override
public int hashCode() {
return messageQueue == null ? 1 : messageQueue.hashCode();
}

public int getQueueId() {
return this.messageQueue.getQueueId();
}

public String getBrokerName() {
return this.messageQueue.getBrokerName();
}

public String getTopic() {
return messageQueue.getTopic();
}

public MessageQueue getMessageQueue() {
return messageQueue;
}

public String getBrokerAddr() {
return brokerAddr;
return super.equals(o);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("messageQueue", messageQueue)
.add("messageQueue", super.toString())
.add("brokerAddr", brokerAddr)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.proxy.service.route;

public class DefaultMessageQueuePriorityProvider implements MessageQueuePriorityProvider<AddressableMessageQueue> {
@Override
public int priorityOf(AddressableMessageQueue queue) {
return 10;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.proxy.service.route;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.message.MessageQueue;

@FunctionalInterface
public interface MessageQueuePenalizer<Q extends MessageQueue> {

/**
* Returns the penalty value for the given MessageQueue; lower is better.
*/
int penaltyOf(Q messageQueue);

/**
* Aggregates penalties from multiple penalizers for the same MessageQueue (by summing them up).
*/
static <Q extends MessageQueue> int evaluatePenalty(Q messageQueue, List<MessageQueuePenalizer<Q>> penalizers) {
Objects.requireNonNull(messageQueue, "messageQueue");
if (penalizers == null || penalizers.isEmpty()) {
return 0;
}
return penalizers.stream()
.mapToInt(p -> p.penaltyOf(messageQueue))
.sum();
}

/**
* Selects the queue with the lowest evaluated penalty from the given queue list.
*
* <p>The method iterates through all queues exactly once, but starts from a rotating index
* derived from {@code startIndex} (round-robin) to avoid always scanning from position 0 .</p>
*
* <p>For each queue, it computes a penalty via {@link #evaluatePenalty} using
* the provided {@code penalizers}. The queue with the smallest penalty is selected.</p>
*
* <p>Short-circuit rule: if any queue has a {@code penalty<= 0}, it is returned immediately,
* since no better result than 0 is expected.</p>
*
* @param queues candidate queues to select from
* @param penalizers penalty evaluators applied to each queue
* @param startIndex atomic counter used to determine the rotating start position (round-robin)
* @param <Q> queue type
* @return a {@code Pair} of (selected queue, penalty), or {@code null} if {@code queues} is null/empty
*/
static <Q extends MessageQueue> Pair<Q, Integer> selectLeastPenalty(List<Q> queues,
List<MessageQueuePenalizer<Q>> penalizers, AtomicInteger startIndex) {
if (queues == null || queues.isEmpty()) {
return null;
}
Q bestQueue = null;
int bestPenalty = Integer.MAX_VALUE;

for (int i = 0; i < queues.size(); i++) {
int index = Math.floorMod(startIndex.getAndIncrement(), queues.size());
Q messageQueue = queues.get(index);
int penalty = evaluatePenalty(messageQueue, penalizers);

// Short-circuit: cannot do better than 0
if (penalty <= 0) {
return Pair.of(messageQueue, penalty);
}

if (penalty < bestPenalty) {
bestPenalty = penalty;
bestQueue = messageQueue;
}
}
return Pair.of(bestQueue, bestPenalty);
}

/**
* Selects a queue with the lowest computed penalty from multiple priority groups.
*
* <p>The input {@code queuesWithPriority} is a list of queue groups ordered by priority.
* For each priority group, this method delegates to {@link #selectLeastPenalty} to pick the best queue
* within that group and obtain its penalty.</p>
*
* <p>Short-circuit rule: if any priority group yields a queue whose {@code penalty <= 0},
* that result is returned immediately.</p>
*
* <p>Otherwise, it returns the queue with the smallest positive penalty among all groups.
* If multiple groups produce the same minimum penalty, the first encountered one wins.</p>
*
* @param queuesWithPriority priority-ordered groups of queues; each inner list represents one priority level
* @param penalizers penalty calculators used by {@code selectLeastPenalty} to score queues
* @param startIndex round-robin start index forwarded to {@code selectLeastPenalty} to reduce contention/hotspots
* @param <Q> queue type
* @return a {@code Pair} of (selected queue, penalty), or {@code null} if {@code queuesWithPriority} is null/empty
*/
static <Q extends MessageQueue> Pair<Q, Integer> selectLeastPenaltyWithPriority(List<List<Q>> queuesWithPriority,
List<MessageQueuePenalizer<Q>> penalizers, AtomicInteger startIndex) {
if (queuesWithPriority == null || queuesWithPriority.isEmpty()) {
return null;
}
if (queuesWithPriority.size() == 1) {
return selectLeastPenalty(queuesWithPriority.get(0), penalizers, startIndex);
}
Q bestQueue = null;
int bestPenalty = Integer.MAX_VALUE;
for (List<Q> queues : queuesWithPriority) {
Pair<Q, Integer> queueAndPenalty = selectLeastPenalty(queues, penalizers, startIndex);
int penalty = queueAndPenalty.getRight();
if (queueAndPenalty.getRight() <= 0) {
return queueAndPenalty;
}
if (penalty < bestPenalty) {
bestPenalty = penalty;
bestQueue = queueAndPenalty.getLeft();
}
}
return Pair.of(bestQueue, bestPenalty);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.proxy.service.route;

import java.util.List;
import org.apache.rocketmq.common.message.MessageQueue;

@FunctionalInterface
public interface MessageQueuePriorityProvider<Q extends MessageQueue> {
/**
* smaller value is higher priority
* */
int priorityOf(Q q);

static <Q extends MessageQueue> List<List<Q>> buildPriorityGroups(List<Q> queues, MessageQueuePriorityProvider<Q> provider) {
if (queues == null || queues.isEmpty()) {
return java.util.Collections.emptyList();
}

java.util.Map<Integer, List<Q>> buckets = new java.util.TreeMap<>();
for (Q q : queues) {
int p = provider.priorityOf(q);
buckets.computeIfAbsent(p, k -> new java.util.ArrayList<>()).add(q);
}
return new java.util.ArrayList<>(buckets.values());
}
}
Loading
Loading