From 3617baee7349f0993d6e651a8e6d2df8bbc6e1fc Mon Sep 17 00:00:00 2001 From: MiMatus Date: Sun, 18 Jan 2026 19:07:16 +0100 Subject: [PATCH] feat: Add distributed semaphore #1 --- README.md | 40 +++ src/Semaphore/DistributedSemaphore.php | 135 ++++++++ src/Semaphore/GroupedException.php | 21 ++ src/Semaphore/SemaphoreCollection.php | 59 ++++ .../SemaphoreCollectionInterface.php | 29 ++ tests/Unit/DistributedSemaphoreTest.php | 317 ++++++++++++++++++ tests/Unit/SemaphoreCollectionTest.php | 70 ++++ 7 files changed, 671 insertions(+) create mode 100644 src/Semaphore/DistributedSemaphore.php create mode 100644 src/Semaphore/GroupedException.php create mode 100644 src/Semaphore/SemaphoreCollection.php create mode 100644 src/Semaphore/SemaphoreCollectionInterface.php create mode 100644 tests/Unit/DistributedSemaphoreTest.php create mode 100644 tests/Unit/SemaphoreCollectionTest.php diff --git a/README.md b/README.md index 361622d..0a6b983 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,46 @@ $locked(function (Closure $suspension): void { // Lock is released after callback execution ``` +### Distributed semaphore +```php +$semaphores = new SemaphoreCollection([ + new RedisSemaphore( + redisClient: $redis1, + maxConcurrentLocks: 3, + ), + new RedisSemaphore( + redisClient: $redis2, + maxConcurrentLocks: 3, + ), + new RedisSemaphore( + redisClient: $redis3, + maxConcurrentLocks: 3, + ), +]); + +$locksmith = new Locksmith( + semaphore: new DistributedSemaphore( + semaphores: $semaphores, + quorum: 2, + ), +); +$resource = new Resource( + namespace: 'test-resource', // Namespace/identifier for resource + version: 1, // Optional resouce version +); +$locked = $locksmith->locked( + $resource, + lockTTLNs: 1_000_000_000, // How long should be resource locked + maxLockWaitNs: 500_000_000, // How long to wait for lock acquisition - error if exceeded + minSuspensionDelayNs: 10_000 // Minimum delay between retries when lock acquisition fails +); +$locked(function (Closure $suspension): void { + // Critical section - code executed under lock + + $suspension(); // Optional - cooperative suspension point to allow other lock acquisition attempts or allow lock TTL checks for long running processes +}); +// Lock is released after callback execution +``` ## Development ### Commits diff --git a/src/Semaphore/DistributedSemaphore.php b/src/Semaphore/DistributedSemaphore.php new file mode 100644 index 0000000..f2a6a34 --- /dev/null +++ b/src/Semaphore/DistributedSemaphore.php @@ -0,0 +1,135 @@ +quorum > count($this->semaphores)) { + throw new LogicException('Acquire quorum cannot be greater than number of semaphores'); + } + } + + /** + * @param Closure(): void $suspension + * @throws GroupedException + * @throws RuntimeException + */ + #[\Override] + public function lock( + ResourceInterface $resource, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ): void { + $successfulLocks = 0; + $startTime = $this->timeProvider->getCurrentTimeNanoseconds(); + $exceptions = []; + $semaphores = clone $this->semaphores; + + do { + $semaphore = $this->semaphores->getRandom(); + $lockTTLNs -= $this->timeProvider->getCurrentTimeNanoseconds() - $startTime; + if ($lockTTLNs <= 0) { + break; + } + + try { + $semaphore->lock($resource, $token, (int) $lockTTLNs, $suspension); + $successfulLocks++; + $semaphores = $semaphores->without($semaphore); + } catch (Throwable $e) { + $exceptions[] = $e; + } + + if ($successfulLocks >= $this->quorum) { + return; + } + + $suspension(); + } while ($lockTTLNs > 0); + + // Rollback successful locks + try { + $this->unlock($resource, $token); + } catch (Throwable $e) { + $exceptions[] = $e; + } + + throw new GroupedException('Failed to acquire lock quorum', $exceptions); + } + + /** + * @throws GroupedException + */ + #[\Override] + public function unlock(ResourceInterface $resource, #[\SensitiveParameter] string $token): void + { + $successfulUnlocks = 0; + $exceptions = []; + + foreach ($this->semaphores as $semaphore) { + /** @var SemaphoreInterface $semaphore */ + try { + $semaphore->unlock($resource, $token); + $successfulUnlocks++; + } catch (Throwable $e) { + $exceptions[] = $e; + } + } + + if ($successfulUnlocks < $this->quorum) { + throw new GroupedException('Failed to release lock quorum', $exceptions); + } + } + + /** + * @throws GroupedException + */ + #[\Override] + public function isLocked(ResourceInterface $resource): bool + { + $lockedCount = 0; + $exceptions = []; + + foreach ($this->semaphores as $semaphore) { + /** @var SemaphoreInterface $semaphore */ + try { + if ($semaphore->isLocked($resource)) { + $lockedCount++; + } + } catch (Throwable $e) { + $exceptions[] = $e; + } + + if ($lockedCount >= $this->quorum) { + return true; + } + } + + if (count($exceptions) >= $this->quorum) { + throw new GroupedException('Failed to determine lock status quorum', $exceptions); + } + + return false; + } +} diff --git a/src/Semaphore/GroupedException.php b/src/Semaphore/GroupedException.php new file mode 100644 index 0000000..48ec332 --- /dev/null +++ b/src/Semaphore/GroupedException.php @@ -0,0 +1,21 @@ + $exceptions + */ + public function __construct( + string $message, + public array $exceptions, + ) { + parent::__construct($message); + } +} diff --git a/src/Semaphore/SemaphoreCollection.php b/src/Semaphore/SemaphoreCollection.php new file mode 100644 index 0000000..26e36f4 --- /dev/null +++ b/src/Semaphore/SemaphoreCollection.php @@ -0,0 +1,59 @@ + + */ +class SemaphoreCollection implements SemaphoreCollectionInterface +{ + /** + * @param list $semaphores + */ + public function __construct( + private array $semaphores, + private Randomizer $randomizer = new Randomizer(new Xoshiro256StarStar()), + ) {} + + #[\Override] + public function count(): int + { + return count($this->semaphores); + } + + #[\Override] + public function without(SemaphoreInterface $semaphore): static + { + $newSemaphores = array_filter($this->semaphores, static fn(SemaphoreInterface $s) => $s !== $semaphore); + + return new self(array_values($newSemaphores), $this->randomizer); + } + + /** + * @return T + */ + #[\Override] + public function getRandom(): SemaphoreInterface + { + /** @var int */ + $key = $this->randomizer->pickArrayKeys($this->semaphores, 1)[0]; + return $this->semaphores[$key]; + } + + /** + * @return \Traversable + */ + #[\Override] + public function getIterator(): \Traversable + { + return new ArrayIterator($this->semaphores); + } +} diff --git a/src/Semaphore/SemaphoreCollectionInterface.php b/src/Semaphore/SemaphoreCollectionInterface.php new file mode 100644 index 0000000..f0bdd42 --- /dev/null +++ b/src/Semaphore/SemaphoreCollectionInterface.php @@ -0,0 +1,29 @@ + + */ + #[\Override] + public function getIterator(): Traversable; +} diff --git a/tests/Unit/DistributedSemaphoreTest.php b/tests/Unit/DistributedSemaphoreTest.php new file mode 100644 index 0000000..fc738cf --- /dev/null +++ b/tests/Unit/DistributedSemaphoreTest.php @@ -0,0 +1,317 @@ +method('lock') + ->willReturnCallback(static function ( + ResourceInterface $r, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime, $resource, &$lockAttempt): void { + self::assertEquals($resource, $r); + self::assertEquals('test-token', $token); + self::assertEquals(1_000_000_000 - ($lockAttempt * 500_000_001), $lockTTLNs); + + $currentTime += 500_000_001; + $lockAttempt++; + throw new \RuntimeException('Lock failed on semaphore 1'); + }); + + $semaphore2 + ->method('lock') + ->willReturnCallback(static function ( + ResourceInterface $r, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime, $resource, &$lockAttempt): void { + self::assertEquals($resource, $r); + self::assertEquals('test-token', $token); + self::assertEquals(1_000_000_000 - ($lockAttempt * 500_000_001), $lockTTLNs); + + $currentTime += 500_000_001; + $lockAttempt++; + throw new \RuntimeException('Lock failed on semaphore 2'); + }); + + $semaphore1->expects(self::atLeastOnce())->method('unlock'); + $semaphore2->expects(self::atLeastOnce())->method('unlock'); + + $timeProvider + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime): int { + return $currentTime; + }); + + $this->expectExceptionObject(new GroupedException('Failed to acquire lock quorum', [])); + + $distributedSemaphore->lock( + resource: new Resource(namespace: 'test-resource'), + token: 'test-token', + lockTTLNs: 1_000_000_000, // 1 second + suspension: static function (): void {}, + ); + } + + public function testAcquiredLockWithoutErrors(): void + { + $resource = new Resource(namespace: 'test-resource'); + $semaphore1 = self::createStub(SemaphoreInterface::class); + $semaphore2 = self::createStub(SemaphoreInterface::class); + $timeProvider = self::createStub(TimeProvider::class); + $currentTime = 0; + + $distributedSemaphore = new DistributedSemaphore( + semaphores: new SemaphoreCollection([$semaphore1, $semaphore2]), + quorum: 1, + timeProvider: $timeProvider, + ); + + $lockAttempt = 0; + $lockedSemaphore1 = false; + $lockedSemaphore2 = false; + $semaphore1 + ->method('lock') + ->willReturnCallback(static function ( + ResourceInterface $r, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime, $resource, &$lockAttempt, &$lockedSemaphore1): void { + self::assertEquals($resource, $r); + self::assertEquals('test-token', $token); + self::assertEquals(1_000_000_000 - ($lockAttempt * 500_000_001), $lockTTLNs); + self::assertFalse($lockedSemaphore1, 'Semaphore 1 should not have been locked yet'); + + $currentTime += 500_000_001; + $lockAttempt++; + $lockedSemaphore1 = true; + }); + + $semaphore2 + ->method('lock') + ->willReturnCallback(static function ( + ResourceInterface $r, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime, $resource, &$lockAttempt, &$lockedSemaphore2): void { + self::assertEquals($resource, $r); + self::assertEquals('test-token', $token); + self::assertEquals(1_000_000_000 - ($lockAttempt * 500_000_001), $lockTTLNs); + self::assertFalse($lockedSemaphore2, 'Semaphore 2 should not have been locked yet'); + + $currentTime += 500_000_001; + $lockAttempt++; + $lockedSemaphore2 = true; + }); + + $timeProvider + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime): int { + return $currentTime; + }); + + $distributedSemaphore->lock( + resource: new Resource(namespace: 'test-resource'), + token: 'test-token', + lockTTLNs: 1_000_000_000, // 1 second + suspension: static function (): void {}, + ); + + self::assertTrue($lockedSemaphore1 || $lockedSemaphore2, 'At least one semaphore should have been locked'); + } + + public function testAcquiredLockWithErrors(): void + { + $resource = new Resource(namespace: 'test-resource'); + $semaphore = self::createStub(SemaphoreInterface::class); + $timeProvider = self::createStub(TimeProvider::class); + $currentTime = 0; + + $distributedSemaphore = new DistributedSemaphore( + semaphores: new SemaphoreCollection([$semaphore, $semaphore, $semaphore, $semaphore, $semaphore]), + quorum: 3, + timeProvider: $timeProvider, + ); + + $lockAttempt = 0; + $locksAquired = 0; + $semaphore + ->method('lock') + ->willReturnCallback(static function ( + ResourceInterface $r, + #[\SensitiveParameter] string $token, + int $lockTTLNs, + Closure $suspension, + ) use (&$currentTime, $resource, &$lockAttempt, &$locksAquired): void { + self::assertEquals($resource, $r); + self::assertEquals('test-token', $token); + + $currentTime += 10_000; + + if (($lockAttempt % 3) === 0) { + $lockAttempt++; + $locksAquired++; + } else { + $lockAttempt++; + throw new \RuntimeException('Lock failed on semaphore'); + } + }); + + $timeProvider + ->method('getCurrentTimeNanoseconds') + ->willReturnCallback(static function () use (&$currentTime): int { + return $currentTime; + }); + + $distributedSemaphore->lock( + resource: new Resource(namespace: 'test-resource'), + token: 'test-token', + lockTTLNs: 1_000_000_000, // 1 second + suspension: static function (): void {}, + ); + + self::assertEquals(3, $locksAquired, 'Exactly three semaphores should have been locked'); + } + + public function testUnlockWithErrors(): void + { + $resource = new Resource(namespace: 'test-resource'); + $semaphore = self::createStub(SemaphoreInterface::class); + + $distributedSemaphore = new DistributedSemaphore(semaphores: new SemaphoreCollection([ + $semaphore, + $semaphore, + $semaphore, + $semaphore, + $semaphore, + ]), quorum: 3); + + $unlockAttempt = 0; + $unlocksPerformed = 0; + $semaphore + ->method('unlock') + ->willReturnCallback(static function (ResourceInterface $r, #[\SensitiveParameter] string $token) use ( + $resource, + &$unlockAttempt, + &$unlocksPerformed, + ): void { + self::assertEquals($resource, $r); + self::assertEquals('test-token', $token); + + if (($unlockAttempt % 3) === 0) { + $unlockAttempt++; + throw new \RuntimeException('Unlock failed on semaphore'); + } else { + $unlockAttempt++; + $unlocksPerformed++; + } + }); + + $distributedSemaphore->unlock(resource: new Resource(namespace: 'test-resource'), token: 'test-token'); + + self::assertEquals(3, $unlocksPerformed, 'Exactly three semaphores should have been unlocked'); + } + + public function testUnableToUnlock(): void + { + $resource = new Resource(namespace: 'test-resource'); + $semaphore = self::createStub(SemaphoreInterface::class); + + $distributedSemaphore = new DistributedSemaphore(semaphores: new SemaphoreCollection([ + $semaphore, + $semaphore, + $semaphore, + $semaphore, + $semaphore, + ]), quorum: 3); + + $unlockAttempt = 0; + $unlocksPerformed = 0; + $semaphore + ->method('unlock') + ->willReturnCallback(static function (ResourceInterface $r, #[\SensitiveParameter] string $token) use ( + $resource, + &$unlockAttempt, + &$unlocksPerformed, + ): void { + self::assertEquals($resource, $r); + self::assertEquals('test-token', $token); + + if (($unlockAttempt % 3) === 0) { + $unlocksPerformed++; + $unlockAttempt++; + } else { + $unlockAttempt++; + throw new \RuntimeException('Unlock failed on semaphore'); + } + }); + + $this->expectExceptionObject(new GroupedException('Failed to release lock quorum', [])); + + $distributedSemaphore->unlock(resource: new Resource(namespace: 'test-resource'), token: 'test-token'); + } + + public function testIsLockedOnQuorum(): void + { + $resource = new Resource(namespace: 'test-resource'); + $semaphore = self::createStub(SemaphoreInterface::class); + + $distributedSemaphore = new DistributedSemaphore(semaphores: new SemaphoreCollection([ + $semaphore, + $semaphore, + $semaphore, + $semaphore, + $semaphore, + ]), quorum: 3); + + $isLockedAttempt = 0; + $semaphore + ->method('isLocked') + ->willReturnCallback(static function (ResourceInterface $r) use ($resource, &$isLockedAttempt): bool { + self::assertEquals($resource, $r); + + if (($isLockedAttempt % 3) === 0) { + $isLockedAttempt++; + throw new \RuntimeException('Unlock failed on semaphore'); + } else { + $isLockedAttempt++; + return true; + } + }); + + self::assertTrue($distributedSemaphore->isLocked(resource: new Resource(namespace: 'test-resource'))); + } +} diff --git a/tests/Unit/SemaphoreCollectionTest.php b/tests/Unit/SemaphoreCollectionTest.php new file mode 100644 index 0000000..d75e725 --- /dev/null +++ b/tests/Unit/SemaphoreCollectionTest.php @@ -0,0 +1,70 @@ +createStub(SemaphoreInterface::class); + $semaphore2 = $this->createStub(SemaphoreInterface::class); + $semaphore3 = $this->createStub(SemaphoreInterface::class); + + $collection = new SemaphoreCollection([ + $semaphore1, + $semaphore2, + $semaphore3, + ]); + + $semaphores = []; + foreach ($collection as $semaphore) { + $semaphores[] = $semaphore; + } + + self::assertSame([$semaphore1, $semaphore2, $semaphore3], $semaphores); + } + + public function testCount(): void + { + $semaphore1 = $this->createStub(SemaphoreInterface::class); + $semaphore2 = $this->createStub(SemaphoreInterface::class); + + $collection = new SemaphoreCollection([ + $semaphore1, + $semaphore2, + ]); + + self::assertSame(2, $collection->count()); + } + + public function testWithout(): void + { + $semaphore1 = $this->createStub(SemaphoreInterface::class); + $semaphore2 = $this->createStub(SemaphoreInterface::class); + $semaphore3 = $this->createStub(SemaphoreInterface::class); + + $collection = new SemaphoreCollection([ + $semaphore1, + $semaphore2, + $semaphore3, + ]); + + $newCollection = $collection->without($semaphore2); + + self::assertSame(3, $collection->count()); + self::assertSame(2, $newCollection->count()); + + $semaphores = []; + foreach ($newCollection as $semaphore) { + $semaphores[] = $semaphore; + } + + self::assertSame([$semaphore1, $semaphore3], $semaphores); + } +}