> futures = new ArrayList<>(redisDrivers.size());
for (RedisDriver driver : redisDrivers) {
- try {
- // Use setex to initialize with a long expiration (10x lock timeout)
- driver.setex(latchKey, countValue, config.getDefaultLockTimeoutMs() * 10);
- successfulNodes++;
- } catch (Exception e) {
- logger.warn("Failed to initialize latch on {}: {}", driver.getIdentifier(), e.getMessage());
- }
+ futures.add(CompletableFuture.supplyAsync(() -> {
+ try {
+ driver.setex(latchKey, countValue, expirationMs);
+ if (successCount.incrementAndGet() >= quorum) {
+ quorumLatch.countDown(); // Signal quorum reached
+ }
+ return true;
+ } catch (Exception e) {
+ logger.warn("Failed to initialize latch on {}: {}", driver.getIdentifier(), e.getMessage());
+ return false;
+ }
+ }));
}
- if (successfulNodes < config.getQuorum()) {
- logger.warn("Failed to initialize latch {} on quorum of nodes (only {} of {} succeeded)", latchKey,
- successfulNodes, redisDrivers.size());
- } else {
- logger.debug("Successfully initialized latch {} with count {} on {} nodes", latchKey, count,
- successfulNodes);
+ // Wait for quorum (not all nodes)
+ try {
+ quorumLatch.await();
+ logger.debug("Successfully initialized latch {} with count {} on quorum", latchKey, count);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("Interrupted while initializing latch {}", latchKey);
}
}
@@ -309,29 +345,14 @@ public void onError(Throwable error) {
}
}
- /**
- * Publishes a notification that the latch has reached zero.
- */
- private void publishZeroNotification() {
- for (RedisDriver driver : redisDrivers) {
- try {
- long subscribers = driver.publish(channelKey, "zero");
- logger.debug("Published zero notification for latch {} to {} subscribers on {}", latchKey, subscribers,
- driver.getIdentifier());
- } catch (Exception e) {
- logger.debug("Failed to publish zero notification on {}: {}", driver.getIdentifier(), e.getMessage());
- }
- }
- }
-
/**
* Resets the latch to its initial count.
- *
+ *
*
* Warning: This is not part of the standard CountDownLatch API and should be used with caution. It's
* provided for scenarios where you need to reuse a latch.
*
- *
+ *
*
* This operation is not atomic and may lead to race conditions if called while other threads are waiting or
* counting down.
@@ -340,14 +361,18 @@ private void publishZeroNotification() {
public void reset() {
logger.debug("Resetting latch {} to initial count {}", latchKey, initialCount);
- // Delete the existing latch using DEL
+ // Delete the existing latch using parallel DEL
+ List> futures = new ArrayList<>(redisDrivers.size());
for (RedisDriver driver : redisDrivers) {
- try {
- driver.del(latchKey);
- } catch (Exception e) {
- logger.warn("Failed to delete latch on {}: {}", driver.getIdentifier(), e.getMessage());
- }
+ futures.add(CompletableFuture.runAsync(() -> {
+ try {
+ driver.del(latchKey);
+ } catch (Exception e) {
+ logger.warn("Failed to delete latch on {}: {}", driver.getIdentifier(), e.getMessage());
+ }
+ }));
}
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// Reset the local latch
localLatch = new CountDownLatch(1);
diff --git a/src/main/java/org/codarama/redlock4j/driver/JedisRedisDriver.java b/src/main/java/org/codarama/redlock4j/driver/JedisRedisDriver.java
index 28ca816..d77501c 100644
--- a/src/main/java/org/codarama/redlock4j/driver/JedisRedisDriver.java
+++ b/src/main/java/org/codarama/redlock4j/driver/JedisRedisDriver.java
@@ -39,6 +39,9 @@ public class JedisRedisDriver implements RedisDriver {
private static final String SET_IF_VALUE_MATCHES_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then "
+ " return redis.call('set', KEYS[1], ARGV[2], 'PX', ARGV[3]) " + "else " + " return nil " + "end";
+ private static final String DECR_AND_PUBLISH_IF_ZERO_SCRIPT = "local v = redis.call('decr', KEYS[1]); "
+ + "if v <= 0 then redis.call('publish', KEYS[2], ARGV[1]) end; " + "return v";
+
/**
* Strategy for CAS/CAD operations.
*/
@@ -291,6 +294,17 @@ public long decr(String key) throws RedisDriverException {
}
}
+ @Override
+ public long decrAndPublishIfZero(String key, String channel, String message) throws RedisDriverException {
+ try (Jedis jedis = jedisPool.getResource()) {
+ Object result = jedis.eval(DECR_AND_PUBLISH_IF_ZERO_SCRIPT, java.util.Arrays.asList(key, channel),
+ Collections.singletonList(message));
+ return result != null ? ((Number) result).longValue() : 0;
+ } catch (JedisException e) {
+ throw new RedisDriverException("Failed to execute DECR_AND_PUBLISH script on " + identifier, e);
+ }
+ }
+
@Override
public String get(String key) throws RedisDriverException {
try (Jedis jedis = jedisPool.getResource()) {
diff --git a/src/main/java/org/codarama/redlock4j/driver/LettuceRedisDriver.java b/src/main/java/org/codarama/redlock4j/driver/LettuceRedisDriver.java
index c1cb6ab..466d2c9 100644
--- a/src/main/java/org/codarama/redlock4j/driver/LettuceRedisDriver.java
+++ b/src/main/java/org/codarama/redlock4j/driver/LettuceRedisDriver.java
@@ -41,6 +41,9 @@ public class LettuceRedisDriver implements RedisDriver {
private static final String SET_IF_VALUE_MATCHES_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then "
+ " return redis.call('set', KEYS[1], ARGV[2], 'PX', ARGV[3]) " + "else " + " return nil " + "end";
+ private static final String DECR_AND_PUBLISH_IF_ZERO_SCRIPT = "local v = redis.call('decr', KEYS[1]); "
+ + "if v <= 0 then redis.call('publish', KEYS[2], ARGV[1]) end; " + "return v";
+
/**
* Strategy for CAS/CAD operations.
*/
@@ -315,6 +318,17 @@ public long decr(String key) throws RedisDriverException {
}
}
+ @Override
+ public long decrAndPublishIfZero(String key, String channel, String message) throws RedisDriverException {
+ try {
+ Object result = commands.eval(DECR_AND_PUBLISH_IF_ZERO_SCRIPT, io.lettuce.core.ScriptOutputType.INTEGER,
+ new String[]{key, channel}, message);
+ return result != null ? ((Number) result).longValue() : 0;
+ } catch (Exception e) {
+ throw new RedisDriverException("Failed to execute DECR_AND_PUBLISH script on " + identifier, e);
+ }
+ }
+
@Override
public String get(String key) throws RedisDriverException {
try {
diff --git a/src/main/java/org/codarama/redlock4j/driver/RedisDriver.java b/src/main/java/org/codarama/redlock4j/driver/RedisDriver.java
index cc4f50d..426af41 100644
--- a/src/main/java/org/codarama/redlock4j/driver/RedisDriver.java
+++ b/src/main/java/org/codarama/redlock4j/driver/RedisDriver.java
@@ -205,6 +205,22 @@ boolean setIfValueMatches(String key, String newValue, String expectedCurrentVal
*/
long decr(String key) throws RedisDriverException;
+ /**
+ * Atomically decrements the value of a key and publishes a message to a channel if the new value is zero or less.
+ * This combines DECR and conditional PUBLISH into a single atomic operation.
+ *
+ * @param key
+ * the key to decrement
+ * @param channel
+ * the channel to publish to if count reaches zero
+ * @param message
+ * the message to publish
+ * @return the value after decrementing
+ * @throws RedisDriverException
+ * if there's an error communicating with Redis
+ */
+ long decrAndPublishIfZero(String key, String channel, String message) throws RedisDriverException;
+
/**
* Gets the value of a key.
*
diff --git a/src/test/java/org/codarama/redlock4j/FairLockTest.java b/src/test/java/org/codarama/redlock4j/FairLockTest.java
new file mode 100644
index 0000000..2f30274
--- /dev/null
+++ b/src/test/java/org/codarama/redlock4j/FairLockTest.java
@@ -0,0 +1,147 @@
+/*
+ * SPDX-License-Identifier: MIT
+ * Copyright (c) 2025 Codarama
+ */
+package org.codarama.redlock4j;
+
+import org.codarama.redlock4j.configuration.RedlockConfiguration;
+import org.codarama.redlock4j.driver.RedisDriver;
+import org.codarama.redlock4j.driver.RedisDriverException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Unit tests for FairLock using Mockito mocks.
+ */
+@ExtendWith(MockitoExtension.class)
+public class FairLockTest {
+
+ @Mock
+ private RedisDriver mockDriver1;
+
+ @Mock
+ private RedisDriver mockDriver2;
+
+ @Mock
+ private RedisDriver mockDriver3;
+
+ private RedlockConfiguration testConfig;
+ private List drivers;
+
+ @BeforeEach
+ void setUp() {
+ testConfig = RedlockConfiguration.builder().addRedisNode("localhost", 6379).addRedisNode("localhost", 6380)
+ .addRedisNode("localhost", 6381).defaultLockTimeout(Duration.ofSeconds(30))
+ .retryDelay(Duration.ofMillis(10)).maxRetryAttempts(3).lockAcquisitionTimeout(Duration.ofSeconds(10))
+ .build();
+
+ drivers = Arrays.asList(mockDriver1, mockDriver2, mockDriver3);
+
+ lenient().when(mockDriver1.getIdentifier()).thenReturn("redis://localhost:6379");
+ lenient().when(mockDriver2.getIdentifier()).thenReturn("redis://localhost:6380");
+ lenient().when(mockDriver3.getIdentifier()).thenReturn("redis://localhost:6381");
+ }
+
+ // ========== Basic Acquisition ==========
+
+ @Test
+ void shouldAcquireLockWhenAtFrontOfQueueAndQuorumSucceeds() throws RedisDriverException, InterruptedException {
+ setupSuccessfulAcquisition();
+
+ FairLock lock = new FairLock("test-fair", drivers, testConfig);
+ boolean acquired = lock.tryLock(1, TimeUnit.SECONDS);
+
+ assertTrue(acquired);
+ assertTrue(lock.isHeldByCurrentThread());
+ assertTrue(lock.getHoldCount() > 0);
+ }
+
+ @Test
+ void shouldFailWhenNotAtFrontOfQueue() throws RedisDriverException, InterruptedException {
+ // Mock: add to queue succeeds
+ when(mockDriver1.zAdd(anyString(), anyDouble(), anyString())).thenReturn(true);
+ when(mockDriver2.zAdd(anyString(), anyDouble(), anyString())).thenReturn(true);
+ when(mockDriver3.zAdd(anyString(), anyDouble(), anyString())).thenReturn(true);
+
+ // Mock: someone else is at front
+ when(mockDriver1.zRange(anyString(), eq(0L), eq(0L))).thenReturn(Collections.singletonList("other-token"));
+ when(mockDriver2.zRange(anyString(), eq(0L), eq(0L))).thenReturn(Collections.singletonList("other-token"));
+ when(mockDriver3.zRange(anyString(), eq(0L), eq(0L))).thenReturn(Collections.singletonList("other-token"));
+
+ FairLock lock = new FairLock("test-fair", drivers, testConfig);
+
+ boolean acquired = lock.tryLock(100, TimeUnit.MILLISECONDS);
+
+ assertFalse(acquired);
+ assertFalse(lock.isHeldByCurrentThread());
+ }
+
+ // ========== Reentrancy ==========
+
+ @Test
+ void shouldSupportReentrantAcquisition() throws RedisDriverException, InterruptedException {
+ setupSuccessfulAcquisition();
+
+ FairLock lock = new FairLock("test-reentrant", drivers, testConfig);
+
+ assertTrue(lock.tryLock(1, TimeUnit.SECONDS));
+ assertEquals(1, lock.getHoldCount());
+
+ // Reentrant acquisition should succeed without Redis calls
+ assertTrue(lock.tryLock(1, TimeUnit.SECONDS));
+ assertEquals(2, lock.getHoldCount());
+
+ lock.unlock();
+ assertEquals(1, lock.getHoldCount());
+ assertTrue(lock.isHeldByCurrentThread());
+
+ lock.unlock();
+ assertEquals(0, lock.getHoldCount());
+ assertFalse(lock.isHeldByCurrentThread());
+ }
+
+ // ========== Utility Methods ==========
+
+ @Test
+ void shouldReportZeroValidityTimeWhenNotHeld() {
+ FairLock lock = new FairLock("test-validity", drivers, testConfig);
+ assertEquals(0, lock.getRemainingValidityTime());
+ }
+
+ @Test
+ void shouldThrowOnNewCondition() {
+ FairLock lock = new FairLock("test-condition", drivers, testConfig);
+ assertThrows(UnsupportedOperationException.class, lock::newCondition);
+ }
+
+ private void setupSuccessfulAcquisition() throws RedisDriverException {
+ when(mockDriver1.zAdd(anyString(), anyDouble(), anyString())).thenAnswer(inv -> {
+ String token = inv.getArgument(2);
+ lenient().when(mockDriver1.zRange(anyString(), eq(0L), eq(0L)))
+ .thenReturn(Collections.singletonList(token));
+ lenient().when(mockDriver2.zRange(anyString(), eq(0L), eq(0L)))
+ .thenReturn(Collections.singletonList(token));
+ lenient().when(mockDriver3.zRange(anyString(), eq(0L), eq(0L)))
+ .thenReturn(Collections.singletonList(token));
+ return true;
+ });
+ when(mockDriver2.zAdd(anyString(), anyDouble(), anyString())).thenReturn(true);
+ when(mockDriver3.zAdd(anyString(), anyDouble(), anyString())).thenReturn(true);
+ when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ }
+}
diff --git a/src/test/java/org/codarama/redlock4j/LockResultTest.java b/src/test/java/org/codarama/redlock4j/LockResultTest.java
new file mode 100644
index 0000000..9ecdbd0
--- /dev/null
+++ b/src/test/java/org/codarama/redlock4j/LockResultTest.java
@@ -0,0 +1,93 @@
+/*
+ * SPDX-License-Identifier: MIT
+ * Copyright (c) 2025 Codarama
+ */
+package org.codarama.redlock4j;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * Unit tests for LockResult value object.
+ */
+public class LockResultTest {
+
+ @Test
+ void shouldStoreAcquiredState() {
+ LockResult result = new LockResult(true, 5000L, "lock-value-123", 2, 3);
+
+ assertTrue(result.isAcquired());
+ }
+
+ @Test
+ void shouldStoreNotAcquiredState() {
+ LockResult result = new LockResult(false, 0L, "lock-value-123", 1, 3);
+
+ assertFalse(result.isAcquired());
+ }
+
+ @Test
+ void shouldStoreValidityTime() {
+ LockResult result = new LockResult(true, 29500L, "lock-value-123", 3, 3);
+
+ assertEquals(29500L, result.getValidityTimeMs());
+ }
+
+ @Test
+ void shouldStoreLockValue() {
+ LockResult result = new LockResult(true, 5000L, "unique-lock-value", 2, 3);
+
+ assertEquals("unique-lock-value", result.getLockValue());
+ }
+
+ @Test
+ void shouldStoreNodeCounts() {
+ LockResult result = new LockResult(true, 5000L, "lock-value", 2, 3);
+
+ assertEquals(2, result.getSuccessfulNodes());
+ assertEquals(3, result.getTotalNodes());
+ }
+
+ @Test
+ void shouldHandleAllNodesSuccess() {
+ LockResult result = new LockResult(true, 5000L, "lock-value", 5, 5);
+
+ assertEquals(5, result.getSuccessfulNodes());
+ assertEquals(5, result.getTotalNodes());
+ }
+
+ @Test
+ void shouldHandleZeroValidityTime() {
+ LockResult result = new LockResult(false, 0L, "lock-value", 1, 3);
+
+ assertEquals(0L, result.getValidityTimeMs());
+ }
+
+ @Test
+ void shouldHandleNullLockValue() {
+ LockResult result = new LockResult(false, 0L, null, 0, 3);
+
+ assertNull(result.getLockValue());
+ }
+
+ @Test
+ void toStringShouldContainAllFields() {
+ LockResult result = new LockResult(true, 5000L, "lock-value", 2, 3);
+ String str = result.toString();
+
+ assertTrue(str.contains("acquired=true"));
+ assertTrue(str.contains("validityTimeMs=5000"));
+ assertTrue(str.contains("successfulNodes=2"));
+ assertTrue(str.contains("totalNodes=3"));
+ }
+
+ @Test
+ void toStringShouldWorkForFailedAcquisition() {
+ LockResult result = new LockResult(false, 0L, "lock-value", 1, 3);
+ String str = result.toString();
+
+ assertTrue(str.contains("acquired=false"));
+ assertTrue(str.contains("validityTimeMs=0"));
+ }
+}
diff --git a/src/test/java/org/codarama/redlock4j/MultiLockTest.java b/src/test/java/org/codarama/redlock4j/MultiLockTest.java
new file mode 100644
index 0000000..49a4052
--- /dev/null
+++ b/src/test/java/org/codarama/redlock4j/MultiLockTest.java
@@ -0,0 +1,145 @@
+/*
+ * SPDX-License-Identifier: MIT
+ * Copyright (c) 2025 Codarama
+ */
+package org.codarama.redlock4j;
+
+import org.codarama.redlock4j.configuration.RedlockConfiguration;
+import org.codarama.redlock4j.driver.RedisDriver;
+import org.codarama.redlock4j.driver.RedisDriverException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Unit tests for MultiLock using Mockito mocks.
+ */
+@ExtendWith(MockitoExtension.class)
+public class MultiLockTest {
+
+ @Mock
+ private RedisDriver mockDriver1;
+
+ @Mock
+ private RedisDriver mockDriver2;
+
+ @Mock
+ private RedisDriver mockDriver3;
+
+ private RedlockConfiguration testConfig;
+ private List drivers;
+
+ @BeforeEach
+ void setUp() {
+ testConfig = RedlockConfiguration.builder().addRedisNode("localhost", 6379).addRedisNode("localhost", 6380)
+ .addRedisNode("localhost", 6381).defaultLockTimeout(Duration.ofSeconds(30))
+ .retryDelay(Duration.ofMillis(10)).maxRetryAttempts(3).lockAcquisitionTimeout(Duration.ofSeconds(10))
+ .build();
+
+ drivers = Arrays.asList(mockDriver1, mockDriver2, mockDriver3);
+
+ lenient().when(mockDriver1.getIdentifier()).thenReturn("redis://localhost:6379");
+ lenient().when(mockDriver2.getIdentifier()).thenReturn("redis://localhost:6380");
+ lenient().when(mockDriver3.getIdentifier()).thenReturn("redis://localhost:6381");
+ }
+
+ // ========== Validation ==========
+
+ @Test
+ void shouldRejectNullKeyList() {
+ assertThrows(IllegalArgumentException.class, () -> new MultiLock(null, drivers, testConfig));
+ }
+
+ @Test
+ void shouldRejectEmptyKeyList() {
+ assertThrows(IllegalArgumentException.class, () -> new MultiLock(Arrays.asList(), drivers, testConfig));
+ }
+
+ // ========== Acquisition ==========
+
+ @Test
+ void shouldAcquireAllLocksWhenQuorumSucceeds() throws RedisDriverException, InterruptedException {
+ when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+
+ MultiLock lock = new MultiLock(Arrays.asList("key1", "key2", "key3"), drivers, testConfig);
+ boolean acquired = lock.tryLock(1, TimeUnit.SECONDS);
+
+ assertTrue(acquired);
+
+ // Verify all keys were locked on all drivers
+ verify(mockDriver1, times(3)).setIfNotExists(anyString(), anyString(), eq(30000L));
+ verify(mockDriver2, times(3)).setIfNotExists(anyString(), anyString(), eq(30000L));
+ }
+
+ @Test
+ void shouldRollbackOnPartialFailure() throws RedisDriverException, InterruptedException {
+ // First key succeeds, second fails on driver1
+ when(mockDriver1.setIfNotExists(eq("key1"), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver1.setIfNotExists(eq("key2"), anyString(), anyLong())).thenReturn(false);
+ // All fail on driver2 and driver3 for simplicity
+ when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(false);
+ when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(false);
+
+ MultiLock lock = new MultiLock(Arrays.asList("key1", "key2"), drivers, testConfig);
+ boolean acquired = lock.tryLock(100, TimeUnit.MILLISECONDS);
+
+ assertFalse(acquired);
+
+ // Verify rollback was attempted for key1 on driver1
+ verify(mockDriver1, atLeastOnce()).deleteIfValueMatches(eq("key1"), anyString());
+ }
+
+ // ========== Key Ordering ==========
+
+ @Test
+ void shouldSortKeysToPreventDeadlock() throws RedisDriverException, InterruptedException {
+ when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+
+ // Pass keys in reverse order
+ MultiLock lock = new MultiLock(Arrays.asList("z", "a", "m"), drivers, testConfig);
+ lock.tryLock(1, TimeUnit.SECONDS);
+
+ // Verify keys are acquired in sorted order: a, m, z
+ org.mockito.InOrder order = inOrder(mockDriver1);
+ order.verify(mockDriver1).setIfNotExists(eq("a"), anyString(), anyLong());
+ order.verify(mockDriver1).setIfNotExists(eq("m"), anyString(), anyLong());
+ order.verify(mockDriver1).setIfNotExists(eq("z"), anyString(), anyLong());
+ }
+
+ @Test
+ void shouldDeduplicateKeys() throws RedisDriverException, InterruptedException {
+ when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+
+ // Pass duplicate keys
+ MultiLock lock = new MultiLock(Arrays.asList("key1", "key1", "key2"), drivers, testConfig);
+ lock.tryLock(1, TimeUnit.SECONDS);
+
+ // Should only lock 2 unique keys
+ verify(mockDriver1, times(2)).setIfNotExists(anyString(), anyString(), anyLong());
+ }
+
+ // ========== Utility ==========
+
+ @Test
+ void shouldThrowOnNewCondition() {
+ MultiLock lock = new MultiLock(Arrays.asList("key1"), drivers, testConfig);
+ assertThrows(UnsupportedOperationException.class, lock::newCondition);
+ }
+}
diff --git a/src/test/java/org/codarama/redlock4j/RedlockCountDownLatchTest.java b/src/test/java/org/codarama/redlock4j/RedlockCountDownLatchTest.java
new file mode 100644
index 0000000..4f73721
--- /dev/null
+++ b/src/test/java/org/codarama/redlock4j/RedlockCountDownLatchTest.java
@@ -0,0 +1,165 @@
+/*
+ * SPDX-License-Identifier: MIT
+ * Copyright (c) 2025 Codarama
+ */
+package org.codarama.redlock4j;
+
+import org.codarama.redlock4j.configuration.RedlockConfiguration;
+import org.codarama.redlock4j.driver.RedisDriver;
+import org.codarama.redlock4j.driver.RedisDriverException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Unit tests for RedlockCountDownLatch using Mockito mocks.
+ */
+@ExtendWith(MockitoExtension.class)
+public class RedlockCountDownLatchTest {
+
+ @Mock
+ private RedisDriver mockDriver1;
+
+ @Mock
+ private RedisDriver mockDriver2;
+
+ @Mock
+ private RedisDriver mockDriver3;
+
+ private RedlockConfiguration testConfig;
+ private List drivers;
+
+ @BeforeEach
+ void setUp() throws RedisDriverException {
+ testConfig = RedlockConfiguration.builder().addRedisNode("localhost", 6379).addRedisNode("localhost", 6380)
+ .addRedisNode("localhost", 6381).defaultLockTimeout(Duration.ofSeconds(30))
+ .retryDelay(Duration.ofMillis(10)).maxRetryAttempts(3).lockAcquisitionTimeout(Duration.ofSeconds(10))
+ .build();
+
+ drivers = Arrays.asList(mockDriver1, mockDriver2, mockDriver3);
+
+ lenient().when(mockDriver1.getIdentifier()).thenReturn("redis://localhost:6379");
+ lenient().when(mockDriver2.getIdentifier()).thenReturn("redis://localhost:6380");
+ lenient().when(mockDriver3.getIdentifier()).thenReturn("redis://localhost:6381");
+
+ // Default setex for initialization (void method - use doNothing)
+ lenient().doNothing().when(mockDriver1).setex(anyString(), anyString(), anyLong());
+ lenient().doNothing().when(mockDriver2).setex(anyString(), anyString(), anyLong());
+ lenient().doNothing().when(mockDriver3).setex(anyString(), anyString(), anyLong());
+ }
+
+ // ========== Validation ==========
+
+ @Test
+ void shouldRejectNegativeCount() {
+ assertThrows(IllegalArgumentException.class, () -> new RedlockCountDownLatch("test", -1, drivers, testConfig));
+ }
+
+ @Test
+ void shouldAllowZeroCount() throws RedisDriverException {
+ // Zero is allowed
+ RedlockCountDownLatch latch = new RedlockCountDownLatch("test", 0, drivers, testConfig);
+ assertNotNull(latch);
+ }
+
+ // ========== getCount ==========
+
+ @Test
+ void shouldReturnCountFromQuorum() throws RedisDriverException, InterruptedException {
+ when(mockDriver1.get(anyString())).thenReturn("5");
+ when(mockDriver2.get(anyString())).thenReturn("5");
+ when(mockDriver3.get(anyString())).thenReturn("5");
+
+ RedlockCountDownLatch latch = new RedlockCountDownLatch("test", 5, drivers, testConfig);
+ long count = latch.getCount();
+
+ assertEquals(5, count);
+ }
+
+ // ========== countDown ==========
+
+ @Test
+ void shouldCallDecrAndPublishOnAllNodes() throws RedisDriverException, InterruptedException {
+ when(mockDriver1.decrAndPublishIfZero(anyString(), anyString(), anyString())).thenReturn(4L);
+ when(mockDriver2.decrAndPublishIfZero(anyString(), anyString(), anyString())).thenReturn(4L);
+ when(mockDriver3.decrAndPublishIfZero(anyString(), anyString(), anyString())).thenReturn(4L);
+
+ RedlockCountDownLatch latch = new RedlockCountDownLatch("test", 5, drivers, testConfig);
+ latch.countDown();
+
+ // Give async operations time to complete
+ Thread.sleep(100);
+
+ verify(mockDriver1, atLeastOnce()).decrAndPublishIfZero(eq("test"), eq("test:channel"), eq("zero"));
+ }
+
+ // ========== await ==========
+
+ @Test
+ void shouldReturnImmediatelyWhenCountIsZero() throws RedisDriverException, InterruptedException {
+ // Use lenient stubs since get() may or may not be called depending on implementation
+ lenient().when(mockDriver1.get(anyString())).thenReturn("0");
+ lenient().when(mockDriver2.get(anyString())).thenReturn("0");
+ lenient().when(mockDriver3.get(anyString())).thenReturn("0");
+
+ RedlockCountDownLatch latch = new RedlockCountDownLatch("test", 0, drivers, testConfig);
+
+ long start = System.currentTimeMillis();
+ boolean result = latch.await(5, TimeUnit.SECONDS);
+ long elapsed = System.currentTimeMillis() - start;
+
+ assertTrue(result);
+ assertTrue(elapsed < 1000, "Should return quickly when count is zero");
+ }
+
+ @Test
+ void shouldTimeoutWhenCountNeverReachesZero() throws RedisDriverException, InterruptedException {
+ when(mockDriver1.get(anyString())).thenReturn("5");
+ when(mockDriver2.get(anyString())).thenReturn("5");
+ when(mockDriver3.get(anyString())).thenReturn("5");
+
+ RedlockCountDownLatch latch = new RedlockCountDownLatch("test", 5, drivers, testConfig);
+
+ long start = System.currentTimeMillis();
+ boolean result = latch.await(200, TimeUnit.MILLISECONDS);
+ long elapsed = System.currentTimeMillis() - start;
+
+ assertFalse(result);
+ assertTrue(elapsed >= 150 && elapsed < 1000, "Should timeout after ~200ms");
+ }
+
+ // ========== reset ==========
+
+ @Test
+ void shouldResetLatchCount() throws RedisDriverException {
+ RedlockCountDownLatch latch = new RedlockCountDownLatch("test", 5, drivers, testConfig);
+ latch.reset();
+
+ // Verify del was called
+ verify(mockDriver1, atLeastOnce()).del(eq("test"));
+ }
+
+ // ========== hasQueuedThreads ==========
+
+ @Test
+ void shouldReturnTrueWhenCountPositive() throws RedisDriverException {
+ // Use lenient since quorum logic may not call all drivers
+ lenient().when(mockDriver1.get(anyString())).thenReturn("3");
+ lenient().when(mockDriver2.get(anyString())).thenReturn("3");
+ lenient().when(mockDriver3.get(anyString())).thenReturn("3");
+
+ RedlockCountDownLatch latch = new RedlockCountDownLatch("test", 3, drivers, testConfig);
+ assertTrue(latch.hasQueuedThreads());
+ }
+}
diff --git a/src/test/java/org/codarama/redlock4j/RedlockExceptionTest.java b/src/test/java/org/codarama/redlock4j/RedlockExceptionTest.java
new file mode 100644
index 0000000..cabaf2d
--- /dev/null
+++ b/src/test/java/org/codarama/redlock4j/RedlockExceptionTest.java
@@ -0,0 +1,80 @@
+/*
+ * SPDX-License-Identifier: MIT
+ * Copyright (c) 2025 Codarama
+ */
+package org.codarama.redlock4j;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * Unit tests for RedlockException.
+ */
+public class RedlockExceptionTest {
+
+ @Test
+ void shouldCreateExceptionWithMessage() {
+ RedlockException exception = new RedlockException("Lock acquisition failed");
+
+ assertEquals("Lock acquisition failed", exception.getMessage());
+ assertNull(exception.getCause());
+ }
+
+ @Test
+ void shouldCreateExceptionWithMessageAndCause() {
+ Throwable cause = new RuntimeException("Connection timeout");
+ RedlockException exception = new RedlockException("Lock acquisition failed", cause);
+
+ assertEquals("Lock acquisition failed", exception.getMessage());
+ assertSame(cause, exception.getCause());
+ }
+
+ @Test
+ void shouldCreateExceptionWithCauseOnly() {
+ Throwable cause = new RuntimeException("Connection refused");
+ RedlockException exception = new RedlockException(cause);
+
+ assertEquals("java.lang.RuntimeException: Connection refused", exception.getMessage());
+ assertSame(cause, exception.getCause());
+ }
+
+ @Test
+ void shouldBeRuntimeException() {
+ RedlockException exception = new RedlockException("test");
+
+ assertTrue(exception instanceof RuntimeException);
+ }
+
+ @Test
+ void shouldPreserveStackTrace() {
+ RedlockException exception = new RedlockException("test");
+
+ assertNotNull(exception.getStackTrace());
+ assertTrue(exception.getStackTrace().length > 0);
+ }
+
+ @Test
+ void shouldSupportNestedCauses() {
+ Throwable rootCause = new IllegalStateException("Redis not available");
+ Throwable intermediateCause = new RuntimeException("Connection failed", rootCause);
+ RedlockException exception = new RedlockException("Lock failed", intermediateCause);
+
+ assertSame(intermediateCause, exception.getCause());
+ assertSame(rootCause, exception.getCause().getCause());
+ }
+
+ @Test
+ void shouldHandleNullMessage() {
+ RedlockException exception = new RedlockException((String) null);
+
+ assertNull(exception.getMessage());
+ }
+
+ @Test
+ void shouldHandleEmptyMessage() {
+ RedlockException exception = new RedlockException("");
+
+ assertEquals("", exception.getMessage());
+ }
+}
diff --git a/src/test/java/org/codarama/redlock4j/RedlockReadWriteLockTest.java b/src/test/java/org/codarama/redlock4j/RedlockReadWriteLockTest.java
new file mode 100644
index 0000000..4dabda6
--- /dev/null
+++ b/src/test/java/org/codarama/redlock4j/RedlockReadWriteLockTest.java
@@ -0,0 +1,159 @@
+/*
+ * SPDX-License-Identifier: MIT
+ * Copyright (c) 2025 Codarama
+ */
+package org.codarama.redlock4j;
+
+import org.codarama.redlock4j.configuration.RedlockConfiguration;
+import org.codarama.redlock4j.driver.RedisDriver;
+import org.codarama.redlock4j.driver.RedisDriverException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Unit tests for RedlockReadWriteLock using Mockito mocks.
+ */
+@ExtendWith(MockitoExtension.class)
+public class RedlockReadWriteLockTest {
+
+ @Mock
+ private RedisDriver mockDriver1;
+
+ @Mock
+ private RedisDriver mockDriver2;
+
+ @Mock
+ private RedisDriver mockDriver3;
+
+ private RedlockConfiguration testConfig;
+ private List drivers;
+
+ @BeforeEach
+ void setUp() {
+ testConfig = RedlockConfiguration.builder().addRedisNode("localhost", 6379).addRedisNode("localhost", 6380)
+ .addRedisNode("localhost", 6381).defaultLockTimeout(Duration.ofSeconds(30))
+ .retryDelay(Duration.ofMillis(10)).maxRetryAttempts(3).lockAcquisitionTimeout(Duration.ofSeconds(10))
+ .build();
+
+ drivers = Arrays.asList(mockDriver1, mockDriver2, mockDriver3);
+
+ lenient().when(mockDriver1.getIdentifier()).thenReturn("redis://localhost:6379");
+ lenient().when(mockDriver2.getIdentifier()).thenReturn("redis://localhost:6380");
+ lenient().when(mockDriver3.getIdentifier()).thenReturn("redis://localhost:6381");
+ }
+
+ // ========== Read Lock ==========
+
+ @Test
+ void shouldAcquireReadLockWhenNoWriter() throws RedisDriverException, InterruptedException {
+ // No write lock exists
+ when(mockDriver1.get(contains(":write"))).thenReturn(null);
+ when(mockDriver2.get(contains(":write"))).thenReturn(null);
+ when(mockDriver3.get(contains(":write"))).thenReturn(null);
+
+ // Increment succeeds
+ when(mockDriver1.incr(anyString())).thenReturn(1L);
+ when(mockDriver2.incr(anyString())).thenReturn(1L);
+ when(mockDriver3.incr(anyString())).thenReturn(1L);
+
+ RedlockReadWriteLock rwLock = new RedlockReadWriteLock("test-rw", drivers, testConfig);
+ Lock readLock = rwLock.readLock();
+
+ boolean acquired = readLock.tryLock(1, TimeUnit.SECONDS);
+
+ assertTrue(acquired);
+ verify(mockDriver1, atLeastOnce()).incr(contains(":readers"));
+ }
+
+ @Test
+ void shouldFailReadLockWhenWriterHoldsLock() throws RedisDriverException, InterruptedException {
+ // Write lock exists on quorum
+ when(mockDriver1.get(contains(":write"))).thenReturn("some-lock-value");
+ when(mockDriver2.get(contains(":write"))).thenReturn("some-lock-value");
+ when(mockDriver3.get(contains(":write"))).thenReturn(null);
+
+ RedlockReadWriteLock rwLock = new RedlockReadWriteLock("test-rw", drivers, testConfig);
+ Lock readLock = rwLock.readLock();
+
+ boolean acquired = readLock.tryLock(100, TimeUnit.MILLISECONDS);
+
+ assertFalse(acquired);
+ }
+
+ // ========== Write Lock ==========
+
+ @Test
+ void shouldAcquireWriteLockWhenNoReaders() throws RedisDriverException, InterruptedException {
+ // No readers
+ when(mockDriver1.get(contains(":readers"))).thenReturn(null);
+ when(mockDriver2.get(contains(":readers"))).thenReturn(null);
+ when(mockDriver3.get(contains(":readers"))).thenReturn(null);
+
+ // Lock acquisition succeeds
+ when(mockDriver1.setIfNotExists(contains(":write"), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver2.setIfNotExists(contains(":write"), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver3.setIfNotExists(contains(":write"), anyString(), anyLong())).thenReturn(true);
+
+ RedlockReadWriteLock rwLock = new RedlockReadWriteLock("test-rw", drivers, testConfig);
+ Lock writeLock = rwLock.writeLock();
+
+ boolean acquired = writeLock.tryLock(1, TimeUnit.SECONDS);
+
+ assertTrue(acquired);
+ }
+
+ @Test
+ void shouldFailWriteLockWhenReadersExist() throws RedisDriverException, InterruptedException {
+ // Active readers on quorum
+ when(mockDriver1.get(contains(":readers"))).thenReturn("2");
+ when(mockDriver2.get(contains(":readers"))).thenReturn("2");
+ when(mockDriver3.get(contains(":readers"))).thenReturn(null);
+
+ RedlockReadWriteLock rwLock = new RedlockReadWriteLock("test-rw", drivers, testConfig);
+ Lock writeLock = rwLock.writeLock();
+
+ boolean acquired = writeLock.tryLock(100, TimeUnit.MILLISECONDS);
+
+ assertFalse(acquired);
+ }
+
+ // ========== Utility ==========
+
+ @Test
+ void shouldReturnSameLockInstances() {
+ RedlockReadWriteLock rwLock = new RedlockReadWriteLock("test-rw", drivers, testConfig);
+
+ Lock read1 = rwLock.readLock();
+ Lock read2 = rwLock.readLock();
+ Lock write1 = rwLock.writeLock();
+ Lock write2 = rwLock.writeLock();
+
+ assertSame(read1, read2);
+ assertSame(write1, write2);
+ }
+
+ @Test
+ void shouldThrowOnNewConditionForReadLock() {
+ RedlockReadWriteLock rwLock = new RedlockReadWriteLock("test-rw", drivers, testConfig);
+ assertThrows(UnsupportedOperationException.class, () -> rwLock.readLock().newCondition());
+ }
+
+ @Test
+ void shouldThrowOnNewConditionForWriteLock() {
+ RedlockReadWriteLock rwLock = new RedlockReadWriteLock("test-rw", drivers, testConfig);
+ assertThrows(UnsupportedOperationException.class, () -> rwLock.writeLock().newCondition());
+ }
+}
diff --git a/src/test/java/org/codarama/redlock4j/RedlockSemaphoreTest.java b/src/test/java/org/codarama/redlock4j/RedlockSemaphoreTest.java
new file mode 100644
index 0000000..95076fa
--- /dev/null
+++ b/src/test/java/org/codarama/redlock4j/RedlockSemaphoreTest.java
@@ -0,0 +1,161 @@
+/*
+ * SPDX-License-Identifier: MIT
+ * Copyright (c) 2025 Codarama
+ */
+package org.codarama.redlock4j;
+
+import org.codarama.redlock4j.configuration.RedlockConfiguration;
+import org.codarama.redlock4j.driver.RedisDriver;
+import org.codarama.redlock4j.driver.RedisDriverException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Unit tests for RedlockSemaphore using Mockito mocks.
+ */
+@ExtendWith(MockitoExtension.class)
+public class RedlockSemaphoreTest {
+
+ @Mock
+ private RedisDriver mockDriver1;
+
+ @Mock
+ private RedisDriver mockDriver2;
+
+ @Mock
+ private RedisDriver mockDriver3;
+
+ private RedlockConfiguration testConfig;
+ private List drivers;
+
+ @BeforeEach
+ void setUp() {
+ testConfig = RedlockConfiguration.builder().addRedisNode("localhost", 6379).addRedisNode("localhost", 6380)
+ .addRedisNode("localhost", 6381).defaultLockTimeout(Duration.ofSeconds(30))
+ .retryDelay(Duration.ofMillis(10)).maxRetryAttempts(3).lockAcquisitionTimeout(Duration.ofSeconds(10))
+ .build();
+
+ drivers = Arrays.asList(mockDriver1, mockDriver2, mockDriver3);
+
+ lenient().when(mockDriver1.getIdentifier()).thenReturn("redis://localhost:6379");
+ lenient().when(mockDriver2.getIdentifier()).thenReturn("redis://localhost:6380");
+ lenient().when(mockDriver3.getIdentifier()).thenReturn("redis://localhost:6381");
+ }
+
+ // ========== Validation ==========
+
+ @Test
+ void shouldRejectZeroPermits() {
+ assertThrows(IllegalArgumentException.class, () -> new RedlockSemaphore("test", 0, drivers, testConfig));
+ }
+
+ @Test
+ void shouldRejectNegativePermits() {
+ assertThrows(IllegalArgumentException.class, () -> new RedlockSemaphore("test", -1, drivers, testConfig));
+ }
+
+ @Test
+ void shouldRejectAcquiringMoreThanMaxPermits() {
+ RedlockSemaphore semaphore = new RedlockSemaphore("test", 3, drivers, testConfig);
+ assertThrows(IllegalArgumentException.class, () -> semaphore.tryAcquire(4, 1, TimeUnit.SECONDS));
+ }
+
+ @Test
+ void shouldRejectAcquiringZeroPermits() {
+ RedlockSemaphore semaphore = new RedlockSemaphore("test", 3, drivers, testConfig);
+ assertThrows(IllegalArgumentException.class, () -> semaphore.tryAcquire(0, 1, TimeUnit.SECONDS));
+ }
+
+ // ========== Acquisition ==========
+
+ @Test
+ void shouldAcquirePermitWhenQuorumSucceeds() throws RedisDriverException, InterruptedException {
+ // setIfNotExists succeeds on all nodes
+ when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+
+ RedlockSemaphore semaphore = new RedlockSemaphore("test", 5, drivers, testConfig);
+ boolean acquired = semaphore.tryAcquire(1, TimeUnit.SECONDS);
+
+ assertTrue(acquired);
+ verify(mockDriver1, atLeastOnce()).setIfNotExists(contains(":permit:"), anyString(), anyLong());
+ }
+
+ @Test
+ void shouldFailWhenQuorumNotReached() throws RedisDriverException, InterruptedException {
+ // Only 1 node succeeds (need quorum of 2)
+ when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(false);
+ when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(false);
+
+ RedlockSemaphore semaphore = new RedlockSemaphore("test", 5, drivers, testConfig);
+ boolean acquired = semaphore.tryAcquire(100, TimeUnit.MILLISECONDS);
+
+ assertFalse(acquired);
+ }
+
+ // ========== Release ==========
+
+ @Test
+ void shouldReleasePermitAfterAcquisition() throws RedisDriverException, InterruptedException {
+ setupSuccessfulAcquisition();
+
+ RedlockSemaphore semaphore = new RedlockSemaphore("test", 5, drivers, testConfig);
+ semaphore.tryAcquire(1, TimeUnit.SECONDS);
+ semaphore.release();
+
+ // Verify deleteIfValueMatches was called to release permit
+ verify(mockDriver1, atLeastOnce()).deleteIfValueMatches(contains(":permit:"), anyString());
+ }
+
+ @Test
+ void shouldHandleReleaseWithoutAcquisition() {
+ RedlockSemaphore semaphore = new RedlockSemaphore("test", 5, drivers, testConfig);
+ // Should not throw
+ semaphore.release();
+ }
+
+ // ========== Available Permits ==========
+
+ @Test
+ void shouldReportMaxPermitsWhenNoStateTracked() {
+ RedlockSemaphore semaphore = new RedlockSemaphore("test", 5, drivers, testConfig);
+ // Current implementation returns maxPermits
+ assertEquals(5, semaphore.availablePermits());
+ }
+
+ // ========== Multiple Permits ==========
+
+ @Test
+ void shouldAcquireMultiplePermits() throws RedisDriverException, InterruptedException {
+ when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+
+ RedlockSemaphore semaphore = new RedlockSemaphore("test", 5, drivers, testConfig);
+ boolean acquired = semaphore.tryAcquire(3, 1, TimeUnit.SECONDS);
+
+ assertTrue(acquired);
+ // 3 permits, each on 3 nodes = at least 3 setIfNotExists calls per driver
+ verify(mockDriver1, atLeast(3)).setIfNotExists(contains(":permit:"), anyString(), anyLong());
+ }
+
+ private void setupSuccessfulAcquisition() throws RedisDriverException {
+ when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ }
+}
diff --git a/src/test/java/org/codarama/redlock4j/async/RxRedlockTest.java b/src/test/java/org/codarama/redlock4j/async/RxRedlockTest.java
new file mode 100644
index 0000000..bc79264
--- /dev/null
+++ b/src/test/java/org/codarama/redlock4j/async/RxRedlockTest.java
@@ -0,0 +1,159 @@
+/*
+ * SPDX-License-Identifier: MIT
+ * Copyright (c) 2025 Codarama
+ */
+package org.codarama.redlock4j.async;
+
+import io.reactivex.rxjava3.observers.TestObserver;
+import org.codarama.redlock4j.configuration.RedlockConfiguration;
+import org.codarama.redlock4j.driver.RedisDriver;
+import org.codarama.redlock4j.driver.RedisDriverException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Unit tests for RxRedlock reactive API.
+ */
+@ExtendWith(MockitoExtension.class)
+public class RxRedlockTest {
+
+ @Mock
+ private RedisDriver mockDriver1;
+
+ @Mock
+ private RedisDriver mockDriver2;
+
+ @Mock
+ private RedisDriver mockDriver3;
+
+ private RedlockConfiguration testConfig;
+ private List drivers;
+ private ScheduledExecutorService scheduler;
+
+ @BeforeEach
+ void setUp() {
+ testConfig = RedlockConfiguration.builder().addRedisNode("localhost", 6379).addRedisNode("localhost", 6380)
+ .addRedisNode("localhost", 6381).defaultLockTimeout(Duration.ofSeconds(30))
+ .retryDelay(Duration.ofMillis(10)).maxRetryAttempts(3).lockAcquisitionTimeout(Duration.ofSeconds(5))
+ .build();
+
+ drivers = Arrays.asList(mockDriver1, mockDriver2, mockDriver3);
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ lenient().when(mockDriver1.getIdentifier()).thenReturn("redis://localhost:6379");
+ lenient().when(mockDriver2.getIdentifier()).thenReturn("redis://localhost:6380");
+ lenient().when(mockDriver3.getIdentifier()).thenReturn("redis://localhost:6381");
+ }
+
+ // ========== tryLockRx ==========
+
+ @Test
+ void tryLockRxShouldEmitTrueOnSuccess() throws RedisDriverException {
+ when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+
+ RxRedlock rxLock = new AsyncRedlockImpl("test-rx", drivers, testConfig, Executors.newCachedThreadPool(),
+ scheduler);
+
+ TestObserver observer = rxLock.tryLockRx().test();
+ observer.awaitDone(5, TimeUnit.SECONDS);
+
+ observer.assertValue(true);
+ observer.assertComplete();
+ }
+
+ @Test
+ void tryLockRxShouldEmitFalseOnFailure() throws RedisDriverException {
+ when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(false);
+ when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(false);
+ when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(false);
+
+ RxRedlock rxLock = new AsyncRedlockImpl("test-rx", drivers, testConfig, Executors.newCachedThreadPool(),
+ scheduler);
+
+ TestObserver observer = rxLock.tryLockRx().test();
+ observer.awaitDone(5, TimeUnit.SECONDS);
+
+ observer.assertValue(false);
+ observer.assertComplete();
+ }
+
+ // ========== lockStateObservable ==========
+
+ @Test
+ void lockStateObservableShouldEmitInitialState() {
+ RxRedlock rxLock = new AsyncRedlockImpl("test-rx", drivers, testConfig, Executors.newCachedThreadPool(),
+ scheduler);
+
+ TestObserver observer = rxLock.lockStateObservable().test();
+
+ observer.assertValue(RxRedlock.LockState.RELEASED);
+ }
+
+ // ========== Utility Methods ==========
+
+ @Test
+ void shouldReturnLockKey() {
+ RxRedlock rxLock = new AsyncRedlockImpl("my-lock-key", drivers, testConfig, Executors.newCachedThreadPool(),
+ scheduler);
+
+ assertEquals("my-lock-key", rxLock.getLockKey());
+ }
+
+ @Test
+ void shouldReturnZeroValidityTimeWhenNotHeld() {
+ RxRedlock rxLock = new AsyncRedlockImpl("test-rx", drivers, testConfig, Executors.newCachedThreadPool(),
+ scheduler);
+
+ assertEquals(0, rxLock.getRemainingValidityTime());
+ }
+
+ @Test
+ void shouldReturnFalseForIsHeldWhenNotAcquired() {
+ RxRedlock rxLock = new AsyncRedlockImpl("test-rx", drivers, testConfig, Executors.newCachedThreadPool(),
+ scheduler);
+
+ assertFalse(rxLock.isHeldByCurrentThread());
+ }
+
+ @Test
+ void shouldReturnZeroHoldCountWhenNotHeld() {
+ RxRedlock rxLock = new AsyncRedlockImpl("test-rx", drivers, testConfig, Executors.newCachedThreadPool(),
+ scheduler);
+
+ assertEquals(0, rxLock.getHoldCount());
+ }
+
+ // ========== tryLockRx with timeout ==========
+
+ @Test
+ void tryLockRxWithTimeoutShouldWork() throws RedisDriverException {
+ when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+ when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true);
+
+ RxRedlock rxLock = new AsyncRedlockImpl("test-rx", drivers, testConfig, Executors.newCachedThreadPool(),
+ scheduler);
+
+ TestObserver observer = rxLock.tryLockRx(Duration.ofSeconds(1)).test();
+ observer.awaitDone(5, TimeUnit.SECONDS);
+
+ observer.assertValue(true);
+ observer.assertComplete();
+ }
+}
diff --git a/src/test/java/org/codarama/redlock4j/integration/FairLockIntegrationTest.java b/src/test/java/org/codarama/redlock4j/integration/FairLockIntegrationTest.java
new file mode 100644
index 0000000..2c2f85b
--- /dev/null
+++ b/src/test/java/org/codarama/redlock4j/integration/FairLockIntegrationTest.java
@@ -0,0 +1,302 @@
+/*
+ * SPDX-License-Identifier: MIT
+ * Copyright (c) 2025 Codarama
+ */
+package org.codarama.redlock4j.integration;
+
+import org.codarama.redlock4j.FairLock;
+import org.codarama.redlock4j.RedlockManager;
+import org.codarama.redlock4j.configuration.RedlockConfiguration;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * Integration tests for {@link FairLock}.
+ */
+@Testcontainers
+public class FairLockIntegrationTest {
+
+ @Container
+ static GenericContainer> redis1 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
+ .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes");
+
+ @Container
+ static GenericContainer> redis2 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
+ .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes");
+
+ @Container
+ static GenericContainer> redis3 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
+ .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes");
+
+ private static RedlockConfiguration testConfiguration;
+
+ @BeforeAll
+ static void setUp() {
+ testConfiguration = RedlockConfiguration.builder().addRedisNode("localhost", redis1.getMappedPort(6379))
+ .addRedisNode("localhost", redis2.getMappedPort(6379))
+ .addRedisNode("localhost", redis3.getMappedPort(6379)).defaultLockTimeout(Duration.ofSeconds(30))
+ .lockAcquisitionTimeout(Duration.ofSeconds(10)).retryDelay(Duration.ofMillis(50)).maxRetryAttempts(100)
+ .build();
+ }
+
+ // ========== Basic Functionality ==========
+
+ @Test
+ void shouldAcquireAndReleaseFairLock() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ FairLock fairLock = (FairLock) manager.createFairLock("basic-fair");
+ assertTrue(fairLock.tryLock(5, TimeUnit.SECONDS));
+ assertTrue(fairLock.isHeldByCurrentThread());
+ fairLock.unlock();
+ assertFalse(fairLock.isHeldByCurrentThread());
+ }
+ }
+
+ @Test
+ void shouldBlockSecondAcquirer() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ FairLock fairLock = (FairLock) manager.createFairLock("block-second");
+ CountDownLatch firstAcquired = new CountDownLatch(1);
+ CountDownLatch secondAttempted = new CountDownLatch(1);
+ AtomicInteger secondResult = new AtomicInteger(-1);
+
+ // First thread acquires
+ assertTrue(fairLock.tryLock(5, TimeUnit.SECONDS));
+ firstAcquired.countDown();
+
+ // Second thread tries
+ Thread t = new Thread(() -> {
+ try {
+ firstAcquired.await();
+ boolean acquired = fairLock.tryLock(500, TimeUnit.MILLISECONDS);
+ secondResult.set(acquired ? 1 : 0);
+ secondAttempted.countDown();
+ if (acquired)
+ fairLock.unlock();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ t.start();
+
+ assertTrue(secondAttempted.await(5, TimeUnit.SECONDS));
+ assertEquals(0, secondResult.get(), "Second thread should be blocked");
+ fairLock.unlock();
+ }
+ }
+
+ // ========== Reentrancy ==========
+
+ @Test
+ void shouldSupportReentrantLocking() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ FairLock fairLock = (FairLock) manager.createFairLock("reentrant-fair");
+
+ assertTrue(fairLock.tryLock(5, TimeUnit.SECONDS));
+ assertEquals(1, fairLock.getHoldCount());
+
+ assertTrue(fairLock.tryLock(5, TimeUnit.SECONDS));
+ assertEquals(2, fairLock.getHoldCount());
+
+ fairLock.unlock();
+ assertEquals(1, fairLock.getHoldCount());
+ assertTrue(fairLock.isHeldByCurrentThread());
+
+ fairLock.unlock();
+ assertEquals(0, fairLock.getHoldCount());
+ assertFalse(fairLock.isHeldByCurrentThread());
+ }
+ }
+
+ // ========== FIFO Ordering ==========
+
+ @Test
+ void shouldAcquireInFIFOOrder() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ FairLock fairLock = (FairLock) manager.createFairLock("fifo-order");
+ List acquisitionOrder = Collections.synchronizedList(new ArrayList<>());
+ int threadCount = 3;
+ CountDownLatch firstHeld = new CountDownLatch(1);
+ CountDownLatch allQueued = new CountDownLatch(threadCount - 1);
+ CountDownLatch allDone = new CountDownLatch(threadCount);
+
+ // First thread acquires and holds
+ new Thread(() -> {
+ try {
+ if (fairLock.tryLock(10, TimeUnit.SECONDS)) {
+ acquisitionOrder.add(0);
+ firstHeld.countDown();
+ allQueued.await(); // Wait for others to queue
+ Thread.sleep(200); // Hold briefly
+ fairLock.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ allDone.countDown();
+ }
+ }).start();
+
+ assertTrue(firstHeld.await(10, TimeUnit.SECONDS));
+
+ // Other threads queue up in order
+ for (int i = 1; i < threadCount; i++) {
+ final int idx = i;
+ Thread.sleep(100); // Stagger to ensure ordering
+ new Thread(() -> {
+ try {
+ allQueued.countDown();
+ if (fairLock.tryLock(30, TimeUnit.SECONDS)) {
+ acquisitionOrder.add(idx);
+ fairLock.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ allDone.countDown();
+ }
+ }).start();
+ }
+
+ assertTrue(allDone.await(60, TimeUnit.SECONDS));
+ assertEquals(threadCount, acquisitionOrder.size(), "All threads should acquire");
+ }
+ }
+
+ // ========== Timeout ==========
+
+ @Test
+ void shouldTimeoutWhenLockNotAvailable() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ FairLock fairLock = (FairLock) manager.createFairLock("timeout-test");
+
+ assertTrue(fairLock.tryLock(5, TimeUnit.SECONDS));
+
+ long start = System.currentTimeMillis();
+ Thread t = new Thread(() -> {
+ try {
+ assertFalse(fairLock.tryLock(500, TimeUnit.MILLISECONDS));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ t.start();
+ t.join();
+ long elapsed = System.currentTimeMillis() - start;
+
+ assertTrue(elapsed >= 400 && elapsed < 2000, "Should timeout after ~500ms");
+ fairLock.unlock();
+ }
+ }
+
+ // ========== Concurrent Access ==========
+
+ @Test
+ void shouldHandleConcurrentAcquisitions() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ FairLock fairLock = (FairLock) manager.createFairLock("concurrent-fair");
+ int threadCount = 5;
+ AtomicInteger successCount = new AtomicInteger(0);
+ CountDownLatch startSignal = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(threadCount);
+
+ for (int i = 0; i < threadCount; i++) {
+ new Thread(() -> {
+ try {
+ startSignal.await();
+ if (fairLock.tryLock(30, TimeUnit.SECONDS)) {
+ try {
+ successCount.incrementAndGet();
+ Thread.sleep(50);
+ } finally {
+ fairLock.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ done.countDown();
+ }
+ }).start();
+ }
+
+ startSignal.countDown();
+ assertTrue(done.await(60, TimeUnit.SECONDS));
+ assertEquals(threadCount, successCount.get(), "All threads should eventually acquire");
+ }
+ }
+
+ @Test
+ void shouldHandleRapidLockUnlockCycles() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ FairLock fairLock = (FairLock) manager.createFairLock("rapid-cycles");
+ int cycles = 5;
+ int threadCount = 3;
+ AtomicInteger totalAcquisitions = new AtomicInteger(0);
+ CountDownLatch done = new CountDownLatch(threadCount);
+
+ for (int i = 0; i < threadCount; i++) {
+ new Thread(() -> {
+ try {
+ for (int j = 0; j < cycles; j++) {
+ if (fairLock.tryLock(10, TimeUnit.SECONDS)) {
+ totalAcquisitions.incrementAndGet();
+ Thread.sleep(20);
+ fairLock.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ done.countDown();
+ }
+ }).start();
+ }
+
+ assertTrue(done.await(120, TimeUnit.SECONDS));
+ assertTrue(totalAcquisitions.get() > 0, "Should have successful acquisitions");
+ }
+ }
+
+ // ========== Utility Methods ==========
+
+ @Test
+ void shouldReportValidityTime() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ FairLock fairLock = (FairLock) manager.createFairLock("validity-time");
+
+ assertEquals(0, fairLock.getRemainingValidityTime());
+
+ assertTrue(fairLock.tryLock(5, TimeUnit.SECONDS));
+ assertTrue(fairLock.getRemainingValidityTime() > 0);
+
+ fairLock.unlock();
+ }
+ }
+
+ // ========== Lettuce Driver ==========
+
+ @Test
+ void shouldWorkWithLettuceDriver() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withLettuce(testConfiguration)) {
+ FairLock fairLock = (FairLock) manager.createFairLock("lettuce-fair");
+ assertTrue(fairLock.tryLock(5, TimeUnit.SECONDS));
+ assertTrue(fairLock.isHeldByCurrentThread());
+ fairLock.unlock();
+ }
+ }
+}
diff --git a/src/test/java/org/codarama/redlock4j/integration/MultiLockIntegrationTest.java b/src/test/java/org/codarama/redlock4j/integration/MultiLockIntegrationTest.java
new file mode 100644
index 0000000..9bcf03e
--- /dev/null
+++ b/src/test/java/org/codarama/redlock4j/integration/MultiLockIntegrationTest.java
@@ -0,0 +1,301 @@
+/*
+ * SPDX-License-Identifier: MIT
+ * Copyright (c) 2025 Codarama
+ */
+package org.codarama.redlock4j.integration;
+
+import org.codarama.redlock4j.MultiLock;
+import org.codarama.redlock4j.RedlockManager;
+import org.codarama.redlock4j.configuration.RedlockConfiguration;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * Integration tests for {@link MultiLock}.
+ */
+@Testcontainers
+public class MultiLockIntegrationTest {
+
+ @Container
+ static GenericContainer> redis1 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
+ .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes");
+
+ @Container
+ static GenericContainer> redis2 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
+ .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes");
+
+ @Container
+ static GenericContainer> redis3 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
+ .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes");
+
+ private static RedlockConfiguration testConfiguration;
+
+ @BeforeAll
+ static void setUp() {
+ testConfiguration = RedlockConfiguration.builder().addRedisNode("localhost", redis1.getMappedPort(6379))
+ .addRedisNode("localhost", redis2.getMappedPort(6379))
+ .addRedisNode("localhost", redis3.getMappedPort(6379)).defaultLockTimeout(Duration.ofSeconds(30))
+ .lockAcquisitionTimeout(Duration.ofSeconds(10)).retryDelay(Duration.ofMillis(50)).maxRetryAttempts(100)
+ .build();
+ }
+
+ // ========== Basic Functionality ==========
+
+ @Test
+ void shouldAcquireAndReleaseMultipleLocks() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ List keys = Arrays.asList("account:1", "account:2", "account:3");
+ Lock multiLock = manager.createMultiLock(keys);
+
+ assertTrue(multiLock.tryLock(5, TimeUnit.SECONDS));
+ multiLock.unlock();
+ }
+ }
+
+ @Test
+ void shouldRejectEmptyKeyList() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ assertThrows(IllegalArgumentException.class, () -> manager.createMultiLock(Arrays.asList()));
+ }
+ }
+
+ @Test
+ void shouldRejectNullKeyList() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ assertThrows(IllegalArgumentException.class, () -> manager.createMultiLock(null));
+ }
+ }
+
+ // ========== All-or-Nothing Semantics ==========
+
+ @Test
+ void shouldBlockIfAnyResourceIsLocked() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ // First, acquire a single lock on one resource
+ Lock singleLock = manager.createLock("resource:B");
+ assertTrue(singleLock.tryLock(5, TimeUnit.SECONDS));
+
+ // Try to acquire multi-lock that includes resource:B
+ List keys = Arrays.asList("resource:A", "resource:B", "resource:C");
+ Lock multiLock = manager.createMultiLock(keys);
+
+ // Should fail with short timeout since resource:B is already held
+ assertFalse(multiLock.tryLock(500, TimeUnit.MILLISECONDS));
+
+ singleLock.unlock();
+ }
+ }
+
+ @Test
+ void shouldAcquireAfterConflictingLockReleased() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ Lock singleLock = manager.createLock("resource:X");
+ assertTrue(singleLock.tryLock(5, TimeUnit.SECONDS));
+
+ List keys = Arrays.asList("resource:W", "resource:X", "resource:Y");
+ Lock multiLock = manager.createMultiLock(keys);
+ CountDownLatch released = new CountDownLatch(1);
+ AtomicBoolean multiAcquired = new AtomicBoolean(false);
+
+ Thread t = new Thread(() -> {
+ try {
+ released.await();
+ if (multiLock.tryLock(10, TimeUnit.SECONDS)) {
+ multiAcquired.set(true);
+ multiLock.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ t.start();
+
+ Thread.sleep(100);
+ singleLock.unlock();
+ released.countDown();
+
+ t.join(15000);
+ assertTrue(multiAcquired.get(), "Multi-lock should acquire after single lock released");
+ }
+ }
+
+ // ========== Exclusive Access ==========
+
+ @Test
+ void twoMultiLocksWithOverlapShouldNotCoexist() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ List keys1 = Arrays.asList("shared:1", "shared:2");
+ List keys2 = Arrays.asList("shared:2", "shared:3");
+
+ Lock multiLock1 = manager.createMultiLock(keys1);
+ Lock multiLock2 = manager.createMultiLock(keys2);
+
+ assertTrue(multiLock1.tryLock(5, TimeUnit.SECONDS));
+ // multiLock2 overlaps on shared:2, should fail
+ assertFalse(multiLock2.tryLock(500, TimeUnit.MILLISECONDS));
+
+ multiLock1.unlock();
+ // Now multiLock2 should succeed
+ assertTrue(multiLock2.tryLock(5, TimeUnit.SECONDS));
+ multiLock2.unlock();
+ }
+ }
+
+ // ========== Concurrent Access ==========
+
+ @Test
+ void shouldHandleConcurrentMultiLockAcquisitions() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ int threadCount = 5;
+ AtomicInteger successCount = new AtomicInteger(0);
+ CountDownLatch startSignal = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(threadCount);
+
+ for (int i = 0; i < threadCount; i++) {
+ final int idx = i;
+ new Thread(() -> {
+ try {
+ // Each thread locks different resources
+ List keys = Arrays.asList("t" + idx + ":a", "t" + idx + ":b");
+ Lock multiLock = manager.createMultiLock(keys);
+ startSignal.await();
+ if (multiLock.tryLock(10, TimeUnit.SECONDS)) {
+ try {
+ successCount.incrementAndGet();
+ Thread.sleep(50);
+ } finally {
+ multiLock.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ done.countDown();
+ }
+ }).start();
+ }
+
+ startSignal.countDown();
+ assertTrue(done.await(30, TimeUnit.SECONDS));
+ assertEquals(threadCount, successCount.get(), "All threads should acquire non-overlapping locks");
+ }
+ }
+
+ @Test
+ void shouldHandleConcurrentAccessToSameResources() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ int threadCount = 4;
+ List keys = Arrays.asList("contested:A", "contested:B");
+ AtomicInteger successCount = new AtomicInteger(0);
+ CountDownLatch startSignal = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(threadCount);
+
+ for (int i = 0; i < threadCount; i++) {
+ new Thread(() -> {
+ try {
+ Lock multiLock = manager.createMultiLock(keys);
+ startSignal.await();
+ if (multiLock.tryLock(30, TimeUnit.SECONDS)) {
+ try {
+ successCount.incrementAndGet();
+ Thread.sleep(50);
+ } finally {
+ multiLock.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ done.countDown();
+ }
+ }).start();
+ }
+
+ startSignal.countDown();
+ assertTrue(done.await(60, TimeUnit.SECONDS));
+ assertEquals(threadCount, successCount.get(), "All threads should eventually acquire");
+ }
+ }
+
+ // ========== Key Ordering (Deadlock Prevention) ==========
+
+ @Test
+ void shouldPreventDeadlockWithDifferentKeyOrders() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ // Thread 1 requests A,B,C; Thread 2 requests C,B,A
+ // Internal sorting should make both request A,B,C
+ List keys1 = Arrays.asList("deadlock:A", "deadlock:B", "deadlock:C");
+ List keys2 = Arrays.asList("deadlock:C", "deadlock:B", "deadlock:A");
+
+ AtomicInteger success1 = new AtomicInteger(0);
+ AtomicInteger success2 = new AtomicInteger(0);
+ CountDownLatch start = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(2);
+
+ new Thread(() -> {
+ try {
+ Lock ml = manager.createMultiLock(keys1);
+ start.await();
+ if (ml.tryLock(30, TimeUnit.SECONDS)) {
+ success1.incrementAndGet();
+ Thread.sleep(100);
+ ml.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ done.countDown();
+ }
+ }).start();
+
+ new Thread(() -> {
+ try {
+ Lock ml = manager.createMultiLock(keys2);
+ start.await();
+ if (ml.tryLock(30, TimeUnit.SECONDS)) {
+ success2.incrementAndGet();
+ Thread.sleep(100);
+ ml.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ done.countDown();
+ }
+ }).start();
+
+ start.countDown();
+ assertTrue(done.await(60, TimeUnit.SECONDS));
+ assertEquals(1, success1.get());
+ assertEquals(1, success2.get());
+ }
+ }
+
+ // ========== Lettuce Driver ==========
+
+ @Test
+ void shouldWorkWithLettuceDriver() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withLettuce(testConfiguration)) {
+ List keys = Arrays.asList("lettuce:1", "lettuce:2");
+ Lock multiLock = manager.createMultiLock(keys);
+
+ assertTrue(multiLock.tryLock(5, TimeUnit.SECONDS));
+ multiLock.unlock();
+ }
+ }
+}
diff --git a/src/test/java/org/codarama/redlock4j/integration/RedlockCountDownLatchIntegrationTest.java b/src/test/java/org/codarama/redlock4j/integration/RedlockCountDownLatchIntegrationTest.java
new file mode 100644
index 0000000..108344d
--- /dev/null
+++ b/src/test/java/org/codarama/redlock4j/integration/RedlockCountDownLatchIntegrationTest.java
@@ -0,0 +1,504 @@
+/*
+ * SPDX-License-Identifier: MIT
+ * Copyright (c) 2025 Codarama
+ */
+package org.codarama.redlock4j.integration;
+
+import org.codarama.redlock4j.RedlockCountDownLatch;
+import org.codarama.redlock4j.RedlockManager;
+import org.codarama.redlock4j.configuration.RedlockConfiguration;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * Comprehensive integration tests for {@link RedlockCountDownLatch}. Verifies the distributed countdown latch honors
+ * its contract.
+ */
+@Testcontainers
+public class RedlockCountDownLatchIntegrationTest {
+
+ @Container
+ static GenericContainer> redis1 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
+ .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes");
+
+ @Container
+ static GenericContainer> redis2 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
+ .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes");
+
+ @Container
+ static GenericContainer> redis3 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
+ .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes");
+
+ private static RedlockConfiguration testConfiguration;
+
+ @BeforeAll
+ static void setUp() {
+ testConfiguration = RedlockConfiguration.builder().addRedisNode("localhost", redis1.getMappedPort(6379))
+ .addRedisNode("localhost", redis2.getMappedPort(6379))
+ .addRedisNode("localhost", redis3.getMappedPort(6379)).defaultLockTimeout(Duration.ofSeconds(30))
+ .retryDelay(Duration.ofMillis(100)).maxRetryAttempts(5).lockAcquisitionTimeout(Duration.ofSeconds(10))
+ .build();
+ }
+
+ // ========== Contract: Initialization ==========
+
+ @Test
+ void shouldInitializeWithCorrectCount() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockCountDownLatch latch = manager.createCountDownLatch("init-test", 5);
+ assertEquals(5, latch.getCount(), "Latch should initialize with specified count");
+ }
+ }
+
+ @Test
+ void shouldRejectNegativeCount() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ assertThrows(IllegalArgumentException.class, () -> {
+ manager.createCountDownLatch("negative-test", -1);
+ }, "Should reject negative count");
+ }
+ }
+
+ @Test
+ void shouldAllowZeroCount() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockCountDownLatch latch = manager.createCountDownLatch("zero-test", 0);
+ assertEquals(0, latch.getCount(), "Latch should allow zero count");
+ }
+ }
+
+ // ========== Contract: countDown() ==========
+
+ @Test
+ void countDownShouldDecrementCount() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockCountDownLatch latch = manager.createCountDownLatch("countdown-decr", 3);
+
+ latch.countDown();
+ assertEquals(2, latch.getCount());
+
+ latch.countDown();
+ assertEquals(1, latch.getCount());
+
+ latch.countDown();
+ assertEquals(0, latch.getCount());
+ }
+ }
+
+ @Test
+ void countDownBelowZeroShouldNotGoNegative() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockCountDownLatch latch = manager.createCountDownLatch("countdown-below-zero", 1);
+
+ latch.countDown();
+ latch.countDown();
+ latch.countDown();
+
+ assertTrue(latch.getCount() <= 0, "Count should be at most 0");
+ }
+ }
+
+ // ========== Contract: await() ==========
+
+ @Test
+ void awaitShouldReturnImmediatelyWhenCountIsZero() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockCountDownLatch latch = manager.createCountDownLatch("await-zero", 0);
+
+ long start = System.currentTimeMillis();
+ boolean result = latch.await(5, TimeUnit.SECONDS);
+ long elapsed = System.currentTimeMillis() - start;
+
+ assertTrue(result, "await() should return true when count is 0");
+ assertTrue(elapsed < 1000, "await() should return immediately when count is 0");
+ }
+ }
+
+ @Test
+ void awaitShouldBlockUntilCountReachesZero() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockCountDownLatch latch = manager.createCountDownLatch("await-block", 2);
+ AtomicBoolean awaitCompleted = new AtomicBoolean(false);
+ CountDownLatch threadStarted = new CountDownLatch(1);
+
+ Thread waiter = new Thread(() -> {
+ try {
+ threadStarted.countDown();
+ latch.await();
+ awaitCompleted.set(true);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ waiter.start();
+
+ threadStarted.await(2, TimeUnit.SECONDS);
+ Thread.sleep(200);
+
+ assertFalse(awaitCompleted.get(), "await() should still be blocking");
+
+ latch.countDown();
+ Thread.sleep(100);
+ assertFalse(awaitCompleted.get(), "await() should still block (count=1)");
+
+ latch.countDown();
+ waiter.join(5000);
+
+ assertTrue(awaitCompleted.get(), "await() should complete when count reaches 0");
+ }
+ }
+
+ @Test
+ void awaitWithTimeoutShouldReturnFalseOnTimeout() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockCountDownLatch latch = manager.createCountDownLatch("await-timeout", 5);
+
+ long start = System.currentTimeMillis();
+ boolean result = latch.await(1, TimeUnit.SECONDS);
+ long elapsed = System.currentTimeMillis() - start;
+
+ assertFalse(result, "await() should return false on timeout");
+ assertTrue(elapsed >= 900 && elapsed < 2000, "Should wait approximately 1 second");
+ }
+ }
+
+ // ========== Contract: Distributed Coordination ==========
+
+ @Test
+ void shouldCoordinateMultipleWorkerThreads() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ int workerCount = 5;
+ RedlockCountDownLatch latch = manager.createCountDownLatch("coord-workers", workerCount);
+ AtomicInteger completedWorkers = new AtomicInteger(0);
+ CountDownLatch allWorkersStarted = new CountDownLatch(workerCount);
+ AtomicBoolean coordinatorCompleted = new AtomicBoolean(false);
+
+ Thread coordinator = new Thread(() -> {
+ try {
+ latch.await();
+ coordinatorCompleted.set(true);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ coordinator.start();
+
+ ExecutorService executor = Executors.newFixedThreadPool(workerCount);
+ for (int i = 0; i < workerCount; i++) {
+ final int workerId = i;
+ executor.submit(() -> {
+ try {
+ allWorkersStarted.countDown();
+ Thread.sleep(100 + workerId * 50);
+ completedWorkers.incrementAndGet();
+ latch.countDown();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ }
+
+ assertTrue(allWorkersStarted.await(5, TimeUnit.SECONDS));
+ Thread.sleep(100);
+ assertFalse(coordinatorCompleted.get(), "Coordinator should wait for workers");
+
+ coordinator.join(10000);
+
+ assertTrue(coordinatorCompleted.get(), "Coordinator should complete after all workers");
+ assertEquals(workerCount, completedWorkers.get(), "All workers should complete");
+
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ void shouldWorkAcrossMultipleRedlockManagerInstances() throws InterruptedException {
+ AtomicBoolean waiterCompleted = new AtomicBoolean(false);
+ CountDownLatch waiterStarted = new CountDownLatch(1);
+
+ Thread waiter = new Thread(() -> {
+ try (RedlockManager manager1 = RedlockManager.withJedis(testConfiguration)) {
+ RedlockCountDownLatch latch1 = manager1.createCountDownLatch("cross-process", 2);
+ waiterStarted.countDown();
+ latch1.await();
+ waiterCompleted.set(true);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ waiter.start();
+
+ waiterStarted.await(2, TimeUnit.SECONDS);
+ Thread.sleep(500);
+
+ try (RedlockManager manager2 = RedlockManager.withJedis(testConfiguration)) {
+ RedlockCountDownLatch latch2 = manager2.createCountDownLatch("cross-process", 2);
+
+ latch2.countDown();
+ Thread.sleep(100);
+ assertFalse(waiterCompleted.get(), "Waiter should still be waiting");
+
+ latch2.countDown();
+ }
+
+ waiter.join(5000);
+ assertTrue(waiterCompleted.get(), "Waiter should complete after cross-process countdown");
+ }
+
+ // ========== Contract: reset() ==========
+
+ @Test
+ void resetShouldRestoreInitialCount() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockCountDownLatch latch = manager.createCountDownLatch("reset-test", 3);
+
+ latch.countDown();
+ latch.countDown();
+ assertEquals(1, latch.getCount());
+
+ latch.reset();
+ assertEquals(3, latch.getCount(), "reset() should restore initial count");
+ }
+ }
+
+ @Test
+ void resetShouldAllowReuseOfLatch() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockCountDownLatch latch = manager.createCountDownLatch("reset-reuse", 2);
+
+ latch.countDown();
+ latch.countDown();
+ assertTrue(latch.await(1, TimeUnit.SECONDS), "First await should complete");
+
+ latch.reset();
+ assertEquals(2, latch.getCount());
+
+ latch.countDown();
+ latch.countDown();
+ assertTrue(latch.await(1, TimeUnit.SECONDS), "Second await should complete after reset");
+ }
+ }
+
+ // ========== Contract: Lettuce Driver ==========
+
+ @Test
+ void shouldWorkWithLettuceDriver() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withLettuce(testConfiguration)) {
+ RedlockCountDownLatch latch = manager.createCountDownLatch("lettuce-test", 3);
+
+ assertEquals(3, latch.getCount());
+
+ latch.countDown();
+ assertEquals(2, latch.getCount());
+
+ latch.countDown();
+ latch.countDown();
+
+ assertTrue(latch.await(1, TimeUnit.SECONDS), "Lettuce latch should work correctly");
+ }
+ }
+
+ // ========== Contract: toString() ==========
+
+ @Test
+ void toStringShouldContainLatchInfo() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockCountDownLatch latch = manager.createCountDownLatch("tostring-test", 5);
+
+ String str = latch.toString();
+ assertTrue(str.contains("tostring-test"), "toString should contain latch key");
+ assertTrue(str.contains("5"), "toString should contain count");
+ }
+ }
+
+ // ========== Contract: hasQueuedThreads() ==========
+
+ @Test
+ void hasQueuedThreadsShouldReturnTrueWhenCountPositive() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockCountDownLatch latch = manager.createCountDownLatch("queued-test", 2);
+
+ assertTrue(latch.hasQueuedThreads(), "hasQueuedThreads should return true when count > 0");
+
+ latch.countDown();
+ latch.countDown();
+
+ assertFalse(latch.hasQueuedThreads(), "hasQueuedThreads should return false when count = 0");
+ }
+ }
+
+ // ========== Contract: Concurrent Access ==========
+
+ @Test
+ void shouldHandleConcurrentCountDownCalls() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ int totalCountDowns = 10;
+ RedlockCountDownLatch latch = manager.createCountDownLatch("concurrent-countdown", totalCountDowns);
+ CountDownLatch startSignal = new CountDownLatch(1);
+ CountDownLatch doneSignal = new CountDownLatch(totalCountDowns);
+
+ // Create threads that all call countDown() simultaneously
+ for (int i = 0; i < totalCountDowns; i++) {
+ new Thread(() -> {
+ try {
+ startSignal.await(); // Wait for signal to start together
+ latch.countDown();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ doneSignal.countDown();
+ }
+ }).start();
+ }
+
+ // Release all threads at once
+ startSignal.countDown();
+
+ // Wait for all threads to complete
+ assertTrue(doneSignal.await(10, TimeUnit.SECONDS), "All threads should complete");
+
+ // Count should be 0 after all concurrent countdowns
+ assertEquals(0, latch.getCount(), "Count should be 0 after concurrent countdowns");
+ }
+ }
+
+ @Test
+ void shouldHandleMultipleConcurrentAwaiters() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ int awaiterCount = 5;
+ RedlockCountDownLatch latch = manager.createCountDownLatch("concurrent-await", 1);
+ AtomicInteger completedAwaiters = new AtomicInteger(0);
+ CountDownLatch allStarted = new CountDownLatch(awaiterCount);
+ CountDownLatch allDone = new CountDownLatch(awaiterCount);
+
+ // Start multiple threads that all await on the same latch
+ for (int i = 0; i < awaiterCount; i++) {
+ new Thread(() -> {
+ try {
+ allStarted.countDown();
+ latch.await();
+ completedAwaiters.incrementAndGet();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ allDone.countDown();
+ }
+ }).start();
+ }
+
+ // Wait for all awaiters to start
+ assertTrue(allStarted.await(5, TimeUnit.SECONDS));
+ Thread.sleep(300); // Let them all block
+
+ // No awaiters should have completed yet
+ assertEquals(0, completedAwaiters.get(), "No awaiter should complete before countdown");
+
+ // Single countdown should release ALL awaiters
+ latch.countDown();
+
+ // All awaiters should complete
+ assertTrue(allDone.await(10, TimeUnit.SECONDS), "All awaiters should complete");
+ assertEquals(awaiterCount, completedAwaiters.get(), "All awaiters should have completed");
+ }
+ }
+
+ @Test
+ void shouldMaintainCorrectCountUnderConcurrentAccess() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ int initialCount = 100;
+ int threadCount = 20;
+ int countDownsPerThread = 5; // 20 * 5 = 100 total
+
+ RedlockCountDownLatch latch = manager.createCountDownLatch("stress-test", initialCount);
+ CountDownLatch startSignal = new CountDownLatch(1);
+ CountDownLatch doneSignal = new CountDownLatch(threadCount);
+
+ for (int i = 0; i < threadCount; i++) {
+ new Thread(() -> {
+ try {
+ startSignal.await();
+ for (int j = 0; j < countDownsPerThread; j++) {
+ latch.countDown();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ doneSignal.countDown();
+ }
+ }).start();
+ }
+
+ // Release all threads simultaneously
+ startSignal.countDown();
+
+ // Wait for completion
+ assertTrue(doneSignal.await(30, TimeUnit.SECONDS), "All threads should complete");
+
+ // Verify final count is 0
+ assertEquals(0, latch.getCount(), "Count should be 0 after all countdowns");
+ }
+ }
+
+ @Test
+ void shouldHandleConcurrentCountDownAndGetCount() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ int initialCount = 50;
+ RedlockCountDownLatch latch = manager.createCountDownLatch("mixed-ops", initialCount);
+ CountDownLatch startSignal = new CountDownLatch(1);
+ AtomicInteger countDownsDone = new AtomicInteger(0);
+ AtomicBoolean invalidCountSeen = new AtomicBoolean(false);
+
+ // Threads doing countDown
+ for (int i = 0; i < initialCount; i++) {
+ new Thread(() -> {
+ try {
+ startSignal.await();
+ latch.countDown();
+ countDownsDone.incrementAndGet();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }).start();
+ }
+
+ // Threads doing getCount (verifying count is always valid)
+ for (int i = 0; i < 10; i++) {
+ new Thread(() -> {
+ try {
+ startSignal.await();
+ for (int j = 0; j < 20; j++) {
+ long count = latch.getCount();
+ if (count < 0 || count > initialCount) {
+ invalidCountSeen.set(true);
+ }
+ Thread.sleep(10);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }).start();
+ }
+
+ startSignal.countDown();
+ Thread.sleep(3000); // Let operations complete
+
+ assertFalse(invalidCountSeen.get(), "Count should always be in valid range");
+ assertEquals(initialCount, countDownsDone.get(), "All countdowns should complete");
+ }
+ }
+}
diff --git a/src/test/java/org/codarama/redlock4j/integration/RedlockReadWriteLockIntegrationTest.java b/src/test/java/org/codarama/redlock4j/integration/RedlockReadWriteLockIntegrationTest.java
new file mode 100644
index 0000000..ca7e4dc
--- /dev/null
+++ b/src/test/java/org/codarama/redlock4j/integration/RedlockReadWriteLockIntegrationTest.java
@@ -0,0 +1,332 @@
+/*
+ * SPDX-License-Identifier: MIT
+ * Copyright (c) 2025 Codarama
+ */
+package org.codarama.redlock4j.integration;
+
+import org.codarama.redlock4j.RedlockManager;
+import org.codarama.redlock4j.RedlockReadWriteLock;
+import org.codarama.redlock4j.configuration.RedlockConfiguration;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * Integration tests for {@link RedlockReadWriteLock}.
+ */
+@Testcontainers
+public class RedlockReadWriteLockIntegrationTest {
+
+ @Container
+ static GenericContainer> redis1 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
+ .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes");
+
+ @Container
+ static GenericContainer> redis2 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
+ .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes");
+
+ @Container
+ static GenericContainer> redis3 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
+ .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes");
+
+ private static RedlockConfiguration testConfiguration;
+
+ @BeforeAll
+ static void setUp() {
+ testConfiguration = RedlockConfiguration.builder().addRedisNode("localhost", redis1.getMappedPort(6379))
+ .addRedisNode("localhost", redis2.getMappedPort(6379))
+ .addRedisNode("localhost", redis3.getMappedPort(6379)).defaultLockTimeout(Duration.ofSeconds(30))
+ .lockAcquisitionTimeout(Duration.ofSeconds(10)).retryDelay(Duration.ofMillis(50)).maxRetryAttempts(100)
+ .build();
+ }
+
+ // ========== Basic Functionality ==========
+
+ @Test
+ void shouldAcquireAndReleaseReadLock() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockReadWriteLock rwLock = manager.createReadWriteLock("basic-read");
+ Lock readLock = rwLock.readLock();
+
+ assertTrue(readLock.tryLock(5, TimeUnit.SECONDS));
+ readLock.unlock();
+ }
+ }
+
+ @Test
+ void shouldAcquireAndReleaseWriteLock() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockReadWriteLock rwLock = manager.createReadWriteLock("basic-write");
+ Lock writeLock = rwLock.writeLock();
+
+ assertTrue(writeLock.tryLock(5, TimeUnit.SECONDS));
+ writeLock.unlock();
+ }
+ }
+
+ // ========== Multiple Readers ==========
+
+ @Test
+ void shouldAllowMultipleConcurrentReaders() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockReadWriteLock rwLock = manager.createReadWriteLock("multi-readers");
+ int readerCount = 5;
+ AtomicInteger acquired = new AtomicInteger(0);
+ CountDownLatch allAcquired = new CountDownLatch(readerCount);
+ CountDownLatch releaseSignal = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(readerCount);
+
+ for (int i = 0; i < readerCount; i++) {
+ new Thread(() -> {
+ try {
+ Lock readLock = rwLock.readLock();
+ if (readLock.tryLock(10, TimeUnit.SECONDS)) {
+ try {
+ acquired.incrementAndGet();
+ allAcquired.countDown();
+ releaseSignal.await();
+ } finally {
+ readLock.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ done.countDown();
+ }
+ }).start();
+ }
+
+ assertTrue(allAcquired.await(15, TimeUnit.SECONDS), "All readers should acquire");
+ assertEquals(readerCount, acquired.get(), "All readers should hold the lock simultaneously");
+
+ releaseSignal.countDown();
+ assertTrue(done.await(5, TimeUnit.SECONDS));
+ }
+ }
+
+ // ========== Writer Exclusivity ==========
+
+ @Test
+ void writerShouldHaveExclusiveAccess() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockReadWriteLock rwLock = manager.createReadWriteLock("writer-exclusive");
+ Lock writeLock = rwLock.writeLock();
+ AtomicBoolean writerHoldsLock = new AtomicBoolean(false);
+ AtomicBoolean secondWriterBlocked = new AtomicBoolean(false);
+ CountDownLatch writerAcquired = new CountDownLatch(1);
+ CountDownLatch secondAttempted = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(1);
+
+ // First writer acquires
+ assertTrue(writeLock.tryLock(5, TimeUnit.SECONDS));
+ writerHoldsLock.set(true);
+ writerAcquired.countDown();
+
+ // Second writer tries to acquire (should fail quickly)
+ new Thread(() -> {
+ try {
+ writerAcquired.await();
+ Lock secondWrite = rwLock.writeLock();
+ boolean acquired = secondWrite.tryLock(500, TimeUnit.MILLISECONDS);
+ secondWriterBlocked.set(!acquired);
+ secondAttempted.countDown();
+ if (acquired) {
+ secondWrite.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ done.countDown();
+ }
+ }).start();
+
+ assertTrue(secondAttempted.await(5, TimeUnit.SECONDS));
+ assertTrue(secondWriterBlocked.get(), "Second writer should be blocked");
+
+ writeLock.unlock();
+ assertTrue(done.await(5, TimeUnit.SECONDS));
+ }
+ }
+
+ // ========== Reader/Writer Blocking ==========
+
+ @Test
+ void readerShouldBeBlockedByWriter() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockReadWriteLock rwLock = manager.createReadWriteLock("reader-blocked-by-writer");
+ Lock writeLock = rwLock.writeLock();
+ AtomicBoolean readerBlocked = new AtomicBoolean(false);
+ CountDownLatch writerAcquired = new CountDownLatch(1);
+ CountDownLatch readerAttempted = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(1);
+
+ // Writer acquires first
+ assertTrue(writeLock.tryLock(5, TimeUnit.SECONDS));
+ writerAcquired.countDown();
+
+ // Reader tries to acquire (should fail with short timeout)
+ new Thread(() -> {
+ try {
+ writerAcquired.await();
+ Lock readLock = rwLock.readLock();
+ boolean acquired = readLock.tryLock(500, TimeUnit.MILLISECONDS);
+ readerBlocked.set(!acquired);
+ readerAttempted.countDown();
+ if (acquired) {
+ readLock.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ done.countDown();
+ }
+ }).start();
+
+ assertTrue(readerAttempted.await(5, TimeUnit.SECONDS));
+ assertTrue(readerBlocked.get(), "Reader should be blocked by writer");
+
+ writeLock.unlock();
+ assertTrue(done.await(5, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ void writerShouldWaitForReadersToFinish() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockReadWriteLock rwLock = manager.createReadWriteLock("writer-waits-readers");
+ Lock readLock = rwLock.readLock();
+ AtomicBoolean writerAcquiredAfterReaderRelease = new AtomicBoolean(false);
+ CountDownLatch readerAcquired = new CountDownLatch(1);
+ CountDownLatch writerStarted = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(1);
+
+ // Reader acquires first
+ assertTrue(readLock.tryLock(5, TimeUnit.SECONDS));
+ readerAcquired.countDown();
+
+ // Writer tries to acquire in background
+ new Thread(() -> {
+ try {
+ readerAcquired.await();
+ writerStarted.countDown();
+ Lock writeLock = rwLock.writeLock();
+ boolean acquired = writeLock.tryLock(10, TimeUnit.SECONDS);
+ writerAcquiredAfterReaderRelease.set(acquired);
+ if (acquired) {
+ writeLock.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ done.countDown();
+ }
+ }).start();
+
+ assertTrue(writerStarted.await(5, TimeUnit.SECONDS));
+ Thread.sleep(200); // Give writer time to attempt
+
+ // Release read lock - writer should then succeed
+ readLock.unlock();
+
+ assertTrue(done.await(15, TimeUnit.SECONDS));
+ assertTrue(writerAcquiredAfterReaderRelease.get(), "Writer should acquire after reader releases");
+ }
+ }
+
+ // ========== Concurrent Access ==========
+
+ @Test
+ void shouldHandleConcurrentReadersAndWriters() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockReadWriteLock rwLock = manager.createReadWriteLock("concurrent-rw");
+ int readerCount = 5;
+ int writerCount = 3;
+ int totalThreads = readerCount + writerCount;
+ AtomicInteger readAcquisitions = new AtomicInteger(0);
+ AtomicInteger writeAcquisitions = new AtomicInteger(0);
+ CountDownLatch startSignal = new CountDownLatch(1);
+ CountDownLatch done = new CountDownLatch(totalThreads);
+
+ // Start readers
+ for (int i = 0; i < readerCount; i++) {
+ new Thread(() -> {
+ try {
+ startSignal.await();
+ Lock readLock = rwLock.readLock();
+ if (readLock.tryLock(10, TimeUnit.SECONDS)) {
+ try {
+ readAcquisitions.incrementAndGet();
+ Thread.sleep(50);
+ } finally {
+ readLock.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ done.countDown();
+ }
+ }).start();
+ }
+
+ // Start writers
+ for (int i = 0; i < writerCount; i++) {
+ new Thread(() -> {
+ try {
+ startSignal.await();
+ Lock writeLock = rwLock.writeLock();
+ if (writeLock.tryLock(10, TimeUnit.SECONDS)) {
+ try {
+ writeAcquisitions.incrementAndGet();
+ Thread.sleep(50);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ done.countDown();
+ }
+ }).start();
+ }
+
+ startSignal.countDown();
+ assertTrue(done.await(60, TimeUnit.SECONDS));
+ assertTrue(readAcquisitions.get() > 0, "Should have successful read acquisitions");
+ assertTrue(writeAcquisitions.get() > 0, "Should have successful write acquisitions");
+ }
+ }
+
+ // ========== Lettuce Driver ==========
+
+ @Test
+ void shouldWorkWithLettuceDriver() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withLettuce(testConfiguration)) {
+ RedlockReadWriteLock rwLock = manager.createReadWriteLock("lettuce-rwlock");
+
+ Lock readLock = rwLock.readLock();
+ assertTrue(readLock.tryLock(5, TimeUnit.SECONDS));
+ readLock.unlock();
+
+ Lock writeLock = rwLock.writeLock();
+ assertTrue(writeLock.tryLock(5, TimeUnit.SECONDS));
+ writeLock.unlock();
+ }
+ }
+}
diff --git a/src/test/java/org/codarama/redlock4j/integration/RedlockSemaphoreIntegrationTest.java b/src/test/java/org/codarama/redlock4j/integration/RedlockSemaphoreIntegrationTest.java
new file mode 100644
index 0000000..3e37a4a
--- /dev/null
+++ b/src/test/java/org/codarama/redlock4j/integration/RedlockSemaphoreIntegrationTest.java
@@ -0,0 +1,307 @@
+/*
+ * SPDX-License-Identifier: MIT
+ * Copyright (c) 2025 Codarama
+ */
+package org.codarama.redlock4j.integration;
+
+import org.codarama.redlock4j.RedlockManager;
+import org.codarama.redlock4j.RedlockSemaphore;
+import org.codarama.redlock4j.configuration.RedlockConfiguration;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * Integration tests for {@link RedlockSemaphore}.
+ */
+@Testcontainers
+public class RedlockSemaphoreIntegrationTest {
+
+ @Container
+ static GenericContainer> redis1 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
+ .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes");
+
+ @Container
+ static GenericContainer> redis2 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
+ .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes");
+
+ @Container
+ static GenericContainer> redis3 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine"))
+ .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes");
+
+ private static RedlockConfiguration testConfiguration;
+
+ @BeforeAll
+ static void setUp() {
+ testConfiguration = RedlockConfiguration.builder().addRedisNode("localhost", redis1.getMappedPort(6379))
+ .addRedisNode("localhost", redis2.getMappedPort(6379))
+ .addRedisNode("localhost", redis3.getMappedPort(6379)).defaultLockTimeout(Duration.ofSeconds(30))
+ .retryDelay(Duration.ofMillis(50)).maxRetryAttempts(10).lockAcquisitionTimeout(Duration.ofSeconds(10))
+ .build();
+ }
+
+ // ========== Initialization ==========
+
+ @Test
+ void shouldCreateSemaphoreWithPositivePermits() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockSemaphore semaphore = manager.createSemaphore("init-test", 5);
+ assertNotNull(semaphore);
+ }
+ }
+
+ @Test
+ void shouldRejectZeroPermits() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ assertThrows(IllegalArgumentException.class, () -> manager.createSemaphore("zero-permits", 0));
+ }
+ }
+
+ @Test
+ void shouldRejectNegativePermits() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ assertThrows(IllegalArgumentException.class, () -> manager.createSemaphore("negative-permits", -1));
+ }
+ }
+
+ // ========== acquire() / release() ==========
+
+ @Test
+ void shouldAcquireAndReleaseSinglePermit() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockSemaphore semaphore = manager.createSemaphore("single-permit", 3);
+ semaphore.acquire();
+ semaphore.release();
+ }
+ }
+
+ @Test
+ void shouldAcquireMultiplePermits() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockSemaphore semaphore = manager.createSemaphore("multi-permit", 5);
+ semaphore.acquire(3);
+ semaphore.release(3);
+ }
+ }
+
+ // ========== tryAcquire() ==========
+
+ @Test
+ void tryAcquireShouldReturnTrueWhenPermitAvailable() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockSemaphore semaphore = manager.createSemaphore("try-acquire", 2);
+ assertTrue(semaphore.tryAcquire(1, TimeUnit.SECONDS));
+ semaphore.release();
+ }
+ }
+
+ @Test
+ void sameThreadCannotAcquireMultipleTimes() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockSemaphore semaphore = manager.createSemaphore("same-thread", 5);
+ assertTrue(semaphore.tryAcquire(1, TimeUnit.SECONDS));
+ // Same thread should fail to acquire again while holding
+ assertFalse(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS));
+ semaphore.release();
+ // After release, can acquire again
+ assertTrue(semaphore.tryAcquire(1, TimeUnit.SECONDS));
+ semaphore.release();
+ }
+ }
+
+ @Test
+ void tryAcquireWithZeroTimeoutShouldReturnImmediately() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockSemaphore semaphore = manager.createSemaphore("zero-timeout", 1);
+ assertTrue(semaphore.tryAcquire());
+ semaphore.release();
+ }
+ }
+
+ // ========== Permit Validation ==========
+
+ @Test
+ void shouldRejectAcquiringMorePermitsThanMax() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockSemaphore semaphore = manager.createSemaphore("exceed-max", 3);
+ assertThrows(IllegalArgumentException.class, () -> semaphore.tryAcquire(5, 1, TimeUnit.SECONDS));
+ }
+ }
+
+ @Test
+ void shouldRejectAcquiringZeroPermits() {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ RedlockSemaphore semaphore = manager.createSemaphore("zero-acquire", 3);
+ assertThrows(IllegalArgumentException.class, () -> semaphore.tryAcquire(0, 1, TimeUnit.SECONDS));
+ }
+ }
+
+ // ========== Concurrent Access ==========
+
+ @Test
+ void shouldAllowMultipleThreadsToAcquirePermits() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ int threadCount = 5;
+ RedlockSemaphore semaphore = manager.createSemaphore("multi-thread-acq", 10);
+
+ AtomicInteger successCount = new AtomicInteger(0);
+ CountDownLatch startSignal = new CountDownLatch(1);
+ CountDownLatch doneSignal = new CountDownLatch(threadCount);
+
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ for (int i = 0; i < threadCount; i++) {
+ executor.submit(() -> {
+ try {
+ startSignal.await();
+ if (semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
+ try {
+ successCount.incrementAndGet();
+ Thread.sleep(100);
+ } finally {
+ semaphore.release();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ doneSignal.countDown();
+ }
+ });
+ }
+
+ startSignal.countDown();
+ assertTrue(doneSignal.await(30, TimeUnit.SECONDS), "All threads should complete");
+ assertEquals(threadCount, successCount.get(), "All threads should acquire");
+ executor.shutdown();
+ }
+ }
+
+ @Test
+ void shouldHandleRapidAcquireReleaseCycles() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ int cycles = 10;
+ int threadCount = 3;
+ RedlockSemaphore semaphore = manager.createSemaphore("rapid-cycles", 5);
+
+ AtomicInteger totalAcquisitions = new AtomicInteger(0);
+ CountDownLatch done = new CountDownLatch(threadCount);
+
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ for (int i = 0; i < threadCount; i++) {
+ executor.submit(() -> {
+ try {
+ for (int j = 0; j < cycles; j++) {
+ if (semaphore.tryAcquire(2, TimeUnit.SECONDS)) {
+ totalAcquisitions.incrementAndGet();
+ Thread.sleep(20);
+ semaphore.release();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ assertTrue(done.await(60, TimeUnit.SECONDS), "All threads should complete");
+ assertTrue(totalAcquisitions.get() > 0, "Should have successful acquisitions");
+ executor.shutdown();
+ }
+ }
+
+ @Test
+ void shouldAllowConcurrentAcquisitionsFromDifferentThreads() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) {
+ int threadCount = 4;
+ RedlockSemaphore semaphore = manager.createSemaphore("concurrent-diff-threads", 10);
+
+ AtomicInteger acquired = new AtomicInteger(0);
+ CountDownLatch allAcquired = new CountDownLatch(threadCount);
+ CountDownLatch releaseSignal = new CountDownLatch(1);
+ CountDownLatch allDone = new CountDownLatch(threadCount);
+
+ for (int i = 0; i < threadCount; i++) {
+ new Thread(() -> {
+ try {
+ if (semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
+ acquired.incrementAndGet();
+ allAcquired.countDown();
+ releaseSignal.await();
+ semaphore.release();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ allDone.countDown();
+ }
+ }).start();
+ }
+
+ assertTrue(allAcquired.await(10, TimeUnit.SECONDS), "All should acquire");
+ assertEquals(threadCount, acquired.get(), "All threads should hold permits");
+
+ releaseSignal.countDown();
+ assertTrue(allDone.await(5, TimeUnit.SECONDS));
+ }
+ }
+
+ // ========== Lettuce Driver ==========
+
+ @Test
+ void shouldWorkWithLettuceDriver() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withLettuce(testConfiguration)) {
+ RedlockSemaphore semaphore = manager.createSemaphore("lettuce-sem", 3);
+ assertTrue(semaphore.tryAcquire(2, TimeUnit.SECONDS));
+ semaphore.release();
+ }
+ }
+
+ @Test
+ void shouldHandleConcurrencyWithLettuce() throws InterruptedException {
+ try (RedlockManager manager = RedlockManager.withLettuce(testConfiguration)) {
+ int threadCount = 4;
+ RedlockSemaphore semaphore = manager.createSemaphore("lettuce-concurrent", 10);
+
+ AtomicInteger successCount = new AtomicInteger(0);
+ CountDownLatch done = new CountDownLatch(threadCount);
+
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ for (int i = 0; i < threadCount; i++) {
+ executor.submit(() -> {
+ try {
+ if (semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
+ try {
+ successCount.incrementAndGet();
+ Thread.sleep(100);
+ } finally {
+ semaphore.release();
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+
+ assertTrue(done.await(30, TimeUnit.SECONDS));
+ assertEquals(threadCount, successCount.get(), "All threads should acquire with Lettuce");
+ executor.shutdown();
+ }
+ }
+}