Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/Amqp/src/AmqpBackedMessageChannelBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ public function withPublisherConfirms(bool $enabled): self
return $this;
}

public function withDelayStrategy(string $delayStrategyReferenceName): self
{
$this->getAmqpOutboundChannelAdapter()->withDelayStrategy($delayStrategyReferenceName);

return $this;
}

public function getMessageChannelName(): string
{
return $this->channelName;
Expand Down
4 changes: 3 additions & 1 deletion packages/Amqp/src/AmqpOutboundChannelAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Ecotone\Messaging\Support\Assert;
use Enqueue\AmqpExt\AmqpContext as AmqpExtContext;
use Enqueue\AmqpLib\AmqpContext as AmqpLibContext;
use Enqueue\AmqpTools\DelayStrategy;
use Interop\Amqp\AmqpMessage;
use Interop\Amqp\Impl\AmqpTopic;

Expand Down Expand Up @@ -43,6 +44,7 @@ public function __construct(
private OutboundMessageConverter $outboundMessageConverter,
private ConversionService $conversionService,
private AmqpTransactionInterceptor $amqpTransactionInterceptor,
private ?DelayStrategy $delayStrategy = null
) {
}

Expand Down Expand Up @@ -87,7 +89,7 @@ public function handle(Message $message): void
$context = $this->connectionFactory->createContext();
$this->connectionFactory->getProducer()
->setTimeToLive($outboundMessage->getTimeToLive())
->setDelayStrategy(new HeadersExchangeDelayStrategy())
->setDelayStrategy($this->delayStrategy ?? new HeadersExchangeDelayStrategy())
->setDeliveryDelay($outboundMessage->getDeliveryDelay())
// this allow for having queue per delay instead of queue per delay + exchangeName
->send(new AmqpTopic($exchangeName), $messageToSend);
Expand Down
11 changes: 10 additions & 1 deletion packages/Amqp/src/AmqpOutboundChannelAdapterBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class AmqpOutboundChannelAdapterBuilder extends EnqueueOutboundChannelAdapterBui
private bool $defaultPersistentDelivery = self::DEFAULT_PERSISTENT_MODE;
private array $staticHeadersToAdd = [];
private bool $publisherConfirms = true;
private ?string $delayStrategyReferenceName = null;

private function __construct(string $exchangeName, string $amqpConnectionFactoryReferenceName)
{
Expand Down Expand Up @@ -65,6 +66,13 @@ public function withPublisherConfirms(bool $publisherConfirms): self
return $this;
}

public function withDelayStrategy(string $delayStrategyReferenceName): self
{
$this->delayStrategyReferenceName = $delayStrategyReferenceName;

return $this;
}

/**
* @param string $headerName
*
Expand Down Expand Up @@ -140,6 +148,7 @@ public function compile(MessagingContainerBuilder $builder): Definition
$outboundMessageConverter,
new Reference(ConversionService::REFERENCE_NAME),
Reference::to(AmqpTransactionInterceptor::class),
$this->delayStrategyReferenceName ? new Reference($this->delayStrategyReferenceName) : null,
]);
}
}
}
25 changes: 25 additions & 0 deletions packages/Amqp/tests/Fixture/AmqpChannel/SpyDelayStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

namespace Test\Ecotone\Amqp\Fixture\AmqpChannel;

use Ecotone\Amqp\HeadersExchangeDelayStrategy;
use Enqueue\AmqpTools\DelayStrategy;
use Interop\Amqp\AmqpContext;
use Interop\Amqp\AmqpDestination;
use Interop\Amqp\AmqpMessage;

class SpyDelayStrategy implements DelayStrategy
{
public static bool $wasCalled = false;

public function __construct()
{
self::$wasCalled = false;
}

public function delayMessage(AmqpContext $context, AmqpDestination $dest, AmqpMessage $message, int $delay): void
{
self::$wasCalled = true;
(new HeadersExchangeDelayStrategy())->delayMessage($context, $dest, $message, $delay);
}
}
28 changes: 28 additions & 0 deletions packages/Amqp/tests/Integration/AmqpChannelAdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
use RuntimeException;
use stdClass;
use Test\Ecotone\Amqp\AmqpMessagingTestCase;
use Test\Ecotone\Amqp\Fixture\AmqpChannel\SpyDelayStrategy;
use Test\Ecotone\Amqp\Fixture\AmqpConsumer\AmqpConsumerExample;
use Test\Ecotone\Amqp\Fixture\Handler\ExceptionalMessageHandler;

Expand Down Expand Up @@ -894,6 +895,33 @@ public function test_sending_and_receiving_from_fanout_exchange()
));
}

public function test_using_custom_delay_strategy_from_channel_builder()
{
$queueName = Uuid::uuid4()->toString();
$customDelayStrategy = new SpyDelayStrategy();

$ecotoneLite = $this->bootstrapForTesting(
[],
array_merge($this->getConnectionFactoryReferences(), ['customDelayStrategy' => $customDelayStrategy]),
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::AMQP_PACKAGE]))
->withExtensionObjects([
AmqpBackedMessageChannelBuilder::create($queueName)
->withDelayStrategy('customDelayStrategy'),
])
);

/** @var PollableChannel $messageChannel */
$messageChannel = $ecotoneLite->getMessageChannelByName($queueName);
$messageChannel->send(
MessageBuilder::withPayload('some')
->setHeader(MessageHeaders::DELIVERY_DELAY, 10)
->build()
);

$this->assertTrue(SpyDelayStrategy::$wasCalled);
}

/**
* @param string $queueName
* @return EnqueueMessageChannel
Expand Down