Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,46 @@ $locked(function (Closure $suspension): void {
// Lock is released after callback execution
```

### Distributed semaphore
```php
$semaphores = new SemaphoreCollection([
new RedisSemaphore(
redisClient: $redis1,
maxConcurrentLocks: 3,
),
new RedisSemaphore(
redisClient: $redis2,
maxConcurrentLocks: 3,
),
new RedisSemaphore(
redisClient: $redis3,
maxConcurrentLocks: 3,
),
]);

$locksmith = new Locksmith(
semaphore: new DistributedSemaphore(
semaphores: $semaphores,
quorum: 2,
),
);
$resource = new Resource(
namespace: 'test-resource', // Namespace/identifier for resource
version: 1, // Optional resouce version
);
$locked = $locksmith->locked(
$resource,
lockTTLNs: 1_000_000_000, // How long should be resource locked
maxLockWaitNs: 500_000_000, // How long to wait for lock acquisition - error if exceeded
minSuspensionDelayNs: 10_000 // Minimum delay between retries when lock acquisition fails
);
$locked(function (Closure $suspension): void {
// Critical section - code executed under lock

$suspension(); // Optional - cooperative suspension point to allow other lock acquisition attempts or allow lock TTL checks for long running processes
});
// Lock is released after callback execution
```
## Development

### Commits
Expand Down
135 changes: 135 additions & 0 deletions src/Semaphore/DistributedSemaphore.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
<?php

declare(strict_types=1);

namespace MiMatus\Locksmith\Semaphore;

use Closure;
use LogicException;
use MiMatus\Locksmith\ResourceInterface;
use MiMatus\Locksmith\SemaphoreInterface;
use Random\Engine\Xoshiro256StarStar;
use Random\Randomizer;
use RuntimeException;
use Throwable;

readonly class DistributedSemaphore implements SemaphoreInterface
{
/**
* @throws LogicException
*/
public function __construct(
private SemaphoreCollectionInterface $semaphores,
private int $quorum,
private TimeProvider $timeProvider = new TimeProvider(),
private Randomizer $randomizer = new Randomizer(new Xoshiro256StarStar()),
) {
if ($this->quorum > count($this->semaphores)) {
throw new LogicException('Acquire quorum cannot be greater than number of semaphores');
}
}

/**
* @param Closure(): void $suspension
* @throws GroupedException
* @throws RuntimeException
*/
#[\Override]
public function lock(
ResourceInterface $resource,
#[\SensitiveParameter] string $token,
int $lockTTLNs,
Closure $suspension,
): void {
$successfulLocks = 0;
$startTime = $this->timeProvider->getCurrentTimeNanoseconds();
$exceptions = [];
$semaphores = clone $this->semaphores;

do {
$semaphore = $this->semaphores->getRandom();
$lockTTLNs -= $this->timeProvider->getCurrentTimeNanoseconds() - $startTime;
if ($lockTTLNs <= 0) {
break;
}

try {
$semaphore->lock($resource, $token, (int) $lockTTLNs, $suspension);
$successfulLocks++;
$semaphores = $semaphores->without($semaphore);
} catch (Throwable $e) {
$exceptions[] = $e;
}

if ($successfulLocks >= $this->quorum) {
return;
}

$suspension();
} while ($lockTTLNs > 0);

// Rollback successful locks
try {
$this->unlock($resource, $token);
} catch (Throwable $e) {
$exceptions[] = $e;
}

throw new GroupedException('Failed to acquire lock quorum', $exceptions);
}

/**
* @throws GroupedException
*/
#[\Override]
public function unlock(ResourceInterface $resource, #[\SensitiveParameter] string $token): void
{
$successfulUnlocks = 0;
$exceptions = [];

foreach ($this->semaphores as $semaphore) {
/** @var SemaphoreInterface $semaphore */
try {
$semaphore->unlock($resource, $token);
$successfulUnlocks++;
} catch (Throwable $e) {
$exceptions[] = $e;
}
}

if ($successfulUnlocks < $this->quorum) {
throw new GroupedException('Failed to release lock quorum', $exceptions);
}
}

/**
* @throws GroupedException
*/
#[\Override]
public function isLocked(ResourceInterface $resource): bool
{
$lockedCount = 0;
$exceptions = [];

foreach ($this->semaphores as $semaphore) {
/** @var SemaphoreInterface $semaphore */
try {
if ($semaphore->isLocked($resource)) {
$lockedCount++;
}
} catch (Throwable $e) {
$exceptions[] = $e;
}

if ($lockedCount >= $this->quorum) {
return true;
}
}

if (count($exceptions) >= $this->quorum) {
throw new GroupedException('Failed to determine lock status quorum', $exceptions);
}

return false;
}
}
21 changes: 21 additions & 0 deletions src/Semaphore/GroupedException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace MiMatus\Locksmith\Semaphore;

use Exception;
use Throwable;

class GroupedException extends Exception
{
/**
* @param list<Throwable> $exceptions
*/
public function __construct(
string $message,
public array $exceptions,
) {
parent::__construct($message);
}
}
59 changes: 59 additions & 0 deletions src/Semaphore/SemaphoreCollection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php

declare(strict_types=1);

namespace MiMatus\Locksmith\Semaphore;

use ArrayIterator;
use MiMatus\Locksmith\SemaphoreInterface;
use Random\Engine\Xoshiro256StarStar;
use Random\Randomizer;

/**
* @template T of SemaphoreInterface
* @implements SemaphoreCollectionInterface<T>
*/
class SemaphoreCollection implements SemaphoreCollectionInterface
{
/**
* @param list<T> $semaphores
*/
public function __construct(
private array $semaphores,
private Randomizer $randomizer = new Randomizer(new Xoshiro256StarStar()),
) {}

#[\Override]
public function count(): int
{
return count($this->semaphores);
}

#[\Override]
public function without(SemaphoreInterface $semaphore): static
{
$newSemaphores = array_filter($this->semaphores, static fn(SemaphoreInterface $s) => $s !== $semaphore);

return new self(array_values($newSemaphores), $this->randomizer);
}

/**
* @return T
*/
#[\Override]
public function getRandom(): SemaphoreInterface
{
/** @var int */
$key = $this->randomizer->pickArrayKeys($this->semaphores, 1)[0];
return $this->semaphores[$key];
}

/**
* @return \Traversable<T>
*/
#[\Override]
public function getIterator(): \Traversable
{
return new ArrayIterator($this->semaphores);
}
}
29 changes: 29 additions & 0 deletions src/Semaphore/SemaphoreCollectionInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace MiMatus\Locksmith\Semaphore;

use Countable;
use IteratorAggregate;
use MiMatus\Locksmith\SemaphoreInterface;
use Traversable;

/**
* @template T of SemaphoreInterface
*/
interface SemaphoreCollectionInterface extends IteratorAggregate, Countable
{
public function without(SemaphoreInterface $semaphore): static;

/**
* @return T
*/
public function getRandom(): SemaphoreInterface;

/**
* @return Traversable<T>
*/
#[\Override]
public function getIterator(): Traversable;
}
Loading