From 0430601db09d71288910ce38e26b8fba7a579952 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A2=81=E5=BB=BA=E5=86=9B?= <631583871@qq.com> Date: Fri, 27 Jun 2025 20:29:56 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#583]:=20=E6=96=B0=E5=A2=9E=20RocketMQ?= =?UTF-8?q?MessageHandler=20=E5=92=8C=20RocketMQMessagePostProcessor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 RocketMQMessageHandler 用户处理消息前预处理,可用于处理链路追踪 - 新增 RocketMQMessagePostProcessor 类,用于处理消息发送前预处理消息,用来传递链路ID --- .../ListenerContainerConfiguration.java | 5 +- .../MessagePostProcessorConfiguration.java | 37 +++++++ .../RocketMQAutoConfiguration.java | 12 ++- .../RocketMQMessageHandlerConfiguration.java | 38 +++++++ .../spring/core/RocketMQTemplate.java | 12 ++- .../DefaultRocketMQListenerContainer.java | 50 ++++++--- .../support/RocketMQMessageHandler.java | 26 +++++ .../support/RocketMQMessageHandlerChain.java | 23 ++++ ...etMQMessageListenerContainerRegistrar.java | 7 +- .../support/RocketMQMessagePostProcessor.java | 30 ++++++ .../support/RocketMQMessageHandlerTest.java | 102 ++++++++++++++++++ .../RocketMQMessagePostProcessorTest.java | 84 +++++++++++++++ .../ExtTemplateResetConfiguration.java | 6 ++ .../ListenerContainerConfiguration.java | 9 +- .../MessagePostProcessorConfiguration.java | 37 +++++++ .../RocketMQAutoConfiguration.java | 12 ++- .../RocketMQMessageHandlerConfiguration.java | 34 ++++++ .../client/core/RocketMQClientTemplate.java | 12 ++- .../support/RocketMQMessageHandler.java | 27 +++++ .../support/RocketMQMessageHandlerChain.java | 24 +++++ .../support/RocketMQMessagePostProcessor.java | 30 ++++++ 21 files changed, 588 insertions(+), 29 deletions(-) create mode 100644 rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/MessagePostProcessorConfiguration.java create mode 100644 rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQMessageHandlerConfiguration.java create mode 100644 rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageHandler.java create mode 100644 rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageHandlerChain.java create mode 100644 rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessagePostProcessor.java create mode 100644 rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQMessageHandlerTest.java create mode 100644 rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQMessagePostProcessorTest.java create mode 100644 rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/MessagePostProcessorConfiguration.java create mode 100644 rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQMessageHandlerConfiguration.java create mode 100644 rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQMessageHandler.java create mode 100644 rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQMessageHandlerChain.java create mode 100644 rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQMessagePostProcessor.java diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java index 726217c2..9d34fefe 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.spring.autoconfigure; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; +import org.apache.rocketmq.spring.support.RocketMQMessageHandler; import org.apache.rocketmq.spring.support.RocketMQMessageListenerContainerRegistrar; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; @@ -28,7 +29,7 @@ @ConditionalOnMissingBean(RocketMQMessageListenerContainerRegistrar.class) public class ListenerContainerConfiguration { @Bean - public RocketMQMessageListenerContainerRegistrar rocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter, ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) { - return new RocketMQMessageListenerContainerRegistrar(rocketMQMessageConverter, environment, rocketMQProperties); + public RocketMQMessageListenerContainerRegistrar rocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter, ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties, RocketMQMessageHandler rocketMQMessageHandler) { + return new RocketMQMessageListenerContainerRegistrar(rocketMQMessageConverter, environment, rocketMQProperties, rocketMQMessageHandler); } } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/MessagePostProcessorConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/MessagePostProcessorConfiguration.java new file mode 100644 index 00000000..6f7bfcbe --- /dev/null +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/MessagePostProcessorConfiguration.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spring.autoconfigure; + + +import org.apache.rocketmq.spring.support.RocketMQMessagePostProcessor; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @see RocketMQMessagePostProcessor + */ +@Configuration +@ConditionalOnMissingBean(RocketMQMessagePostProcessor.class) +class MessagePostProcessorConfiguration { + + @Bean + public RocketMQMessagePostProcessor createMessagePostProcessor() { + return new RocketMQMessagePostProcessor(); + } + +} diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java index 45d26642..a0261cd0 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java @@ -25,6 +25,7 @@ import org.apache.rocketmq.spring.annotation.SelectorType; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQMessageConverter; +import org.apache.rocketmq.spring.support.RocketMQMessagePostProcessor; import org.apache.rocketmq.spring.support.RocketMQUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,9 +52,10 @@ @EnableConfigurationProperties(RocketMQProperties.class) @ConditionalOnClass({MQAdmin.class}) @ConditionalOnProperty(prefix = "rocketmq", value = "name-server", matchIfMissing = true) -@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, - ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class, RocketMQListenerConfiguration.class}) -@AutoConfigureAfter({MessageConverterConfiguration.class}) +@Import({MessageConverterConfiguration.class, RocketMQMessageHandlerConfiguration.class, ListenerContainerConfiguration.class, ExtProducerResetConfiguration.class, + ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class, RocketMQListenerConfiguration.class, + MessagePostProcessorConfiguration.class}) +@AutoConfigureAfter({MessageConverterConfiguration.class, MessagePostProcessorConfiguration.class, RocketMQMessageHandlerConfiguration.class}) @AutoConfigureBefore({RocketMQTransactionConfiguration.class}) public class RocketMQAutoConfiguration implements ApplicationContextAware { private static final Logger log = LoggerFactory.getLogger(RocketMQAutoConfiguration.class); @@ -166,7 +168,8 @@ public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocket @Bean(destroyMethod = "destroy") @Conditional(ProducerOrConsumerPropertyCondition.class) @ConditionalOnMissingBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME) - public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter rocketMQMessageConverter) { + public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter rocketMQMessageConverter, + RocketMQMessagePostProcessor rocketMQMessagePostProcessor) { RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); if (applicationContext.containsBean(PRODUCER_BEAN_NAME)) { rocketMQTemplate.setProducer((DefaultMQProducer) applicationContext.getBean(PRODUCER_BEAN_NAME)); @@ -175,6 +178,7 @@ public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter rocketMQMessag rocketMQTemplate.setConsumer((DefaultLitePullConsumer) applicationContext.getBean(CONSUMER_BEAN_NAME)); } rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); + rocketMQTemplate.setMessagePostProcessor(rocketMQMessagePostProcessor.getMessagePostProcessor()); return rocketMQTemplate; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQMessageHandlerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQMessageHandlerConfiguration.java new file mode 100644 index 00000000..28303fd4 --- /dev/null +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQMessageHandlerConfiguration.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.spring.autoconfigure; + +import org.apache.rocketmq.spring.support.RocketMQMessageConverter; +import org.apache.rocketmq.spring.support.RocketMQMessageHandler; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @see RocketMQMessageConverter + */ +@Configuration +@ConditionalOnMissingBean(RocketMQMessageHandler.class) +class RocketMQMessageHandlerConfiguration { + + @Bean + public RocketMQMessageHandler createRocketMQMessageHandler() { + return (message, consumer) -> consumer.doHandler(message); + } + +} diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java index b4fba546..5f0881cf 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java @@ -72,6 +72,8 @@ public class RocketMQTemplate extends AbstractMessageSendingTemplate imp private RocketMQMessageConverter rocketMQMessageConverter = new RocketMQMessageConverter(); + private MessagePostProcessor messagePostProcessor; + public DefaultMQProducer getProducer() { return producer; } @@ -104,6 +106,14 @@ public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) { this.messageQueueSelector = messageQueueSelector; } + public MessagePostProcessor getMessagePostProcessor() { + return messagePostProcessor; + } + + public void setMessagePostProcessor(MessagePostProcessor messagePostProcessor) { + this.messagePostProcessor = messagePostProcessor; + } + public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) { this.producer.setAsyncSenderExecutor(asyncSenderExecutor); } @@ -1191,7 +1201,7 @@ public TransactionSendResult sendMessageInTransaction(final String destination, private org.apache.rocketmq.common.message.Message createRocketMqMessage( String destination, Message message) { - Message msg = this.doConvert(message.getPayload(), message.getHeaders(), null); + Message msg = this.doConvert(message.getPayload(), message.getHeaders(), messagePostProcessor); return RocketMQUtil.convertToRocketMessage(getMessageConverter(), charset, destination, msg); } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java index c3bf81b2..cda35cd9 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java @@ -140,6 +140,8 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, private String instanceName; + private RocketMQMessageHandler rocketMQMessageHandler = (message, chain) -> chain.doHandler(message); + public long getSuspendCurrentQueueTimeMillis() { return suspendCurrentQueueTimeMillis; } @@ -318,6 +320,14 @@ public void setInstanceName(String instanceName) { this.instanceName = instanceName; } + public RocketMQMessageHandler getRocketMQMessageHandler() { + return rocketMQMessageHandler; + } + + public void setRocketMQMessageHandler(RocketMQMessageHandler rocketMQMessageHandler) { + this.rocketMQMessageHandler = rocketMQMessageHandler; + } + public DefaultRocketMQListenerContainer setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) { this.awaitTerminationMillisWhenShutdown = awaitTerminationMillisWhenShutdown; return this; @@ -426,15 +436,21 @@ public class DefaultMessageListenerConcurrently implements MessageListenerConcur @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { - log.debug("received msg: {}", messageExt); try { - long now = System.currentTimeMillis(); - DefaultRocketMQListenerContainer container = applicationContext.getBean(name, DefaultRocketMQListenerContainer.class); - container.handleMessage(messageExt); - long costTime = System.currentTimeMillis() - now; - log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); + rocketMQMessageHandler.doHandler(messageExt, messageExt1 -> { + log.debug("received msg: {}", messageExt1); + try { + long now = System.currentTimeMillis(); + DefaultRocketMQListenerContainer container = applicationContext.getBean(name, DefaultRocketMQListenerContainer.class); + container.handleMessage(messageExt1); + long costTime = System.currentTimeMillis() - now; + log.debug("consume {} cost: {} ms", messageExt1.getMsgId(), costTime); + } catch (Exception e) { + log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt1.getMsgId(), messageExt1.getTopic(), messageExt1.getReconsumeTimes(), e); + throw e; + } + }); } catch (Exception e) { - log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e); context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } @@ -450,15 +466,21 @@ public class DefaultMessageListenerOrderly implements MessageListenerOrderly { @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { for (MessageExt messageExt : msgs) { - log.debug("received msg: {}", messageExt); try { - long now = System.currentTimeMillis(); - DefaultRocketMQListenerContainer container = applicationContext.getBean(name, DefaultRocketMQListenerContainer.class); - container.handleMessage(messageExt); - long costTime = System.currentTimeMillis() - now; - log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); + rocketMQMessageHandler.doHandler(messageExt, messageExt1 -> { + log.debug("received msg: {}", messageExt1); + try { + long now = System.currentTimeMillis(); + DefaultRocketMQListenerContainer container = applicationContext.getBean(name, DefaultRocketMQListenerContainer.class); + container.handleMessage(messageExt1); + long costTime = System.currentTimeMillis() - now; + log.debug("consume {} cost: {} ms", messageExt1.getMsgId(), costTime); + } catch (Exception e) { + log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt1.getMsgId(), messageExt1.getTopic(), messageExt1.getReconsumeTimes(), e); + throw e; + } + }); } catch (Exception e) { - log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e); context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageHandler.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageHandler.java new file mode 100644 index 00000000..025994f3 --- /dev/null +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageHandler.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spring.support; + + +import org.apache.rocketmq.common.message.MessageExt; + +public interface RocketMQMessageHandler { + + void doHandler(MessageExt message, RocketMQMessageHandlerChain chain) throws Exception; + +} diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageHandlerChain.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageHandlerChain.java new file mode 100644 index 00000000..a461a819 --- /dev/null +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageHandlerChain.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spring.support; + +import org.apache.rocketmq.common.message.MessageExt; + +public interface RocketMQMessageHandlerChain { + void doHandler(MessageExt message) throws Exception; +} diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java index a27c187f..8f365c94 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessageListenerContainerRegistrar.java @@ -56,11 +56,15 @@ public class RocketMQMessageListenerContainerRegistrar implements ApplicationCon private final List containers = new ArrayList<>(); + private final RocketMQMessageHandler rocketMQMessageHandler; + public RocketMQMessageListenerContainerRegistrar(RocketMQMessageConverter rocketMQMessageConverter, - ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) { + ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties, + RocketMQMessageHandler rocketMQMessageHandler) { this.rocketMQMessageConverter = rocketMQMessageConverter; this.environment = environment; this.rocketMQProperties = rocketMQProperties; + this.rocketMQMessageHandler = rocketMQMessageHandler; } @Override @@ -146,6 +150,7 @@ private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String container.setRocketMQReplyListener((RocketMQReplyListener) bean); } container.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); + container.setRocketMQMessageHandler(rocketMQMessageHandler); container.setName(name); String namespace = environment.resolvePlaceholders(annotation.namespace()); diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessagePostProcessor.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessagePostProcessor.java new file mode 100644 index 00000000..df95f758 --- /dev/null +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQMessagePostProcessor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spring.support; + +import org.springframework.messaging.core.MessagePostProcessor; + +/** + * @see MessagePostProcessor + */ +public class RocketMQMessagePostProcessor { + + public MessagePostProcessor getMessagePostProcessor() { + return null; + } + +} diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQMessageHandlerTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQMessageHandlerTest.java new file mode 100644 index 00000000..9707a480 --- /dev/null +++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQMessageHandlerTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spring.support; + +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.junit.Test; +import org.springframework.beans.BeansException; +import org.springframework.context.support.GenericApplicationContext; + +import java.lang.reflect.Field; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class RocketMQMessageHandlerTest { + + @Test + public void testRocketMQMessageHandler() throws Exception { + DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer(); + + listenerContainer.setApplicationContext(new GenericApplicationContext(){ + @Override + public T getBean(String name, Class requiredType) throws BeansException { + return (T) listenerContainer; + } + }); + listenerContainer.setRocketMQMessageHandler(new RocketMQMessageHandler() { + @Override + public void doHandler(MessageExt message, RocketMQMessageHandlerChain chain) throws Exception { + message.putUserProperty("test", "test"); + chain.doHandler(message); + } + }); + + listenerContainer.setRocketMQListener(new RocketMQListener() { + @Override + public void onMessage(MessageExt message) { + assertEquals(message.getProperties().get("test"), "test"); + } + }); + + Field messageTypeField = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType"); + messageTypeField.setAccessible(true); + messageTypeField.set(listenerContainer,MessageExt.class); + + DefaultRocketMQListenerContainer.DefaultMessageListenerConcurrently concurrently = listenerContainer.new DefaultMessageListenerConcurrently(); + MessageExt messageExt = new MessageExt(); + messageExt.getProperties(); + concurrently.consumeMessage(Arrays.asList(messageExt),null); + } + + @Test + public void testRocketMQMessageHandler1() throws Exception { + DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer(); + + listenerContainer.setApplicationContext(new GenericApplicationContext(){ + @Override + public T getBean(String name, Class requiredType) throws BeansException { + return (T) listenerContainer; + } + }); + listenerContainer.setRocketMQMessageHandler(new RocketMQMessageHandler() { + @Override + public void doHandler(MessageExt message, RocketMQMessageHandlerChain chain) throws Exception { + message.putUserProperty("test", "test"); + chain.doHandler(message); + } + }); + + listenerContainer.setRocketMQListener(new RocketMQListener() { + @Override + public void onMessage(MessageExt message) { + assertEquals(message.getProperties().get("test"), "test"); + } + }); + + Field messageTypeField = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType"); + messageTypeField.setAccessible(true); + messageTypeField.set(listenerContainer,MessageExt.class); + + DefaultRocketMQListenerContainer.DefaultMessageListenerOrderly concurrently = listenerContainer.new DefaultMessageListenerOrderly(); + MessageExt messageExt = new MessageExt(); + messageExt.getProperties(); + concurrently.consumeMessage(Arrays.asList(messageExt),null); + } + +} \ No newline at end of file diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQMessagePostProcessorTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQMessagePostProcessorTest.java new file mode 100644 index 00000000..c336aeb5 --- /dev/null +++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/RocketMQMessagePostProcessorTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.spring.support; + +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.junit.Test; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.core.MessagePostProcessor; +import org.springframework.messaging.support.MessageBuilder; + +import java.util.Objects; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class RocketMQMessagePostProcessorTest { + + @Test + public void testMessagePostProcessor() { + + RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); + rocketMQTemplate.setProducer(new DefaultMQProducer()); + rocketMQTemplate.setMessagePostProcessor(new MessagePostProcessor() { + @Override + public Message postProcessMessage(Message message) { + throw new RuntimeException("postProcessMessage"); + } + }); + try { + rocketMQTemplate.syncSend("test", "payload"); + } catch (MessagingException e) { + assertThat(e).hasMessageContaining("postProcessMessage"); + } + } + + @Test + public void testMessagePostProcessor2() { + + RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); + rocketMQTemplate.setProducer(new DefaultMQProducer() { + @Override + public SendResult send(org.apache.rocketmq.common.message.Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + SendResult sendResult = new SendResult(); + if (Objects.equals(msg.getProperties().get("test"), "test")) { + sendResult.setSendStatus(SendStatus.SEND_OK); + } else { + sendResult.setSendStatus(SendStatus.FLUSH_DISK_TIMEOUT); + } + return sendResult; + } + }); + rocketMQTemplate.setMessagePostProcessor(new MessagePostProcessor() { + @Override + public Message postProcessMessage(Message message) { + MessageBuilder builder = MessageBuilder.fromMessage(message); + builder.setHeader("test", "test"); + return builder.build(); + } + }); + SendResult sendResult = rocketMQTemplate.syncSend("test", "payload"); + assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus()); + } +} \ No newline at end of file diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java index ba8021e4..76063548 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtTemplateResetConfiguration.java @@ -18,6 +18,7 @@ import org.apache.rocketmq.client.annotation.ExtProducerResetConfiguration; import org.apache.rocketmq.client.support.RocketMQMessageConverter; +import org.apache.rocketmq.client.support.RocketMQMessagePostProcessor; import org.apache.rocketmq.client.support.RocketMQUtil; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientServiceProvider; @@ -54,11 +55,15 @@ public class ExtTemplateResetConfiguration implements ApplicationContextAware, S private RocketMQMessageConverter rocketMQMessageConverter; + private RocketMQMessagePostProcessor rocketMQMessagePostProcessor; + public ExtTemplateResetConfiguration(RocketMQMessageConverter rocketMQMessageConverter, + RocketMQMessagePostProcessor rocketMQMessagePostProcessor, ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) { this.rocketMQMessageConverter = rocketMQMessageConverter; this.environment = environment; this.rocketMQProperties = rocketMQProperties; + this.rocketMQMessagePostProcessor = rocketMQMessagePostProcessor; } @Override @@ -90,6 +95,7 @@ private void registerTemplate(String beanName, Object bean) { RocketMQClientTemplate rocketMQTemplate = (RocketMQClientTemplate) bean; rocketMQTemplate.setProducerBuilder(producerBuilder); rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); + rocketMQTemplate.setMessagePostProcessor(rocketMQMessagePostProcessor.getMessagePostProcessor()); String topic = environment.resolvePlaceholders(annotation.topic()); log.info("Set real producer to {} using topic {}", beanName, topic); } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java index 450d846e..1d674b8b 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ListenerContainerConfiguration.java @@ -20,6 +20,7 @@ import org.apache.rocketmq.client.core.RocketMQListener; import org.apache.rocketmq.client.support.DefaultListenerContainer; import org.apache.rocketmq.client.support.RocketMQMessageConverter; +import org.apache.rocketmq.client.support.RocketMQMessageHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; @@ -52,11 +53,15 @@ public class ListenerContainerConfiguration implements ApplicationContextAware { private final List containers = new ArrayList<>(); + private RocketMQMessageHandler rocketMQMessageHandler; + public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter, - ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) { + ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties, + RocketMQMessageHandler rocketMQMessageHandler) { this.rocketMQMessageConverter = rocketMQMessageConverter; this.environment = environment; this.rocketMQProperties = rocketMQProperties; + this.rocketMQMessageHandler = rocketMQMessageHandler; } @Override @@ -95,7 +100,7 @@ private DefaultListenerContainer createRocketMQListenerContainer(String name, Ob DefaultListenerContainer container = new DefaultListenerContainer(); container.setName(name); container.setRocketMQMessageListener(annotation); - container.setMessageListener((RocketMQListener) bean); + container.setMessageListener(messageView -> rocketMQMessageHandler.doHandler(messageView, message -> ((RocketMQListener) bean).consume(message))); container.setAccessKey(environment.resolvePlaceholders(annotation.accessKey())); container.setSecretKey(environment.resolvePlaceholders(annotation.secretKey())); container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/MessagePostProcessorConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/MessagePostProcessorConfiguration.java new file mode 100644 index 00000000..be579748 --- /dev/null +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/MessagePostProcessorConfiguration.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.autoconfigure; + + +import org.apache.rocketmq.client.support.RocketMQMessagePostProcessor; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @see RocketMQMessagePostProcessor + */ +@Configuration +@ConditionalOnMissingBean(RocketMQMessagePostProcessor.class) +class MessagePostProcessorConfiguration { + + @Bean + public RocketMQMessagePostProcessor createMessagePostProcessor() { + return new RocketMQMessagePostProcessor(); + } + +} diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java index 346311f2..94eadd1e 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.autoconfigure; import org.apache.rocketmq.client.support.RocketMQMessageConverter; +import org.apache.rocketmq.client.support.RocketMQMessagePostProcessor; import org.apache.rocketmq.client.support.RocketMQUtil; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientServiceProvider; @@ -50,9 +51,10 @@ @Configuration @EnableConfigurationProperties(RocketMQProperties.class) -@Import({MessageConverterConfiguration.class, ListenerContainerConfiguration.class, ExtTemplateResetConfiguration.class, - ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class, RocketMQListenerConfiguration.class}) -@AutoConfigureAfter({MessageConverterConfiguration.class}) +@Import({MessageConverterConfiguration.class, RocketMQMessageHandlerConfiguration.class, ListenerContainerConfiguration.class, ExtTemplateResetConfiguration.class, + ExtConsumerResetConfiguration.class, RocketMQTransactionConfiguration.class, RocketMQListenerConfiguration.class, + MessagePostProcessorConfiguration.class}) +@AutoConfigureAfter({MessageConverterConfiguration.class, MessagePostProcessorConfiguration.class,RocketMQMessageHandlerConfiguration.class}) @AutoConfigureBefore({RocketMQTransactionConfiguration.class}) public class RocketMQAutoConfiguration implements ApplicationContextAware { private static final Logger log = LoggerFactory.getLogger(RocketMQAutoConfiguration.class); @@ -126,7 +128,8 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr @Bean(destroyMethod = "destroy") @Conditional(ProducerOrConsumerPropertyCondition.class) @ConditionalOnMissingBean(name = ROCKETMQ_TEMPLATE_DEFAULT_GLOBAL_NAME) - public RocketMQClientTemplate rocketMQClientTemplate(RocketMQMessageConverter rocketMQMessageConverter) { + public RocketMQClientTemplate rocketMQClientTemplate(RocketMQMessageConverter rocketMQMessageConverter, + RocketMQMessagePostProcessor rocketMQMessagePostProcessor) { RocketMQClientTemplate rocketMQClientTemplate = new RocketMQClientTemplate(); if (applicationContext.containsBean(PRODUCER_BUILDER_BEAN_NAME)) { @@ -136,6 +139,7 @@ public RocketMQClientTemplate rocketMQClientTemplate(RocketMQMessageConverter ro rocketMQClientTemplate.setSimpleConsumerBuilder((SimpleConsumerBuilder) applicationContext.getBean(SIMPLE_CONSUMER_BUILDER_BEAN_NAME)); } rocketMQClientTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); + rocketMQClientTemplate.setMessagePostProcessor(rocketMQMessagePostProcessor.getMessagePostProcessor()); return rocketMQClientTemplate; } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQMessageHandlerConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQMessageHandlerConfiguration.java new file mode 100644 index 00000000..f17fbb49 --- /dev/null +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQMessageHandlerConfiguration.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.autoconfigure; + +import org.apache.rocketmq.client.support.RocketMQMessageHandler; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConditionalOnMissingBean(RocketMQMessageHandler.class) +class RocketMQMessageHandlerConfiguration { + + @Bean + public RocketMQMessageHandler createRocketMQMessageHandler() { + return (message, consumer) -> consumer.doHandler(message); + } + +} diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQClientTemplate.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQClientTemplate.java index 4213cb4b..1a464754 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQClientTemplate.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQClientTemplate.java @@ -33,6 +33,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; import org.springframework.messaging.core.AbstractMessageSendingTemplate; +import org.springframework.messaging.core.MessagePostProcessor; import org.springframework.messaging.support.MessageBuilder; import java.io.IOException; @@ -59,6 +60,8 @@ public class RocketMQClientTemplate extends AbstractMessageSendingTemplate ackAsync(MessageView messageView) { private org.apache.rocketmq.client.apis.message.Message createRocketMQMessage(String destination, Message message, Duration messageDelayTime, String messageGroup) { - Message msg = this.doConvert(message.getPayload(), message.getHeaders(), null); + Message msg = this.doConvert(message.getPayload(), message.getHeaders(), messagePostProcessor); return RocketMQUtil.convertToClientMessage(getMessageConverter(), charset, destination, msg, messageDelayTime, messageGroup); } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQMessageHandler.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQMessageHandler.java new file mode 100644 index 00000000..8e58b4ed --- /dev/null +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQMessageHandler.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.support; + + +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.message.MessageView; + +public interface RocketMQMessageHandler { + + ConsumeResult doHandler(MessageView message, RocketMQMessageHandlerChain consumer); + +} diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQMessageHandlerChain.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQMessageHandlerChain.java new file mode 100644 index 00000000..a0db9afa --- /dev/null +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQMessageHandlerChain.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.support; + +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.message.MessageView; + +public interface RocketMQMessageHandlerChain { + ConsumeResult doHandler(MessageView message); +} diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQMessagePostProcessor.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQMessagePostProcessor.java new file mode 100644 index 00000000..b15b37ad --- /dev/null +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQMessagePostProcessor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.support; + +import org.springframework.messaging.core.MessagePostProcessor; + +/** + * @see MessagePostProcessor + */ +public class RocketMQMessagePostProcessor { + + public MessagePostProcessor getMessagePostProcessor() { + return null; + } + +}