From 0062000cd04c0cf1fa89a246681408f5ffd8b9c5 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Fri, 27 Feb 2026 22:19:35 +0200 Subject: [PATCH 1/2] Extending the test suite --- .../redlock4j/RedlockCountDownLatch.java | 183 ++++--- .../redlock4j/driver/JedisRedisDriver.java | 14 + .../redlock4j/driver/LettuceRedisDriver.java | 14 + .../redlock4j/driver/RedisDriver.java | 16 + .../org/codarama/redlock4j/FairLockTest.java | 147 +++++ .../codarama/redlock4j/LockResultTest.java | 93 ++++ .../org/codarama/redlock4j/MultiLockTest.java | 145 +++++ .../redlock4j/RedlockCountDownLatchTest.java | 164 ++++++ .../redlock4j/RedlockExceptionTest.java | 80 +++ .../redlock4j/RedlockReadWriteLockTest.java | 159 ++++++ .../redlock4j/RedlockSemaphoreTest.java | 161 ++++++ .../redlock4j/async/RxRedlockTest.java | 159 ++++++ .../integration/FairLockIntegrationTest.java | 302 +++++++++++ .../integration/MultiLockIntegrationTest.java | 301 +++++++++++ .../RedlockCountDownLatchIntegrationTest.java | 504 ++++++++++++++++++ .../RedlockReadWriteLockIntegrationTest.java | 332 ++++++++++++ .../RedlockSemaphoreIntegrationTest.java | 307 +++++++++++ 17 files changed, 3002 insertions(+), 79 deletions(-) create mode 100644 src/test/java/org/codarama/redlock4j/FairLockTest.java create mode 100644 src/test/java/org/codarama/redlock4j/LockResultTest.java create mode 100644 src/test/java/org/codarama/redlock4j/MultiLockTest.java create mode 100644 src/test/java/org/codarama/redlock4j/RedlockCountDownLatchTest.java create mode 100644 src/test/java/org/codarama/redlock4j/RedlockExceptionTest.java create mode 100644 src/test/java/org/codarama/redlock4j/RedlockReadWriteLockTest.java create mode 100644 src/test/java/org/codarama/redlock4j/RedlockSemaphoreTest.java create mode 100644 src/test/java/org/codarama/redlock4j/async/RxRedlockTest.java create mode 100644 src/test/java/org/codarama/redlock4j/integration/FairLockIntegrationTest.java create mode 100644 src/test/java/org/codarama/redlock4j/integration/MultiLockIntegrationTest.java create mode 100644 src/test/java/org/codarama/redlock4j/integration/RedlockCountDownLatchIntegrationTest.java create mode 100644 src/test/java/org/codarama/redlock4j/integration/RedlockReadWriteLockIntegrationTest.java create mode 100644 src/test/java/org/codarama/redlock4j/integration/RedlockSemaphoreIntegrationTest.java diff --git a/src/main/java/org/codarama/redlock4j/RedlockCountDownLatch.java b/src/main/java/org/codarama/redlock4j/RedlockCountDownLatch.java index 968a06f..3ef1683 100644 --- a/src/main/java/org/codarama/redlock4j/RedlockCountDownLatch.java +++ b/src/main/java/org/codarama/redlock4j/RedlockCountDownLatch.java @@ -9,10 +9,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * A distributed countdown latch that allows one or more threads to wait until a set of operations being performed in @@ -169,77 +173,96 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { /** * Decrements the count of the latch, releasing all waiting threads if the count reaches zero. - * + * *

* If the current count is greater than zero then it is decremented. If the new count is zero then all waiting * threads are re-enabled for thread scheduling purposes. *

- * + * *

* If the current count equals zero then nothing happens. *

*/ public void countDown() { - // Decrement the count atomically using Redis DECR - int successfulNodes = 0; - long newCount = -1; + int quorum = config.getQuorum(); + CountDownLatch quorumLatch = new CountDownLatch(1); + AtomicInteger successCount = new AtomicInteger(0); + // Execute atomic decrement + conditional publish on all nodes in parallel for (RedisDriver driver : redisDrivers) { - try { - // Atomically decrement the count - long count = driver.decr(latchKey); - newCount = count; - successfulNodes++; - - logger.debug("Decremented latch {} count to {} on {}", latchKey, count, driver.getIdentifier()); - } catch (Exception e) { - logger.debug("Failed to decrement latch count on {}: {}", driver.getIdentifier(), e.getMessage()); - } + CompletableFuture.runAsync(() -> { + try { + // Atomic: decrement and publish if zero in single Lua script + long count = driver.decrAndPublishIfZero(latchKey, channelKey, "zero"); + logger.debug("Decremented latch {} count to {} on {}", latchKey, count, driver.getIdentifier()); + if (successCount.incrementAndGet() >= quorum) { + quorumLatch.countDown(); // Signal quorum reached + } + } catch (Exception e) { + logger.debug("Failed to decrement latch count on {}: {}", driver.getIdentifier(), e.getMessage()); + } + }); } - if (successfulNodes >= config.getQuorum()) { + // Wait for quorum (not all nodes) + try { + quorumLatch.await(); logger.debug("Successfully decremented latch {} count on quorum", latchKey); - - // If count reached zero, publish notification - if (newCount <= 0) { - publishZeroNotification(); - } - } else { - logger.warn("Failed to decrement latch {} count on quorum of nodes", latchKey); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("Interrupted while decrementing latch {}", latchKey); } } /** * Returns the current count. - * + * *

* This method is typically used for debugging and testing purposes. *

- * + * * @return the current count */ public long getCount() { - // Use Redis GET to retrieve the current count - int successfulReads = 0; - long totalCount = 0; + int quorum = config.getQuorum(); + CountDownLatch quorumLatch = new CountDownLatch(1); + List results = new ArrayList<>(); + // Execute GET on all nodes in parallel for (RedisDriver driver : redisDrivers) { - try { - String countStr = driver.get(latchKey); - if (countStr != null) { - long count = Long.parseLong(countStr); - totalCount += count; - successfulReads++; + CompletableFuture.runAsync(() -> { + try { + String countStr = driver.get(latchKey); + if (countStr != null) { + synchronized (results) { + results.add(Long.parseLong(countStr)); + if (results.size() >= quorum) { + quorumLatch.countDown(); // Signal quorum reached + } + } + } + } catch (Exception e) { + logger.debug("Failed to read latch count from {}: {}", driver.getIdentifier(), e.getMessage()); } - } catch (Exception e) { - logger.debug("Failed to read latch count from {}: {}", driver.getIdentifier(), e.getMessage()); - } + }); } - if (successfulReads >= config.getQuorum()) { - // Return average count (simple approach, could use median for better accuracy) - long avgCount = totalCount / successfulReads; - return Math.max(0, avgCount); // Never return negative + // Wait for quorum (not all nodes) + try { + quorumLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("Interrupted while reading latch {} count", latchKey); + return 0; + } + + synchronized (results) { + if (results.size() >= quorum) { + // Return average count + long totalCount = results.stream().mapToLong(Long::longValue).sum(); + long avgCount = totalCount / results.size(); + return Math.max(0, avgCount); // Never return negative + } } logger.warn("Failed to read latch {} count from quorum of nodes", latchKey); @@ -247,28 +270,41 @@ public long getCount() { } /** - * Initializes the latch count in Redis. + * Initializes the latch count in Redis using parallel operations with early quorum return. */ private void initializeLatch(int count) { String countValue = String.valueOf(count); - int successfulNodes = 0; + long expirationMs = config.getDefaultLockTimeoutMs() * 10; + int quorum = config.getQuorum(); + // Latch to signal when quorum is reached + CountDownLatch quorumLatch = new CountDownLatch(1); + AtomicInteger successCount = new AtomicInteger(0); + + // Execute initialization on all nodes in parallel + List> futures = new ArrayList<>(redisDrivers.size()); for (RedisDriver driver : redisDrivers) { - try { - // Use setex to initialize with a long expiration (10x lock timeout) - driver.setex(latchKey, countValue, config.getDefaultLockTimeoutMs() * 10); - successfulNodes++; - } catch (Exception e) { - logger.warn("Failed to initialize latch on {}: {}", driver.getIdentifier(), e.getMessage()); - } + futures.add(CompletableFuture.supplyAsync(() -> { + try { + driver.setex(latchKey, countValue, expirationMs); + if (successCount.incrementAndGet() >= quorum) { + quorumLatch.countDown(); // Signal quorum reached + } + return true; + } catch (Exception e) { + logger.warn("Failed to initialize latch on {}: {}", driver.getIdentifier(), e.getMessage()); + return false; + } + })); } - if (successfulNodes < config.getQuorum()) { - logger.warn("Failed to initialize latch {} on quorum of nodes (only {} of {} succeeded)", latchKey, - successfulNodes, redisDrivers.size()); - } else { - logger.debug("Successfully initialized latch {} with count {} on {} nodes", latchKey, count, - successfulNodes); + // Wait for quorum (not all nodes) + try { + quorumLatch.await(); + logger.debug("Successfully initialized latch {} with count {} on quorum", latchKey, count); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("Interrupted while initializing latch {}", latchKey); } } @@ -309,29 +345,14 @@ public void onError(Throwable error) { } } - /** - * Publishes a notification that the latch has reached zero. - */ - private void publishZeroNotification() { - for (RedisDriver driver : redisDrivers) { - try { - long subscribers = driver.publish(channelKey, "zero"); - logger.debug("Published zero notification for latch {} to {} subscribers on {}", latchKey, subscribers, - driver.getIdentifier()); - } catch (Exception e) { - logger.debug("Failed to publish zero notification on {}: {}", driver.getIdentifier(), e.getMessage()); - } - } - } - /** * Resets the latch to its initial count. - * + * *

* Warning: This is not part of the standard CountDownLatch API and should be used with caution. It's * provided for scenarios where you need to reuse a latch. *

- * + * *

* This operation is not atomic and may lead to race conditions if called while other threads are waiting or * counting down. @@ -340,14 +361,18 @@ private void publishZeroNotification() { public void reset() { logger.debug("Resetting latch {} to initial count {}", latchKey, initialCount); - // Delete the existing latch using DEL + // Delete the existing latch using parallel DEL + List> futures = new ArrayList<>(redisDrivers.size()); for (RedisDriver driver : redisDrivers) { - try { - driver.del(latchKey); - } catch (Exception e) { - logger.warn("Failed to delete latch on {}: {}", driver.getIdentifier(), e.getMessage()); - } + futures.add(CompletableFuture.runAsync(() -> { + try { + driver.del(latchKey); + } catch (Exception e) { + logger.warn("Failed to delete latch on {}: {}", driver.getIdentifier(), e.getMessage()); + } + })); } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); // Reset the local latch localLatch = new CountDownLatch(1); diff --git a/src/main/java/org/codarama/redlock4j/driver/JedisRedisDriver.java b/src/main/java/org/codarama/redlock4j/driver/JedisRedisDriver.java index 28ca816..d77501c 100644 --- a/src/main/java/org/codarama/redlock4j/driver/JedisRedisDriver.java +++ b/src/main/java/org/codarama/redlock4j/driver/JedisRedisDriver.java @@ -39,6 +39,9 @@ public class JedisRedisDriver implements RedisDriver { private static final String SET_IF_VALUE_MATCHES_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " + " return redis.call('set', KEYS[1], ARGV[2], 'PX', ARGV[3]) " + "else " + " return nil " + "end"; + private static final String DECR_AND_PUBLISH_IF_ZERO_SCRIPT = "local v = redis.call('decr', KEYS[1]); " + + "if v <= 0 then redis.call('publish', KEYS[2], ARGV[1]) end; " + "return v"; + /** * Strategy for CAS/CAD operations. */ @@ -291,6 +294,17 @@ public long decr(String key) throws RedisDriverException { } } + @Override + public long decrAndPublishIfZero(String key, String channel, String message) throws RedisDriverException { + try (Jedis jedis = jedisPool.getResource()) { + Object result = jedis.eval(DECR_AND_PUBLISH_IF_ZERO_SCRIPT, java.util.Arrays.asList(key, channel), + Collections.singletonList(message)); + return result != null ? ((Number) result).longValue() : 0; + } catch (JedisException e) { + throw new RedisDriverException("Failed to execute DECR_AND_PUBLISH script on " + identifier, e); + } + } + @Override public String get(String key) throws RedisDriverException { try (Jedis jedis = jedisPool.getResource()) { diff --git a/src/main/java/org/codarama/redlock4j/driver/LettuceRedisDriver.java b/src/main/java/org/codarama/redlock4j/driver/LettuceRedisDriver.java index c1cb6ab..466d2c9 100644 --- a/src/main/java/org/codarama/redlock4j/driver/LettuceRedisDriver.java +++ b/src/main/java/org/codarama/redlock4j/driver/LettuceRedisDriver.java @@ -41,6 +41,9 @@ public class LettuceRedisDriver implements RedisDriver { private static final String SET_IF_VALUE_MATCHES_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " + " return redis.call('set', KEYS[1], ARGV[2], 'PX', ARGV[3]) " + "else " + " return nil " + "end"; + private static final String DECR_AND_PUBLISH_IF_ZERO_SCRIPT = "local v = redis.call('decr', KEYS[1]); " + + "if v <= 0 then redis.call('publish', KEYS[2], ARGV[1]) end; " + "return v"; + /** * Strategy for CAS/CAD operations. */ @@ -315,6 +318,17 @@ public long decr(String key) throws RedisDriverException { } } + @Override + public long decrAndPublishIfZero(String key, String channel, String message) throws RedisDriverException { + try { + Object result = commands.eval(DECR_AND_PUBLISH_IF_ZERO_SCRIPT, io.lettuce.core.ScriptOutputType.INTEGER, + new String[]{key, channel}, message); + return result != null ? ((Number) result).longValue() : 0; + } catch (Exception e) { + throw new RedisDriverException("Failed to execute DECR_AND_PUBLISH script on " + identifier, e); + } + } + @Override public String get(String key) throws RedisDriverException { try { diff --git a/src/main/java/org/codarama/redlock4j/driver/RedisDriver.java b/src/main/java/org/codarama/redlock4j/driver/RedisDriver.java index cc4f50d..426af41 100644 --- a/src/main/java/org/codarama/redlock4j/driver/RedisDriver.java +++ b/src/main/java/org/codarama/redlock4j/driver/RedisDriver.java @@ -205,6 +205,22 @@ boolean setIfValueMatches(String key, String newValue, String expectedCurrentVal */ long decr(String key) throws RedisDriverException; + /** + * Atomically decrements the value of a key and publishes a message to a channel if the new value is zero or less. + * This combines DECR and conditional PUBLISH into a single atomic operation. + * + * @param key + * the key to decrement + * @param channel + * the channel to publish to if count reaches zero + * @param message + * the message to publish + * @return the value after decrementing + * @throws RedisDriverException + * if there's an error communicating with Redis + */ + long decrAndPublishIfZero(String key, String channel, String message) throws RedisDriverException; + /** * Gets the value of a key. * diff --git a/src/test/java/org/codarama/redlock4j/FairLockTest.java b/src/test/java/org/codarama/redlock4j/FairLockTest.java new file mode 100644 index 0000000..2f30274 --- /dev/null +++ b/src/test/java/org/codarama/redlock4j/FairLockTest.java @@ -0,0 +1,147 @@ +/* + * SPDX-License-Identifier: MIT + * Copyright (c) 2025 Codarama + */ +package org.codarama.redlock4j; + +import org.codarama.redlock4j.configuration.RedlockConfiguration; +import org.codarama.redlock4j.driver.RedisDriver; +import org.codarama.redlock4j.driver.RedisDriverException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +/** + * Unit tests for FairLock using Mockito mocks. + */ +@ExtendWith(MockitoExtension.class) +public class FairLockTest { + + @Mock + private RedisDriver mockDriver1; + + @Mock + private RedisDriver mockDriver2; + + @Mock + private RedisDriver mockDriver3; + + private RedlockConfiguration testConfig; + private List drivers; + + @BeforeEach + void setUp() { + testConfig = RedlockConfiguration.builder().addRedisNode("localhost", 6379).addRedisNode("localhost", 6380) + .addRedisNode("localhost", 6381).defaultLockTimeout(Duration.ofSeconds(30)) + .retryDelay(Duration.ofMillis(10)).maxRetryAttempts(3).lockAcquisitionTimeout(Duration.ofSeconds(10)) + .build(); + + drivers = Arrays.asList(mockDriver1, mockDriver2, mockDriver3); + + lenient().when(mockDriver1.getIdentifier()).thenReturn("redis://localhost:6379"); + lenient().when(mockDriver2.getIdentifier()).thenReturn("redis://localhost:6380"); + lenient().when(mockDriver3.getIdentifier()).thenReturn("redis://localhost:6381"); + } + + // ========== Basic Acquisition ========== + + @Test + void shouldAcquireLockWhenAtFrontOfQueueAndQuorumSucceeds() throws RedisDriverException, InterruptedException { + setupSuccessfulAcquisition(); + + FairLock lock = new FairLock("test-fair", drivers, testConfig); + boolean acquired = lock.tryLock(1, TimeUnit.SECONDS); + + assertTrue(acquired); + assertTrue(lock.isHeldByCurrentThread()); + assertTrue(lock.getHoldCount() > 0); + } + + @Test + void shouldFailWhenNotAtFrontOfQueue() throws RedisDriverException, InterruptedException { + // Mock: add to queue succeeds + when(mockDriver1.zAdd(anyString(), anyDouble(), anyString())).thenReturn(true); + when(mockDriver2.zAdd(anyString(), anyDouble(), anyString())).thenReturn(true); + when(mockDriver3.zAdd(anyString(), anyDouble(), anyString())).thenReturn(true); + + // Mock: someone else is at front + when(mockDriver1.zRange(anyString(), eq(0L), eq(0L))).thenReturn(Collections.singletonList("other-token")); + when(mockDriver2.zRange(anyString(), eq(0L), eq(0L))).thenReturn(Collections.singletonList("other-token")); + when(mockDriver3.zRange(anyString(), eq(0L), eq(0L))).thenReturn(Collections.singletonList("other-token")); + + FairLock lock = new FairLock("test-fair", drivers, testConfig); + + boolean acquired = lock.tryLock(100, TimeUnit.MILLISECONDS); + + assertFalse(acquired); + assertFalse(lock.isHeldByCurrentThread()); + } + + // ========== Reentrancy ========== + + @Test + void shouldSupportReentrantAcquisition() throws RedisDriverException, InterruptedException { + setupSuccessfulAcquisition(); + + FairLock lock = new FairLock("test-reentrant", drivers, testConfig); + + assertTrue(lock.tryLock(1, TimeUnit.SECONDS)); + assertEquals(1, lock.getHoldCount()); + + // Reentrant acquisition should succeed without Redis calls + assertTrue(lock.tryLock(1, TimeUnit.SECONDS)); + assertEquals(2, lock.getHoldCount()); + + lock.unlock(); + assertEquals(1, lock.getHoldCount()); + assertTrue(lock.isHeldByCurrentThread()); + + lock.unlock(); + assertEquals(0, lock.getHoldCount()); + assertFalse(lock.isHeldByCurrentThread()); + } + + // ========== Utility Methods ========== + + @Test + void shouldReportZeroValidityTimeWhenNotHeld() { + FairLock lock = new FairLock("test-validity", drivers, testConfig); + assertEquals(0, lock.getRemainingValidityTime()); + } + + @Test + void shouldThrowOnNewCondition() { + FairLock lock = new FairLock("test-condition", drivers, testConfig); + assertThrows(UnsupportedOperationException.class, lock::newCondition); + } + + private void setupSuccessfulAcquisition() throws RedisDriverException { + when(mockDriver1.zAdd(anyString(), anyDouble(), anyString())).thenAnswer(inv -> { + String token = inv.getArgument(2); + lenient().when(mockDriver1.zRange(anyString(), eq(0L), eq(0L))) + .thenReturn(Collections.singletonList(token)); + lenient().when(mockDriver2.zRange(anyString(), eq(0L), eq(0L))) + .thenReturn(Collections.singletonList(token)); + lenient().when(mockDriver3.zRange(anyString(), eq(0L), eq(0L))) + .thenReturn(Collections.singletonList(token)); + return true; + }); + when(mockDriver2.zAdd(anyString(), anyDouble(), anyString())).thenReturn(true); + when(mockDriver3.zAdd(anyString(), anyDouble(), anyString())).thenReturn(true); + when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + } +} diff --git a/src/test/java/org/codarama/redlock4j/LockResultTest.java b/src/test/java/org/codarama/redlock4j/LockResultTest.java new file mode 100644 index 0000000..9ecdbd0 --- /dev/null +++ b/src/test/java/org/codarama/redlock4j/LockResultTest.java @@ -0,0 +1,93 @@ +/* + * SPDX-License-Identifier: MIT + * Copyright (c) 2025 Codarama + */ +package org.codarama.redlock4j; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for LockResult value object. + */ +public class LockResultTest { + + @Test + void shouldStoreAcquiredState() { + LockResult result = new LockResult(true, 5000L, "lock-value-123", 2, 3); + + assertTrue(result.isAcquired()); + } + + @Test + void shouldStoreNotAcquiredState() { + LockResult result = new LockResult(false, 0L, "lock-value-123", 1, 3); + + assertFalse(result.isAcquired()); + } + + @Test + void shouldStoreValidityTime() { + LockResult result = new LockResult(true, 29500L, "lock-value-123", 3, 3); + + assertEquals(29500L, result.getValidityTimeMs()); + } + + @Test + void shouldStoreLockValue() { + LockResult result = new LockResult(true, 5000L, "unique-lock-value", 2, 3); + + assertEquals("unique-lock-value", result.getLockValue()); + } + + @Test + void shouldStoreNodeCounts() { + LockResult result = new LockResult(true, 5000L, "lock-value", 2, 3); + + assertEquals(2, result.getSuccessfulNodes()); + assertEquals(3, result.getTotalNodes()); + } + + @Test + void shouldHandleAllNodesSuccess() { + LockResult result = new LockResult(true, 5000L, "lock-value", 5, 5); + + assertEquals(5, result.getSuccessfulNodes()); + assertEquals(5, result.getTotalNodes()); + } + + @Test + void shouldHandleZeroValidityTime() { + LockResult result = new LockResult(false, 0L, "lock-value", 1, 3); + + assertEquals(0L, result.getValidityTimeMs()); + } + + @Test + void shouldHandleNullLockValue() { + LockResult result = new LockResult(false, 0L, null, 0, 3); + + assertNull(result.getLockValue()); + } + + @Test + void toStringShouldContainAllFields() { + LockResult result = new LockResult(true, 5000L, "lock-value", 2, 3); + String str = result.toString(); + + assertTrue(str.contains("acquired=true")); + assertTrue(str.contains("validityTimeMs=5000")); + assertTrue(str.contains("successfulNodes=2")); + assertTrue(str.contains("totalNodes=3")); + } + + @Test + void toStringShouldWorkForFailedAcquisition() { + LockResult result = new LockResult(false, 0L, "lock-value", 1, 3); + String str = result.toString(); + + assertTrue(str.contains("acquired=false")); + assertTrue(str.contains("validityTimeMs=0")); + } +} diff --git a/src/test/java/org/codarama/redlock4j/MultiLockTest.java b/src/test/java/org/codarama/redlock4j/MultiLockTest.java new file mode 100644 index 0000000..49a4052 --- /dev/null +++ b/src/test/java/org/codarama/redlock4j/MultiLockTest.java @@ -0,0 +1,145 @@ +/* + * SPDX-License-Identifier: MIT + * Copyright (c) 2025 Codarama + */ +package org.codarama.redlock4j; + +import org.codarama.redlock4j.configuration.RedlockConfiguration; +import org.codarama.redlock4j.driver.RedisDriver; +import org.codarama.redlock4j.driver.RedisDriverException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +/** + * Unit tests for MultiLock using Mockito mocks. + */ +@ExtendWith(MockitoExtension.class) +public class MultiLockTest { + + @Mock + private RedisDriver mockDriver1; + + @Mock + private RedisDriver mockDriver2; + + @Mock + private RedisDriver mockDriver3; + + private RedlockConfiguration testConfig; + private List drivers; + + @BeforeEach + void setUp() { + testConfig = RedlockConfiguration.builder().addRedisNode("localhost", 6379).addRedisNode("localhost", 6380) + .addRedisNode("localhost", 6381).defaultLockTimeout(Duration.ofSeconds(30)) + .retryDelay(Duration.ofMillis(10)).maxRetryAttempts(3).lockAcquisitionTimeout(Duration.ofSeconds(10)) + .build(); + + drivers = Arrays.asList(mockDriver1, mockDriver2, mockDriver3); + + lenient().when(mockDriver1.getIdentifier()).thenReturn("redis://localhost:6379"); + lenient().when(mockDriver2.getIdentifier()).thenReturn("redis://localhost:6380"); + lenient().when(mockDriver3.getIdentifier()).thenReturn("redis://localhost:6381"); + } + + // ========== Validation ========== + + @Test + void shouldRejectNullKeyList() { + assertThrows(IllegalArgumentException.class, () -> new MultiLock(null, drivers, testConfig)); + } + + @Test + void shouldRejectEmptyKeyList() { + assertThrows(IllegalArgumentException.class, () -> new MultiLock(Arrays.asList(), drivers, testConfig)); + } + + // ========== Acquisition ========== + + @Test + void shouldAcquireAllLocksWhenQuorumSucceeds() throws RedisDriverException, InterruptedException { + when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + + MultiLock lock = new MultiLock(Arrays.asList("key1", "key2", "key3"), drivers, testConfig); + boolean acquired = lock.tryLock(1, TimeUnit.SECONDS); + + assertTrue(acquired); + + // Verify all keys were locked on all drivers + verify(mockDriver1, times(3)).setIfNotExists(anyString(), anyString(), eq(30000L)); + verify(mockDriver2, times(3)).setIfNotExists(anyString(), anyString(), eq(30000L)); + } + + @Test + void shouldRollbackOnPartialFailure() throws RedisDriverException, InterruptedException { + // First key succeeds, second fails on driver1 + when(mockDriver1.setIfNotExists(eq("key1"), anyString(), anyLong())).thenReturn(true); + when(mockDriver1.setIfNotExists(eq("key2"), anyString(), anyLong())).thenReturn(false); + // All fail on driver2 and driver3 for simplicity + when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(false); + when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(false); + + MultiLock lock = new MultiLock(Arrays.asList("key1", "key2"), drivers, testConfig); + boolean acquired = lock.tryLock(100, TimeUnit.MILLISECONDS); + + assertFalse(acquired); + + // Verify rollback was attempted for key1 on driver1 + verify(mockDriver1, atLeastOnce()).deleteIfValueMatches(eq("key1"), anyString()); + } + + // ========== Key Ordering ========== + + @Test + void shouldSortKeysToPreventDeadlock() throws RedisDriverException, InterruptedException { + when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + + // Pass keys in reverse order + MultiLock lock = new MultiLock(Arrays.asList("z", "a", "m"), drivers, testConfig); + lock.tryLock(1, TimeUnit.SECONDS); + + // Verify keys are acquired in sorted order: a, m, z + org.mockito.InOrder order = inOrder(mockDriver1); + order.verify(mockDriver1).setIfNotExists(eq("a"), anyString(), anyLong()); + order.verify(mockDriver1).setIfNotExists(eq("m"), anyString(), anyLong()); + order.verify(mockDriver1).setIfNotExists(eq("z"), anyString(), anyLong()); + } + + @Test + void shouldDeduplicateKeys() throws RedisDriverException, InterruptedException { + when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + + // Pass duplicate keys + MultiLock lock = new MultiLock(Arrays.asList("key1", "key1", "key2"), drivers, testConfig); + lock.tryLock(1, TimeUnit.SECONDS); + + // Should only lock 2 unique keys + verify(mockDriver1, times(2)).setIfNotExists(anyString(), anyString(), anyLong()); + } + + // ========== Utility ========== + + @Test + void shouldThrowOnNewCondition() { + MultiLock lock = new MultiLock(Arrays.asList("key1"), drivers, testConfig); + assertThrows(UnsupportedOperationException.class, lock::newCondition); + } +} diff --git a/src/test/java/org/codarama/redlock4j/RedlockCountDownLatchTest.java b/src/test/java/org/codarama/redlock4j/RedlockCountDownLatchTest.java new file mode 100644 index 0000000..b75e372 --- /dev/null +++ b/src/test/java/org/codarama/redlock4j/RedlockCountDownLatchTest.java @@ -0,0 +1,164 @@ +/* + * SPDX-License-Identifier: MIT + * Copyright (c) 2025 Codarama + */ +package org.codarama.redlock4j; + +import org.codarama.redlock4j.configuration.RedlockConfiguration; +import org.codarama.redlock4j.driver.RedisDriver; +import org.codarama.redlock4j.driver.RedisDriverException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +/** + * Unit tests for RedlockCountDownLatch using Mockito mocks. + */ +@ExtendWith(MockitoExtension.class) +public class RedlockCountDownLatchTest { + + @Mock + private RedisDriver mockDriver1; + + @Mock + private RedisDriver mockDriver2; + + @Mock + private RedisDriver mockDriver3; + + private RedlockConfiguration testConfig; + private List drivers; + + @BeforeEach + void setUp() throws RedisDriverException { + testConfig = RedlockConfiguration.builder().addRedisNode("localhost", 6379).addRedisNode("localhost", 6380) + .addRedisNode("localhost", 6381).defaultLockTimeout(Duration.ofSeconds(30)) + .retryDelay(Duration.ofMillis(10)).maxRetryAttempts(3).lockAcquisitionTimeout(Duration.ofSeconds(10)) + .build(); + + drivers = Arrays.asList(mockDriver1, mockDriver2, mockDriver3); + + lenient().when(mockDriver1.getIdentifier()).thenReturn("redis://localhost:6379"); + lenient().when(mockDriver2.getIdentifier()).thenReturn("redis://localhost:6380"); + lenient().when(mockDriver3.getIdentifier()).thenReturn("redis://localhost:6381"); + + // Default setex for initialization (void method - use doNothing) + lenient().doNothing().when(mockDriver1).setex(anyString(), anyString(), anyLong()); + lenient().doNothing().when(mockDriver2).setex(anyString(), anyString(), anyLong()); + lenient().doNothing().when(mockDriver3).setex(anyString(), anyString(), anyLong()); + } + + // ========== Validation ========== + + @Test + void shouldRejectNegativeCount() { + assertThrows(IllegalArgumentException.class, () -> new RedlockCountDownLatch("test", -1, drivers, testConfig)); + } + + @Test + void shouldAllowZeroCount() throws RedisDriverException { + // Zero is allowed + RedlockCountDownLatch latch = new RedlockCountDownLatch("test", 0, drivers, testConfig); + assertNotNull(latch); + } + + // ========== getCount ========== + + @Test + void shouldReturnCountFromQuorum() throws RedisDriverException, InterruptedException { + when(mockDriver1.get(anyString())).thenReturn("5"); + when(mockDriver2.get(anyString())).thenReturn("5"); + when(mockDriver3.get(anyString())).thenReturn("5"); + + RedlockCountDownLatch latch = new RedlockCountDownLatch("test", 5, drivers, testConfig); + long count = latch.getCount(); + + assertEquals(5, count); + } + + // ========== countDown ========== + + @Test + void shouldCallDecrAndPublishOnAllNodes() throws RedisDriverException, InterruptedException { + when(mockDriver1.decrAndPublishIfZero(anyString(), anyString(), anyString())).thenReturn(4L); + when(mockDriver2.decrAndPublishIfZero(anyString(), anyString(), anyString())).thenReturn(4L); + when(mockDriver3.decrAndPublishIfZero(anyString(), anyString(), anyString())).thenReturn(4L); + + RedlockCountDownLatch latch = new RedlockCountDownLatch("test", 5, drivers, testConfig); + latch.countDown(); + + // Give async operations time to complete + Thread.sleep(100); + + verify(mockDriver1, atLeastOnce()).decrAndPublishIfZero(eq("test"), eq("test:channel"), eq("zero")); + } + + // ========== await ========== + + @Test + void shouldReturnImmediatelyWhenCountIsZero() throws RedisDriverException, InterruptedException { + // Use lenient stubs since get() may or may not be called depending on implementation + lenient().when(mockDriver1.get(anyString())).thenReturn("0"); + lenient().when(mockDriver2.get(anyString())).thenReturn("0"); + lenient().when(mockDriver3.get(anyString())).thenReturn("0"); + + RedlockCountDownLatch latch = new RedlockCountDownLatch("test", 0, drivers, testConfig); + + long start = System.currentTimeMillis(); + boolean result = latch.await(5, TimeUnit.SECONDS); + long elapsed = System.currentTimeMillis() - start; + + assertTrue(result); + assertTrue(elapsed < 1000, "Should return quickly when count is zero"); + } + + @Test + void shouldTimeoutWhenCountNeverReachesZero() throws RedisDriverException, InterruptedException { + when(mockDriver1.get(anyString())).thenReturn("5"); + when(mockDriver2.get(anyString())).thenReturn("5"); + when(mockDriver3.get(anyString())).thenReturn("5"); + + RedlockCountDownLatch latch = new RedlockCountDownLatch("test", 5, drivers, testConfig); + + long start = System.currentTimeMillis(); + boolean result = latch.await(200, TimeUnit.MILLISECONDS); + long elapsed = System.currentTimeMillis() - start; + + assertFalse(result); + assertTrue(elapsed >= 150 && elapsed < 1000, "Should timeout after ~200ms"); + } + + // ========== reset ========== + + @Test + void shouldResetLatchCount() throws RedisDriverException { + RedlockCountDownLatch latch = new RedlockCountDownLatch("test", 5, drivers, testConfig); + latch.reset(); + + // Verify del was called + verify(mockDriver1, atLeastOnce()).del(eq("test")); + } + + // ========== hasQueuedThreads ========== + + @Test + void shouldReturnTrueWhenCountPositive() throws RedisDriverException { + when(mockDriver1.get(anyString())).thenReturn("3"); + when(mockDriver2.get(anyString())).thenReturn("3"); + when(mockDriver3.get(anyString())).thenReturn("3"); + + RedlockCountDownLatch latch = new RedlockCountDownLatch("test", 3, drivers, testConfig); + assertTrue(latch.hasQueuedThreads()); + } +} diff --git a/src/test/java/org/codarama/redlock4j/RedlockExceptionTest.java b/src/test/java/org/codarama/redlock4j/RedlockExceptionTest.java new file mode 100644 index 0000000..cabaf2d --- /dev/null +++ b/src/test/java/org/codarama/redlock4j/RedlockExceptionTest.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: MIT + * Copyright (c) 2025 Codarama + */ +package org.codarama.redlock4j; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for RedlockException. + */ +public class RedlockExceptionTest { + + @Test + void shouldCreateExceptionWithMessage() { + RedlockException exception = new RedlockException("Lock acquisition failed"); + + assertEquals("Lock acquisition failed", exception.getMessage()); + assertNull(exception.getCause()); + } + + @Test + void shouldCreateExceptionWithMessageAndCause() { + Throwable cause = new RuntimeException("Connection timeout"); + RedlockException exception = new RedlockException("Lock acquisition failed", cause); + + assertEquals("Lock acquisition failed", exception.getMessage()); + assertSame(cause, exception.getCause()); + } + + @Test + void shouldCreateExceptionWithCauseOnly() { + Throwable cause = new RuntimeException("Connection refused"); + RedlockException exception = new RedlockException(cause); + + assertEquals("java.lang.RuntimeException: Connection refused", exception.getMessage()); + assertSame(cause, exception.getCause()); + } + + @Test + void shouldBeRuntimeException() { + RedlockException exception = new RedlockException("test"); + + assertTrue(exception instanceof RuntimeException); + } + + @Test + void shouldPreserveStackTrace() { + RedlockException exception = new RedlockException("test"); + + assertNotNull(exception.getStackTrace()); + assertTrue(exception.getStackTrace().length > 0); + } + + @Test + void shouldSupportNestedCauses() { + Throwable rootCause = new IllegalStateException("Redis not available"); + Throwable intermediateCause = new RuntimeException("Connection failed", rootCause); + RedlockException exception = new RedlockException("Lock failed", intermediateCause); + + assertSame(intermediateCause, exception.getCause()); + assertSame(rootCause, exception.getCause().getCause()); + } + + @Test + void shouldHandleNullMessage() { + RedlockException exception = new RedlockException((String) null); + + assertNull(exception.getMessage()); + } + + @Test + void shouldHandleEmptyMessage() { + RedlockException exception = new RedlockException(""); + + assertEquals("", exception.getMessage()); + } +} diff --git a/src/test/java/org/codarama/redlock4j/RedlockReadWriteLockTest.java b/src/test/java/org/codarama/redlock4j/RedlockReadWriteLockTest.java new file mode 100644 index 0000000..4dabda6 --- /dev/null +++ b/src/test/java/org/codarama/redlock4j/RedlockReadWriteLockTest.java @@ -0,0 +1,159 @@ +/* + * SPDX-License-Identifier: MIT + * Copyright (c) 2025 Codarama + */ +package org.codarama.redlock4j; + +import org.codarama.redlock4j.configuration.RedlockConfiguration; +import org.codarama.redlock4j.driver.RedisDriver; +import org.codarama.redlock4j.driver.RedisDriverException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +/** + * Unit tests for RedlockReadWriteLock using Mockito mocks. + */ +@ExtendWith(MockitoExtension.class) +public class RedlockReadWriteLockTest { + + @Mock + private RedisDriver mockDriver1; + + @Mock + private RedisDriver mockDriver2; + + @Mock + private RedisDriver mockDriver3; + + private RedlockConfiguration testConfig; + private List drivers; + + @BeforeEach + void setUp() { + testConfig = RedlockConfiguration.builder().addRedisNode("localhost", 6379).addRedisNode("localhost", 6380) + .addRedisNode("localhost", 6381).defaultLockTimeout(Duration.ofSeconds(30)) + .retryDelay(Duration.ofMillis(10)).maxRetryAttempts(3).lockAcquisitionTimeout(Duration.ofSeconds(10)) + .build(); + + drivers = Arrays.asList(mockDriver1, mockDriver2, mockDriver3); + + lenient().when(mockDriver1.getIdentifier()).thenReturn("redis://localhost:6379"); + lenient().when(mockDriver2.getIdentifier()).thenReturn("redis://localhost:6380"); + lenient().when(mockDriver3.getIdentifier()).thenReturn("redis://localhost:6381"); + } + + // ========== Read Lock ========== + + @Test + void shouldAcquireReadLockWhenNoWriter() throws RedisDriverException, InterruptedException { + // No write lock exists + when(mockDriver1.get(contains(":write"))).thenReturn(null); + when(mockDriver2.get(contains(":write"))).thenReturn(null); + when(mockDriver3.get(contains(":write"))).thenReturn(null); + + // Increment succeeds + when(mockDriver1.incr(anyString())).thenReturn(1L); + when(mockDriver2.incr(anyString())).thenReturn(1L); + when(mockDriver3.incr(anyString())).thenReturn(1L); + + RedlockReadWriteLock rwLock = new RedlockReadWriteLock("test-rw", drivers, testConfig); + Lock readLock = rwLock.readLock(); + + boolean acquired = readLock.tryLock(1, TimeUnit.SECONDS); + + assertTrue(acquired); + verify(mockDriver1, atLeastOnce()).incr(contains(":readers")); + } + + @Test + void shouldFailReadLockWhenWriterHoldsLock() throws RedisDriverException, InterruptedException { + // Write lock exists on quorum + when(mockDriver1.get(contains(":write"))).thenReturn("some-lock-value"); + when(mockDriver2.get(contains(":write"))).thenReturn("some-lock-value"); + when(mockDriver3.get(contains(":write"))).thenReturn(null); + + RedlockReadWriteLock rwLock = new RedlockReadWriteLock("test-rw", drivers, testConfig); + Lock readLock = rwLock.readLock(); + + boolean acquired = readLock.tryLock(100, TimeUnit.MILLISECONDS); + + assertFalse(acquired); + } + + // ========== Write Lock ========== + + @Test + void shouldAcquireWriteLockWhenNoReaders() throws RedisDriverException, InterruptedException { + // No readers + when(mockDriver1.get(contains(":readers"))).thenReturn(null); + when(mockDriver2.get(contains(":readers"))).thenReturn(null); + when(mockDriver3.get(contains(":readers"))).thenReturn(null); + + // Lock acquisition succeeds + when(mockDriver1.setIfNotExists(contains(":write"), anyString(), anyLong())).thenReturn(true); + when(mockDriver2.setIfNotExists(contains(":write"), anyString(), anyLong())).thenReturn(true); + when(mockDriver3.setIfNotExists(contains(":write"), anyString(), anyLong())).thenReturn(true); + + RedlockReadWriteLock rwLock = new RedlockReadWriteLock("test-rw", drivers, testConfig); + Lock writeLock = rwLock.writeLock(); + + boolean acquired = writeLock.tryLock(1, TimeUnit.SECONDS); + + assertTrue(acquired); + } + + @Test + void shouldFailWriteLockWhenReadersExist() throws RedisDriverException, InterruptedException { + // Active readers on quorum + when(mockDriver1.get(contains(":readers"))).thenReturn("2"); + when(mockDriver2.get(contains(":readers"))).thenReturn("2"); + when(mockDriver3.get(contains(":readers"))).thenReturn(null); + + RedlockReadWriteLock rwLock = new RedlockReadWriteLock("test-rw", drivers, testConfig); + Lock writeLock = rwLock.writeLock(); + + boolean acquired = writeLock.tryLock(100, TimeUnit.MILLISECONDS); + + assertFalse(acquired); + } + + // ========== Utility ========== + + @Test + void shouldReturnSameLockInstances() { + RedlockReadWriteLock rwLock = new RedlockReadWriteLock("test-rw", drivers, testConfig); + + Lock read1 = rwLock.readLock(); + Lock read2 = rwLock.readLock(); + Lock write1 = rwLock.writeLock(); + Lock write2 = rwLock.writeLock(); + + assertSame(read1, read2); + assertSame(write1, write2); + } + + @Test + void shouldThrowOnNewConditionForReadLock() { + RedlockReadWriteLock rwLock = new RedlockReadWriteLock("test-rw", drivers, testConfig); + assertThrows(UnsupportedOperationException.class, () -> rwLock.readLock().newCondition()); + } + + @Test + void shouldThrowOnNewConditionForWriteLock() { + RedlockReadWriteLock rwLock = new RedlockReadWriteLock("test-rw", drivers, testConfig); + assertThrows(UnsupportedOperationException.class, () -> rwLock.writeLock().newCondition()); + } +} diff --git a/src/test/java/org/codarama/redlock4j/RedlockSemaphoreTest.java b/src/test/java/org/codarama/redlock4j/RedlockSemaphoreTest.java new file mode 100644 index 0000000..95076fa --- /dev/null +++ b/src/test/java/org/codarama/redlock4j/RedlockSemaphoreTest.java @@ -0,0 +1,161 @@ +/* + * SPDX-License-Identifier: MIT + * Copyright (c) 2025 Codarama + */ +package org.codarama.redlock4j; + +import org.codarama.redlock4j.configuration.RedlockConfiguration; +import org.codarama.redlock4j.driver.RedisDriver; +import org.codarama.redlock4j.driver.RedisDriverException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +/** + * Unit tests for RedlockSemaphore using Mockito mocks. + */ +@ExtendWith(MockitoExtension.class) +public class RedlockSemaphoreTest { + + @Mock + private RedisDriver mockDriver1; + + @Mock + private RedisDriver mockDriver2; + + @Mock + private RedisDriver mockDriver3; + + private RedlockConfiguration testConfig; + private List drivers; + + @BeforeEach + void setUp() { + testConfig = RedlockConfiguration.builder().addRedisNode("localhost", 6379).addRedisNode("localhost", 6380) + .addRedisNode("localhost", 6381).defaultLockTimeout(Duration.ofSeconds(30)) + .retryDelay(Duration.ofMillis(10)).maxRetryAttempts(3).lockAcquisitionTimeout(Duration.ofSeconds(10)) + .build(); + + drivers = Arrays.asList(mockDriver1, mockDriver2, mockDriver3); + + lenient().when(mockDriver1.getIdentifier()).thenReturn("redis://localhost:6379"); + lenient().when(mockDriver2.getIdentifier()).thenReturn("redis://localhost:6380"); + lenient().when(mockDriver3.getIdentifier()).thenReturn("redis://localhost:6381"); + } + + // ========== Validation ========== + + @Test + void shouldRejectZeroPermits() { + assertThrows(IllegalArgumentException.class, () -> new RedlockSemaphore("test", 0, drivers, testConfig)); + } + + @Test + void shouldRejectNegativePermits() { + assertThrows(IllegalArgumentException.class, () -> new RedlockSemaphore("test", -1, drivers, testConfig)); + } + + @Test + void shouldRejectAcquiringMoreThanMaxPermits() { + RedlockSemaphore semaphore = new RedlockSemaphore("test", 3, drivers, testConfig); + assertThrows(IllegalArgumentException.class, () -> semaphore.tryAcquire(4, 1, TimeUnit.SECONDS)); + } + + @Test + void shouldRejectAcquiringZeroPermits() { + RedlockSemaphore semaphore = new RedlockSemaphore("test", 3, drivers, testConfig); + assertThrows(IllegalArgumentException.class, () -> semaphore.tryAcquire(0, 1, TimeUnit.SECONDS)); + } + + // ========== Acquisition ========== + + @Test + void shouldAcquirePermitWhenQuorumSucceeds() throws RedisDriverException, InterruptedException { + // setIfNotExists succeeds on all nodes + when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + + RedlockSemaphore semaphore = new RedlockSemaphore("test", 5, drivers, testConfig); + boolean acquired = semaphore.tryAcquire(1, TimeUnit.SECONDS); + + assertTrue(acquired); + verify(mockDriver1, atLeastOnce()).setIfNotExists(contains(":permit:"), anyString(), anyLong()); + } + + @Test + void shouldFailWhenQuorumNotReached() throws RedisDriverException, InterruptedException { + // Only 1 node succeeds (need quorum of 2) + when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(false); + when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(false); + + RedlockSemaphore semaphore = new RedlockSemaphore("test", 5, drivers, testConfig); + boolean acquired = semaphore.tryAcquire(100, TimeUnit.MILLISECONDS); + + assertFalse(acquired); + } + + // ========== Release ========== + + @Test + void shouldReleasePermitAfterAcquisition() throws RedisDriverException, InterruptedException { + setupSuccessfulAcquisition(); + + RedlockSemaphore semaphore = new RedlockSemaphore("test", 5, drivers, testConfig); + semaphore.tryAcquire(1, TimeUnit.SECONDS); + semaphore.release(); + + // Verify deleteIfValueMatches was called to release permit + verify(mockDriver1, atLeastOnce()).deleteIfValueMatches(contains(":permit:"), anyString()); + } + + @Test + void shouldHandleReleaseWithoutAcquisition() { + RedlockSemaphore semaphore = new RedlockSemaphore("test", 5, drivers, testConfig); + // Should not throw + semaphore.release(); + } + + // ========== Available Permits ========== + + @Test + void shouldReportMaxPermitsWhenNoStateTracked() { + RedlockSemaphore semaphore = new RedlockSemaphore("test", 5, drivers, testConfig); + // Current implementation returns maxPermits + assertEquals(5, semaphore.availablePermits()); + } + + // ========== Multiple Permits ========== + + @Test + void shouldAcquireMultiplePermits() throws RedisDriverException, InterruptedException { + when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + + RedlockSemaphore semaphore = new RedlockSemaphore("test", 5, drivers, testConfig); + boolean acquired = semaphore.tryAcquire(3, 1, TimeUnit.SECONDS); + + assertTrue(acquired); + // 3 permits, each on 3 nodes = at least 3 setIfNotExists calls per driver + verify(mockDriver1, atLeast(3)).setIfNotExists(contains(":permit:"), anyString(), anyLong()); + } + + private void setupSuccessfulAcquisition() throws RedisDriverException { + when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + } +} diff --git a/src/test/java/org/codarama/redlock4j/async/RxRedlockTest.java b/src/test/java/org/codarama/redlock4j/async/RxRedlockTest.java new file mode 100644 index 0000000..bc79264 --- /dev/null +++ b/src/test/java/org/codarama/redlock4j/async/RxRedlockTest.java @@ -0,0 +1,159 @@ +/* + * SPDX-License-Identifier: MIT + * Copyright (c) 2025 Codarama + */ +package org.codarama.redlock4j.async; + +import io.reactivex.rxjava3.observers.TestObserver; +import org.codarama.redlock4j.configuration.RedlockConfiguration; +import org.codarama.redlock4j.driver.RedisDriver; +import org.codarama.redlock4j.driver.RedisDriverException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +/** + * Unit tests for RxRedlock reactive API. + */ +@ExtendWith(MockitoExtension.class) +public class RxRedlockTest { + + @Mock + private RedisDriver mockDriver1; + + @Mock + private RedisDriver mockDriver2; + + @Mock + private RedisDriver mockDriver3; + + private RedlockConfiguration testConfig; + private List drivers; + private ScheduledExecutorService scheduler; + + @BeforeEach + void setUp() { + testConfig = RedlockConfiguration.builder().addRedisNode("localhost", 6379).addRedisNode("localhost", 6380) + .addRedisNode("localhost", 6381).defaultLockTimeout(Duration.ofSeconds(30)) + .retryDelay(Duration.ofMillis(10)).maxRetryAttempts(3).lockAcquisitionTimeout(Duration.ofSeconds(5)) + .build(); + + drivers = Arrays.asList(mockDriver1, mockDriver2, mockDriver3); + scheduler = Executors.newSingleThreadScheduledExecutor(); + + lenient().when(mockDriver1.getIdentifier()).thenReturn("redis://localhost:6379"); + lenient().when(mockDriver2.getIdentifier()).thenReturn("redis://localhost:6380"); + lenient().when(mockDriver3.getIdentifier()).thenReturn("redis://localhost:6381"); + } + + // ========== tryLockRx ========== + + @Test + void tryLockRxShouldEmitTrueOnSuccess() throws RedisDriverException { + when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + + RxRedlock rxLock = new AsyncRedlockImpl("test-rx", drivers, testConfig, Executors.newCachedThreadPool(), + scheduler); + + TestObserver observer = rxLock.tryLockRx().test(); + observer.awaitDone(5, TimeUnit.SECONDS); + + observer.assertValue(true); + observer.assertComplete(); + } + + @Test + void tryLockRxShouldEmitFalseOnFailure() throws RedisDriverException { + when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(false); + when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(false); + when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(false); + + RxRedlock rxLock = new AsyncRedlockImpl("test-rx", drivers, testConfig, Executors.newCachedThreadPool(), + scheduler); + + TestObserver observer = rxLock.tryLockRx().test(); + observer.awaitDone(5, TimeUnit.SECONDS); + + observer.assertValue(false); + observer.assertComplete(); + } + + // ========== lockStateObservable ========== + + @Test + void lockStateObservableShouldEmitInitialState() { + RxRedlock rxLock = new AsyncRedlockImpl("test-rx", drivers, testConfig, Executors.newCachedThreadPool(), + scheduler); + + TestObserver observer = rxLock.lockStateObservable().test(); + + observer.assertValue(RxRedlock.LockState.RELEASED); + } + + // ========== Utility Methods ========== + + @Test + void shouldReturnLockKey() { + RxRedlock rxLock = new AsyncRedlockImpl("my-lock-key", drivers, testConfig, Executors.newCachedThreadPool(), + scheduler); + + assertEquals("my-lock-key", rxLock.getLockKey()); + } + + @Test + void shouldReturnZeroValidityTimeWhenNotHeld() { + RxRedlock rxLock = new AsyncRedlockImpl("test-rx", drivers, testConfig, Executors.newCachedThreadPool(), + scheduler); + + assertEquals(0, rxLock.getRemainingValidityTime()); + } + + @Test + void shouldReturnFalseForIsHeldWhenNotAcquired() { + RxRedlock rxLock = new AsyncRedlockImpl("test-rx", drivers, testConfig, Executors.newCachedThreadPool(), + scheduler); + + assertFalse(rxLock.isHeldByCurrentThread()); + } + + @Test + void shouldReturnZeroHoldCountWhenNotHeld() { + RxRedlock rxLock = new AsyncRedlockImpl("test-rx", drivers, testConfig, Executors.newCachedThreadPool(), + scheduler); + + assertEquals(0, rxLock.getHoldCount()); + } + + // ========== tryLockRx with timeout ========== + + @Test + void tryLockRxWithTimeoutShouldWork() throws RedisDriverException { + when(mockDriver1.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver2.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + when(mockDriver3.setIfNotExists(anyString(), anyString(), anyLong())).thenReturn(true); + + RxRedlock rxLock = new AsyncRedlockImpl("test-rx", drivers, testConfig, Executors.newCachedThreadPool(), + scheduler); + + TestObserver observer = rxLock.tryLockRx(Duration.ofSeconds(1)).test(); + observer.awaitDone(5, TimeUnit.SECONDS); + + observer.assertValue(true); + observer.assertComplete(); + } +} diff --git a/src/test/java/org/codarama/redlock4j/integration/FairLockIntegrationTest.java b/src/test/java/org/codarama/redlock4j/integration/FairLockIntegrationTest.java new file mode 100644 index 0000000..2c2f85b --- /dev/null +++ b/src/test/java/org/codarama/redlock4j/integration/FairLockIntegrationTest.java @@ -0,0 +1,302 @@ +/* + * SPDX-License-Identifier: MIT + * Copyright (c) 2025 Codarama + */ +package org.codarama.redlock4j.integration; + +import org.codarama.redlock4j.FairLock; +import org.codarama.redlock4j.RedlockManager; +import org.codarama.redlock4j.configuration.RedlockConfiguration; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Integration tests for {@link FairLock}. + */ +@Testcontainers +public class FairLockIntegrationTest { + + @Container + static GenericContainer redis1 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes"); + + @Container + static GenericContainer redis2 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes"); + + @Container + static GenericContainer redis3 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes"); + + private static RedlockConfiguration testConfiguration; + + @BeforeAll + static void setUp() { + testConfiguration = RedlockConfiguration.builder().addRedisNode("localhost", redis1.getMappedPort(6379)) + .addRedisNode("localhost", redis2.getMappedPort(6379)) + .addRedisNode("localhost", redis3.getMappedPort(6379)).defaultLockTimeout(Duration.ofSeconds(30)) + .lockAcquisitionTimeout(Duration.ofSeconds(10)).retryDelay(Duration.ofMillis(50)).maxRetryAttempts(100) + .build(); + } + + // ========== Basic Functionality ========== + + @Test + void shouldAcquireAndReleaseFairLock() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + FairLock fairLock = (FairLock) manager.createFairLock("basic-fair"); + assertTrue(fairLock.tryLock(5, TimeUnit.SECONDS)); + assertTrue(fairLock.isHeldByCurrentThread()); + fairLock.unlock(); + assertFalse(fairLock.isHeldByCurrentThread()); + } + } + + @Test + void shouldBlockSecondAcquirer() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + FairLock fairLock = (FairLock) manager.createFairLock("block-second"); + CountDownLatch firstAcquired = new CountDownLatch(1); + CountDownLatch secondAttempted = new CountDownLatch(1); + AtomicInteger secondResult = new AtomicInteger(-1); + + // First thread acquires + assertTrue(fairLock.tryLock(5, TimeUnit.SECONDS)); + firstAcquired.countDown(); + + // Second thread tries + Thread t = new Thread(() -> { + try { + firstAcquired.await(); + boolean acquired = fairLock.tryLock(500, TimeUnit.MILLISECONDS); + secondResult.set(acquired ? 1 : 0); + secondAttempted.countDown(); + if (acquired) + fairLock.unlock(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + t.start(); + + assertTrue(secondAttempted.await(5, TimeUnit.SECONDS)); + assertEquals(0, secondResult.get(), "Second thread should be blocked"); + fairLock.unlock(); + } + } + + // ========== Reentrancy ========== + + @Test + void shouldSupportReentrantLocking() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + FairLock fairLock = (FairLock) manager.createFairLock("reentrant-fair"); + + assertTrue(fairLock.tryLock(5, TimeUnit.SECONDS)); + assertEquals(1, fairLock.getHoldCount()); + + assertTrue(fairLock.tryLock(5, TimeUnit.SECONDS)); + assertEquals(2, fairLock.getHoldCount()); + + fairLock.unlock(); + assertEquals(1, fairLock.getHoldCount()); + assertTrue(fairLock.isHeldByCurrentThread()); + + fairLock.unlock(); + assertEquals(0, fairLock.getHoldCount()); + assertFalse(fairLock.isHeldByCurrentThread()); + } + } + + // ========== FIFO Ordering ========== + + @Test + void shouldAcquireInFIFOOrder() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + FairLock fairLock = (FairLock) manager.createFairLock("fifo-order"); + List acquisitionOrder = Collections.synchronizedList(new ArrayList<>()); + int threadCount = 3; + CountDownLatch firstHeld = new CountDownLatch(1); + CountDownLatch allQueued = new CountDownLatch(threadCount - 1); + CountDownLatch allDone = new CountDownLatch(threadCount); + + // First thread acquires and holds + new Thread(() -> { + try { + if (fairLock.tryLock(10, TimeUnit.SECONDS)) { + acquisitionOrder.add(0); + firstHeld.countDown(); + allQueued.await(); // Wait for others to queue + Thread.sleep(200); // Hold briefly + fairLock.unlock(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + allDone.countDown(); + } + }).start(); + + assertTrue(firstHeld.await(10, TimeUnit.SECONDS)); + + // Other threads queue up in order + for (int i = 1; i < threadCount; i++) { + final int idx = i; + Thread.sleep(100); // Stagger to ensure ordering + new Thread(() -> { + try { + allQueued.countDown(); + if (fairLock.tryLock(30, TimeUnit.SECONDS)) { + acquisitionOrder.add(idx); + fairLock.unlock(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + allDone.countDown(); + } + }).start(); + } + + assertTrue(allDone.await(60, TimeUnit.SECONDS)); + assertEquals(threadCount, acquisitionOrder.size(), "All threads should acquire"); + } + } + + // ========== Timeout ========== + + @Test + void shouldTimeoutWhenLockNotAvailable() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + FairLock fairLock = (FairLock) manager.createFairLock("timeout-test"); + + assertTrue(fairLock.tryLock(5, TimeUnit.SECONDS)); + + long start = System.currentTimeMillis(); + Thread t = new Thread(() -> { + try { + assertFalse(fairLock.tryLock(500, TimeUnit.MILLISECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + t.start(); + t.join(); + long elapsed = System.currentTimeMillis() - start; + + assertTrue(elapsed >= 400 && elapsed < 2000, "Should timeout after ~500ms"); + fairLock.unlock(); + } + } + + // ========== Concurrent Access ========== + + @Test + void shouldHandleConcurrentAcquisitions() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + FairLock fairLock = (FairLock) manager.createFairLock("concurrent-fair"); + int threadCount = 5; + AtomicInteger successCount = new AtomicInteger(0); + CountDownLatch startSignal = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + new Thread(() -> { + try { + startSignal.await(); + if (fairLock.tryLock(30, TimeUnit.SECONDS)) { + try { + successCount.incrementAndGet(); + Thread.sleep(50); + } finally { + fairLock.unlock(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }).start(); + } + + startSignal.countDown(); + assertTrue(done.await(60, TimeUnit.SECONDS)); + assertEquals(threadCount, successCount.get(), "All threads should eventually acquire"); + } + } + + @Test + void shouldHandleRapidLockUnlockCycles() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + FairLock fairLock = (FairLock) manager.createFairLock("rapid-cycles"); + int cycles = 5; + int threadCount = 3; + AtomicInteger totalAcquisitions = new AtomicInteger(0); + CountDownLatch done = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + new Thread(() -> { + try { + for (int j = 0; j < cycles; j++) { + if (fairLock.tryLock(10, TimeUnit.SECONDS)) { + totalAcquisitions.incrementAndGet(); + Thread.sleep(20); + fairLock.unlock(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }).start(); + } + + assertTrue(done.await(120, TimeUnit.SECONDS)); + assertTrue(totalAcquisitions.get() > 0, "Should have successful acquisitions"); + } + } + + // ========== Utility Methods ========== + + @Test + void shouldReportValidityTime() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + FairLock fairLock = (FairLock) manager.createFairLock("validity-time"); + + assertEquals(0, fairLock.getRemainingValidityTime()); + + assertTrue(fairLock.tryLock(5, TimeUnit.SECONDS)); + assertTrue(fairLock.getRemainingValidityTime() > 0); + + fairLock.unlock(); + } + } + + // ========== Lettuce Driver ========== + + @Test + void shouldWorkWithLettuceDriver() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withLettuce(testConfiguration)) { + FairLock fairLock = (FairLock) manager.createFairLock("lettuce-fair"); + assertTrue(fairLock.tryLock(5, TimeUnit.SECONDS)); + assertTrue(fairLock.isHeldByCurrentThread()); + fairLock.unlock(); + } + } +} diff --git a/src/test/java/org/codarama/redlock4j/integration/MultiLockIntegrationTest.java b/src/test/java/org/codarama/redlock4j/integration/MultiLockIntegrationTest.java new file mode 100644 index 0000000..9bcf03e --- /dev/null +++ b/src/test/java/org/codarama/redlock4j/integration/MultiLockIntegrationTest.java @@ -0,0 +1,301 @@ +/* + * SPDX-License-Identifier: MIT + * Copyright (c) 2025 Codarama + */ +package org.codarama.redlock4j.integration; + +import org.codarama.redlock4j.MultiLock; +import org.codarama.redlock4j.RedlockManager; +import org.codarama.redlock4j.configuration.RedlockConfiguration; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Integration tests for {@link MultiLock}. + */ +@Testcontainers +public class MultiLockIntegrationTest { + + @Container + static GenericContainer redis1 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes"); + + @Container + static GenericContainer redis2 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes"); + + @Container + static GenericContainer redis3 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes"); + + private static RedlockConfiguration testConfiguration; + + @BeforeAll + static void setUp() { + testConfiguration = RedlockConfiguration.builder().addRedisNode("localhost", redis1.getMappedPort(6379)) + .addRedisNode("localhost", redis2.getMappedPort(6379)) + .addRedisNode("localhost", redis3.getMappedPort(6379)).defaultLockTimeout(Duration.ofSeconds(30)) + .lockAcquisitionTimeout(Duration.ofSeconds(10)).retryDelay(Duration.ofMillis(50)).maxRetryAttempts(100) + .build(); + } + + // ========== Basic Functionality ========== + + @Test + void shouldAcquireAndReleaseMultipleLocks() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + List keys = Arrays.asList("account:1", "account:2", "account:3"); + Lock multiLock = manager.createMultiLock(keys); + + assertTrue(multiLock.tryLock(5, TimeUnit.SECONDS)); + multiLock.unlock(); + } + } + + @Test + void shouldRejectEmptyKeyList() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + assertThrows(IllegalArgumentException.class, () -> manager.createMultiLock(Arrays.asList())); + } + } + + @Test + void shouldRejectNullKeyList() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + assertThrows(IllegalArgumentException.class, () -> manager.createMultiLock(null)); + } + } + + // ========== All-or-Nothing Semantics ========== + + @Test + void shouldBlockIfAnyResourceIsLocked() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + // First, acquire a single lock on one resource + Lock singleLock = manager.createLock("resource:B"); + assertTrue(singleLock.tryLock(5, TimeUnit.SECONDS)); + + // Try to acquire multi-lock that includes resource:B + List keys = Arrays.asList("resource:A", "resource:B", "resource:C"); + Lock multiLock = manager.createMultiLock(keys); + + // Should fail with short timeout since resource:B is already held + assertFalse(multiLock.tryLock(500, TimeUnit.MILLISECONDS)); + + singleLock.unlock(); + } + } + + @Test + void shouldAcquireAfterConflictingLockReleased() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + Lock singleLock = manager.createLock("resource:X"); + assertTrue(singleLock.tryLock(5, TimeUnit.SECONDS)); + + List keys = Arrays.asList("resource:W", "resource:X", "resource:Y"); + Lock multiLock = manager.createMultiLock(keys); + CountDownLatch released = new CountDownLatch(1); + AtomicBoolean multiAcquired = new AtomicBoolean(false); + + Thread t = new Thread(() -> { + try { + released.await(); + if (multiLock.tryLock(10, TimeUnit.SECONDS)) { + multiAcquired.set(true); + multiLock.unlock(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + t.start(); + + Thread.sleep(100); + singleLock.unlock(); + released.countDown(); + + t.join(15000); + assertTrue(multiAcquired.get(), "Multi-lock should acquire after single lock released"); + } + } + + // ========== Exclusive Access ========== + + @Test + void twoMultiLocksWithOverlapShouldNotCoexist() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + List keys1 = Arrays.asList("shared:1", "shared:2"); + List keys2 = Arrays.asList("shared:2", "shared:3"); + + Lock multiLock1 = manager.createMultiLock(keys1); + Lock multiLock2 = manager.createMultiLock(keys2); + + assertTrue(multiLock1.tryLock(5, TimeUnit.SECONDS)); + // multiLock2 overlaps on shared:2, should fail + assertFalse(multiLock2.tryLock(500, TimeUnit.MILLISECONDS)); + + multiLock1.unlock(); + // Now multiLock2 should succeed + assertTrue(multiLock2.tryLock(5, TimeUnit.SECONDS)); + multiLock2.unlock(); + } + } + + // ========== Concurrent Access ========== + + @Test + void shouldHandleConcurrentMultiLockAcquisitions() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + int threadCount = 5; + AtomicInteger successCount = new AtomicInteger(0); + CountDownLatch startSignal = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + final int idx = i; + new Thread(() -> { + try { + // Each thread locks different resources + List keys = Arrays.asList("t" + idx + ":a", "t" + idx + ":b"); + Lock multiLock = manager.createMultiLock(keys); + startSignal.await(); + if (multiLock.tryLock(10, TimeUnit.SECONDS)) { + try { + successCount.incrementAndGet(); + Thread.sleep(50); + } finally { + multiLock.unlock(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }).start(); + } + + startSignal.countDown(); + assertTrue(done.await(30, TimeUnit.SECONDS)); + assertEquals(threadCount, successCount.get(), "All threads should acquire non-overlapping locks"); + } + } + + @Test + void shouldHandleConcurrentAccessToSameResources() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + int threadCount = 4; + List keys = Arrays.asList("contested:A", "contested:B"); + AtomicInteger successCount = new AtomicInteger(0); + CountDownLatch startSignal = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + new Thread(() -> { + try { + Lock multiLock = manager.createMultiLock(keys); + startSignal.await(); + if (multiLock.tryLock(30, TimeUnit.SECONDS)) { + try { + successCount.incrementAndGet(); + Thread.sleep(50); + } finally { + multiLock.unlock(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }).start(); + } + + startSignal.countDown(); + assertTrue(done.await(60, TimeUnit.SECONDS)); + assertEquals(threadCount, successCount.get(), "All threads should eventually acquire"); + } + } + + // ========== Key Ordering (Deadlock Prevention) ========== + + @Test + void shouldPreventDeadlockWithDifferentKeyOrders() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + // Thread 1 requests A,B,C; Thread 2 requests C,B,A + // Internal sorting should make both request A,B,C + List keys1 = Arrays.asList("deadlock:A", "deadlock:B", "deadlock:C"); + List keys2 = Arrays.asList("deadlock:C", "deadlock:B", "deadlock:A"); + + AtomicInteger success1 = new AtomicInteger(0); + AtomicInteger success2 = new AtomicInteger(0); + CountDownLatch start = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(2); + + new Thread(() -> { + try { + Lock ml = manager.createMultiLock(keys1); + start.await(); + if (ml.tryLock(30, TimeUnit.SECONDS)) { + success1.incrementAndGet(); + Thread.sleep(100); + ml.unlock(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }).start(); + + new Thread(() -> { + try { + Lock ml = manager.createMultiLock(keys2); + start.await(); + if (ml.tryLock(30, TimeUnit.SECONDS)) { + success2.incrementAndGet(); + Thread.sleep(100); + ml.unlock(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }).start(); + + start.countDown(); + assertTrue(done.await(60, TimeUnit.SECONDS)); + assertEquals(1, success1.get()); + assertEquals(1, success2.get()); + } + } + + // ========== Lettuce Driver ========== + + @Test + void shouldWorkWithLettuceDriver() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withLettuce(testConfiguration)) { + List keys = Arrays.asList("lettuce:1", "lettuce:2"); + Lock multiLock = manager.createMultiLock(keys); + + assertTrue(multiLock.tryLock(5, TimeUnit.SECONDS)); + multiLock.unlock(); + } + } +} diff --git a/src/test/java/org/codarama/redlock4j/integration/RedlockCountDownLatchIntegrationTest.java b/src/test/java/org/codarama/redlock4j/integration/RedlockCountDownLatchIntegrationTest.java new file mode 100644 index 0000000..108344d --- /dev/null +++ b/src/test/java/org/codarama/redlock4j/integration/RedlockCountDownLatchIntegrationTest.java @@ -0,0 +1,504 @@ +/* + * SPDX-License-Identifier: MIT + * Copyright (c) 2025 Codarama + */ +package org.codarama.redlock4j.integration; + +import org.codarama.redlock4j.RedlockCountDownLatch; +import org.codarama.redlock4j.RedlockManager; +import org.codarama.redlock4j.configuration.RedlockConfiguration; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Comprehensive integration tests for {@link RedlockCountDownLatch}. Verifies the distributed countdown latch honors + * its contract. + */ +@Testcontainers +public class RedlockCountDownLatchIntegrationTest { + + @Container + static GenericContainer redis1 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes"); + + @Container + static GenericContainer redis2 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes"); + + @Container + static GenericContainer redis3 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes"); + + private static RedlockConfiguration testConfiguration; + + @BeforeAll + static void setUp() { + testConfiguration = RedlockConfiguration.builder().addRedisNode("localhost", redis1.getMappedPort(6379)) + .addRedisNode("localhost", redis2.getMappedPort(6379)) + .addRedisNode("localhost", redis3.getMappedPort(6379)).defaultLockTimeout(Duration.ofSeconds(30)) + .retryDelay(Duration.ofMillis(100)).maxRetryAttempts(5).lockAcquisitionTimeout(Duration.ofSeconds(10)) + .build(); + } + + // ========== Contract: Initialization ========== + + @Test + void shouldInitializeWithCorrectCount() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockCountDownLatch latch = manager.createCountDownLatch("init-test", 5); + assertEquals(5, latch.getCount(), "Latch should initialize with specified count"); + } + } + + @Test + void shouldRejectNegativeCount() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + assertThrows(IllegalArgumentException.class, () -> { + manager.createCountDownLatch("negative-test", -1); + }, "Should reject negative count"); + } + } + + @Test + void shouldAllowZeroCount() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockCountDownLatch latch = manager.createCountDownLatch("zero-test", 0); + assertEquals(0, latch.getCount(), "Latch should allow zero count"); + } + } + + // ========== Contract: countDown() ========== + + @Test + void countDownShouldDecrementCount() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockCountDownLatch latch = manager.createCountDownLatch("countdown-decr", 3); + + latch.countDown(); + assertEquals(2, latch.getCount()); + + latch.countDown(); + assertEquals(1, latch.getCount()); + + latch.countDown(); + assertEquals(0, latch.getCount()); + } + } + + @Test + void countDownBelowZeroShouldNotGoNegative() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockCountDownLatch latch = manager.createCountDownLatch("countdown-below-zero", 1); + + latch.countDown(); + latch.countDown(); + latch.countDown(); + + assertTrue(latch.getCount() <= 0, "Count should be at most 0"); + } + } + + // ========== Contract: await() ========== + + @Test + void awaitShouldReturnImmediatelyWhenCountIsZero() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockCountDownLatch latch = manager.createCountDownLatch("await-zero", 0); + + long start = System.currentTimeMillis(); + boolean result = latch.await(5, TimeUnit.SECONDS); + long elapsed = System.currentTimeMillis() - start; + + assertTrue(result, "await() should return true when count is 0"); + assertTrue(elapsed < 1000, "await() should return immediately when count is 0"); + } + } + + @Test + void awaitShouldBlockUntilCountReachesZero() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockCountDownLatch latch = manager.createCountDownLatch("await-block", 2); + AtomicBoolean awaitCompleted = new AtomicBoolean(false); + CountDownLatch threadStarted = new CountDownLatch(1); + + Thread waiter = new Thread(() -> { + try { + threadStarted.countDown(); + latch.await(); + awaitCompleted.set(true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + waiter.start(); + + threadStarted.await(2, TimeUnit.SECONDS); + Thread.sleep(200); + + assertFalse(awaitCompleted.get(), "await() should still be blocking"); + + latch.countDown(); + Thread.sleep(100); + assertFalse(awaitCompleted.get(), "await() should still block (count=1)"); + + latch.countDown(); + waiter.join(5000); + + assertTrue(awaitCompleted.get(), "await() should complete when count reaches 0"); + } + } + + @Test + void awaitWithTimeoutShouldReturnFalseOnTimeout() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockCountDownLatch latch = manager.createCountDownLatch("await-timeout", 5); + + long start = System.currentTimeMillis(); + boolean result = latch.await(1, TimeUnit.SECONDS); + long elapsed = System.currentTimeMillis() - start; + + assertFalse(result, "await() should return false on timeout"); + assertTrue(elapsed >= 900 && elapsed < 2000, "Should wait approximately 1 second"); + } + } + + // ========== Contract: Distributed Coordination ========== + + @Test + void shouldCoordinateMultipleWorkerThreads() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + int workerCount = 5; + RedlockCountDownLatch latch = manager.createCountDownLatch("coord-workers", workerCount); + AtomicInteger completedWorkers = new AtomicInteger(0); + CountDownLatch allWorkersStarted = new CountDownLatch(workerCount); + AtomicBoolean coordinatorCompleted = new AtomicBoolean(false); + + Thread coordinator = new Thread(() -> { + try { + latch.await(); + coordinatorCompleted.set(true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + coordinator.start(); + + ExecutorService executor = Executors.newFixedThreadPool(workerCount); + for (int i = 0; i < workerCount; i++) { + final int workerId = i; + executor.submit(() -> { + try { + allWorkersStarted.countDown(); + Thread.sleep(100 + workerId * 50); + completedWorkers.incrementAndGet(); + latch.countDown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + } + + assertTrue(allWorkersStarted.await(5, TimeUnit.SECONDS)); + Thread.sleep(100); + assertFalse(coordinatorCompleted.get(), "Coordinator should wait for workers"); + + coordinator.join(10000); + + assertTrue(coordinatorCompleted.get(), "Coordinator should complete after all workers"); + assertEquals(workerCount, completedWorkers.get(), "All workers should complete"); + + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.SECONDS); + } + } + + @Test + void shouldWorkAcrossMultipleRedlockManagerInstances() throws InterruptedException { + AtomicBoolean waiterCompleted = new AtomicBoolean(false); + CountDownLatch waiterStarted = new CountDownLatch(1); + + Thread waiter = new Thread(() -> { + try (RedlockManager manager1 = RedlockManager.withJedis(testConfiguration)) { + RedlockCountDownLatch latch1 = manager1.createCountDownLatch("cross-process", 2); + waiterStarted.countDown(); + latch1.await(); + waiterCompleted.set(true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + waiter.start(); + + waiterStarted.await(2, TimeUnit.SECONDS); + Thread.sleep(500); + + try (RedlockManager manager2 = RedlockManager.withJedis(testConfiguration)) { + RedlockCountDownLatch latch2 = manager2.createCountDownLatch("cross-process", 2); + + latch2.countDown(); + Thread.sleep(100); + assertFalse(waiterCompleted.get(), "Waiter should still be waiting"); + + latch2.countDown(); + } + + waiter.join(5000); + assertTrue(waiterCompleted.get(), "Waiter should complete after cross-process countdown"); + } + + // ========== Contract: reset() ========== + + @Test + void resetShouldRestoreInitialCount() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockCountDownLatch latch = manager.createCountDownLatch("reset-test", 3); + + latch.countDown(); + latch.countDown(); + assertEquals(1, latch.getCount()); + + latch.reset(); + assertEquals(3, latch.getCount(), "reset() should restore initial count"); + } + } + + @Test + void resetShouldAllowReuseOfLatch() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockCountDownLatch latch = manager.createCountDownLatch("reset-reuse", 2); + + latch.countDown(); + latch.countDown(); + assertTrue(latch.await(1, TimeUnit.SECONDS), "First await should complete"); + + latch.reset(); + assertEquals(2, latch.getCount()); + + latch.countDown(); + latch.countDown(); + assertTrue(latch.await(1, TimeUnit.SECONDS), "Second await should complete after reset"); + } + } + + // ========== Contract: Lettuce Driver ========== + + @Test + void shouldWorkWithLettuceDriver() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withLettuce(testConfiguration)) { + RedlockCountDownLatch latch = manager.createCountDownLatch("lettuce-test", 3); + + assertEquals(3, latch.getCount()); + + latch.countDown(); + assertEquals(2, latch.getCount()); + + latch.countDown(); + latch.countDown(); + + assertTrue(latch.await(1, TimeUnit.SECONDS), "Lettuce latch should work correctly"); + } + } + + // ========== Contract: toString() ========== + + @Test + void toStringShouldContainLatchInfo() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockCountDownLatch latch = manager.createCountDownLatch("tostring-test", 5); + + String str = latch.toString(); + assertTrue(str.contains("tostring-test"), "toString should contain latch key"); + assertTrue(str.contains("5"), "toString should contain count"); + } + } + + // ========== Contract: hasQueuedThreads() ========== + + @Test + void hasQueuedThreadsShouldReturnTrueWhenCountPositive() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockCountDownLatch latch = manager.createCountDownLatch("queued-test", 2); + + assertTrue(latch.hasQueuedThreads(), "hasQueuedThreads should return true when count > 0"); + + latch.countDown(); + latch.countDown(); + + assertFalse(latch.hasQueuedThreads(), "hasQueuedThreads should return false when count = 0"); + } + } + + // ========== Contract: Concurrent Access ========== + + @Test + void shouldHandleConcurrentCountDownCalls() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + int totalCountDowns = 10; + RedlockCountDownLatch latch = manager.createCountDownLatch("concurrent-countdown", totalCountDowns); + CountDownLatch startSignal = new CountDownLatch(1); + CountDownLatch doneSignal = new CountDownLatch(totalCountDowns); + + // Create threads that all call countDown() simultaneously + for (int i = 0; i < totalCountDowns; i++) { + new Thread(() -> { + try { + startSignal.await(); // Wait for signal to start together + latch.countDown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + doneSignal.countDown(); + } + }).start(); + } + + // Release all threads at once + startSignal.countDown(); + + // Wait for all threads to complete + assertTrue(doneSignal.await(10, TimeUnit.SECONDS), "All threads should complete"); + + // Count should be 0 after all concurrent countdowns + assertEquals(0, latch.getCount(), "Count should be 0 after concurrent countdowns"); + } + } + + @Test + void shouldHandleMultipleConcurrentAwaiters() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + int awaiterCount = 5; + RedlockCountDownLatch latch = manager.createCountDownLatch("concurrent-await", 1); + AtomicInteger completedAwaiters = new AtomicInteger(0); + CountDownLatch allStarted = new CountDownLatch(awaiterCount); + CountDownLatch allDone = new CountDownLatch(awaiterCount); + + // Start multiple threads that all await on the same latch + for (int i = 0; i < awaiterCount; i++) { + new Thread(() -> { + try { + allStarted.countDown(); + latch.await(); + completedAwaiters.incrementAndGet(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + allDone.countDown(); + } + }).start(); + } + + // Wait for all awaiters to start + assertTrue(allStarted.await(5, TimeUnit.SECONDS)); + Thread.sleep(300); // Let them all block + + // No awaiters should have completed yet + assertEquals(0, completedAwaiters.get(), "No awaiter should complete before countdown"); + + // Single countdown should release ALL awaiters + latch.countDown(); + + // All awaiters should complete + assertTrue(allDone.await(10, TimeUnit.SECONDS), "All awaiters should complete"); + assertEquals(awaiterCount, completedAwaiters.get(), "All awaiters should have completed"); + } + } + + @Test + void shouldMaintainCorrectCountUnderConcurrentAccess() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + int initialCount = 100; + int threadCount = 20; + int countDownsPerThread = 5; // 20 * 5 = 100 total + + RedlockCountDownLatch latch = manager.createCountDownLatch("stress-test", initialCount); + CountDownLatch startSignal = new CountDownLatch(1); + CountDownLatch doneSignal = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + new Thread(() -> { + try { + startSignal.await(); + for (int j = 0; j < countDownsPerThread; j++) { + latch.countDown(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + doneSignal.countDown(); + } + }).start(); + } + + // Release all threads simultaneously + startSignal.countDown(); + + // Wait for completion + assertTrue(doneSignal.await(30, TimeUnit.SECONDS), "All threads should complete"); + + // Verify final count is 0 + assertEquals(0, latch.getCount(), "Count should be 0 after all countdowns"); + } + } + + @Test + void shouldHandleConcurrentCountDownAndGetCount() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + int initialCount = 50; + RedlockCountDownLatch latch = manager.createCountDownLatch("mixed-ops", initialCount); + CountDownLatch startSignal = new CountDownLatch(1); + AtomicInteger countDownsDone = new AtomicInteger(0); + AtomicBoolean invalidCountSeen = new AtomicBoolean(false); + + // Threads doing countDown + for (int i = 0; i < initialCount; i++) { + new Thread(() -> { + try { + startSignal.await(); + latch.countDown(); + countDownsDone.incrementAndGet(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }).start(); + } + + // Threads doing getCount (verifying count is always valid) + for (int i = 0; i < 10; i++) { + new Thread(() -> { + try { + startSignal.await(); + for (int j = 0; j < 20; j++) { + long count = latch.getCount(); + if (count < 0 || count > initialCount) { + invalidCountSeen.set(true); + } + Thread.sleep(10); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }).start(); + } + + startSignal.countDown(); + Thread.sleep(3000); // Let operations complete + + assertFalse(invalidCountSeen.get(), "Count should always be in valid range"); + assertEquals(initialCount, countDownsDone.get(), "All countdowns should complete"); + } + } +} diff --git a/src/test/java/org/codarama/redlock4j/integration/RedlockReadWriteLockIntegrationTest.java b/src/test/java/org/codarama/redlock4j/integration/RedlockReadWriteLockIntegrationTest.java new file mode 100644 index 0000000..ca7e4dc --- /dev/null +++ b/src/test/java/org/codarama/redlock4j/integration/RedlockReadWriteLockIntegrationTest.java @@ -0,0 +1,332 @@ +/* + * SPDX-License-Identifier: MIT + * Copyright (c) 2025 Codarama + */ +package org.codarama.redlock4j.integration; + +import org.codarama.redlock4j.RedlockManager; +import org.codarama.redlock4j.RedlockReadWriteLock; +import org.codarama.redlock4j.configuration.RedlockConfiguration; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Integration tests for {@link RedlockReadWriteLock}. + */ +@Testcontainers +public class RedlockReadWriteLockIntegrationTest { + + @Container + static GenericContainer redis1 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes"); + + @Container + static GenericContainer redis2 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes"); + + @Container + static GenericContainer redis3 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes"); + + private static RedlockConfiguration testConfiguration; + + @BeforeAll + static void setUp() { + testConfiguration = RedlockConfiguration.builder().addRedisNode("localhost", redis1.getMappedPort(6379)) + .addRedisNode("localhost", redis2.getMappedPort(6379)) + .addRedisNode("localhost", redis3.getMappedPort(6379)).defaultLockTimeout(Duration.ofSeconds(30)) + .lockAcquisitionTimeout(Duration.ofSeconds(10)).retryDelay(Duration.ofMillis(50)).maxRetryAttempts(100) + .build(); + } + + // ========== Basic Functionality ========== + + @Test + void shouldAcquireAndReleaseReadLock() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockReadWriteLock rwLock = manager.createReadWriteLock("basic-read"); + Lock readLock = rwLock.readLock(); + + assertTrue(readLock.tryLock(5, TimeUnit.SECONDS)); + readLock.unlock(); + } + } + + @Test + void shouldAcquireAndReleaseWriteLock() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockReadWriteLock rwLock = manager.createReadWriteLock("basic-write"); + Lock writeLock = rwLock.writeLock(); + + assertTrue(writeLock.tryLock(5, TimeUnit.SECONDS)); + writeLock.unlock(); + } + } + + // ========== Multiple Readers ========== + + @Test + void shouldAllowMultipleConcurrentReaders() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockReadWriteLock rwLock = manager.createReadWriteLock("multi-readers"); + int readerCount = 5; + AtomicInteger acquired = new AtomicInteger(0); + CountDownLatch allAcquired = new CountDownLatch(readerCount); + CountDownLatch releaseSignal = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(readerCount); + + for (int i = 0; i < readerCount; i++) { + new Thread(() -> { + try { + Lock readLock = rwLock.readLock(); + if (readLock.tryLock(10, TimeUnit.SECONDS)) { + try { + acquired.incrementAndGet(); + allAcquired.countDown(); + releaseSignal.await(); + } finally { + readLock.unlock(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }).start(); + } + + assertTrue(allAcquired.await(15, TimeUnit.SECONDS), "All readers should acquire"); + assertEquals(readerCount, acquired.get(), "All readers should hold the lock simultaneously"); + + releaseSignal.countDown(); + assertTrue(done.await(5, TimeUnit.SECONDS)); + } + } + + // ========== Writer Exclusivity ========== + + @Test + void writerShouldHaveExclusiveAccess() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockReadWriteLock rwLock = manager.createReadWriteLock("writer-exclusive"); + Lock writeLock = rwLock.writeLock(); + AtomicBoolean writerHoldsLock = new AtomicBoolean(false); + AtomicBoolean secondWriterBlocked = new AtomicBoolean(false); + CountDownLatch writerAcquired = new CountDownLatch(1); + CountDownLatch secondAttempted = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(1); + + // First writer acquires + assertTrue(writeLock.tryLock(5, TimeUnit.SECONDS)); + writerHoldsLock.set(true); + writerAcquired.countDown(); + + // Second writer tries to acquire (should fail quickly) + new Thread(() -> { + try { + writerAcquired.await(); + Lock secondWrite = rwLock.writeLock(); + boolean acquired = secondWrite.tryLock(500, TimeUnit.MILLISECONDS); + secondWriterBlocked.set(!acquired); + secondAttempted.countDown(); + if (acquired) { + secondWrite.unlock(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }).start(); + + assertTrue(secondAttempted.await(5, TimeUnit.SECONDS)); + assertTrue(secondWriterBlocked.get(), "Second writer should be blocked"); + + writeLock.unlock(); + assertTrue(done.await(5, TimeUnit.SECONDS)); + } + } + + // ========== Reader/Writer Blocking ========== + + @Test + void readerShouldBeBlockedByWriter() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockReadWriteLock rwLock = manager.createReadWriteLock("reader-blocked-by-writer"); + Lock writeLock = rwLock.writeLock(); + AtomicBoolean readerBlocked = new AtomicBoolean(false); + CountDownLatch writerAcquired = new CountDownLatch(1); + CountDownLatch readerAttempted = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(1); + + // Writer acquires first + assertTrue(writeLock.tryLock(5, TimeUnit.SECONDS)); + writerAcquired.countDown(); + + // Reader tries to acquire (should fail with short timeout) + new Thread(() -> { + try { + writerAcquired.await(); + Lock readLock = rwLock.readLock(); + boolean acquired = readLock.tryLock(500, TimeUnit.MILLISECONDS); + readerBlocked.set(!acquired); + readerAttempted.countDown(); + if (acquired) { + readLock.unlock(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }).start(); + + assertTrue(readerAttempted.await(5, TimeUnit.SECONDS)); + assertTrue(readerBlocked.get(), "Reader should be blocked by writer"); + + writeLock.unlock(); + assertTrue(done.await(5, TimeUnit.SECONDS)); + } + } + + @Test + void writerShouldWaitForReadersToFinish() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockReadWriteLock rwLock = manager.createReadWriteLock("writer-waits-readers"); + Lock readLock = rwLock.readLock(); + AtomicBoolean writerAcquiredAfterReaderRelease = new AtomicBoolean(false); + CountDownLatch readerAcquired = new CountDownLatch(1); + CountDownLatch writerStarted = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(1); + + // Reader acquires first + assertTrue(readLock.tryLock(5, TimeUnit.SECONDS)); + readerAcquired.countDown(); + + // Writer tries to acquire in background + new Thread(() -> { + try { + readerAcquired.await(); + writerStarted.countDown(); + Lock writeLock = rwLock.writeLock(); + boolean acquired = writeLock.tryLock(10, TimeUnit.SECONDS); + writerAcquiredAfterReaderRelease.set(acquired); + if (acquired) { + writeLock.unlock(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }).start(); + + assertTrue(writerStarted.await(5, TimeUnit.SECONDS)); + Thread.sleep(200); // Give writer time to attempt + + // Release read lock - writer should then succeed + readLock.unlock(); + + assertTrue(done.await(15, TimeUnit.SECONDS)); + assertTrue(writerAcquiredAfterReaderRelease.get(), "Writer should acquire after reader releases"); + } + } + + // ========== Concurrent Access ========== + + @Test + void shouldHandleConcurrentReadersAndWriters() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockReadWriteLock rwLock = manager.createReadWriteLock("concurrent-rw"); + int readerCount = 5; + int writerCount = 3; + int totalThreads = readerCount + writerCount; + AtomicInteger readAcquisitions = new AtomicInteger(0); + AtomicInteger writeAcquisitions = new AtomicInteger(0); + CountDownLatch startSignal = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(totalThreads); + + // Start readers + for (int i = 0; i < readerCount; i++) { + new Thread(() -> { + try { + startSignal.await(); + Lock readLock = rwLock.readLock(); + if (readLock.tryLock(10, TimeUnit.SECONDS)) { + try { + readAcquisitions.incrementAndGet(); + Thread.sleep(50); + } finally { + readLock.unlock(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }).start(); + } + + // Start writers + for (int i = 0; i < writerCount; i++) { + new Thread(() -> { + try { + startSignal.await(); + Lock writeLock = rwLock.writeLock(); + if (writeLock.tryLock(10, TimeUnit.SECONDS)) { + try { + writeAcquisitions.incrementAndGet(); + Thread.sleep(50); + } finally { + writeLock.unlock(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }).start(); + } + + startSignal.countDown(); + assertTrue(done.await(60, TimeUnit.SECONDS)); + assertTrue(readAcquisitions.get() > 0, "Should have successful read acquisitions"); + assertTrue(writeAcquisitions.get() > 0, "Should have successful write acquisitions"); + } + } + + // ========== Lettuce Driver ========== + + @Test + void shouldWorkWithLettuceDriver() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withLettuce(testConfiguration)) { + RedlockReadWriteLock rwLock = manager.createReadWriteLock("lettuce-rwlock"); + + Lock readLock = rwLock.readLock(); + assertTrue(readLock.tryLock(5, TimeUnit.SECONDS)); + readLock.unlock(); + + Lock writeLock = rwLock.writeLock(); + assertTrue(writeLock.tryLock(5, TimeUnit.SECONDS)); + writeLock.unlock(); + } + } +} diff --git a/src/test/java/org/codarama/redlock4j/integration/RedlockSemaphoreIntegrationTest.java b/src/test/java/org/codarama/redlock4j/integration/RedlockSemaphoreIntegrationTest.java new file mode 100644 index 0000000..3e37a4a --- /dev/null +++ b/src/test/java/org/codarama/redlock4j/integration/RedlockSemaphoreIntegrationTest.java @@ -0,0 +1,307 @@ +/* + * SPDX-License-Identifier: MIT + * Copyright (c) 2025 Codarama + */ +package org.codarama.redlock4j.integration; + +import org.codarama.redlock4j.RedlockManager; +import org.codarama.redlock4j.RedlockSemaphore; +import org.codarama.redlock4j.configuration.RedlockConfiguration; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Integration tests for {@link RedlockSemaphore}. + */ +@Testcontainers +public class RedlockSemaphoreIntegrationTest { + + @Container + static GenericContainer redis1 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes"); + + @Container + static GenericContainer redis2 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes"); + + @Container + static GenericContainer redis3 = new GenericContainer<>(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379).withCommand("redis-server", "--appendonly", "yes"); + + private static RedlockConfiguration testConfiguration; + + @BeforeAll + static void setUp() { + testConfiguration = RedlockConfiguration.builder().addRedisNode("localhost", redis1.getMappedPort(6379)) + .addRedisNode("localhost", redis2.getMappedPort(6379)) + .addRedisNode("localhost", redis3.getMappedPort(6379)).defaultLockTimeout(Duration.ofSeconds(30)) + .retryDelay(Duration.ofMillis(50)).maxRetryAttempts(10).lockAcquisitionTimeout(Duration.ofSeconds(10)) + .build(); + } + + // ========== Initialization ========== + + @Test + void shouldCreateSemaphoreWithPositivePermits() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockSemaphore semaphore = manager.createSemaphore("init-test", 5); + assertNotNull(semaphore); + } + } + + @Test + void shouldRejectZeroPermits() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + assertThrows(IllegalArgumentException.class, () -> manager.createSemaphore("zero-permits", 0)); + } + } + + @Test + void shouldRejectNegativePermits() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + assertThrows(IllegalArgumentException.class, () -> manager.createSemaphore("negative-permits", -1)); + } + } + + // ========== acquire() / release() ========== + + @Test + void shouldAcquireAndReleaseSinglePermit() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockSemaphore semaphore = manager.createSemaphore("single-permit", 3); + semaphore.acquire(); + semaphore.release(); + } + } + + @Test + void shouldAcquireMultiplePermits() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockSemaphore semaphore = manager.createSemaphore("multi-permit", 5); + semaphore.acquire(3); + semaphore.release(3); + } + } + + // ========== tryAcquire() ========== + + @Test + void tryAcquireShouldReturnTrueWhenPermitAvailable() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockSemaphore semaphore = manager.createSemaphore("try-acquire", 2); + assertTrue(semaphore.tryAcquire(1, TimeUnit.SECONDS)); + semaphore.release(); + } + } + + @Test + void sameThreadCannotAcquireMultipleTimes() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockSemaphore semaphore = manager.createSemaphore("same-thread", 5); + assertTrue(semaphore.tryAcquire(1, TimeUnit.SECONDS)); + // Same thread should fail to acquire again while holding + assertFalse(semaphore.tryAcquire(500, TimeUnit.MILLISECONDS)); + semaphore.release(); + // After release, can acquire again + assertTrue(semaphore.tryAcquire(1, TimeUnit.SECONDS)); + semaphore.release(); + } + } + + @Test + void tryAcquireWithZeroTimeoutShouldReturnImmediately() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockSemaphore semaphore = manager.createSemaphore("zero-timeout", 1); + assertTrue(semaphore.tryAcquire()); + semaphore.release(); + } + } + + // ========== Permit Validation ========== + + @Test + void shouldRejectAcquiringMorePermitsThanMax() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockSemaphore semaphore = manager.createSemaphore("exceed-max", 3); + assertThrows(IllegalArgumentException.class, () -> semaphore.tryAcquire(5, 1, TimeUnit.SECONDS)); + } + } + + @Test + void shouldRejectAcquiringZeroPermits() { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + RedlockSemaphore semaphore = manager.createSemaphore("zero-acquire", 3); + assertThrows(IllegalArgumentException.class, () -> semaphore.tryAcquire(0, 1, TimeUnit.SECONDS)); + } + } + + // ========== Concurrent Access ========== + + @Test + void shouldAllowMultipleThreadsToAcquirePermits() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + int threadCount = 5; + RedlockSemaphore semaphore = manager.createSemaphore("multi-thread-acq", 10); + + AtomicInteger successCount = new AtomicInteger(0); + CountDownLatch startSignal = new CountDownLatch(1); + CountDownLatch doneSignal = new CountDownLatch(threadCount); + + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + startSignal.await(); + if (semaphore.tryAcquire(5, TimeUnit.SECONDS)) { + try { + successCount.incrementAndGet(); + Thread.sleep(100); + } finally { + semaphore.release(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + doneSignal.countDown(); + } + }); + } + + startSignal.countDown(); + assertTrue(doneSignal.await(30, TimeUnit.SECONDS), "All threads should complete"); + assertEquals(threadCount, successCount.get(), "All threads should acquire"); + executor.shutdown(); + } + } + + @Test + void shouldHandleRapidAcquireReleaseCycles() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + int cycles = 10; + int threadCount = 3; + RedlockSemaphore semaphore = manager.createSemaphore("rapid-cycles", 5); + + AtomicInteger totalAcquisitions = new AtomicInteger(0); + CountDownLatch done = new CountDownLatch(threadCount); + + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + for (int j = 0; j < cycles; j++) { + if (semaphore.tryAcquire(2, TimeUnit.SECONDS)) { + totalAcquisitions.incrementAndGet(); + Thread.sleep(20); + semaphore.release(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }); + } + + assertTrue(done.await(60, TimeUnit.SECONDS), "All threads should complete"); + assertTrue(totalAcquisitions.get() > 0, "Should have successful acquisitions"); + executor.shutdown(); + } + } + + @Test + void shouldAllowConcurrentAcquisitionsFromDifferentThreads() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withJedis(testConfiguration)) { + int threadCount = 4; + RedlockSemaphore semaphore = manager.createSemaphore("concurrent-diff-threads", 10); + + AtomicInteger acquired = new AtomicInteger(0); + CountDownLatch allAcquired = new CountDownLatch(threadCount); + CountDownLatch releaseSignal = new CountDownLatch(1); + CountDownLatch allDone = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + new Thread(() -> { + try { + if (semaphore.tryAcquire(5, TimeUnit.SECONDS)) { + acquired.incrementAndGet(); + allAcquired.countDown(); + releaseSignal.await(); + semaphore.release(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + allDone.countDown(); + } + }).start(); + } + + assertTrue(allAcquired.await(10, TimeUnit.SECONDS), "All should acquire"); + assertEquals(threadCount, acquired.get(), "All threads should hold permits"); + + releaseSignal.countDown(); + assertTrue(allDone.await(5, TimeUnit.SECONDS)); + } + } + + // ========== Lettuce Driver ========== + + @Test + void shouldWorkWithLettuceDriver() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withLettuce(testConfiguration)) { + RedlockSemaphore semaphore = manager.createSemaphore("lettuce-sem", 3); + assertTrue(semaphore.tryAcquire(2, TimeUnit.SECONDS)); + semaphore.release(); + } + } + + @Test + void shouldHandleConcurrencyWithLettuce() throws InterruptedException { + try (RedlockManager manager = RedlockManager.withLettuce(testConfiguration)) { + int threadCount = 4; + RedlockSemaphore semaphore = manager.createSemaphore("lettuce-concurrent", 10); + + AtomicInteger successCount = new AtomicInteger(0); + CountDownLatch done = new CountDownLatch(threadCount); + + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + for (int i = 0; i < threadCount; i++) { + executor.submit(() -> { + try { + if (semaphore.tryAcquire(5, TimeUnit.SECONDS)) { + try { + successCount.incrementAndGet(); + Thread.sleep(100); + } finally { + semaphore.release(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }); + } + + assertTrue(done.await(30, TimeUnit.SECONDS)); + assertEquals(threadCount, successCount.get(), "All threads should acquire with Lettuce"); + executor.shutdown(); + } + } +} From 705115b461d432111216d2e7f37642b2a91325ce Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Fri, 27 Feb 2026 22:24:01 +0200 Subject: [PATCH 2/2] The hasQueuedThreads() might use quorum logic and not need all three stubs. Let me fix with lenient: --- .../org/codarama/redlock4j/RedlockCountDownLatchTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/test/java/org/codarama/redlock4j/RedlockCountDownLatchTest.java b/src/test/java/org/codarama/redlock4j/RedlockCountDownLatchTest.java index b75e372..4f73721 100644 --- a/src/test/java/org/codarama/redlock4j/RedlockCountDownLatchTest.java +++ b/src/test/java/org/codarama/redlock4j/RedlockCountDownLatchTest.java @@ -154,9 +154,10 @@ void shouldResetLatchCount() throws RedisDriverException { @Test void shouldReturnTrueWhenCountPositive() throws RedisDriverException { - when(mockDriver1.get(anyString())).thenReturn("3"); - when(mockDriver2.get(anyString())).thenReturn("3"); - when(mockDriver3.get(anyString())).thenReturn("3"); + // Use lenient since quorum logic may not call all drivers + lenient().when(mockDriver1.get(anyString())).thenReturn("3"); + lenient().when(mockDriver2.get(anyString())).thenReturn("3"); + lenient().when(mockDriver3.get(anyString())).thenReturn("3"); RedlockCountDownLatch latch = new RedlockCountDownLatch("test", 3, drivers, testConfig); assertTrue(latch.hasQueuedThreads());