diff --git a/composer.json b/composer.json index e9d80c9..53da913 100644 --- a/composer.json +++ b/composer.json @@ -3,8 +3,7 @@ "type": "library", "license": "MIT", "require": { - "php": ">=8.3", - "revolt/event-loop": "^1.0" + "php": ">=8.3" }, "suggest": { "ext-redis": "To use this library with the PHP Redis extension.", @@ -25,6 +24,7 @@ "ext-redis": "*", "phpunit/phpunit": "^12.5", "amphp/redis": "^2.0", - "predis/predis": "^3.3" + "predis/predis": "^3.3", + "revolt/event-loop": "^1.0" } } diff --git a/composer.lock b/composer.lock index 7e6dc8b..a41e7b8 100644 --- a/composer.lock +++ b/composer.lock @@ -4,81 +4,8 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "c62b27a2a38173fb3d61673fc12d5989", - "packages": [ - { - "name": "revolt/event-loop", - "version": "v1.0.8", - "source": { - "type": "git", - "url": "https://github.com/revoltphp/event-loop.git", - "reference": "b6fc06dce8e9b523c9946138fa5e62181934f91c" - }, - "dist": { - "type": "zip", - "url": "https://api.github.com/repos/revoltphp/event-loop/zipball/b6fc06dce8e9b523c9946138fa5e62181934f91c", - "reference": "b6fc06dce8e9b523c9946138fa5e62181934f91c", - "shasum": "" - }, - "require": { - "php": ">=8.1" - }, - "require-dev": { - "ext-json": "*", - "jetbrains/phpstorm-stubs": "^2019.3", - "phpunit/phpunit": "^9", - "psalm/phar": "^5.15" - }, - "type": "library", - "extra": { - "branch-alias": { - "dev-main": "1.x-dev" - } - }, - "autoload": { - "psr-4": { - "Revolt\\": "src" - } - }, - "notification-url": "https://packagist.org/downloads/", - "license": [ - "MIT" - ], - "authors": [ - { - "name": "Aaron Piotrowski", - "email": "aaron@trowski.com" - }, - { - "name": "Cees-Jan Kiewiet", - "email": "ceesjank@gmail.com" - }, - { - "name": "Christian Lück", - "email": "christian@clue.engineering" - }, - { - "name": "Niklas Keller", - "email": "me@kelunik.com" - } - ], - "description": "Rock-solid event loop for concurrent PHP applications.", - "keywords": [ - "async", - "asynchronous", - "concurrency", - "event", - "event-loop", - "non-blocking", - "scheduler" - ], - "support": { - "issues": "https://github.com/revoltphp/event-loop/issues", - "source": "https://github.com/revoltphp/event-loop/tree/v1.0.8" - }, - "time": "2025-08-27T21:33:23+00:00" - } - ], + "content-hash": "347b9fd5107b5d2a529b4c76beca8030", + "packages": [], "packages-dev": [ { "name": "amphp/amp", @@ -2065,6 +1992,78 @@ }, "time": "2024-09-11T13:17:53+00:00" }, + { + "name": "revolt/event-loop", + "version": "v1.0.8", + "source": { + "type": "git", + "url": "https://github.com/revoltphp/event-loop.git", + "reference": "b6fc06dce8e9b523c9946138fa5e62181934f91c" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/revoltphp/event-loop/zipball/b6fc06dce8e9b523c9946138fa5e62181934f91c", + "reference": "b6fc06dce8e9b523c9946138fa5e62181934f91c", + "shasum": "" + }, + "require": { + "php": ">=8.1" + }, + "require-dev": { + "ext-json": "*", + "jetbrains/phpstorm-stubs": "^2019.3", + "phpunit/phpunit": "^9", + "psalm/phar": "^5.15" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-main": "1.x-dev" + } + }, + "autoload": { + "psr-4": { + "Revolt\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Cees-Jan Kiewiet", + "email": "ceesjank@gmail.com" + }, + { + "name": "Christian Lück", + "email": "christian@clue.engineering" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "Rock-solid event loop for concurrent PHP applications.", + "keywords": [ + "async", + "asynchronous", + "concurrency", + "event", + "event-loop", + "non-blocking", + "scheduler" + ], + "support": { + "issues": "https://github.com/revoltphp/event-loop/issues", + "source": "https://github.com/revoltphp/event-loop/tree/v1.0.8" + }, + "time": "2025-08-27T21:33:23+00:00" + }, { "name": "sebastian/cli-parser", "version": "4.2.0", diff --git a/src/FiberTaskExecutor.php b/src/FiberTaskExecutor.php new file mode 100644 index 0000000..1912787 --- /dev/null +++ b/src/FiberTaskExecutor.php @@ -0,0 +1,66 @@ +timeProvider->getCurrentTimeNanoseconds(); + + $fiber = new Fiber($task); + $fiber->start(); + + while (!$fiber->isTerminated()) { + if ($ttlNanoseconds < ($this->timeProvider->getCurrentTimeNanoseconds() - $start)) { + throw new RuntimeException('Unable to get result under TTL'); + } + + if (Fiber::getCurrent() !== null) { + Fiber::suspend(); + } elseif ($minSuspensionDelayNs > 0) { + /** @var positive-int $delay */ + $delay = $minSuspensionDelayNs / 1000; + usleep($delay); + } + + if (!$fiber->isSuspended()) { + throw new RuntimeException('Fiber error, fiber is not suspended nor terminated'); + } + + $fiber->resume(); + } + + if ($ttlNanoseconds < ($this->timeProvider->getCurrentTimeNanoseconds() - $start)) { + throw new RuntimeException('Unable to get result under TTL'); + } + + /** @var T */ + return $fiber->getReturn(); + } +} diff --git a/src/Locksmith.php b/src/Locksmith.php index 6489555..a01144d 100644 --- a/src/Locksmith.php +++ b/src/Locksmith.php @@ -14,11 +14,22 @@ readonly class Locksmith { + private TaskExecutorInterface $taskExecutor; + public function __construct( private SemaphoreInterface $semaphore, private TimeProvider $timeProvider = new TimeProvider(), private Engine $randomEngine = new Xoshiro256StarStar(), - ) {} + ?TaskExecutorInterface $taskExecutor = null, + ) { + $this->taskExecutor = + $taskExecutor + ?? ( + class_exists(EventLoop::class) + ? new RevoltTaskExecutor($timeProvider) + : new FiberTaskExecutor($timeProvider) + ); + } /** * @template T @@ -49,9 +60,9 @@ public function locked( } /** @var non-negative-int $remainingLockTTLNs */ - $this->getResultUnderTTL(static fn() => null, $remainingLockTTLNs, $minSuspensionDelayNs); + $this->taskExecutor->getResultUnderTTL(static fn() => null, $remainingLockTTLNs, $minSuspensionDelayNs); }; - $this->getResultUnderTTL( + $this->taskExecutor->getResultUnderTTL( function () use ($token, $resource, $lockTTLNs, $suspender): void { $this->semaphore->lock($resource, $token, $lockTTLNs, $suspender); }, @@ -61,7 +72,7 @@ function () use ($token, $resource, $lockTTLNs, $suspender): void { try { /** @var T */ - return $this->getResultUnderTTL( + return $this->taskExecutor->getResultUnderTTL( function () use ($callback, $resource, $suspender): mixed { if (!$this->semaphore->isLocked($resource)) { throw new RuntimeException('Lock has been lost during process'); @@ -77,48 +88,4 @@ function () use ($callback, $resource, $suspender): mixed { } }; } - - /** - * @template T - * @throws Throwable - * @param Closure(): T $task - * @param non-negative-int $ttlNanoseconds - * @return T - */ - private function getResultUnderTTL(Closure $task, int $ttlNanoseconds, int $minSuspensionDelayNs): mixed - { - $suspension = EventLoop::getSuspension(); - - $startTime = $this->timeProvider->getCurrentTimeNanoseconds(); - - $deferId = EventLoop::delay($minSuspensionDelayNs / 1_000_000, function () use ( - $task, - $suspension, - $startTime, - $ttlNanoseconds, - ): void { - try { - $result = $task(); - } catch (Throwable $e) { - $suspension->throw($e); - return; - } - - // Check if TTL has been exceeded before resuming the fiber - there might have been a blocking operation in the task that caused us to exceed the TTL - if (($this->timeProvider->getCurrentTimeNanoseconds() - $startTime) >= $ttlNanoseconds) { - $suspension->throw(new RuntimeException('Unable to get result under TTL')); - return; - } - $suspension->resume($result); - }); - - EventLoop::delay($ttlNanoseconds / 1_000_000, static function () use ($deferId, $suspension) { - EventLoop::cancel($deferId); - - $suspension->throw(new RuntimeException('Unable to get result under TTL')); - }); - - /** @var T */ - return $suspension->suspend(); - } } diff --git a/src/RevoltTaskExecutor.php b/src/RevoltTaskExecutor.php new file mode 100644 index 0000000..30a43ed --- /dev/null +++ b/src/RevoltTaskExecutor.php @@ -0,0 +1,68 @@ +timeProvider->getCurrentTimeNanoseconds(); + + $deferId = EventLoop::delay($minSuspensionDelayNs / 1_000_000, function () use ( + $task, + $suspension, + $startTime, + $ttlNanoseconds, + ): void { + try { + $result = $task(); + } catch (Throwable $e) { + $suspension->throw($e); + return; + } + + // Check if TTL has been exceeded before resuming the fiber - there might have been a blocking operation in the task that caused us to exceed the TTL + if (($this->timeProvider->getCurrentTimeNanoseconds() - $startTime) >= $ttlNanoseconds) { + $suspension->throw(new RuntimeException('Unable to get result under TTL')); + return; + } + $suspension->resume($result); + }); + + EventLoop::delay($ttlNanoseconds / 1_000_000, static function () use ($deferId, $suspension) { + EventLoop::cancel($deferId); + + $suspension->throw(new RuntimeException('Unable to get result under TTL')); + }); + + /** @var T */ + return $suspension->suspend(); + } +} diff --git a/src/TaskExecutorInterface.php b/src/TaskExecutorInterface.php new file mode 100644 index 0000000..c2f2f5c --- /dev/null +++ b/src/TaskExecutorInterface.php @@ -0,0 +1,24 @@ +semaphore = $this->createMock(SemaphoreInterface::class); + $this->timeProvider = $this->createMock(TimeProvider::class); + $this->randomEngine = $this->createMock(Engine::class); + $this->taskExecutor = $this->createTaskExecutor($this->timeProvider); + } + + abstract protected function createTaskExecutor(TimeProvider $timeProvider): TaskExecutorInterface; + + public function testUnableToAcquireLockTimeout(): void + { + $currentTime = 0; + $this->timeProvider + ->expects($this->atLeastOnce()) + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $this->semaphore + ->expects($this->atLeastOnce()) + ->method('lock') + ->willReturnCallback(static function ( + Resource $resource, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime): void { + $currentTime += 500_000_001; + $suspension(); + }); + + $this->randomEngine + ->expects($this->once()) + ->method('generate') + ->willReturn('token'); + + $locksmith = new Locksmith( + semaphore: $this->semaphore, + timeProvider: $this->timeProvider, + randomEngine: $this->randomEngine, + taskExecutor: $this->taskExecutor, + ); + + $locked = $locksmith->locked( + resource: new Resource(namespace: 'test-resource', version: 1), + lockTTLNs: 1_000_000_000, + maxLockWaitNs: 500_000_000, + minSuspensionDelayNs: 10_000, + ); + + $this->expectExceptionObject(new RuntimeException('Unable to get result under TTL')); + $locked(static function (): void { + self::fail('Lock should not be acquired'); + }); + } + + public function testUnableToAcquireLock(): void + { + $currentTime = 0; + $this->timeProvider + ->expects($this->atLeastOnce()) + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $this->semaphore + ->expects($this->atLeastOnce()) + ->method('lock') + ->willReturnCallback(static function ( + Resource $resource, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime): void { + throw new RuntimeException('error'); + }); + $this->randomEngine + ->expects($this->once()) + ->method('generate') + ->willReturn('token'); + + $locksmith = new Locksmith( + semaphore: $this->semaphore, + timeProvider: $this->timeProvider, + randomEngine: $this->randomEngine, + taskExecutor: $this->taskExecutor, + ); + + $locked = $locksmith->locked( + resource: new Resource(namespace: 'test-resource', version: 1), + lockTTLNs: 1_000_000_000, + maxLockWaitNs: 500_000_000, + minSuspensionDelayNs: 10_000, + ); + + $this->expectExceptionObject(new RuntimeException('error')); + $locked(static function (): void { + self::fail('Lock should not be acquired'); + }); + } + + public function testLostDuringExecution(): void + { + $resource = new Resource(namespace: 'test-resource', version: 1); + + $currentTime = 0; + $this->timeProvider + ->expects($this->atLeastOnce()) + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $this->semaphore + ->expects($this->once()) + ->method('lock') + ->willReturnCallback(static function ( + Resource $resource, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime): void { + // Lock acquired immediately + }); + + $this->semaphore + ->expects($this->once()) + ->method('isLocked') + ->with(self::equalTo($resource)) + ->willReturn(false); + + $this->randomEngine + ->expects($this->once()) + ->method('generate') + ->willReturn('token'); + + $locksmith = new Locksmith( + semaphore: $this->semaphore, + timeProvider: $this->timeProvider, + randomEngine: $this->randomEngine, + taskExecutor: $this->taskExecutor, + ); + + $locked = $locksmith->locked( + resource: $resource, + lockTTLNs: 1_000_000_000, + maxLockWaitNs: 500_000_000, + minSuspensionDelayNs: 10_000, + ); + + $this->expectExceptionObject(new RuntimeException('Lock has been lost during process')); + $locked(static function (Closure $suspension): void { + $suspension(); // Simulate some processing and force lock check + }); + } + + public function testUnableToUnlock(): void + { + $resource = new Resource(namespace: 'test-resource', version: 1); + + $currentTime = 0; + $this->timeProvider + ->expects($this->atLeastOnce()) + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $this->semaphore + ->expects($this->once()) + ->method('lock') + ->willReturnCallback(static function ( + Resource $resource, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime): void { + // Lock acquired immediately + }); + + $this->semaphore + ->expects($this->once()) + ->method('unlock') + ->willReturnCallback(static function (Resource $resource, #[\SensitiveParameter] string $token) use ( + &$currentTime, + ): void { + throw new RuntimeException('error during unlock'); + }); + + $this->semaphore + ->expects($this->once()) + ->method('isLocked') + ->with(self::equalTo($resource)) + ->willReturn(false); + + $this->randomEngine + ->expects($this->once()) + ->method('generate') + ->willReturn('token'); + + $locksmith = new Locksmith( + semaphore: $this->semaphore, + timeProvider: $this->timeProvider, + randomEngine: $this->randomEngine, + taskExecutor: $this->taskExecutor, + ); + + $locked = $locksmith->locked( + resource: $resource, + lockTTLNs: 1_000_000_000, + maxLockWaitNs: 500_000_000, + minSuspensionDelayNs: 10_000, + ); + + $this->expectExceptionObject(new RuntimeException('error during unlock')); + $locked(static function (Closure $suspension): void { + // Simulate some processing + }); + } + + public function testLocked(): void + { + $resource = new Resource(namespace: 'test-resource', version: 1); + + $currentTime = 0; + $this->timeProvider + ->expects($this->atLeastOnce()) + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $this->semaphore + ->expects($this->once()) + ->method('lock') + ->willReturnCallback(static function ( + Resource $resource, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime): void { + // Lock acquired immediately + }); + + $this->semaphore + ->expects($this->once()) + ->method('unlock') + ->willReturnCallback(static function (Resource $resource, #[\SensitiveParameter] string $token) use ( + &$currentTime, + ): void { + // Unlock successful + }); + + $this->semaphore + ->expects($this->once()) + ->method('isLocked') + ->with(self::equalTo($resource)) + ->willReturn(true); + + $this->randomEngine + ->expects($this->once()) + ->method('generate') + ->willReturn('token'); + + $locksmith = new Locksmith( + semaphore: $this->semaphore, + timeProvider: $this->timeProvider, + randomEngine: $this->randomEngine, + taskExecutor: $this->taskExecutor, + ); + + $locked = $locksmith->locked( + resource: $resource, + lockTTLNs: 1_000_000_000, + maxLockWaitNs: 500_000_000, + minSuspensionDelayNs: 10_000, + ); + + $called = false; + $locked(static function (Closure $suspension) use (&$called): void { + // Simulate some processing + $called = true; + }); + + self::assertTrue($called, 'Locked callback executed'); + } +} diff --git a/tests/Unit/FiberTaskExecutorTest.php b/tests/Unit/FiberTaskExecutorTest.php new file mode 100644 index 0000000..0b6b2d2 --- /dev/null +++ b/tests/Unit/FiberTaskExecutorTest.php @@ -0,0 +1,119 @@ +getResultUnderTTL( + static function () use ($timeProvider) { + return 'result'; + }, + ttlNanoseconds: 1_000_000_000, // Set TTL to 1 second + minSuspensionDelayNs: 100_000_000, // Set minimum suspension delay to 0.1 seconds + ); + + $this->assertSame('result', $result); + } + + public function testResultRetrievalTookTooLongBlocking(): void + { + $currentTime = 0; + $timeProvider = self::createStub(TimeProvider::class); + $timeProvider + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $taskExecutor = new FiberTaskExecutor($timeProvider); + + $this->expectExceptionObject(new RuntimeException('Unable to get result under TTL')); + + $taskExecutor->getResultUnderTTL( + static function () use ($timeProvider, &$currentTime) { + $currentTime += 1_500_000_000; // Simulate a long-running task (1.5 seconds) + return 'result'; + }, + ttlNanoseconds: 1_000_000_000, // Set TTL to 1 second + minSuspensionDelayNs: 100_000_000, // Set minimum suspension delay to 0.1 seconds + ); + } + + public function testResultRetrievalNonBlocking(): void + { + $currentTime = 0; + $timeProvider = self::createStub(TimeProvider::class); + $timeProvider + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $taskExecutor = new FiberTaskExecutor($timeProvider); + + $fiber = new Fiber(static function () use ($taskExecutor, $timeProvider, &$currentTime) { + return $taskExecutor->getResultUnderTTL( + static function () use ($timeProvider, &$currentTime) { + $currentTime += 100_000_000; // Simulate a short task (0.1 seconds) + Fiber::suspend(); // Suspend to trigger the TTL check in the executor + $currentTime += 100_000_000; // Simulate a short task (0.1 seconds) + return 'result'; + }, + ttlNanoseconds: 1_000_000_000, // Set TTL to 1 second + minSuspensionDelayNs: 100_000_000, // Set minimum suspension delay to 0.1 seconds + ); + }); + + $fiber->start(); + $this->assertSame(100_000_000, $currentTime); + + $fiber->resume(); + + $this->assertSame(200_000_000, $currentTime); + $this->assertTrue($fiber->isTerminated()); + $this->assertSame('result', $fiber->getReturn()); + } + + public function testResultRetrievalTookTooLongNonBlocking(): void + { + $currentTime = 0; + $timeProvider = self::createStub(TimeProvider::class); + $timeProvider + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime) { + return $currentTime; + }); + + $taskExecutor = new FiberTaskExecutor($timeProvider); + + $fiber = new Fiber(static function () use ($taskExecutor, $timeProvider, &$currentTime) { + return $taskExecutor->getResultUnderTTL( + static function () use ($timeProvider, &$currentTime) { + $currentTime += 1_500_000_000; // Simulate a long-running task (1.5 seconds) + Fiber::suspend(); // Suspend to trigger the TTL check in the executor + return 'result'; + }, + ttlNanoseconds: 1_000_000_000, // Set TTL to 1 second + minSuspensionDelayNs: 100_000_000, // Set minimum suspension delay to 0.1 seconds + ); + }); + + $this->expectExceptionObject(new RuntimeException('Unable to get result under TTL')); + + $fiber->start(); + $this->assertSame(1_500_000_000, $currentTime); + } +} diff --git a/tests/Unit/LocksmithTest.php b/tests/Unit/LocksmithTest.php index 15cfc86..b271eec 100644 --- a/tests/Unit/LocksmithTest.php +++ b/tests/Unit/LocksmithTest.php @@ -4,306 +4,15 @@ namespace MiMatus\Locksmith\Tests\Unit; -use Closure; -use Exception; -use MiMatus\Locksmith\Locksmith; -use MiMatus\Locksmith\Resource; +use MiMatus\Locksmith\FiberTaskExecutor; use MiMatus\Locksmith\Semaphore\TimeProvider; -use MiMatus\Locksmith\SemaphoreInterface; -use Override; -use PHPUnit\Framework\MockObject\MockObject; -use PHPUnit\Framework\TestCase; -use Random\Engine; -use RuntimeException; +use MiMatus\Locksmith\TaskExecutorInterface; +use MiMatus\Locksmith\Tests\LocksmithTestCase; -class LocksmithTest extends TestCase +class LocksmithTest extends LocksmithTestCase { - private SemaphoreInterface&MockObject $semaphore; - private TimeProvider&MockObject $timeProvider; - private Engine&MockObject $randomEngine; - - /** - * @throws Exception - */ - #[Override] - protected function setUp(): void - { - $this->semaphore = $this->createMock(SemaphoreInterface::class); - $this->timeProvider = $this->createMock(TimeProvider::class); - $this->randomEngine = $this->createMock(Engine::class); - } - - public function testUnableToAcquireLockTimeout(): void - { - $currentTime = 0; - $this->timeProvider - ->expects($this->atLeastOnce()) - ->method('getCurrentTimeNanoseconds') - ->willReturnCallback(static function () use (&$currentTime) { - return $currentTime; - }); - - $this->semaphore - ->expects($this->atLeastOnce()) - ->method('lock') - ->willReturnCallback(static function ( - Resource $resource, - #[\SensitiveParameter] string $token, - int $lockTTLNs, - Closure $suspension, - ) use (&$currentTime): void { - $currentTime += 500_000_001; - $suspension(); - }); - $this->randomEngine - ->expects($this->once()) - ->method('generate') - ->willReturn('token'); - - $locksmith = new Locksmith( - semaphore: $this->semaphore, - timeProvider: $this->timeProvider, - randomEngine: $this->randomEngine, - ); - - $locked = $locksmith->locked( - resource: new Resource(namespace: 'test-resource', version: 1), - lockTTLNs: 1_000_000_000, - maxLockWaitNs: 500_000_000, - minSuspensionDelayNs: 10_000, - ); - - $this->expectExceptionObject(new RuntimeException('Unable to get result under TTL')); - $locked(static function (): void { - self::fail('Lock should not be acquired'); - }); - } - - public function testUnableToAcquireLock(): void - { - $currentTime = 0; - $this->timeProvider - ->expects($this->atLeastOnce()) - ->method('getCurrentTimeNanoseconds') - ->willReturnCallback(static function () use (&$currentTime) { - return $currentTime; - }); - - $this->semaphore - ->expects($this->atLeastOnce()) - ->method('lock') - ->willReturnCallback(static function ( - Resource $resource, - #[\SensitiveParameter] string $token, - int $lockTTLNs, - Closure $suspension, - ) use (&$currentTime): void { - throw new RuntimeException('error'); - }); - $this->randomEngine - ->expects($this->once()) - ->method('generate') - ->willReturn('token'); - - $locksmith = new Locksmith( - semaphore: $this->semaphore, - timeProvider: $this->timeProvider, - randomEngine: $this->randomEngine, - ); - - $locked = $locksmith->locked( - resource: new Resource(namespace: 'test-resource', version: 1), - lockTTLNs: 1_000_000_000, - maxLockWaitNs: 500_000_000, - minSuspensionDelayNs: 10_000, - ); - - $this->expectExceptionObject(new RuntimeException('error')); - $locked(static function (): void { - self::fail('Lock should not be acquired'); - }); - } - - public function testLostDuringExecution(): void - { - $resource = new Resource(namespace: 'test-resource', version: 1); - - $currentTime = 0; - $this->timeProvider - ->expects($this->atLeastOnce()) - ->method('getCurrentTimeNanoseconds') - ->willReturnCallback(static function () use (&$currentTime) { - return $currentTime; - }); - - $this->semaphore - ->expects($this->once()) - ->method('lock') - ->willReturnCallback(static function ( - Resource $resource, - #[\SensitiveParameter] string $token, - int $lockTTLNs, - Closure $suspension, - ) use (&$currentTime): void { - // Lock acquired immediately - }); - - $this->semaphore - ->expects($this->once()) - ->method('isLocked') - ->with(self::equalTo($resource)) - ->willReturn(false); - - $this->randomEngine - ->expects($this->once()) - ->method('generate') - ->willReturn('token'); - - $locksmith = new Locksmith( - semaphore: $this->semaphore, - timeProvider: $this->timeProvider, - randomEngine: $this->randomEngine, - ); - - $locked = $locksmith->locked( - resource: $resource, - lockTTLNs: 1_000_000_000, - maxLockWaitNs: 500_000_000, - minSuspensionDelayNs: 10_000, - ); - - $this->expectExceptionObject(new RuntimeException('Lock has been lost during process')); - $locked(static function (Closure $suspension): void { - $suspension(); // Simulate some processing and force lock check - }); - } - - public function testUnableToUnlock(): void - { - $resource = new Resource(namespace: 'test-resource', version: 1); - - $currentTime = 0; - $this->timeProvider - ->expects($this->atLeastOnce()) - ->method('getCurrentTimeNanoseconds') - ->willReturnCallback(static function () use (&$currentTime) { - return $currentTime; - }); - - $this->semaphore - ->expects($this->once()) - ->method('lock') - ->willReturnCallback(static function ( - Resource $resource, - #[\SensitiveParameter] string $token, - int $lockTTLNs, - Closure $suspension, - ) use (&$currentTime): void { - // Lock acquired immediately - }); - - $this->semaphore - ->expects($this->once()) - ->method('unlock') - ->willReturnCallback(static function (Resource $resource, #[\SensitiveParameter] string $token) use ( - &$currentTime, - ): void { - throw new RuntimeException('error during unlock'); - }); - - $this->semaphore - ->expects($this->once()) - ->method('isLocked') - ->with(self::equalTo($resource)) - ->willReturn(false); - - $this->randomEngine - ->expects($this->once()) - ->method('generate') - ->willReturn('token'); - - $locksmith = new Locksmith( - semaphore: $this->semaphore, - timeProvider: $this->timeProvider, - randomEngine: $this->randomEngine, - ); - - $locked = $locksmith->locked( - resource: $resource, - lockTTLNs: 1_000_000_000, - maxLockWaitNs: 500_000_000, - minSuspensionDelayNs: 10_000, - ); - - $this->expectExceptionObject(new RuntimeException('error during unlock')); - $locked(static function (Closure $suspension): void { - // Simulate some processing - }); - } - - public function testLocked(): void + protected function createTaskExecutor(TimeProvider $timeProvider): TaskExecutorInterface { - $resource = new Resource(namespace: 'test-resource', version: 1); - - $currentTime = 0; - $this->timeProvider - ->expects($this->atLeastOnce()) - ->method('getCurrentTimeNanoseconds') - ->willReturnCallback(static function () use (&$currentTime) { - return $currentTime; - }); - - $this->semaphore - ->expects($this->once()) - ->method('lock') - ->willReturnCallback(static function ( - Resource $resource, - #[\SensitiveParameter] string $token, - int $lockTTLNs, - Closure $suspension, - ) use (&$currentTime): void { - // Lock acquired immediately - }); - - $this->semaphore - ->expects($this->once()) - ->method('unlock') - ->willReturnCallback(static function (Resource $resource, #[\SensitiveParameter] string $token) use ( - &$currentTime, - ): void { - // Unlock successful - }); - - $this->semaphore - ->expects($this->once()) - ->method('isLocked') - ->with(self::equalTo($resource)) - ->willReturn(true); - - $this->randomEngine - ->expects($this->once()) - ->method('generate') - ->willReturn('token'); - - $locksmith = new Locksmith( - semaphore: $this->semaphore, - timeProvider: $this->timeProvider, - randomEngine: $this->randomEngine, - ); - - $locked = $locksmith->locked( - resource: $resource, - lockTTLNs: 1_000_000_000, - maxLockWaitNs: 500_000_000, - minSuspensionDelayNs: 10_000, - ); - - $called = false; - $locked(static function (Closure $suspension) use (&$called): void { - // Simulate some processing - $called = true; - }); - - self::assertTrue($called, 'Locked callback executed'); + return new FiberTaskExecutor($timeProvider); } }