Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public abstract class BatchMutationCall<ReqT, RespT> implements IBatchCall<ReqT, RespT, MutationCallBatcherKey> {
protected final MutationCallBatcherKey batcherKey;
private final IMutationPipeline storePipeline;
private Deque<MutationCallTaskBatch<ReqT, RespT>> batchCallTasks = new ArrayDeque<>();
private Deque<ICallTask<ReqT, RespT, MutationCallBatcherKey>> pendingCallTasks = new ArrayDeque<>();

protected BatchMutationCall(IMutationPipeline storePipeline, MutationCallBatcherKey batcherKey) {
this.batcherKey = batcherKey;
Expand All @@ -51,20 +51,7 @@ protected BatchMutationCall(IMutationPipeline storePipeline, MutationCallBatcher

@Override
public final void add(ICallTask<ReqT, RespT, MutationCallBatcherKey> callTask) {
MutationCallTaskBatch<ReqT, RespT> lastBatchCallTask;
MutationCallBatcherKey batcherKey = callTask.batcherKey();
assert callTask.batcherKey().id.equals(batcherKey.id);
if ((lastBatchCallTask = batchCallTasks.peekLast()) != null) {
if (!lastBatchCallTask.isBatchable(callTask)) {
lastBatchCallTask = newBatch(batcherKey.ver);
batchCallTasks.add(lastBatchCallTask);
}
lastBatchCallTask.add(callTask);
} else {
lastBatchCallTask = newBatch(batcherKey.ver);
lastBatchCallTask.add(callTask);
batchCallTasks.add(lastBatchCallTask);
}
pendingCallTasks.add(callTask);
}

protected MutationCallTaskBatch<ReqT, RespT> newBatch(long ver) {
Expand All @@ -81,25 +68,56 @@ protected abstract void handleOutput(Queue<ICallTask<ReqT, RespT, MutationCallBa
@Override
public void reset(boolean abort) {
if (abort) {
batchCallTasks = new ArrayDeque<>();
pendingCallTasks = new ArrayDeque<>();
}
}

@Override
public CompletableFuture<Void> execute() {
return execute(batchCallTasks);
return executeBatches();
}

private CompletableFuture<Void> execute(Deque<MutationCallTaskBatch<ReqT, RespT>> batchCallTasks) {
private CompletableFuture<Void> executeBatches() {
CompletableFuture<Void> chained = CompletableFuture.completedFuture(null);
MutationCallTaskBatch<ReqT, RespT> batchCallTask;
while ((batchCallTask = batchCallTasks.poll()) != null) {
while ((batchCallTask = buildNextBatch()) != null) {
MutationCallTaskBatch<ReqT, RespT> current = batchCallTask;
chained = chained.thenCompose(v -> fireSingleBatch(current));
}
return chained;
}

private MutationCallTaskBatch<ReqT, RespT> buildNextBatch() {
if (pendingCallTasks.isEmpty()) {
return null;
}
MutationCallTaskBatch<ReqT, RespT> batchCallTask = null;
long batchVer = -1;
int size = pendingCallTasks.size();
for (int i = 0; i < size; i++) {
ICallTask<ReqT, RespT, MutationCallBatcherKey> task = pendingCallTasks.pollFirst();
if (task == null) {
break;
}
if (batchCallTask == null) {
batchVer = task.batcherKey().ver;
batchCallTask = newBatch(batchVer);
batchCallTask.add(task);
continue;
}
if (task.batcherKey().ver != batchVer) {
pendingCallTasks.addLast(task);
continue;
}
if (batchCallTask.isBatchable(task)) {
batchCallTask.add(task);
} else {
pendingCallTasks.addLast(task);
}
}
return batchCallTask;
}

private CompletableFuture<Void> fireSingleBatch(MutationCallTaskBatch<ReqT, RespT> batchCallTask) {
RWCoProcInput input = makeBatch(batchCallTask.batchedTasks);
long reqId = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.protobuf.ByteString;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -77,9 +78,11 @@ public void teardown() {

@Test
public void addToSameBatch() {
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{
put(FULL_BOUNDARY, setting(id, "V1", 0));
}});
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {
{
put(FULL_BOUNDARY, setting(id, "V1", 0));
}
});

when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1);
when(mutationPipeline1.execute(any()))
Expand All @@ -103,7 +106,8 @@ public void addToSameBatch() {
String[] keys = request.getRwCoProc().getRaw().toStringUtf8().split("_");
assertEquals(keys.length, Sets.newSet(keys).size());
}
// the resp order preserved
Collections.sort(reqList);
Collections.sort(respList);
assertEquals(reqList, respList);
}

Expand All @@ -124,19 +128,24 @@ public void addToDifferentBatch() {
int req = ThreadLocalRandom.current().nextInt(1, 1001);
reqList.add(req);
if (req < 500) {
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{
put(FULL_BOUNDARY, setting(id, "V1", 0));
}});
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {
{
put(FULL_BOUNDARY, setting(id, "V1", 0));
}
});
} else {
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {{
put(FULL_BOUNDARY, setting(id, "V2", 0));
}});
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {
{
put(FULL_BOUNDARY, setting(id, "V2", 0));
}
});
}
futures.add(scheduler.schedule(ByteString.copyFromUtf8(Integer.toString(req)))
.thenAccept((v) -> respList.add(Integer.parseInt(v.toStringUtf8()))));
}
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
// the resp order preserved
Collections.sort(reqList);
Collections.sort(respList);
assertEquals(reqList, respList);
}

Expand Down Expand Up @@ -166,4 +175,82 @@ public void executeManySmallBatchesNoRecursion() {
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
assertEquals(execCount.get(), n);
}

@Test
public void reScanWhenHitNonBatchable() {
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {
{
put(FULL_BOUNDARY, setting(id, "V1", 0));
}
});
when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1);
when(mutationPipeline1.execute(any()))
.thenReturn(CompletableFuture.supplyAsync(() -> KVRangeRWReply.newBuilder().build()));

MutationCallScheduler<ByteString, ByteString, TestBatchMutationCall> scheduler =
new MutationCallScheduler<>(NonBatchableBatchCall::new, Duration.ofMillis(1000).toNanos(), storeClient) {
@Override
protected ByteString rangeKey(ByteString call) {
return call;
}
};
List<CompletableFuture<Void>> futures = new ArrayList<>();
List<ByteString> reqs = List.of(
ByteString.copyFromUtf8("k1"),
ByteString.copyFromUtf8("k_dup"), // will mark non-batchable in first batch
ByteString.copyFromUtf8("k2"));
List<ByteString> resps = new CopyOnWriteArrayList<>();
reqs.forEach(req -> futures.add(scheduler.schedule(req).thenAccept(resps::add)));
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
List<String> reqSorted = reqs.stream().map(ByteString::toStringUtf8).sorted().toList();
List<String> respSorted = resps.stream().map(ByteString::toStringUtf8).sorted().toList();
assertEquals(reqSorted, respSorted);
}

@Test
public void mixDifferentVersions() {
when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1);
when(storeClient.createMutationPipeline("V2")).thenReturn(mutationPipeline2);
when(mutationPipeline1.execute(any()))
.thenReturn(CompletableFuture.supplyAsync(() -> KVRangeRWReply.newBuilder().build()));
when(mutationPipeline2.execute(any()))
.thenReturn(CompletableFuture.supplyAsync(() -> KVRangeRWReply.newBuilder().build()));
TestMutationCallScheduler scheduler = new TestMutationCallScheduler(storeClient, Duration.ofMillis(1000));
List<CompletableFuture<Void>> futures = new ArrayList<>();
List<ByteString> reqs = new ArrayList<>();
List<ByteString> resps = new CopyOnWriteArrayList<>();
for (int i = 0; i < 20; i++) {
ByteString req = ByteString.copyFromUtf8("k" + i);
reqs.add(req);
if (i % 2 == 0) {
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {
{
put(FULL_BOUNDARY, setting(id, "V1", 0));
}
});
} else {
when(storeClient.latestEffectiveRouter()).thenReturn(new TreeMap<>(BoundaryUtil::compare) {
{
put(FULL_BOUNDARY, setting(id, "V2", 1));
}
});
}
futures.add(scheduler.schedule(req).thenAccept(resps::add));
}
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
List<String> reqSorted = reqs.stream().map(ByteString::toStringUtf8).sorted().toList();
List<String> respSorted = resps.stream().map(ByteString::toStringUtf8).sorted().toList();
assertEquals(reqSorted, respSorted);
}

private static class NonBatchableBatchCall extends TestBatchMutationCall {
protected NonBatchableBatchCall(IMutationPipeline pipeline, MutationCallBatcherKey batcherKey) {
super(pipeline, batcherKey);
}

@Override
protected NonBatchableFirstBatch newBatch(long ver) {
return new NonBatchableFirstBatch(ver);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

package org.apache.bifromq.basekv.client.scheduler;

import org.apache.bifromq.basekv.client.IMutationPipeline;
import org.apache.bifromq.basekv.store.proto.RWCoProcInput;
import org.apache.bifromq.basekv.store.proto.RWCoProcOutput;
import org.apache.bifromq.basescheduler.ICallTask;
import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import org.apache.bifromq.basekv.client.IMutationPipeline;
import org.apache.bifromq.basekv.store.proto.RWCoProcInput;
import org.apache.bifromq.basekv.store.proto.RWCoProcOutput;
import org.apache.bifromq.basescheduler.ICallTask;

public class TestBatchMutationCall extends BatchMutationCall<ByteString, ByteString> {
protected TestBatchMutationCall(IMutationPipeline pipeline, MutationCallBatcherKey batcherKey) {
Expand Down Expand Up @@ -86,4 +86,28 @@ protected boolean isBatchable(ICallTask<ByteString, ByteString, MutationCallBatc
return !keys.contains(callTask.call());
}
}

static class NonBatchableFirstBatch extends MutationCallTaskBatch<ByteString, ByteString> {
private final Set<ByteString> keys = new HashSet<>();
private boolean sawNonBatchable;

protected NonBatchableFirstBatch(long ver) {
super(ver);
}

@Override
protected void add(ICallTask<ByteString, ByteString, MutationCallBatcherKey> callTask) {
super.add(callTask);
keys.add(callTask.call());
}

@Override
protected boolean isBatchable(ICallTask<ByteString, ByteString, MutationCallBatcherKey> callTask) {
if (!sawNonBatchable && callTask.call().toStringUtf8().contains("dup")) {
sawNonBatchable = true;
return false;
}
return !keys.contains(callTask.call());
}
}
}
Loading