From e826f87f350728aa07ec1b14dc440dcf818c9fe0 Mon Sep 17 00:00:00 2001 From: MiMatus Date: Sat, 7 Feb 2026 17:55:27 +0100 Subject: [PATCH] feat: Multiple redis clients #3 --- Makefile | 2 +- README.md | 18 +- composer.json | 11 +- composer.lock | 1393 ++++++++++++++++- src/Locksmith.php | 73 +- src/Semaphore/InMemorySemaphore.php | 5 +- src/Semaphore/Redis/AmPhpRedisClient.php | 43 + src/Semaphore/Redis/PhpRedisClient.php | 49 + src/Semaphore/Redis/PredisRedisClient.php | 52 + src/Semaphore/Redis/RedisClientInterface.php | 22 + src/Semaphore/{ => Redis}/RedisSemaphore.php | 44 +- .../AmpPhp/AmPhpRedisSemaphoreTest.php | 77 + tests/Integration/AmpPhp/LocksmithTest.php | 78 + .../PhpRedis/PhpRedisSemaphoreTest.php | 77 + .../Predis/PredisSemaphoreTest.php | 76 + tests/{Unit => }/SemaphoreTestCase.php | 92 +- tests/Unit/DistributedSemaphoreTest.php | 10 +- tests/Unit/InMemorySemaphoreTest.php | 1 + tests/Unit/LocksmithTest.php | 1 - tests/Unit/RedisSemaphoreTest.php | 46 - 20 files changed, 2002 insertions(+), 168 deletions(-) create mode 100644 src/Semaphore/Redis/AmPhpRedisClient.php create mode 100644 src/Semaphore/Redis/PhpRedisClient.php create mode 100644 src/Semaphore/Redis/PredisRedisClient.php create mode 100644 src/Semaphore/Redis/RedisClientInterface.php rename src/Semaphore/{ => Redis}/RedisSemaphore.php (81%) create mode 100644 tests/Integration/AmpPhp/AmPhpRedisSemaphoreTest.php create mode 100644 tests/Integration/AmpPhp/LocksmithTest.php create mode 100644 tests/Integration/PhpRedis/PhpRedisSemaphoreTest.php create mode 100644 tests/Integration/Predis/PredisSemaphoreTest.php rename tests/{Unit => }/SemaphoreTestCase.php (80%) delete mode 100644 tests/Unit/RedisSemaphoreTest.php diff --git a/Makefile b/Makefile index fa7db69..9d98e62 100644 --- a/Makefile +++ b/Makefile @@ -26,4 +26,4 @@ mago-format: # Run code formatting via mago. .PHONY: run-tests run-tests: # Run unit tests via PHPUnit. - docker compose run --rm php ./vendor/bin/phpunit --colors=always --configuration ./tests/phpunit.xml ./tests/Unit/ \ No newline at end of file + docker compose run --rm php ./vendor/bin/phpunit --colors=always --configuration ./tests/phpunit.xml ./tests/ \ No newline at end of file diff --git a/README.md b/README.md index 3fbdaf0..e4a2470 100644 --- a/README.md +++ b/README.md @@ -22,11 +22,12 @@ composer require mimatus/locksmith ## Roadmap - [x] Basic in-memory & Redis semaphore implementation +- [x] Redlock algorithm for Redis semaphore +- [x] Predis support for Redis semaphore +- [x] AMPHP Redis client support for Redis semaphore +- [ ] First class support and tests for Valkey/KeyDB - [ ] Feedback and API stabilization - [ ] Documentation improvements -- [x] Redlock algorithm for Redis semaphore -- [ ] Predis support for Redis semaphore -- [ ] AMPHP Redis client support for Redis semaphore - [ ] MySQL/MariaDB/PostgreSQL semaphore implementation ## Usage @@ -64,9 +65,10 @@ $locked(function (Closure $suspension): void { $redis = new Redis(); $redis->connect('redis'); +$phpRedisCleint = new PhpRedisClient($redis); $semaphore = new RedisSemaphore( - redisClient: $redis, + redisClient: $phpRedisCleint, maxConcurrentLocks: 3, // Max concurrent locks ); @@ -97,15 +99,15 @@ $locked(function (Closure $suspension): void { ```php $semaphores = new SemaphoreCollection([ new RedisSemaphore( - redisClient: $redis1, + redisClient: $redisClient1, maxConcurrentLocks: 3, ), new RedisSemaphore( - redisClient: $redis2, + redisClient: $redisClient2, maxConcurrentLocks: 3, ), new RedisSemaphore( - redisClient: $redis3, + redisClient: $redisClient3, maxConcurrentLocks: 3, ), ]); @@ -155,7 +157,7 @@ mago-analyze: Run static analysis via mago. mago-format: Run code formatting via mago. mago-lint-fix: Run linting with auto-fix via mago. mago-lint: Run linting via mago. -run-tests: Run unit tests via PHPUnit. +run-tests: Run tests via PHPUnit. ``` ## License diff --git a/composer.json b/composer.json index 9fb15c6..e9d80c9 100644 --- a/composer.json +++ b/composer.json @@ -3,10 +3,13 @@ "type": "library", "license": "MIT", "require": { - "php": ">=8.3" + "php": ">=8.3", + "revolt/event-loop": "^1.0" }, "suggest": { - "ext-redis": "To use this library with the PHP Redis extension." + "ext-redis": "To use this library with the PHP Redis extension.", + "predis/predis": "To use this library with the Predis client.", + "amphp/redis": "To use this library with the AMPHP Redis client." }, "autoload": { "psr-4": { @@ -20,6 +23,8 @@ }, "require-dev": { "ext-redis": "*", - "phpunit/phpunit": "^12.5" + "phpunit/phpunit": "^12.5", + "amphp/redis": "^2.0", + "predis/predis": "^3.3" } } diff --git a/composer.lock b/composer.lock index 1122af3..7e6dc8b 100644 --- a/composer.lock +++ b/composer.lock @@ -4,9 +4,1171 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "6a525cd42cbc85c10c1d7a10d546c274", - "packages": [], + "content-hash": "c62b27a2a38173fb3d61673fc12d5989", + "packages": [ + { + "name": "revolt/event-loop", + "version": "v1.0.8", + "source": { + "type": "git", + "url": "https://github.com/revoltphp/event-loop.git", + "reference": "b6fc06dce8e9b523c9946138fa5e62181934f91c" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/revoltphp/event-loop/zipball/b6fc06dce8e9b523c9946138fa5e62181934f91c", + "reference": "b6fc06dce8e9b523c9946138fa5e62181934f91c", + "shasum": "" + }, + "require": { + "php": ">=8.1" + }, + "require-dev": { + "ext-json": "*", + "jetbrains/phpstorm-stubs": "^2019.3", + "phpunit/phpunit": "^9", + "psalm/phar": "^5.15" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-main": "1.x-dev" + } + }, + "autoload": { + "psr-4": { + "Revolt\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Cees-Jan Kiewiet", + "email": "ceesjank@gmail.com" + }, + { + "name": "Christian Lück", + "email": "christian@clue.engineering" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "Rock-solid event loop for concurrent PHP applications.", + "keywords": [ + "async", + "asynchronous", + "concurrency", + "event", + "event-loop", + "non-blocking", + "scheduler" + ], + "support": { + "issues": "https://github.com/revoltphp/event-loop/issues", + "source": "https://github.com/revoltphp/event-loop/tree/v1.0.8" + }, + "time": "2025-08-27T21:33:23+00:00" + } + ], "packages-dev": [ + { + "name": "amphp/amp", + "version": "v3.1.1", + "source": { + "type": "git", + "url": "https://github.com/amphp/amp.git", + "reference": "fa0ab33a6f47a82929c38d03ca47ebb71086a93f" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/amp/zipball/fa0ab33a6f47a82929c38d03ca47ebb71086a93f", + "reference": "fa0ab33a6f47a82929c38d03ca47ebb71086a93f", + "shasum": "" + }, + "require": { + "php": ">=8.1", + "revolt/event-loop": "^1 || ^0.2" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "phpunit/phpunit": "^9", + "psalm/phar": "5.23.1" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php", + "src/Future/functions.php", + "src/Internal/functions.php" + ], + "psr-4": { + "Amp\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Bob Weinand", + "email": "bobwei9@hotmail.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + }, + { + "name": "Daniel Lowrey", + "email": "rdlowrey@php.net" + } + ], + "description": "A non-blocking concurrency framework for PHP applications.", + "homepage": "https://amphp.org/amp", + "keywords": [ + "async", + "asynchronous", + "awaitable", + "concurrency", + "event", + "event-loop", + "future", + "non-blocking", + "promise" + ], + "support": { + "issues": "https://github.com/amphp/amp/issues", + "source": "https://github.com/amphp/amp/tree/v3.1.1" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2025-08-27T21:42:00+00:00" + }, + { + "name": "amphp/byte-stream", + "version": "v2.1.2", + "source": { + "type": "git", + "url": "https://github.com/amphp/byte-stream.git", + "reference": "55a6bd071aec26fa2a3e002618c20c35e3df1b46" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/byte-stream/zipball/55a6bd071aec26fa2a3e002618c20c35e3df1b46", + "reference": "55a6bd071aec26fa2a3e002618c20c35e3df1b46", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "amphp/parser": "^1.1", + "amphp/pipeline": "^1", + "amphp/serialization": "^1", + "amphp/sync": "^2", + "php": ">=8.1", + "revolt/event-loop": "^1 || ^0.2.3" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "phpunit/phpunit": "^9", + "psalm/phar": "5.22.1" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php", + "src/Internal/functions.php" + ], + "psr-4": { + "Amp\\ByteStream\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "A stream abstraction to make working with non-blocking I/O simple.", + "homepage": "https://amphp.org/byte-stream", + "keywords": [ + "amp", + "amphp", + "async", + "io", + "non-blocking", + "stream" + ], + "support": { + "issues": "https://github.com/amphp/byte-stream/issues", + "source": "https://github.com/amphp/byte-stream/tree/v2.1.2" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2025-03-16T17:10:27+00:00" + }, + { + "name": "amphp/cache", + "version": "v2.0.1", + "source": { + "type": "git", + "url": "https://github.com/amphp/cache.git", + "reference": "46912e387e6aa94933b61ea1ead9cf7540b7797c" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/cache/zipball/46912e387e6aa94933b61ea1ead9cf7540b7797c", + "reference": "46912e387e6aa94933b61ea1ead9cf7540b7797c", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "amphp/serialization": "^1", + "amphp/sync": "^2", + "php": ">=8.1", + "revolt/event-loop": "^1 || ^0.2" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "phpunit/phpunit": "^9", + "psalm/phar": "^5.4" + }, + "type": "library", + "autoload": { + "psr-4": { + "Amp\\Cache\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + }, + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Daniel Lowrey", + "email": "rdlowrey@php.net" + } + ], + "description": "A fiber-aware cache API based on Amp and Revolt.", + "homepage": "https://amphp.org/cache", + "support": { + "issues": "https://github.com/amphp/cache/issues", + "source": "https://github.com/amphp/cache/tree/v2.0.1" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2024-04-19T03:38:06+00:00" + }, + { + "name": "amphp/dns", + "version": "v2.4.0", + "source": { + "type": "git", + "url": "https://github.com/amphp/dns.git", + "reference": "78eb3db5fc69bf2fc0cb503c4fcba667bc223c71" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/dns/zipball/78eb3db5fc69bf2fc0cb503c4fcba667bc223c71", + "reference": "78eb3db5fc69bf2fc0cb503c4fcba667bc223c71", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "amphp/byte-stream": "^2", + "amphp/cache": "^2", + "amphp/parser": "^1", + "amphp/process": "^2", + "daverandom/libdns": "^2.0.2", + "ext-filter": "*", + "ext-json": "*", + "php": ">=8.1", + "revolt/event-loop": "^1 || ^0.2" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "phpunit/phpunit": "^9", + "psalm/phar": "5.20" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php" + ], + "psr-4": { + "Amp\\Dns\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Chris Wright", + "email": "addr@daverandom.com" + }, + { + "name": "Daniel Lowrey", + "email": "rdlowrey@php.net" + }, + { + "name": "Bob Weinand", + "email": "bobwei9@hotmail.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + }, + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + } + ], + "description": "Async DNS resolution for Amp.", + "homepage": "https://github.com/amphp/dns", + "keywords": [ + "amp", + "amphp", + "async", + "client", + "dns", + "resolve" + ], + "support": { + "issues": "https://github.com/amphp/dns/issues", + "source": "https://github.com/amphp/dns/tree/v2.4.0" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2025-01-19T15:43:40+00:00" + }, + { + "name": "amphp/parser", + "version": "v1.1.1", + "source": { + "type": "git", + "url": "https://github.com/amphp/parser.git", + "reference": "3cf1f8b32a0171d4b1bed93d25617637a77cded7" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/parser/zipball/3cf1f8b32a0171d4b1bed93d25617637a77cded7", + "reference": "3cf1f8b32a0171d4b1bed93d25617637a77cded7", + "shasum": "" + }, + "require": { + "php": ">=7.4" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "phpunit/phpunit": "^9", + "psalm/phar": "^5.4" + }, + "type": "library", + "autoload": { + "psr-4": { + "Amp\\Parser\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "A generator parser to make streaming parsers simple.", + "homepage": "https://github.com/amphp/parser", + "keywords": [ + "async", + "non-blocking", + "parser", + "stream" + ], + "support": { + "issues": "https://github.com/amphp/parser/issues", + "source": "https://github.com/amphp/parser/tree/v1.1.1" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2024-03-21T19:16:53+00:00" + }, + { + "name": "amphp/pipeline", + "version": "v1.2.3", + "source": { + "type": "git", + "url": "https://github.com/amphp/pipeline.git", + "reference": "7b52598c2e9105ebcddf247fc523161581930367" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/pipeline/zipball/7b52598c2e9105ebcddf247fc523161581930367", + "reference": "7b52598c2e9105ebcddf247fc523161581930367", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "php": ">=8.1", + "revolt/event-loop": "^1" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "phpunit/phpunit": "^9", + "psalm/phar": "^5.18" + }, + "type": "library", + "autoload": { + "psr-4": { + "Amp\\Pipeline\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "Asynchronous iterators and operators.", + "homepage": "https://amphp.org/pipeline", + "keywords": [ + "amp", + "amphp", + "async", + "io", + "iterator", + "non-blocking" + ], + "support": { + "issues": "https://github.com/amphp/pipeline/issues", + "source": "https://github.com/amphp/pipeline/tree/v1.2.3" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2025-03-16T16:33:53+00:00" + }, + { + "name": "amphp/process", + "version": "v2.0.3", + "source": { + "type": "git", + "url": "https://github.com/amphp/process.git", + "reference": "52e08c09dec7511d5fbc1fb00d3e4e79fc77d58d" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/process/zipball/52e08c09dec7511d5fbc1fb00d3e4e79fc77d58d", + "reference": "52e08c09dec7511d5fbc1fb00d3e4e79fc77d58d", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "amphp/byte-stream": "^2", + "amphp/sync": "^2", + "php": ">=8.1", + "revolt/event-loop": "^1 || ^0.2" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "phpunit/phpunit": "^9", + "psalm/phar": "^5.4" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php" + ], + "psr-4": { + "Amp\\Process\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Bob Weinand", + "email": "bobwei9@hotmail.com" + }, + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "A fiber-aware process manager based on Amp and Revolt.", + "homepage": "https://amphp.org/process", + "support": { + "issues": "https://github.com/amphp/process/issues", + "source": "https://github.com/amphp/process/tree/v2.0.3" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2024-04-19T03:13:44+00:00" + }, + { + "name": "amphp/redis", + "version": "v2.0.3", + "source": { + "type": "git", + "url": "https://github.com/amphp/redis.git", + "reference": "1572c2fec2849d272570919e998f9a3c1a5b1703" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/redis/zipball/1572c2fec2849d272570919e998f9a3c1a5b1703", + "reference": "1572c2fec2849d272570919e998f9a3c1a5b1703", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "amphp/byte-stream": "^2", + "amphp/cache": "^2", + "amphp/parser": "^1", + "amphp/pipeline": "^1", + "amphp/serialization": "^1", + "amphp/socket": "^2", + "amphp/sync": "^2", + "league/uri": "^7", + "php": ">=8.1", + "psr/log": "^1|^2|^3", + "revolt/event-loop": "^1" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "amphp/process": "^2", + "phpunit/phpunit": "^9", + "psalm/phar": "5.22" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php", + "src/Internal/functions.php" + ], + "psr-4": { + "Amp\\Redis\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + }, + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + } + ], + "description": "Efficient asynchronous communication with Redis servers, enabling scalable and responsive data storage and retrieval.", + "homepage": "https://amphp.org/redis", + "keywords": [ + "amp", + "amphp", + "async", + "client", + "redis", + "revolt" + ], + "support": { + "issues": "https://github.com/amphp/redis/issues", + "source": "https://github.com/amphp/redis/tree/v2.0.3" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2025-01-15T04:14:11+00:00" + }, + { + "name": "amphp/serialization", + "version": "v1.0.0", + "source": { + "type": "git", + "url": "https://github.com/amphp/serialization.git", + "reference": "693e77b2fb0b266c3c7d622317f881de44ae94a1" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/serialization/zipball/693e77b2fb0b266c3c7d622317f881de44ae94a1", + "reference": "693e77b2fb0b266c3c7d622317f881de44ae94a1", + "shasum": "" + }, + "require": { + "php": ">=7.1" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "dev-master", + "phpunit/phpunit": "^9 || ^8 || ^7" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php" + ], + "psr-4": { + "Amp\\Serialization\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "Serialization tools for IPC and data storage in PHP.", + "homepage": "https://github.com/amphp/serialization", + "keywords": [ + "async", + "asynchronous", + "serialization", + "serialize" + ], + "support": { + "issues": "https://github.com/amphp/serialization/issues", + "source": "https://github.com/amphp/serialization/tree/master" + }, + "time": "2020-03-25T21:39:07+00:00" + }, + { + "name": "amphp/socket", + "version": "v2.3.1", + "source": { + "type": "git", + "url": "https://github.com/amphp/socket.git", + "reference": "58e0422221825b79681b72c50c47a930be7bf1e1" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/socket/zipball/58e0422221825b79681b72c50c47a930be7bf1e1", + "reference": "58e0422221825b79681b72c50c47a930be7bf1e1", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "amphp/byte-stream": "^2", + "amphp/dns": "^2", + "ext-openssl": "*", + "kelunik/certificate": "^1.1", + "league/uri": "^6.5 | ^7", + "league/uri-interfaces": "^2.3 | ^7", + "php": ">=8.1", + "revolt/event-loop": "^1 || ^0.2" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "amphp/process": "^2", + "phpunit/phpunit": "^9", + "psalm/phar": "5.20" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php", + "src/Internal/functions.php", + "src/SocketAddress/functions.php" + ], + "psr-4": { + "Amp\\Socket\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Daniel Lowrey", + "email": "rdlowrey@gmail.com" + }, + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "Non-blocking socket connection / server implementations based on Amp and Revolt.", + "homepage": "https://github.com/amphp/socket", + "keywords": [ + "amp", + "async", + "encryption", + "non-blocking", + "sockets", + "tcp", + "tls" + ], + "support": { + "issues": "https://github.com/amphp/socket/issues", + "source": "https://github.com/amphp/socket/tree/v2.3.1" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2024-04-21T14:33:03+00:00" + }, + { + "name": "amphp/sync", + "version": "v2.3.0", + "source": { + "type": "git", + "url": "https://github.com/amphp/sync.git", + "reference": "217097b785130d77cfcc58ff583cf26cd1770bf1" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/amphp/sync/zipball/217097b785130d77cfcc58ff583cf26cd1770bf1", + "reference": "217097b785130d77cfcc58ff583cf26cd1770bf1", + "shasum": "" + }, + "require": { + "amphp/amp": "^3", + "amphp/pipeline": "^1", + "amphp/serialization": "^1", + "php": ">=8.1", + "revolt/event-loop": "^1 || ^0.2" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "amphp/phpunit-util": "^3", + "phpunit/phpunit": "^9", + "psalm/phar": "5.23" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php" + ], + "psr-4": { + "Amp\\Sync\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" + }, + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + }, + { + "name": "Stephen Coakley", + "email": "me@stephencoakley.com" + } + ], + "description": "Non-blocking synchronization primitives for PHP based on Amp and Revolt.", + "homepage": "https://github.com/amphp/sync", + "keywords": [ + "async", + "asynchronous", + "mutex", + "semaphore", + "synchronization" + ], + "support": { + "issues": "https://github.com/amphp/sync/issues", + "source": "https://github.com/amphp/sync/tree/v2.3.0" + }, + "funding": [ + { + "url": "https://github.com/amphp", + "type": "github" + } + ], + "time": "2024-08-03T19:31:26+00:00" + }, + { + "name": "daverandom/libdns", + "version": "v2.1.0", + "source": { + "type": "git", + "url": "https://github.com/DaveRandom/LibDNS.git", + "reference": "b84c94e8fe6b7ee4aecfe121bfe3b6177d303c8a" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/DaveRandom/LibDNS/zipball/b84c94e8fe6b7ee4aecfe121bfe3b6177d303c8a", + "reference": "b84c94e8fe6b7ee4aecfe121bfe3b6177d303c8a", + "shasum": "" + }, + "require": { + "ext-ctype": "*", + "php": ">=7.1" + }, + "suggest": { + "ext-intl": "Required for IDN support" + }, + "type": "library", + "autoload": { + "files": [ + "src/functions.php" + ], + "psr-4": { + "LibDNS\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "DNS protocol implementation written in pure PHP", + "keywords": [ + "dns" + ], + "support": { + "issues": "https://github.com/DaveRandom/LibDNS/issues", + "source": "https://github.com/DaveRandom/LibDNS/tree/v2.1.0" + }, + "time": "2024-04-12T12:12:48+00:00" + }, + { + "name": "kelunik/certificate", + "version": "v1.1.3", + "source": { + "type": "git", + "url": "https://github.com/kelunik/certificate.git", + "reference": "7e00d498c264d5eb4f78c69f41c8bd6719c0199e" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/kelunik/certificate/zipball/7e00d498c264d5eb4f78c69f41c8bd6719c0199e", + "reference": "7e00d498c264d5eb4f78c69f41c8bd6719c0199e", + "shasum": "" + }, + "require": { + "ext-openssl": "*", + "php": ">=7.0" + }, + "require-dev": { + "amphp/php-cs-fixer-config": "^2", + "phpunit/phpunit": "^6 | 7 | ^8 | ^9" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.x-dev" + } + }, + "autoload": { + "psr-4": { + "Kelunik\\Certificate\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Niklas Keller", + "email": "me@kelunik.com" + } + ], + "description": "Access certificate details and transform between different formats.", + "keywords": [ + "DER", + "certificate", + "certificates", + "openssl", + "pem", + "x509" + ], + "support": { + "issues": "https://github.com/kelunik/certificate/issues", + "source": "https://github.com/kelunik/certificate/tree/v1.1.3" + }, + "time": "2023-02-03T21:26:53+00:00" + }, + { + "name": "league/uri", + "version": "7.8.0", + "source": { + "type": "git", + "url": "https://github.com/thephpleague/uri.git", + "reference": "4436c6ec8d458e4244448b069cc572d088230b76" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/thephpleague/uri/zipball/4436c6ec8d458e4244448b069cc572d088230b76", + "reference": "4436c6ec8d458e4244448b069cc572d088230b76", + "shasum": "" + }, + "require": { + "league/uri-interfaces": "^7.8", + "php": "^8.1", + "psr/http-factory": "^1" + }, + "conflict": { + "league/uri-schemes": "^1.0" + }, + "suggest": { + "ext-bcmath": "to improve IPV4 host parsing", + "ext-dom": "to convert the URI into an HTML anchor tag", + "ext-fileinfo": "to create Data URI from file contennts", + "ext-gmp": "to improve IPV4 host parsing", + "ext-intl": "to handle IDN host with the best performance", + "ext-uri": "to use the PHP native URI class", + "jeremykendall/php-domain-parser": "to further parse the URI host and resolve its Public Suffix and Top Level Domain", + "league/uri-components": "to provide additional tools to manipulate URI objects components", + "league/uri-polyfill": "to backport the PHP URI extension for older versions of PHP", + "php-64bit": "to improve IPV4 host parsing", + "rowbot/url": "to handle URLs using the WHATWG URL Living Standard specification", + "symfony/polyfill-intl-idn": "to handle IDN host via the Symfony polyfill if ext-intl is not present" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "7.x-dev" + } + }, + "autoload": { + "psr-4": { + "League\\Uri\\": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Ignace Nyamagana Butera", + "email": "nyamsprod@gmail.com", + "homepage": "https://nyamsprod.com" + } + ], + "description": "URI manipulation library", + "homepage": "https://uri.thephpleague.com", + "keywords": [ + "URN", + "data-uri", + "file-uri", + "ftp", + "hostname", + "http", + "https", + "middleware", + "parse_str", + "parse_url", + "psr-7", + "query-string", + "querystring", + "rfc2141", + "rfc3986", + "rfc3987", + "rfc6570", + "rfc8141", + "uri", + "uri-template", + "url", + "ws" + ], + "support": { + "docs": "https://uri.thephpleague.com", + "forum": "https://thephpleague.slack.com", + "issues": "https://github.com/thephpleague/uri-src/issues", + "source": "https://github.com/thephpleague/uri/tree/7.8.0" + }, + "funding": [ + { + "url": "https://github.com/sponsors/nyamsprod", + "type": "github" + } + ], + "time": "2026-01-14T17:24:56+00:00" + }, + { + "name": "league/uri-interfaces", + "version": "7.8.0", + "source": { + "type": "git", + "url": "https://github.com/thephpleague/uri-interfaces.git", + "reference": "c5c5cd056110fc8afaba29fa6b72a43ced42acd4" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/thephpleague/uri-interfaces/zipball/c5c5cd056110fc8afaba29fa6b72a43ced42acd4", + "reference": "c5c5cd056110fc8afaba29fa6b72a43ced42acd4", + "shasum": "" + }, + "require": { + "ext-filter": "*", + "php": "^8.1", + "psr/http-message": "^1.1 || ^2.0" + }, + "suggest": { + "ext-bcmath": "to improve IPV4 host parsing", + "ext-gmp": "to improve IPV4 host parsing", + "ext-intl": "to handle IDN host with the best performance", + "php-64bit": "to improve IPV4 host parsing", + "rowbot/url": "to handle URLs using the WHATWG URL Living Standard specification", + "symfony/polyfill-intl-idn": "to handle IDN host via the Symfony polyfill if ext-intl is not present" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "7.x-dev" + } + }, + "autoload": { + "psr-4": { + "League\\Uri\\": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Ignace Nyamagana Butera", + "email": "nyamsprod@gmail.com", + "homepage": "https://nyamsprod.com" + } + ], + "description": "Common tools for parsing and resolving RFC3987/RFC3986 URI", + "homepage": "https://uri.thephpleague.com", + "keywords": [ + "data-uri", + "file-uri", + "ftp", + "hostname", + "http", + "https", + "parse_str", + "parse_url", + "psr-7", + "query-string", + "querystring", + "rfc3986", + "rfc3987", + "rfc6570", + "uri", + "url", + "ws" + ], + "support": { + "docs": "https://uri.thephpleague.com", + "forum": "https://thephpleague.slack.com", + "issues": "https://github.com/thephpleague/uri-src/issues", + "source": "https://github.com/thephpleague/uri-interfaces/tree/7.8.0" + }, + "funding": [ + { + "url": "https://github.com/sponsors/nyamsprod", + "type": "github" + } + ], + "time": "2026-01-15T06:54:53+00:00" + }, { "name": "myclabs/deep-copy", "version": "1.13.4", @@ -682,6 +1844,227 @@ ], "time": "2025-12-15T06:05:34+00:00" }, + { + "name": "predis/predis", + "version": "v3.3.0", + "source": { + "type": "git", + "url": "https://github.com/predis/predis.git", + "reference": "153097374b39a2f737fe700ebcd725642526cdec" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/predis/predis/zipball/153097374b39a2f737fe700ebcd725642526cdec", + "reference": "153097374b39a2f737fe700ebcd725642526cdec", + "shasum": "" + }, + "require": { + "php": "^7.2 || ^8.0", + "psr/http-message": "^1.0|^2.0" + }, + "require-dev": { + "friendsofphp/php-cs-fixer": "^3.3", + "phpstan/phpstan": "^1.9", + "phpunit/phpcov": "^6.0 || ^8.0", + "phpunit/phpunit": "^8.0 || ~9.4.4" + }, + "suggest": { + "ext-relay": "Faster connection with in-memory caching (>=0.6.2)" + }, + "type": "library", + "autoload": { + "psr-4": { + "Predis\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Till Krüss", + "homepage": "https://till.im", + "role": "Maintainer" + } + ], + "description": "A flexible and feature-complete Redis/Valkey client for PHP.", + "homepage": "http://github.com/predis/predis", + "keywords": [ + "nosql", + "predis", + "redis" + ], + "support": { + "issues": "https://github.com/predis/predis/issues", + "source": "https://github.com/predis/predis/tree/v3.3.0" + }, + "funding": [ + { + "url": "https://github.com/sponsors/tillkruss", + "type": "github" + } + ], + "time": "2025-11-24T17:48:50+00:00" + }, + { + "name": "psr/http-factory", + "version": "1.1.0", + "source": { + "type": "git", + "url": "https://github.com/php-fig/http-factory.git", + "reference": "2b4765fddfe3b508ac62f829e852b1501d3f6e8a" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/http-factory/zipball/2b4765fddfe3b508ac62f829e852b1501d3f6e8a", + "reference": "2b4765fddfe3b508ac62f829e852b1501d3f6e8a", + "shasum": "" + }, + "require": { + "php": ">=7.1", + "psr/http-message": "^1.0 || ^2.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\Http\\Message\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "https://www.php-fig.org/" + } + ], + "description": "PSR-17: Common interfaces for PSR-7 HTTP message factories", + "keywords": [ + "factory", + "http", + "message", + "psr", + "psr-17", + "psr-7", + "request", + "response" + ], + "support": { + "source": "https://github.com/php-fig/http-factory" + }, + "time": "2024-04-15T12:06:14+00:00" + }, + { + "name": "psr/http-message", + "version": "2.0", + "source": { + "type": "git", + "url": "https://github.com/php-fig/http-message.git", + "reference": "402d35bcb92c70c026d1a6a9883f06b2ead23d71" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/http-message/zipball/402d35bcb92c70c026d1a6a9883f06b2ead23d71", + "reference": "402d35bcb92c70c026d1a6a9883f06b2ead23d71", + "shasum": "" + }, + "require": { + "php": "^7.2 || ^8.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "2.0.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\Http\\Message\\": "src/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "https://www.php-fig.org/" + } + ], + "description": "Common interface for HTTP messages", + "homepage": "https://github.com/php-fig/http-message", + "keywords": [ + "http", + "http-message", + "psr", + "psr-7", + "request", + "response" + ], + "support": { + "source": "https://github.com/php-fig/http-message/tree/2.0" + }, + "time": "2023-04-04T09:54:51+00:00" + }, + { + "name": "psr/log", + "version": "3.0.2", + "source": { + "type": "git", + "url": "https://github.com/php-fig/log.git", + "reference": "f16e1d5863e37f8d8c2a01719f5b34baa2b714d3" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/log/zipball/f16e1d5863e37f8d8c2a01719f5b34baa2b714d3", + "reference": "f16e1d5863e37f8d8c2a01719f5b34baa2b714d3", + "shasum": "" + }, + "require": { + "php": ">=8.0.0" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "3.x-dev" + } + }, + "autoload": { + "psr-4": { + "Psr\\Log\\": "src" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "https://www.php-fig.org/" + } + ], + "description": "Common interface for logging libraries", + "homepage": "https://github.com/php-fig/log", + "keywords": [ + "log", + "psr", + "psr-3" + ], + "support": { + "source": "https://github.com/php-fig/log/tree/3.0.2" + }, + "time": "2024-09-11T13:17:53+00:00" + }, { "name": "sebastian/cli-parser", "version": "4.2.0", @@ -1688,8 +3071,10 @@ "prefer-stable": false, "prefer-lowest": false, "platform": { - "php": ">=8.2" + "php": ">=8.3" + }, + "platform-dev": { + "ext-redis": "*" }, - "platform-dev": {}, "plugin-api-version": "2.9.0" } diff --git a/src/Locksmith.php b/src/Locksmith.php index 96d4bca..6489555 100644 --- a/src/Locksmith.php +++ b/src/Locksmith.php @@ -5,10 +5,10 @@ namespace MiMatus\Locksmith; use Closure; -use Fiber; use MiMatus\Locksmith\Semaphore\TimeProvider; use Random\Engine; use Random\Engine\Xoshiro256StarStar; +use Revolt\EventLoop; use RuntimeException; use Throwable; @@ -37,10 +37,24 @@ public function locked( return function (Closure $callback) use ($resource, $lockTTLNs, $maxLockWaitNs, $minSuspensionDelayNs): mixed { $token = bin2hex($this->randomEngine->generate()); + $startTimeNs = $this->timeProvider->getCurrentTimeNanoseconds(); + + $suspender = function () use ($startTimeNs, $lockTTLNs, $minSuspensionDelayNs) { + $remainingLockTTLNs = (int) ( + $lockTTLNs + - ($this->timeProvider->getCurrentTimeNanoseconds() - $startTimeNs) + ); + if ($remainingLockTTLNs <= 0) { + throw new RuntimeException('Unable to get result under TTL'); + } + /** @var non-negative-int $remainingLockTTLNs */ + + $this->getResultUnderTTL(static fn() => null, $remainingLockTTLNs, $minSuspensionDelayNs); + }; $this->getResultUnderTTL( - new Fiber(function () use ($token, $resource, $lockTTLNs): void { - $this->semaphore->lock($resource, $token, $lockTTLNs, Fiber::suspend(...)); - }), + function () use ($token, $resource, $lockTTLNs, $suspender): void { + $this->semaphore->lock($resource, $token, $lockTTLNs, $suspender); + }, $maxLockWaitNs, $minSuspensionDelayNs, ); @@ -48,13 +62,13 @@ public function locked( try { /** @var T */ return $this->getResultUnderTTL( - new Fiber(function () use ($callback, $resource): mixed { + function () use ($callback, $resource, $suspender): mixed { if (!$this->semaphore->isLocked($resource)) { throw new RuntimeException('Lock has been lost during process'); } - return $callback(Fiber::suspend(...)); - }), + return $callback($suspender); + }, $lockTTLNs, $minSuspensionDelayNs, ); @@ -67,37 +81,44 @@ public function locked( /** * @template T * @throws Throwable - * @param Fiber $fiber + * @param Closure(): T $task * @param non-negative-int $ttlNanoseconds * @return T */ - private function getResultUnderTTL(Fiber $fiber, int $ttlNanoseconds, int $minSuspensionDelayNs): mixed + private function getResultUnderTTL(Closure $task, int $ttlNanoseconds, int $minSuspensionDelayNs): mixed { - $start = $this->timeProvider->getCurrentTimeNanoseconds(); + $suspension = EventLoop::getSuspension(); - $fiber->start(); + $startTime = $this->timeProvider->getCurrentTimeNanoseconds(); - while (!$fiber->isTerminated()) { - if ($ttlNanoseconds < ($this->timeProvider->getCurrentTimeNanoseconds() - $start)) { - throw new RuntimeException('Unable to get result under TTL'); + $deferId = EventLoop::delay($minSuspensionDelayNs / 1_000_000, function () use ( + $task, + $suspension, + $startTime, + $ttlNanoseconds, + ): void { + try { + $result = $task(); + } catch (Throwable $e) { + $suspension->throw($e); + return; } - if (Fiber::getCurrent() !== null) { - Fiber::suspend(); - } elseif ($minSuspensionDelayNs > 0) { - /** @var positive-int $delay */ - $delay = $minSuspensionDelayNs / 1000; - usleep($delay); + // Check if TTL has been exceeded before resuming the fiber - there might have been a blocking operation in the task that caused us to exceed the TTL + if (($this->timeProvider->getCurrentTimeNanoseconds() - $startTime) >= $ttlNanoseconds) { + $suspension->throw(new RuntimeException('Unable to get result under TTL')); + return; } + $suspension->resume($result); + }); - if (!$fiber->isSuspended()) { - throw new RuntimeException('Fiber error, fiber is not suspended nor terminated'); - } + EventLoop::delay($ttlNanoseconds / 1_000_000, static function () use ($deferId, $suspension) { + EventLoop::cancel($deferId); - $fiber->resume(); - } + $suspension->throw(new RuntimeException('Unable to get result under TTL')); + }); /** @var T */ - return $fiber->getReturn(); + return $suspension->suspend(); } } diff --git a/src/Semaphore/InMemorySemaphore.php b/src/Semaphore/InMemorySemaphore.php index daafff4..d99e6dd 100644 --- a/src/Semaphore/InMemorySemaphore.php +++ b/src/Semaphore/InMemorySemaphore.php @@ -62,11 +62,10 @@ public function lock( $higherVersion = $resource->getVersion() > $storedResource['version']; $maxLocksReached = $activeLocks >= $this->maxConcurrentLocks; - if ($maxLocksReached || $higherVersion) { - $suspension(); - } else { + if (!$maxLocksReached && !$higherVersion) { break; } + $suspension(); } while (true); $resourceExpiration = $currentTime + $lockTTLNs; diff --git a/src/Semaphore/Redis/AmPhpRedisClient.php b/src/Semaphore/Redis/AmPhpRedisClient.php new file mode 100644 index 0000000..d7ebef8 --- /dev/null +++ b/src/Semaphore/Redis/AmPhpRedisClient.php @@ -0,0 +1,43 @@ + $keys + * @param list $args + * @throws RuntimeException + */ + #[\Override] + public function eval(string $script, array $keys = [], array $args = []): mixed + { + try { + return $this->redis->eval($script, $keys, $args); + } catch (\RedisException $e) { + throw new RuntimeException('Redis eval failed: ' . $e->getMessage(), 0, $e); + } + } + + /** + * @throws RuntimeException + */ + #[\Override] + public function exists(string $key): bool + { + try { + return $this->redis->has($key); + } catch (\RedisException $e) { + throw new RuntimeException('Redis exists check failed: ' . $e->getMessage(), 0, $e); + } + } +} diff --git a/src/Semaphore/Redis/PhpRedisClient.php b/src/Semaphore/Redis/PhpRedisClient.php new file mode 100644 index 0000000..d4b1430 --- /dev/null +++ b/src/Semaphore/Redis/PhpRedisClient.php @@ -0,0 +1,49 @@ + $keys + * @param list $args + * @throws RuntimeException + */ + #[\Override] + public function eval(string $script, array $keys = [], array $args = []): mixed + { + try { + /** @var mixed */ + $result = $this->redis->eval($script, [...$keys, ...$args], count($keys)); + } catch (\RedisException $e) { + throw new RuntimeException('Redis eval failed: ' . $e->getMessage(), 0, $e); + } + + if ($result === false) { + $errorMessage = $this->redis->getLastError() ?? 'Unknown error'; + throw new RuntimeException('Redis eval failed: ' . $errorMessage); + } + return $result; + } + + /** + * @throws RuntimeException + */ + #[\Override] + public function exists(string $key): bool + { + try { + return $this->redis->exists($key) > 0; + } catch (\RedisException $e) { + throw new RuntimeException('Redis exists check failed: ' . $e->getMessage(), 0, $e); + } + } +} diff --git a/src/Semaphore/Redis/PredisRedisClient.php b/src/Semaphore/Redis/PredisRedisClient.php new file mode 100644 index 0000000..4530aca --- /dev/null +++ b/src/Semaphore/Redis/PredisRedisClient.php @@ -0,0 +1,52 @@ + $keys + * @param list $args + * @throws RuntimeException + */ + #[\Override] + public function eval(string $script, array $keys = [], array $args = []): mixed + { + try { + /** @var mixed */ + $response = $this->redis->eval($script, count($keys), ...[...$keys, ...$args]); + } catch (PredisException $e) { + throw new RuntimeException('Redis eval failed: ' . $e->getMessage(), 0, $e); + } + + if ($response instanceof ErrorResponseInterface && !$this->redis->getOptions()->exceptions) { + throw new RuntimeException($response->getMessage()); + } + + return $response; + } + + /** + * @throws RuntimeException + */ + #[\Override] + public function exists(string $key): bool + { + try { + return $this->redis->exists($key) > 0; + } catch (PredisException $e) { + throw new RuntimeException('Redis exists check failed: ' . $e->getMessage(), 0, $e); + } + } +} diff --git a/src/Semaphore/Redis/RedisClientInterface.php b/src/Semaphore/Redis/RedisClientInterface.php new file mode 100644 index 0000000..145c915 --- /dev/null +++ b/src/Semaphore/Redis/RedisClientInterface.php @@ -0,0 +1,22 @@ + $keys + * @param list $args + * @throws RuntimeException + */ + public function eval(string $script, array $keys = [], array $args = []): mixed; + + /** + * @throws RuntimeException + */ + public function exists(string $key): bool; +} diff --git a/src/Semaphore/RedisSemaphore.php b/src/Semaphore/Redis/RedisSemaphore.php similarity index 81% rename from src/Semaphore/RedisSemaphore.php rename to src/Semaphore/Redis/RedisSemaphore.php index 515beaa..1c2c015 100644 --- a/src/Semaphore/RedisSemaphore.php +++ b/src/Semaphore/Redis/RedisSemaphore.php @@ -2,21 +2,25 @@ declare(strict_types=1); -namespace MiMatus\Locksmith\Semaphore; +namespace MiMatus\Locksmith\Semaphore\Redis; use Closure; use MiMatus\Locksmith\ResourceInterface; +use MiMatus\Locksmith\Semaphore\Redis\RedisClientInterface; use MiMatus\Locksmith\SemaphoreInterface; use RedisException; use RuntimeException; +use Throwable; readonly class RedisSemaphore implements SemaphoreInterface { + private const string RedisKeyPrefix = 'locksmith:semaphore:'; + /** * @param positive-int $maxConcurrentLocks */ public function __construct( - public \Redis $redisClient, + public RedisClientInterface $redisClient, private int $maxConcurrentLocks = 1, ) {} @@ -91,22 +95,21 @@ public function lock( do { try { /** @var bool|int */ - $result = $this->redisClient->eval( + $this->redisClient->eval( $luaScript, [ - $resource->getNamespace(), + self::RedisKeyPrefix . $resource->getNamespace(), + ], + [ $token, - $milisecondsTTL, - $resource->getVersion(), - $this->maxConcurrentLocks, + (string) $milisecondsTTL, + (string) $resource->getVersion(), + (string) $this->maxConcurrentLocks, ], - 1, ); - if ($result !== false) { - break; - } - - $errorMessage = $this->redisClient->getLastError() ?? 'Unknown error'; + break; + } catch (Throwable $e) { + $errorMessage = $e->getMessage(); if (str_contains($errorMessage, 'MiMatus_VERSION_MISMATCH')) { throw new RuntimeException('Lock version mismatch'); } @@ -121,8 +124,6 @@ public function lock( continue; } - throw new RuntimeException(message: 'Redis Error: ' . $errorMessage); - } catch (RedisException $e) { throw new RuntimeException(message: 'Redis Error: ' . $e->getMessage(), previous: $e); } } while (true); @@ -158,11 +159,7 @@ public function unlock(ResourceInterface $resource, #[\SensitiveParameter] strin LUA; try { - /** @var bool|int */ - $result = $this->redisClient->eval($luaScript, [$resource->getNamespace(), $token], 1); - if ($result === false) { - throw new RuntimeException('Redis Error: ' . ($this->redisClient->getLastError() ?? 'Unknown error')); - } + $this->redisClient->eval($luaScript, [self::RedisKeyPrefix . $resource->getNamespace()], [$token]); } catch (RedisException $e) { throw new RuntimeException(message: 'Redis Error: ' . $e->getMessage(), previous: $e); } @@ -175,14 +172,9 @@ public function unlock(ResourceInterface $resource, #[\SensitiveParameter] strin public function isLocked(ResourceInterface $resource): bool { try { - /** @var bool|int */ - $exists = $this->redisClient->exists($resource->getNamespace()); + return $this->redisClient->exists(self::RedisKeyPrefix . $resource->getNamespace()); } catch (RedisException $e) { throw new RuntimeException(message: 'Redis Error: ' . $e->getMessage(), previous: $e); } - if ($exists === false) { - throw new RuntimeException('Redis Error: ' . ($this->redisClient->getLastError() ?? 'Unknown error')); - } - return $exists === 1; } } diff --git a/tests/Integration/AmpPhp/AmPhpRedisSemaphoreTest.php b/tests/Integration/AmpPhp/AmPhpRedisSemaphoreTest.php new file mode 100644 index 0000000..d73fe9c --- /dev/null +++ b/tests/Integration/AmpPhp/AmPhpRedisSemaphoreTest.php @@ -0,0 +1,77 @@ +redis = createRedisClient('tcp://redis:6379'); + } + + protected function tearDown(): void + { + parent::tearDown(); + $this->redis->flushAll(); + } + + protected function advanceTime(int $nanoseconds): void + { + usleep((int) $nanoseconds / 1000); + parent::advanceTime($nanoseconds); + } + + protected function createSemaphore(TimeProvider $timeProvider, $maxConcurrentLocks = 1): SemaphoreInterface + { + $rediClient = new AmPhpRedisClient($this->redis); + return new RedisSemaphore(redisClient: $rediClient, maxConcurrentLocks: $maxConcurrentLocks); + } + + public function testLockingOccupiedKey(): void + { + $semaphore = $this->createSemaphore(timeProvider: $this->timeProvider); + + $this->redis->set('test-lock-key', 'occupied'); + + $resource = new Resource(namespace: 'test-lock-key'); + $semaphore->lock( + resource: $resource, + token: 'test-lock-token', // @mago-ignore lint:no-literal-password + lockTTLNs: 5_000_000_000, + suspension: static function (): void { + self::fail('Lock should not have been acquired, suspension should not be called'); + }, + ); + + self::assertTrue($semaphore->isLocked(resource: $resource), 'Resource should be locked'); + + $this->redis->delete('test-lock-key'); // Deleting same key does not affect lock + + self::assertTrue($semaphore->isLocked(resource: $resource), 'Resource should be locked'); + + $semaphore->unlock(resource: $resource, token: 'test-lock-token'); // @mago-ignore lint:no-literal-password + + self::assertFalse($semaphore->isLocked(resource: $resource), 'Resource should be unlocked'); + } +} diff --git a/tests/Integration/AmpPhp/LocksmithTest.php b/tests/Integration/AmpPhp/LocksmithTest.php new file mode 100644 index 0000000..92e731f --- /dev/null +++ b/tests/Integration/AmpPhp/LocksmithTest.php @@ -0,0 +1,78 @@ +semaphore = new RedisSemaphore( + redisClient: new \MiMatus\Locksmith\Semaphore\Redis\AmPhpRedisClient(redis: createRedisClient( + 'tcp://redis:6379', + )), + ); + + $this->timeProvider = new TimeProvider(); + } + + public function testConcurrentTask(): void + { + $sharedCounter = 0; + + $task = static function (Closure $suspension) use (&$sharedCounter) { + $suspension(); // Suspend! This allows the other fiber to run. + $sharedCounter += 1; + }; + + $task2 = static function (Closure $suspension) use (&$sharedCounter) { + self::assertSame(0, $sharedCounter, 'Task 2 should not see the incremented counter value'); + $suspension(); // Suspend! This allows the other fiber to run. + $sharedCounter += 1; + }; + + $locksmith = new Locksmith(semaphore: $this->semaphore, timeProvider: $this->timeProvider); + + $locked = $locksmith->locked( + resource: new Resource(namespace: 'test-lock key'), + lockTTLNs: 5_000_000_000, + maxLockWaitNs: 1_000_000_000, + minSuspensionDelayNs: 10_000, + ); + $locked2 = $locksmith->locked( + resource: new Resource(namespace: 'test-lock key2'), + lockTTLNs: 5_000_000_000, + maxLockWaitNs: 1_000_000_000, + minSuspensionDelayNs: 10_000, + ); + + $future1 = async(static fn() => $locked($task)); + $future2 = async(static fn() => $locked2($task2)); + + await([$future2, $future1]); + + self::assertSame(2, $sharedCounter); + } +} diff --git a/tests/Integration/PhpRedis/PhpRedisSemaphoreTest.php b/tests/Integration/PhpRedis/PhpRedisSemaphoreTest.php new file mode 100644 index 0000000..a15c749 --- /dev/null +++ b/tests/Integration/PhpRedis/PhpRedisSemaphoreTest.php @@ -0,0 +1,77 @@ +redis = new Redis(); + $this->redis->connect('redis'); + } + + protected function tearDown(): void + { + parent::tearDown(); + $this->redis->flushAll(); + $this->redis->close(); + } + + protected function advanceTime(int $nanoseconds): void + { + usleep((int) $nanoseconds / 1000); + parent::advanceTime($nanoseconds); + } + + protected function createSemaphore(TimeProvider $timeProvider, $maxConcurrentLocks = 1): SemaphoreInterface + { + $rediClient = new PhpRedisClient($this->redis); + return new RedisSemaphore(redisClient: $rediClient, maxConcurrentLocks: $maxConcurrentLocks); + } + + public function testLockingOccupiedKey(): void + { + $semaphore = $this->createSemaphore(timeProvider: $this->timeProvider); + + $this->redis->set('test-lock-key', 'occupied'); + + $resource = new Resource(namespace: 'test-lock-key'); + $semaphore->lock( + resource: $resource, + token: 'test-lock-token', // @mago-ignore lint:no-literal-password + lockTTLNs: 5_000_000_000, + suspension: static function (): void { + self::fail('Lock should not have been acquired, suspension should not be called'); + }, + ); + + self::assertTrue($semaphore->isLocked(resource: $resource), 'Resource should be locked'); + + $this->redis->del('test-lock-key'); // Deleting same key does not affect lock + + self::assertTrue($semaphore->isLocked(resource: $resource), 'Resource should be locked'); + + $semaphore->unlock(resource: $resource, token: 'test-lock-token'); // @mago-ignore lint:no-literal-password + + self::assertFalse($semaphore->isLocked(resource: $resource), 'Resource should be unlocked'); + } +} diff --git a/tests/Integration/Predis/PredisSemaphoreTest.php b/tests/Integration/Predis/PredisSemaphoreTest.php new file mode 100644 index 0000000..8c73ab0 --- /dev/null +++ b/tests/Integration/Predis/PredisSemaphoreTest.php @@ -0,0 +1,76 @@ +redis = new Client('tcp://redis:6379'); + } + + protected function tearDown(): void + { + parent::tearDown(); + $this->redis->flushAll(); + $this->redis->disconnect(); + } + + protected function advanceTime(int $nanoseconds): void + { + usleep((int) $nanoseconds / 1000); + parent::advanceTime($nanoseconds); + } + + protected function createSemaphore(TimeProvider $timeProvider, $maxConcurrentLocks = 1): SemaphoreInterface + { + $rediClient = new PredisRedisClient($this->redis); + return new RedisSemaphore(redisClient: $rediClient, maxConcurrentLocks: $maxConcurrentLocks); + } + + public function testLockingOccupiedKey(): void + { + $semaphore = $this->createSemaphore(timeProvider: $this->timeProvider); + + $this->redis->set('test-lock-key', 'occupied'); + + $resource = new Resource(namespace: 'test-lock-key'); + $semaphore->lock( + resource: $resource, + token: 'test-lock-token', // @mago-ignore lint:no-literal-password + lockTTLNs: 5_000_000_000, + suspension: static function (): void { + self::fail('Lock should not have been acquired, suspension should not be called'); + }, + ); + + self::assertTrue($semaphore->isLocked(resource: $resource), 'Resource should be locked'); + + $this->redis->del('test-lock-key'); // Deleting same key does not affect lock + + self::assertTrue($semaphore->isLocked(resource: $resource), 'Resource should be locked'); + + $semaphore->unlock(resource: $resource, token: 'test-lock-token'); // @mago-ignore lint:no-literal-password + + self::assertFalse($semaphore->isLocked(resource: $resource), 'Resource should be unlocked'); + } +} diff --git a/tests/Unit/SemaphoreTestCase.php b/tests/SemaphoreTestCase.php similarity index 80% rename from tests/Unit/SemaphoreTestCase.php rename to tests/SemaphoreTestCase.php index 315516a..af94e3e 100644 --- a/tests/Unit/SemaphoreTestCase.php +++ b/tests/SemaphoreTestCase.php @@ -2,10 +2,9 @@ declare(strict_types=1); -namespace MiMatus\Locksmith\Tests\Unit; +namespace MiMatus\Locksmith\Tests; use Exception; -use Fiber; use MiMatus\Locksmith\Resource; use MiMatus\Locksmith\Semaphore\TimeProvider; use MiMatus\Locksmith\SemaphoreInterface; @@ -17,9 +16,9 @@ abstract class SemaphoreTestCase extends TestCase { - private TimeProvider&Stub $timeProvider; + protected TimeProvider&Stub $timeProvider; - private int $currentTime = 0; + protected int $currentTime = 0; /** * @throws Exception @@ -106,19 +105,20 @@ public function testLockingAlreadyLockedKey(): void self::fail('Suspension should not be called when lock is available'); }); - $called = false; - $fiber = new Fiber(static function () use ($semaphore, $resource, &$called): void { - $semaphore->lock($resource, 'test-lock-token-2', 5_000_000_000, static function () use (&$called): void { - $called = true; - Fiber::suspend(); - }); - }); - - $fiber->start(); - - self::assertTrue($called); - self::assertTrue($fiber->isSuspended()); - self::assertFalse($fiber->isTerminated()); + $isSuspended = static function () use ($resource, $semaphore): bool { + try { + $semaphore->lock($resource, 'test-lock-token-2', 5_000_000_000, static function () use ( + &$called, + ): void { + throw new RuntimeException('SUSPENSION_CALLED'); + }); + } catch (Throwable $e) { + return $e->getMessage() === 'SUSPENSION_CALLED' ? true : throw $e; + } + return false; + }; + + self::assertTrue($isSuspended()); self::assertTrue($semaphore->isLocked($resource), 'Lock should be held'); @@ -126,7 +126,7 @@ public function testLockingAlreadyLockedKey(): void self::assertFalse($semaphore->isLocked($resource), 'Lock should be released after unlock'); - $fiber->resume(); + self::assertFalse($isSuspended()); self::assertTrue($semaphore->isLocked($resource), 'Lock should be held'); $semaphore->unlock($resource, 'test-lock-token-2'); @@ -148,19 +148,20 @@ public function testLockingHigherVersion(): void self::fail('Suspension should not be called when lock is available'); }); - $called = false; - $fiber = new Fiber(static function () use ($semaphore, $resource2, &$called): void { - $semaphore->lock($resource2, 'test-lock-token-2', 5_000_000_000, static function () use (&$called): void { - $called = true; - Fiber::suspend(); - }); - }); - - $fiber->start(); - - self::assertTrue($called); - self::assertTrue($fiber->isSuspended()); - self::assertFalse($fiber->isTerminated()); + $isSuspended = static function () use ($resource2, $semaphore): bool { + try { + $semaphore->lock($resource2, 'test-lock-token-2', 5_000_000_000, static function () use ( + &$called, + ): void { + throw new RuntimeException('SUSPENSION_CALLED'); + }); + } catch (Throwable $e) { + return $e->getMessage() === 'SUSPENSION_CALLED' ? true : throw $e; + } + return false; + }; + + self::assertTrue($isSuspended()); self::assertTrue($semaphore->isLocked($resource1), 'Lock should be held'); @@ -168,7 +169,7 @@ public function testLockingHigherVersion(): void self::assertFalse($semaphore->isLocked($resource1), 'Lock should be released after unlock'); - $fiber->resume(); + self::assertFalse($isSuspended()); self::assertTrue($semaphore->isLocked($resource2), 'Lock should be held'); $semaphore->unlock($resource2, 'test-lock-token-2'); @@ -193,19 +194,20 @@ public function testSemaphoreWithCapacity(): void self::fail('Suspension should not be called when lock is available'); }); - $called = false; - $fiber = new Fiber(static function () use ($semaphore, $resource, &$called): void { - $semaphore->lock($resource, 'test-lock-token-3', 5_000_000_000, static function () use (&$called): void { - $called = true; - Fiber::suspend(); - }); - }); - - $fiber->start(); - - self::assertTrue($called); - self::assertTrue($fiber->isSuspended()); - self::assertFalse($fiber->isTerminated()); + $isSuspended = static function () use ($resource, $semaphore): bool { + try { + $semaphore->lock($resource, 'test-lock-token-3', 5_000_000_000, static function () use ( + &$called, + ): void { + throw new RuntimeException('SUSPENSION_CALLED'); + }); + } catch (Throwable $e) { + return $e->getMessage() === 'SUSPENSION_CALLED' ? true : throw $e; + } + return false; + }; + + self::assertTrue($isSuspended()); self::assertTrue($semaphore->isLocked($resource), 'Lock should be held'); diff --git a/tests/Unit/DistributedSemaphoreTest.php b/tests/Unit/DistributedSemaphoreTest.php index fc738cf..a50b0b9 100644 --- a/tests/Unit/DistributedSemaphoreTest.php +++ b/tests/Unit/DistributedSemaphoreTest.php @@ -78,7 +78,7 @@ public function testUnableToAcquireLockQuorumUnderTTL(): void $distributedSemaphore->lock( resource: new Resource(namespace: 'test-resource'), - token: 'test-token', + token: 'test-token', // @mago-ignore lint:no-literal-password lockTTLNs: 1_000_000_000, // 1 second suspension: static function (): void {}, ); @@ -145,7 +145,7 @@ public function testAcquiredLockWithoutErrors(): void $distributedSemaphore->lock( resource: new Resource(namespace: 'test-resource'), - token: 'test-token', + token: 'test-token', // @mago-ignore lint:no-literal-password lockTTLNs: 1_000_000_000, // 1 second suspension: static function (): void {}, ); @@ -198,7 +198,7 @@ public function testAcquiredLockWithErrors(): void $distributedSemaphore->lock( resource: new Resource(namespace: 'test-resource'), - token: 'test-token', + token: 'test-token', // @mago-ignore lint:no-literal-password lockTTLNs: 1_000_000_000, // 1 second suspension: static function (): void {}, ); @@ -240,7 +240,7 @@ public function testUnlockWithErrors(): void } }); - $distributedSemaphore->unlock(resource: new Resource(namespace: 'test-resource'), token: 'test-token'); + $distributedSemaphore->unlock(resource: new Resource(namespace: 'test-resource'), token: 'test-token'); // @mago-ignore lint:no-literal-password self::assertEquals(3, $unlocksPerformed, 'Exactly three semaphores should have been unlocked'); } @@ -281,7 +281,7 @@ public function testUnableToUnlock(): void $this->expectExceptionObject(new GroupedException('Failed to release lock quorum', [])); - $distributedSemaphore->unlock(resource: new Resource(namespace: 'test-resource'), token: 'test-token'); + $distributedSemaphore->unlock(resource: new Resource(namespace: 'test-resource'), token: 'test-token'); // @mago-ignore lint:no-literal-password } public function testIsLockedOnQuorum(): void diff --git a/tests/Unit/InMemorySemaphoreTest.php b/tests/Unit/InMemorySemaphoreTest.php index 9cb3573..6d9dd92 100644 --- a/tests/Unit/InMemorySemaphoreTest.php +++ b/tests/Unit/InMemorySemaphoreTest.php @@ -7,6 +7,7 @@ use MiMatus\Locksmith\Semaphore\InMemorySemaphore; use MiMatus\Locksmith\Semaphore\TimeProvider; use MiMatus\Locksmith\SemaphoreInterface; +use MiMatus\Locksmith\Tests\SemaphoreTestCase; class InMemorySemaphoreTest extends SemaphoreTestCase { diff --git a/tests/Unit/LocksmithTest.php b/tests/Unit/LocksmithTest.php index f35f57c..15cfc86 100644 --- a/tests/Unit/LocksmithTest.php +++ b/tests/Unit/LocksmithTest.php @@ -52,7 +52,6 @@ public function testUnableToAcquireLockTimeout(): void int $lockTTLNs, Closure $suspension, ) use (&$currentTime): void { - $suspension(); $currentTime += 500_000_001; $suspension(); }); diff --git a/tests/Unit/RedisSemaphoreTest.php b/tests/Unit/RedisSemaphoreTest.php deleted file mode 100644 index 54b604b..0000000 --- a/tests/Unit/RedisSemaphoreTest.php +++ /dev/null @@ -1,46 +0,0 @@ -redis = new Redis(); - $this->redis->connect('redis'); - } - - protected function tearDown(): void - { - parent::tearDown(); - $this->redis->flushAll(); - $this->redis->close(); - } - - protected function advanceTime(int $nanoseconds): void - { - usleep((int) $nanoseconds / 1000); - parent::advanceTime($nanoseconds); - } - - protected function createSemaphore(TimeProvider $timeProvider, $maxConcurrentLocks = 1): SemaphoreInterface - { - return new RedisSemaphore(redisClient: $this->redis, maxConcurrentLocks: $maxConcurrentLocks); - } -}