Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
"type": "library",
"license": "MIT",
"require": {
"php": ">=8.3",
"revolt/event-loop": "^1.0"
"php": ">=8.3"
},
"suggest": {
"ext-redis": "To use this library with the PHP Redis extension.",
Expand All @@ -25,6 +24,7 @@
"ext-redis": "*",
"phpunit/phpunit": "^12.5",
"amphp/redis": "^2.0",
"predis/predis": "^3.3"
"predis/predis": "^3.3",
"revolt/event-loop": "^1.0"
}
}
149 changes: 74 additions & 75 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 66 additions & 0 deletions src/FiberTaskExecutor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

declare(strict_types=1);

namespace MiMatus\Locksmith;

use Closure;
use Fiber;
use MiMatus\Locksmith\Semaphore\TimeProvider;
use Override;
use RuntimeException;
use Throwable;

/**
* @internal
*/
readonly class FiberTaskExecutor implements TaskExecutorInterface
{
public function __construct(
private TimeProvider $timeProvider = new TimeProvider(),
) {}

/**
* @template T
* @param Closure(): T $task
* @param non-negative-int $ttlNanoseconds
* @param non-negative-int $minSuspensionDelayNs
* @throws Throwable
* @return T
*/
#[Override]
public function getResultUnderTTL(Closure $task, int $ttlNanoseconds, int $minSuspensionDelayNs): mixed
{
$start = $this->timeProvider->getCurrentTimeNanoseconds();

$fiber = new Fiber($task);
$fiber->start();

while (!$fiber->isTerminated()) {
if ($ttlNanoseconds < ($this->timeProvider->getCurrentTimeNanoseconds() - $start)) {
throw new RuntimeException('Unable to get result under TTL');
}

if (Fiber::getCurrent() !== null) {
Fiber::suspend();
} elseif ($minSuspensionDelayNs > 0) {
/** @var positive-int $delay */
$delay = $minSuspensionDelayNs / 1000;
usleep($delay);
}

if (!$fiber->isSuspended()) {
throw new RuntimeException('Fiber error, fiber is not suspended nor terminated');
}

$fiber->resume();
}

if ($ttlNanoseconds < ($this->timeProvider->getCurrentTimeNanoseconds() - $start)) {
throw new RuntimeException('Unable to get result under TTL');
}

/** @var T */
return $fiber->getReturn();
}
}
63 changes: 15 additions & 48 deletions src/Locksmith.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,22 @@

readonly class Locksmith
{
private TaskExecutorInterface $taskExecutor;

public function __construct(
private SemaphoreInterface $semaphore,
private TimeProvider $timeProvider = new TimeProvider(),
private Engine $randomEngine = new Xoshiro256StarStar(),
) {}
?TaskExecutorInterface $taskExecutor = null,
) {
$this->taskExecutor =
$taskExecutor
?? (
class_exists(EventLoop::class)
? new RevoltTaskExecutor($timeProvider)
: new FiberTaskExecutor($timeProvider)
);
}

/**
* @template T
Expand Down Expand Up @@ -49,9 +60,9 @@ public function locked(
}
/** @var non-negative-int $remainingLockTTLNs */

$this->getResultUnderTTL(static fn() => null, $remainingLockTTLNs, $minSuspensionDelayNs);
$this->taskExecutor->getResultUnderTTL(static fn() => null, $remainingLockTTLNs, $minSuspensionDelayNs);
};
$this->getResultUnderTTL(
$this->taskExecutor->getResultUnderTTL(
function () use ($token, $resource, $lockTTLNs, $suspender): void {
$this->semaphore->lock($resource, $token, $lockTTLNs, $suspender);
},
Expand All @@ -61,7 +72,7 @@ function () use ($token, $resource, $lockTTLNs, $suspender): void {

try {
/** @var T */
return $this->getResultUnderTTL(
return $this->taskExecutor->getResultUnderTTL(
function () use ($callback, $resource, $suspender): mixed {
if (!$this->semaphore->isLocked($resource)) {
throw new RuntimeException('Lock has been lost during process');
Expand All @@ -77,48 +88,4 @@ function () use ($callback, $resource, $suspender): mixed {
}
};
}

/**
* @template T
* @throws Throwable
* @param Closure(): T $task
* @param non-negative-int $ttlNanoseconds
* @return T
*/
private function getResultUnderTTL(Closure $task, int $ttlNanoseconds, int $minSuspensionDelayNs): mixed
{
$suspension = EventLoop::getSuspension();

$startTime = $this->timeProvider->getCurrentTimeNanoseconds();

$deferId = EventLoop::delay($minSuspensionDelayNs / 1_000_000, function () use (
$task,
$suspension,
$startTime,
$ttlNanoseconds,
): void {
try {
$result = $task();
} catch (Throwable $e) {
$suspension->throw($e);
return;
}

// Check if TTL has been exceeded before resuming the fiber - there might have been a blocking operation in the task that caused us to exceed the TTL
if (($this->timeProvider->getCurrentTimeNanoseconds() - $startTime) >= $ttlNanoseconds) {
$suspension->throw(new RuntimeException('Unable to get result under TTL'));
return;
}
$suspension->resume($result);
});

EventLoop::delay($ttlNanoseconds / 1_000_000, static function () use ($deferId, $suspension) {
EventLoop::cancel($deferId);

$suspension->throw(new RuntimeException('Unable to get result under TTL'));
});

/** @var T */
return $suspension->suspend();
}
}
Loading