diff --git a/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCall.java b/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCall.java index e5c6dee94..6c9a43ee2 100644 --- a/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCall.java +++ b/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCall.java @@ -42,7 +42,7 @@ public abstract class BatchMutationCall implements IBatchCall { protected final MutationCallBatcherKey batcherKey; private final IMutationPipeline storePipeline; - private Deque> batchCallTasks = new ArrayDeque<>(); + private Deque> pendingCallTasks = new ArrayDeque<>(); protected BatchMutationCall(IMutationPipeline storePipeline, MutationCallBatcherKey batcherKey) { this.batcherKey = batcherKey; @@ -51,20 +51,7 @@ protected BatchMutationCall(IMutationPipeline storePipeline, MutationCallBatcher @Override public final void add(ICallTask callTask) { - MutationCallTaskBatch lastBatchCallTask; - MutationCallBatcherKey batcherKey = callTask.batcherKey(); - assert callTask.batcherKey().id.equals(batcherKey.id); - if ((lastBatchCallTask = batchCallTasks.peekLast()) != null) { - if (!lastBatchCallTask.isBatchable(callTask)) { - lastBatchCallTask = newBatch(batcherKey.ver); - batchCallTasks.add(lastBatchCallTask); - } - lastBatchCallTask.add(callTask); - } else { - lastBatchCallTask = newBatch(batcherKey.ver); - lastBatchCallTask.add(callTask); - batchCallTasks.add(lastBatchCallTask); - } + pendingCallTasks.add(callTask); } protected MutationCallTaskBatch newBatch(long ver) { @@ -81,25 +68,56 @@ protected abstract void handleOutput(Queue(); + pendingCallTasks = new ArrayDeque<>(); } } @Override public CompletableFuture execute() { - return execute(batchCallTasks); + return executeBatches(); } - private CompletableFuture execute(Deque> batchCallTasks) { + private CompletableFuture executeBatches() { CompletableFuture chained = CompletableFuture.completedFuture(null); MutationCallTaskBatch batchCallTask; - while ((batchCallTask = batchCallTasks.poll()) != null) { + while ((batchCallTask = buildNextBatch()) != null) { MutationCallTaskBatch current = batchCallTask; chained = chained.thenCompose(v -> fireSingleBatch(current)); } return chained; } + private MutationCallTaskBatch buildNextBatch() { + if (pendingCallTasks.isEmpty()) { + return null; + } + MutationCallTaskBatch batchCallTask = null; + long batchVer = -1; + int size = pendingCallTasks.size(); + for (int i = 0; i < size; i++) { + ICallTask task = pendingCallTasks.pollFirst(); + if (task == null) { + break; + } + if (batchCallTask == null) { + batchVer = task.batcherKey().ver; + batchCallTask = newBatch(batchVer); + batchCallTask.add(task); + continue; + } + if (task.batcherKey().ver != batchVer) { + pendingCallTasks.addLast(task); + continue; + } + if (batchCallTask.isBatchable(task)) { + batchCallTask.add(task); + } else { + pendingCallTasks.addLast(task); + } + } + return batchCallTask; + } + private CompletableFuture fireSingleBatch(MutationCallTaskBatch batchCallTask) { RWCoProcInput input = makeBatch(batchCallTask.batchedTasks); long reqId = System.nanoTime(); diff --git a/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCallTest.java b/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCallTest.java index 72b1c3457..f13fb9114 100644 --- a/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCallTest.java +++ b/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCallTest.java @@ -30,6 +30,7 @@ import com.google.protobuf.ByteString; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; @@ -77,9 +78,11 @@ public void teardown() { @Test public void addToSameBatch() { - when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ - put(FULL_BOUNDARY, setting(id, "V1", 0)); - }}); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) { + { + put(FULL_BOUNDARY, setting(id, "V1", 0)); + } + }); when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1); when(mutationPipeline1.execute(any())) @@ -103,7 +106,8 @@ public void addToSameBatch() { String[] keys = request.getRwCoProc().getRaw().toStringUtf8().split("_"); assertEquals(keys.length, Sets.newSet(keys).size()); } - // the resp order preserved + Collections.sort(reqList); + Collections.sort(respList); assertEquals(reqList, respList); } @@ -124,19 +128,24 @@ public void addToDifferentBatch() { int req = ThreadLocalRandom.current().nextInt(1, 1001); reqList.add(req); if (req < 500) { - when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ - put(FULL_BOUNDARY, setting(id, "V1", 0)); - }}); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) { + { + put(FULL_BOUNDARY, setting(id, "V1", 0)); + } + }); } else { - when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{ - put(FULL_BOUNDARY, setting(id, "V2", 0)); - }}); + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) { + { + put(FULL_BOUNDARY, setting(id, "V2", 0)); + } + }); } futures.add(scheduler.schedule(ByteString.copyFromUtf8(Integer.toString(req))) .thenAccept((v) -> respList.add(Integer.parseInt(v.toStringUtf8())))); } CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join(); - // the resp order preserved + Collections.sort(reqList); + Collections.sort(respList); assertEquals(reqList, respList); } @@ -166,4 +175,82 @@ public void executeManySmallBatchesNoRecursion() { CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join(); assertEquals(execCount.get(), n); } + + @Test + public void reScanWhenHitNonBatchable() { + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) { + { + put(FULL_BOUNDARY, setting(id, "V1", 0)); + } + }); + when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1); + when(mutationPipeline1.execute(any())) + .thenReturn(CompletableFuture.supplyAsync(() -> KVRangeRWReply.newBuilder().build())); + + MutationCallScheduler scheduler = + new MutationCallScheduler<>(NonBatchableBatchCall::new, Duration.ofMillis(1000).toNanos(), storeClient) { + @Override + protected ByteString rangeKey(ByteString call) { + return call; + } + }; + List> futures = new ArrayList<>(); + List reqs = List.of( + ByteString.copyFromUtf8("k1"), + ByteString.copyFromUtf8("k_dup"), // will mark non-batchable in first batch + ByteString.copyFromUtf8("k2")); + List resps = new CopyOnWriteArrayList<>(); + reqs.forEach(req -> futures.add(scheduler.schedule(req).thenAccept(resps::add))); + CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join(); + List reqSorted = reqs.stream().map(ByteString::toStringUtf8).sorted().toList(); + List respSorted = resps.stream().map(ByteString::toStringUtf8).sorted().toList(); + assertEquals(reqSorted, respSorted); + } + + @Test + public void mixDifferentVersions() { + when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1); + when(storeClient.createMutationPipeline("V2")).thenReturn(mutationPipeline2); + when(mutationPipeline1.execute(any())) + .thenReturn(CompletableFuture.supplyAsync(() -> KVRangeRWReply.newBuilder().build())); + when(mutationPipeline2.execute(any())) + .thenReturn(CompletableFuture.supplyAsync(() -> KVRangeRWReply.newBuilder().build())); + TestMutationCallScheduler scheduler = new TestMutationCallScheduler(storeClient, Duration.ofMillis(1000)); + List> futures = new ArrayList<>(); + List reqs = new ArrayList<>(); + List resps = new CopyOnWriteArrayList<>(); + for (int i = 0; i < 20; i++) { + ByteString req = ByteString.copyFromUtf8("k" + i); + reqs.add(req); + if (i % 2 == 0) { + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) { + { + put(FULL_BOUNDARY, setting(id, "V1", 0)); + } + }); + } else { + when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) { + { + put(FULL_BOUNDARY, setting(id, "V2", 1)); + } + }); + } + futures.add(scheduler.schedule(req).thenAccept(resps::add)); + } + CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join(); + List reqSorted = reqs.stream().map(ByteString::toStringUtf8).sorted().toList(); + List respSorted = resps.stream().map(ByteString::toStringUtf8).sorted().toList(); + assertEquals(reqSorted, respSorted); + } + + private static class NonBatchableBatchCall extends TestBatchMutationCall { + protected NonBatchableBatchCall(IMutationPipeline pipeline, MutationCallBatcherKey batcherKey) { + super(pipeline, batcherKey); + } + + @Override + protected NonBatchableFirstBatch newBatch(long ver) { + return new NonBatchableFirstBatch(ver); + } + } } diff --git a/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/TestBatchMutationCall.java b/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/TestBatchMutationCall.java index 276634b09..cbb06fff6 100644 --- a/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/TestBatchMutationCall.java +++ b/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/TestBatchMutationCall.java @@ -19,16 +19,16 @@ package org.apache.bifromq.basekv.client.scheduler; -import org.apache.bifromq.basekv.client.IMutationPipeline; -import org.apache.bifromq.basekv.store.proto.RWCoProcInput; -import org.apache.bifromq.basekv.store.proto.RWCoProcOutput; -import org.apache.bifromq.basescheduler.ICallTask; import com.google.common.collect.Iterables; import com.google.protobuf.ByteString; import java.util.HashSet; import java.util.Iterator; import java.util.Queue; import java.util.Set; +import org.apache.bifromq.basekv.client.IMutationPipeline; +import org.apache.bifromq.basekv.store.proto.RWCoProcInput; +import org.apache.bifromq.basekv.store.proto.RWCoProcOutput; +import org.apache.bifromq.basescheduler.ICallTask; public class TestBatchMutationCall extends BatchMutationCall { protected TestBatchMutationCall(IMutationPipeline pipeline, MutationCallBatcherKey batcherKey) { @@ -86,4 +86,28 @@ protected boolean isBatchable(ICallTask { + private final Set keys = new HashSet<>(); + private boolean sawNonBatchable; + + protected NonBatchableFirstBatch(long ver) { + super(ver); + } + + @Override + protected void add(ICallTask callTask) { + super.add(callTask); + keys.add(callTask.call()); + } + + @Override + protected boolean isBatchable(ICallTask callTask) { + if (!sawNonBatchable && callTask.call().toStringUtf8().contains("dup")) { + sawNonBatchable = true; + return false; + } + return !keys.contains(callTask.call()); + } + } }