From cad558d907770a7a23e1913b767f2243e2448aff Mon Sep 17 00:00:00 2001 From: bakhritdinov Date: Fri, 16 Jan 2026 11:19:26 +0500 Subject: [PATCH 1/2] feat(amqp): Allow configurable delay strategy for backed channel This commit introduces the ability to provide a custom DelayStrategy for AMQP-backed message channels. The AmqpBackedMessageChannelBuilder now has a withDelayStrategy method, which passes the strategy down to the AmqpOutboundChannelAdapter. This allows users to override the default delaying behavior. An integration test has been added to verify that the custom strategy is correctly wired and invoked. --- .../src/AmqpBackedMessageChannelBuilder.php | 8 ++++++ .../Amqp/src/AmqpOutboundChannelAdapter.php | 4 ++- .../src/AmqpOutboundChannelAdapterBuilder.php | 12 +++++++- .../Fixture/AmqpChannel/SpyDelayStrategy.php | 25 +++++++++++++++++ .../Integration/AmqpChannelAdapterTest.php | 28 +++++++++++++++++++ 5 files changed, 75 insertions(+), 2 deletions(-) create mode 100644 packages/Amqp/tests/Fixture/AmqpChannel/SpyDelayStrategy.php diff --git a/packages/Amqp/src/AmqpBackedMessageChannelBuilder.php b/packages/Amqp/src/AmqpBackedMessageChannelBuilder.php index 28d44602d..dffdb3066 100644 --- a/packages/Amqp/src/AmqpBackedMessageChannelBuilder.php +++ b/packages/Amqp/src/AmqpBackedMessageChannelBuilder.php @@ -4,6 +4,7 @@ use Ecotone\Enqueue\EnqueueMessageChannelBuilder; use Enqueue\AmqpExt\AmqpConnectionFactory; +use Enqueue\AmqpTools\DelayStrategy; /** * Class AmqpBackedMessageChannelBuilder @@ -71,6 +72,13 @@ public function withPublisherConfirms(bool $enabled): self return $this; } + public function withDelayStrategy(DelayStrategy $delayStrategy): self + { + $this->getAmqpOutboundChannelAdapter()->withDelayStrategy($delayStrategy); + + return $this; + } + public function getMessageChannelName(): string { return $this->channelName; diff --git a/packages/Amqp/src/AmqpOutboundChannelAdapter.php b/packages/Amqp/src/AmqpOutboundChannelAdapter.php index 1062ebe96..0639fa791 100644 --- a/packages/Amqp/src/AmqpOutboundChannelAdapter.php +++ b/packages/Amqp/src/AmqpOutboundChannelAdapter.php @@ -13,6 +13,7 @@ use Ecotone\Messaging\Support\Assert; use Enqueue\AmqpExt\AmqpContext as AmqpExtContext; use Enqueue\AmqpLib\AmqpContext as AmqpLibContext; +use Enqueue\AmqpTools\DelayStrategy; use Interop\Amqp\AmqpMessage; use Interop\Amqp\Impl\AmqpTopic; @@ -43,6 +44,7 @@ public function __construct( private OutboundMessageConverter $outboundMessageConverter, private ConversionService $conversionService, private AmqpTransactionInterceptor $amqpTransactionInterceptor, + private ?DelayStrategy $delayStrategy = null ) { } @@ -87,7 +89,7 @@ public function handle(Message $message): void $context = $this->connectionFactory->createContext(); $this->connectionFactory->getProducer() ->setTimeToLive($outboundMessage->getTimeToLive()) - ->setDelayStrategy(new HeadersExchangeDelayStrategy()) + ->setDelayStrategy($this->delayStrategy ?? new HeadersExchangeDelayStrategy()) ->setDeliveryDelay($outboundMessage->getDeliveryDelay()) // this allow for having queue per delay instead of queue per delay + exchangeName ->send(new AmqpTopic($exchangeName), $messageToSend); diff --git a/packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php b/packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php index 813b31a95..7c71d8136 100644 --- a/packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php +++ b/packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php @@ -12,6 +12,7 @@ use Ecotone\Messaging\Config\Container\MessagingContainerBuilder; use Ecotone\Messaging\Config\Container\Reference; use Ecotone\Messaging\Conversion\ConversionService; +use Enqueue\AmqpTools\DelayStrategy; /** * licence Apache-2.0 @@ -28,6 +29,7 @@ class AmqpOutboundChannelAdapterBuilder extends EnqueueOutboundChannelAdapterBui private bool $defaultPersistentDelivery = self::DEFAULT_PERSISTENT_MODE; private array $staticHeadersToAdd = []; private bool $publisherConfirms = true; + private ?DelayStrategy $delayStrategy = null; private function __construct(string $exchangeName, string $amqpConnectionFactoryReferenceName) { @@ -65,6 +67,13 @@ public function withPublisherConfirms(bool $publisherConfirms): self return $this; } + public function withDelayStrategy(DelayStrategy $delayStrategy): self + { + $this->delayStrategy = $delayStrategy; + + return $this; + } + /** * @param string $headerName * @@ -140,6 +149,7 @@ public function compile(MessagingContainerBuilder $builder): Definition $outboundMessageConverter, new Reference(ConversionService::REFERENCE_NAME), Reference::to(AmqpTransactionInterceptor::class), + $this->delayStrategy ]); } -} +} \ No newline at end of file diff --git a/packages/Amqp/tests/Fixture/AmqpChannel/SpyDelayStrategy.php b/packages/Amqp/tests/Fixture/AmqpChannel/SpyDelayStrategy.php new file mode 100644 index 000000000..b70ff0073 --- /dev/null +++ b/packages/Amqp/tests/Fixture/AmqpChannel/SpyDelayStrategy.php @@ -0,0 +1,25 @@ +delayMessage($context, $dest, $message, $delay); + } +} \ No newline at end of file diff --git a/packages/Amqp/tests/Integration/AmqpChannelAdapterTest.php b/packages/Amqp/tests/Integration/AmqpChannelAdapterTest.php index 3c2c0d5d6..fac899e78 100644 --- a/packages/Amqp/tests/Integration/AmqpChannelAdapterTest.php +++ b/packages/Amqp/tests/Integration/AmqpChannelAdapterTest.php @@ -42,6 +42,7 @@ use RuntimeException; use stdClass; use Test\Ecotone\Amqp\AmqpMessagingTestCase; +use Test\Ecotone\Amqp\Fixture\AmqpChannel\SpyDelayStrategy; use Test\Ecotone\Amqp\Fixture\AmqpConsumer\AmqpConsumerExample; use Test\Ecotone\Amqp\Fixture\Handler\ExceptionalMessageHandler; @@ -894,6 +895,33 @@ public function test_sending_and_receiving_from_fanout_exchange() )); } + public function test_using_custom_delay_strategy_from_channel_builder() + { + $queueName = Uuid::uuid4()->toString(); + $customDelayStrategy = new SpyDelayStrategy(); + + $ecotoneLite = $this->bootstrapForTesting( + [], + array_merge($this->getConnectionFactoryReferences(), ['customDelayStrategy' => $customDelayStrategy]), + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE])) + ->withExtensionObjects([ + AmqpBackedMessageChannelBuilder::create($queueName) + ->withDelayStrategy($customDelayStrategy), + ]) + ); + + /** @var PollableChannel $messageChannel */ + $messageChannel = $ecotoneLite->getMessageChannelByName($queueName); + $messageChannel->send( + MessageBuilder::withPayload('some') + ->setHeader(MessageHeaders::DELIVERY_DELAY, 10) + ->build() + ); + + $this->assertTrue(SpyDelayStrategy::$wasCalled); + } + /** * @param string $queueName * @return EnqueueMessageChannel From 84dcdd97eaca61e79e243650c5dc9af7dbdb5a83 Mon Sep 17 00:00:00 2001 From: Bobur Bakhritdinov Date: Mon, 19 Jan 2026 10:17:08 +0500 Subject: [PATCH 2/2] refactor(amqp): Use reference for delay strategy The DelayStrategy is now passed by reference name to the AmqpBackedMessageChannelBuilder and AmqpOutboundChannelAdapterBuilder. This allows for better reusability of the delay strategy in the dependency container. --- packages/Amqp/src/AmqpBackedMessageChannelBuilder.php | 5 ++--- packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php | 9 ++++----- .../Amqp/tests/Integration/AmqpChannelAdapterTest.php | 2 +- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/packages/Amqp/src/AmqpBackedMessageChannelBuilder.php b/packages/Amqp/src/AmqpBackedMessageChannelBuilder.php index dffdb3066..6fafaf28d 100644 --- a/packages/Amqp/src/AmqpBackedMessageChannelBuilder.php +++ b/packages/Amqp/src/AmqpBackedMessageChannelBuilder.php @@ -4,7 +4,6 @@ use Ecotone\Enqueue\EnqueueMessageChannelBuilder; use Enqueue\AmqpExt\AmqpConnectionFactory; -use Enqueue\AmqpTools\DelayStrategy; /** * Class AmqpBackedMessageChannelBuilder @@ -72,9 +71,9 @@ public function withPublisherConfirms(bool $enabled): self return $this; } - public function withDelayStrategy(DelayStrategy $delayStrategy): self + public function withDelayStrategy(string $delayStrategyReferenceName): self { - $this->getAmqpOutboundChannelAdapter()->withDelayStrategy($delayStrategy); + $this->getAmqpOutboundChannelAdapter()->withDelayStrategy($delayStrategyReferenceName); return $this; } diff --git a/packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php b/packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php index 7c71d8136..e8019661b 100644 --- a/packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php +++ b/packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php @@ -12,7 +12,6 @@ use Ecotone\Messaging\Config\Container\MessagingContainerBuilder; use Ecotone\Messaging\Config\Container\Reference; use Ecotone\Messaging\Conversion\ConversionService; -use Enqueue\AmqpTools\DelayStrategy; /** * licence Apache-2.0 @@ -29,7 +28,7 @@ class AmqpOutboundChannelAdapterBuilder extends EnqueueOutboundChannelAdapterBui private bool $defaultPersistentDelivery = self::DEFAULT_PERSISTENT_MODE; private array $staticHeadersToAdd = []; private bool $publisherConfirms = true; - private ?DelayStrategy $delayStrategy = null; + private ?string $delayStrategyReferenceName = null; private function __construct(string $exchangeName, string $amqpConnectionFactoryReferenceName) { @@ -67,9 +66,9 @@ public function withPublisherConfirms(bool $publisherConfirms): self return $this; } - public function withDelayStrategy(DelayStrategy $delayStrategy): self + public function withDelayStrategy(string $delayStrategyReferenceName): self { - $this->delayStrategy = $delayStrategy; + $this->delayStrategyReferenceName = $delayStrategyReferenceName; return $this; } @@ -149,7 +148,7 @@ public function compile(MessagingContainerBuilder $builder): Definition $outboundMessageConverter, new Reference(ConversionService::REFERENCE_NAME), Reference::to(AmqpTransactionInterceptor::class), - $this->delayStrategy + $this->delayStrategyReferenceName ? new Reference($this->delayStrategyReferenceName) : null, ]); } } \ No newline at end of file diff --git a/packages/Amqp/tests/Integration/AmqpChannelAdapterTest.php b/packages/Amqp/tests/Integration/AmqpChannelAdapterTest.php index fac899e78..8975b275c 100644 --- a/packages/Amqp/tests/Integration/AmqpChannelAdapterTest.php +++ b/packages/Amqp/tests/Integration/AmqpChannelAdapterTest.php @@ -907,7 +907,7 @@ public function test_using_custom_delay_strategy_from_channel_builder() ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE])) ->withExtensionObjects([ AmqpBackedMessageChannelBuilder::create($queueName) - ->withDelayStrategy($customDelayStrategy), + ->withDelayStrategy('customDelayStrategy'), ]) );