diff --git a/packages/Amqp/src/AmqpBackedMessageChannelBuilder.php b/packages/Amqp/src/AmqpBackedMessageChannelBuilder.php index 28d44602d..6fafaf28d 100644 --- a/packages/Amqp/src/AmqpBackedMessageChannelBuilder.php +++ b/packages/Amqp/src/AmqpBackedMessageChannelBuilder.php @@ -71,6 +71,13 @@ public function withPublisherConfirms(bool $enabled): self return $this; } + public function withDelayStrategy(string $delayStrategyReferenceName): self + { + $this->getAmqpOutboundChannelAdapter()->withDelayStrategy($delayStrategyReferenceName); + + return $this; + } + public function getMessageChannelName(): string { return $this->channelName; diff --git a/packages/Amqp/src/AmqpOutboundChannelAdapter.php b/packages/Amqp/src/AmqpOutboundChannelAdapter.php index 1062ebe96..0639fa791 100644 --- a/packages/Amqp/src/AmqpOutboundChannelAdapter.php +++ b/packages/Amqp/src/AmqpOutboundChannelAdapter.php @@ -13,6 +13,7 @@ use Ecotone\Messaging\Support\Assert; use Enqueue\AmqpExt\AmqpContext as AmqpExtContext; use Enqueue\AmqpLib\AmqpContext as AmqpLibContext; +use Enqueue\AmqpTools\DelayStrategy; use Interop\Amqp\AmqpMessage; use Interop\Amqp\Impl\AmqpTopic; @@ -43,6 +44,7 @@ public function __construct( private OutboundMessageConverter $outboundMessageConverter, private ConversionService $conversionService, private AmqpTransactionInterceptor $amqpTransactionInterceptor, + private ?DelayStrategy $delayStrategy = null ) { } @@ -87,7 +89,7 @@ public function handle(Message $message): void $context = $this->connectionFactory->createContext(); $this->connectionFactory->getProducer() ->setTimeToLive($outboundMessage->getTimeToLive()) - ->setDelayStrategy(new HeadersExchangeDelayStrategy()) + ->setDelayStrategy($this->delayStrategy ?? new HeadersExchangeDelayStrategy()) ->setDeliveryDelay($outboundMessage->getDeliveryDelay()) // this allow for having queue per delay instead of queue per delay + exchangeName ->send(new AmqpTopic($exchangeName), $messageToSend); diff --git a/packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php b/packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php index 813b31a95..e8019661b 100644 --- a/packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php +++ b/packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php @@ -28,6 +28,7 @@ class AmqpOutboundChannelAdapterBuilder extends EnqueueOutboundChannelAdapterBui private bool $defaultPersistentDelivery = self::DEFAULT_PERSISTENT_MODE; private array $staticHeadersToAdd = []; private bool $publisherConfirms = true; + private ?string $delayStrategyReferenceName = null; private function __construct(string $exchangeName, string $amqpConnectionFactoryReferenceName) { @@ -65,6 +66,13 @@ public function withPublisherConfirms(bool $publisherConfirms): self return $this; } + public function withDelayStrategy(string $delayStrategyReferenceName): self + { + $this->delayStrategyReferenceName = $delayStrategyReferenceName; + + return $this; + } + /** * @param string $headerName * @@ -140,6 +148,7 @@ public function compile(MessagingContainerBuilder $builder): Definition $outboundMessageConverter, new Reference(ConversionService::REFERENCE_NAME), Reference::to(AmqpTransactionInterceptor::class), + $this->delayStrategyReferenceName ? new Reference($this->delayStrategyReferenceName) : null, ]); } -} +} \ No newline at end of file diff --git a/packages/Amqp/tests/Fixture/AmqpChannel/SpyDelayStrategy.php b/packages/Amqp/tests/Fixture/AmqpChannel/SpyDelayStrategy.php new file mode 100644 index 000000000..b70ff0073 --- /dev/null +++ b/packages/Amqp/tests/Fixture/AmqpChannel/SpyDelayStrategy.php @@ -0,0 +1,25 @@ +delayMessage($context, $dest, $message, $delay); + } +} \ No newline at end of file diff --git a/packages/Amqp/tests/Integration/AmqpChannelAdapterTest.php b/packages/Amqp/tests/Integration/AmqpChannelAdapterTest.php index 3c2c0d5d6..8975b275c 100644 --- a/packages/Amqp/tests/Integration/AmqpChannelAdapterTest.php +++ b/packages/Amqp/tests/Integration/AmqpChannelAdapterTest.php @@ -42,6 +42,7 @@ use RuntimeException; use stdClass; use Test\Ecotone\Amqp\AmqpMessagingTestCase; +use Test\Ecotone\Amqp\Fixture\AmqpChannel\SpyDelayStrategy; use Test\Ecotone\Amqp\Fixture\AmqpConsumer\AmqpConsumerExample; use Test\Ecotone\Amqp\Fixture\Handler\ExceptionalMessageHandler; @@ -894,6 +895,33 @@ public function test_sending_and_receiving_from_fanout_exchange() )); } + public function test_using_custom_delay_strategy_from_channel_builder() + { + $queueName = Uuid::uuid4()->toString(); + $customDelayStrategy = new SpyDelayStrategy(); + + $ecotoneLite = $this->bootstrapForTesting( + [], + array_merge($this->getConnectionFactoryReferences(), ['customDelayStrategy' => $customDelayStrategy]), + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE])) + ->withExtensionObjects([ + AmqpBackedMessageChannelBuilder::create($queueName) + ->withDelayStrategy('customDelayStrategy'), + ]) + ); + + /** @var PollableChannel $messageChannel */ + $messageChannel = $ecotoneLite->getMessageChannelByName($queueName); + $messageChannel->send( + MessageBuilder::withPayload('some') + ->setHeader(MessageHeaders::DELIVERY_DELAY, 10) + ->build() + ); + + $this->assertTrue(SpyDelayStrategy::$wasCalled); + } + /** * @param string $queueName * @return EnqueueMessageChannel