Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,20 @@ jobs:
strategy:
matrix:
php-version: ["8.3", "8.4", "8.5"]
redis-image: ["redis:7-alpine", "redis:8-alpine", "valkey/valkey:9-alpine"]

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Build and Start Services
# Pass the matrix value as an environment variable to Docker Compose
run: PHP_VERSION=${{ matrix.php-version }} docker compose up redis -d --build
run: PHP_VERSION=${{ matrix.php-version }} REDIS_IMAGE=${{ matrix.redis-image }} docker compose up redis -d --build

- name: Unit Tests
run: |
make run-tests

- name: Stop Services
if: always()
run: PHP_VERSION=${{ matrix.php-version }} docker compose down
run: PHP_VERSION=${{ matrix.php-version }} REDIS_IMAGE=${{ matrix.redis-image }} docker compose down
34 changes: 32 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,25 @@ composer require mimatus/locksmith
- [x] Redlock algorithm for Redis semaphore
- [x] Predis support for Redis semaphore
- [x] AMPHP Redis client support for Redis semaphore
- [ ] First class support and tests for Valkey/KeyDB
- [x] First class support and tests for Redis 7 | Redis 8 | Valkey 9
- [ ] Feedback and API stabilization
- [ ] Redis Cluster support
- [ ] Documentation improvements
- [ ] MySQL/MariaDB/PostgreSQL semaphore implementation

## Usage

> [!NOTE]
> Project is still in early stages of development, so API is not stable yet and may change. Feedback is very welcome to help shape the API and make it more intuitive and easy to use.

### In-Memory semaphore

For single-process scenarios you can use in-memory semaphore implementation. It allows to limit concurrent access to resource within single process (e.g., number of concurrent HTTP requests, background jobs, or other tasks).

It's suitable mainly for concurrent PHP - AMPHP, Swoole, ReactPHP, etc.

It's not suitable for multi-process scenarios (e.g., multiple PHP-FPM workers, multiple servers) as each process/server will have its own instance of in-memory semaphore. For multi-process scenarios you should use Redis-based semaphore implementation.

```php

$locksmith = new Locksmith(
Expand Down Expand Up @@ -61,6 +72,18 @@ $locked(function (Closure $suspension): void {
```

### Redis semaphore

For distributed scenarios you can use Redis-based semaphore implementation.

Supported Redis servers:
- Redis 7+
- Valkey 9+

Supported Redis clients:
- PhpRedis
- Predis
- AMPHP Redis client

```php

$redis = new Redis();
Expand Down Expand Up @@ -96,6 +119,13 @@ $locked(function (Closure $suspension): void {
```

### Distributed semaphore
Distributed semaphore allows to use multiple semaphore instances (e.g., multiple Redis instances) to achieve higher availability and fault tolerance. It uses quorum-based approach - single lock is successful only if the defined quorum of semaphores is reached.

Implementation of distributed semaphore is based on [Redlock algorithm](https://redis.io/docs/latest/develop/clients/patterns/distributed-locks/#the-redlock-algorithm) with some adjustments to fit the `Semaphore` interface and allow cooperative suspension points.

> [!NOTE]
> It's important to note that while distributed semaphore can be used Redis instances, it does not have first class support for Redis Cluster or Sentinel. First class support for Redis Cluster is on the roadmap, but in the meantime you can use distributed semaphore with multiple independent Redis instances as a workaround.

```php
$semaphores = new SemaphoreCollection([
new RedisSemaphore(
Expand All @@ -120,7 +150,7 @@ $locksmith = new Locksmith(
);
$resource = new Resource(
namespace: 'test-resource', // Namespace/identifier for resource
version: 1, // Optional resouce version
version: 1, // Optional resource version
);
$locked = $locksmith->locked(
$resource,
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ services:
REDIS_HOST: redis

redis:
image: redis:7-alpine
image: ${REDIS_IMAGE:-redis:7-alpine}
ports:
- "6379:6379"
- "6379:6379"
4 changes: 2 additions & 2 deletions src/Semaphore/DistributedSemaphore.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public function lock(
$semaphores = clone $this->semaphores;

do {
$semaphore = $this->semaphores->getRandom();
$semaphore = $semaphores->getRandom();
$lockTTLNs -= $this->timeProvider->getCurrentTimeNanoseconds() - $startTime;
if ($lockTTLNs <= 0) {
if ($lockTTLNs <= 0 || $semaphore === null) {
break;
}

Expand Down
12 changes: 8 additions & 4 deletions src/Semaphore/Redis/PhpRedisClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

namespace MiMatus\Locksmith\Semaphore\Redis;

use Redis;
use RedisCluster;
use RedisSentinel;
use RuntimeException;

class PhpRedisClient implements RedisClientInterface
{
public function __construct(
private \Redis $redis,
private Redis|RedisCluster $redis,
) {}

/**
Expand All @@ -22,13 +25,14 @@ public function eval(string $script, array $keys = [], array $args = []): mixed
{
try {
/** @var mixed */
$result = $this->redis->eval($script, [...$keys, ...$args], count($keys));
$result = $this->redis->eval($script, [...$keys, ...$args], count($keys)); // @mago-ignore analysis:invalid-method-access RedisCluster
} catch (\RedisException $e) {
throw new RuntimeException('Redis eval failed: ' . $e->getMessage(), 0, $e);
}

if ($result === false) {
$errorMessage = $this->redis->getLastError() ?? 'Unknown error';
/** @var string */
$errorMessage = $this->redis->getLastError() ?? 'Unknown error'; // @mago-ignore analysis:invalid-method-access RedisCluster
throw new RuntimeException('Redis eval failed: ' . $errorMessage);
}
return $result;
Expand All @@ -41,7 +45,7 @@ public function eval(string $script, array $keys = [], array $args = []): mixed
public function exists(string $key): bool
{
try {
return $this->redis->exists($key) > 0;
return (bool) $this->redis->exists($key); // @mago-ignore analysis:invalid-method-access RedisCluster
} catch (\RedisException $e) {
throw new RuntimeException('Redis exists check failed: ' . $e->getMessage(), 0, $e);
}
Expand Down
8 changes: 6 additions & 2 deletions src/Semaphore/SemaphoreCollection.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@ public function without(SemaphoreInterface $semaphore): static
}

/**
* @return T
* @return ?T
*/
#[\Override]
public function getRandom(): SemaphoreInterface
public function getRandom(): ?SemaphoreInterface
{
if ($this->semaphores === []) {
return null;
}

/** @var int */
$key = $this->randomizer->pickArrayKeys($this->semaphores, 1)[0];
return $this->semaphores[$key];
Expand Down
4 changes: 2 additions & 2 deletions src/Semaphore/SemaphoreCollectionInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ interface SemaphoreCollectionInterface extends IteratorAggregate, Countable
public function without(SemaphoreInterface $semaphore): static;

/**
* @return T
* @return ?T
*/
public function getRandom(): SemaphoreInterface;
public function getRandom(): ?SemaphoreInterface;

/**
* @return Traversable<T>
Expand Down
20 changes: 13 additions & 7 deletions tests/Unit/DistributedSemaphoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,9 @@ public function testAcquiredLockWithErrors(): void
$semaphore = self::createStub(SemaphoreInterface::class);
$timeProvider = self::createStub(TimeProvider::class);
$currentTime = 0;

$distributedSemaphore = new DistributedSemaphore(
semaphores: new SemaphoreCollection([$semaphore, $semaphore, $semaphore, $semaphore, $semaphore]),
quorum: 3,
timeProvider: $timeProvider,
);

$lockAttempt = 0;
$locksAquired = 0;

$semaphore
->method('lock')
->willReturnCallback(static function (
Expand Down Expand Up @@ -196,6 +190,18 @@ public function testAcquiredLockWithErrors(): void
return $currentTime;
});

$distributedSemaphore = new DistributedSemaphore(
semaphores: new SemaphoreCollection([
clone $semaphore,
clone $semaphore,
clone $semaphore,
clone $semaphore,
clone $semaphore,
]),
quorum: 3,
timeProvider: $timeProvider,
);

$distributedSemaphore->lock(
resource: new Resource(namespace: 'test-resource'),
token: 'test-token', // @mago-ignore lint:no-literal-password
Expand Down