From 53bf4a250341962b084474a013f97250cbd6df9a Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 16 Aug 2024 01:35:04 +0800 Subject: [PATCH 1/8] Support BK batch read. --- .../mledger/ManagedLedgerConfig.java | 9 +++ .../impl/cache/EntryCacheDisabled.java | 46 +++++++------- .../impl/cache/RangeEntryCacheImpl.java | 60 ++++++++++--------- .../mledger/impl/OffloadPrefixReadTest.java | 6 ++ .../pulsar/broker/service/BrokerService.java | 1 + .../client/PulsarMockLedgerHandle.java | 6 ++ .../client/PulsarMockReadHandle.java | 6 ++ .../impl/FileStoreBackedReadHandleImpl.java | 6 ++ .../impl/BlobStoreBackedReadHandleImpl.java | 6 ++ .../impl/BlobStoreBackedReadHandleImplV2.java | 6 ++ .../BlockAwareSegmentInputStreamTest.java | 6 ++ 11 files changed, 109 insertions(+), 49 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 03439f93ccad8..03d059cb597a7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -86,6 +86,7 @@ public class ManagedLedgerConfig { private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; private boolean triggerOffloadOnTopicLoad = false; + private boolean enableBookkeeperBatchRead = false; @Getter @Setter @@ -769,5 +770,13 @@ public String getShadowSource() { return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY); } + public void setEnableBookkeeperBatchRead(boolean enableBookkeeperBatchRead) { + this.enableBookkeeperBatchRead = enableBookkeeperBatchRead; + } + + public boolean isEnableBookkeeperBatchRead() { + return enableBookkeeperBatchRead; + } + public static final String PROPERTY_SOURCE_TOPIC_KEY = "PULSAR.SHADOW_SOURCE"; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java index 4f8f70bc81bab..52a8f432a8e7c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -39,10 +41,12 @@ public class EntryCacheDisabled implements EntryCache { private final ManagedLedgerImpl ml; private final ManagedLedgerInterceptor interceptor; + private final boolean enableBookkeeperBatchRead; public EntryCacheDisabled(ManagedLedgerImpl ml) { this.ml = ml; this.interceptor = ml.getManagedLedgerInterceptor(); + this.enableBookkeeperBatchRead = ml.getConfig().isEnableBookkeeperBatchRead(); } @Override @@ -79,26 +83,28 @@ public void invalidateEntriesBeforeTimestamp(long timestamp) { @Override public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { - lh.readAsync(firstEntry, lastEntry).thenAcceptAsync( - ledgerEntries -> { - List entries = new ArrayList<>(); - long totalSize = 0; - try { - for (LedgerEntry e : ledgerEntries) { - // Insert the entries at the end of the list (they will be unsorted for now) - EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor); - entries.add(entry); - totalSize += entry.getLength(); - } - } finally { - ledgerEntries.close(); - } - ml.getMbean().recordReadEntriesOpsCacheMisses(entries.size(), totalSize); - ml.getFactory().getMbean().recordCacheMiss(entries.size(), totalSize); - ml.getMbean().addReadEntriesSample(entries.size(), totalSize); - - callback.readEntriesComplete(entries, ctx); - }, ml.getExecutor()).exceptionally(exception -> { + final int entriesToRead = (int) (lastEntry - firstEntry + 1); + CompletableFuture f = enableBookkeeperBatchRead ? + lh.batchReadAsync(firstEntry, entriesToRead, 0) : lh.readAsync(firstEntry, lastEntry); + f.thenAcceptAsync(ledgerEntries -> { + List entries = new ArrayList<>(); + long totalSize = 0; + try { + for (LedgerEntry e : ledgerEntries) { + // Insert the entries at the end of the list (they will be unsorted for now) + EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor); + entries.add(entry); + totalSize += entry.getLength(); + } + } finally { + ledgerEntries.close(); + } + ml.getMbean().recordReadEntriesOpsCacheMisses(entries.size(), totalSize); + ml.getFactory().getMbean().recordCacheMiss(entries.size(), totalSize); + ml.getMbean().addReadEntriesSample(entries.size(), totalSize); + + callback.readEntriesComplete(entries, ctx); + }, ml.getExecutor()).exceptionally(exception -> { callback.readEntriesFailed(createManagedLedgerException(exception), ctx); return null; }); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index 254a517786a55..4b864368604d0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -31,6 +31,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.client.api.BKException; +import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -65,6 +66,7 @@ public class RangeEntryCacheImpl implements EntryCache { private final RangeCache entries; private final boolean copyEntries; private final PendingReadsManager pendingReadsManager; + private final boolean enableBookkeeperBatchRead; private volatile long estimatedEntrySize = 10 * 1024; @@ -80,6 +82,7 @@ public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl this.readEntryTimeoutMillis = getManagedLedgerConfig().getReadEntryTimeoutSeconds(); this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp); this.copyEntries = copyEntries; + this.enableBookkeeperBatchRead = ml.getConfig().isEnableBookkeeperBatchRead(); if (log.isDebugEnabled()) { log.debug("[{}] Initialized managed-ledger entry cache", ml.getName()); @@ -429,37 +432,36 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { CompletableFuture> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry) { final int entriesToRead = (int) (lastEntry - firstEntry) + 1; - CompletableFuture> readResult = lh.readAsync(firstEntry, lastEntry) - .thenApply( - ledgerEntries -> { - requireNonNull(ml.getName()); - requireNonNull(ml.getExecutor()); - - try { - // We got the entries, we need to transform them to a List<> type - long totalSize = 0; - final List entriesToReturn = - Lists.newArrayListWithExpectedSize(entriesToRead); - for (LedgerEntry e : ledgerEntries) { - EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor); - entriesToReturn.add(entry); - totalSize += entry.getLength(); - if (shouldCacheEntry) { - EntryImpl cacheEntry = EntryImpl.create(entry); - insert(cacheEntry); - cacheEntry.release(); - } - } + CompletableFuture f = enableBookkeeperBatchRead ? + lh.batchReadAsync(firstEntry, entriesToRead, 0) : lh.readAsync(firstEntry, lastEntry); + CompletableFuture> readResult = f.thenApply(ledgerEntries -> { + requireNonNull(ml.getName()); + requireNonNull(ml.getExecutor()); + try { + // We got the entries, we need to transform them to a List<> type + long totalSize = 0; + final List entriesToReturn = + Lists.newArrayListWithExpectedSize(entriesToRead); + for (LedgerEntry e : ledgerEntries) { + EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor); + entriesToReturn.add(entry); + totalSize += entry.getLength(); + if (shouldCacheEntry) { + EntryImpl cacheEntry = EntryImpl.create(entry); + insert(cacheEntry); + cacheEntry.release(); + } + } - ml.getMbean().recordReadEntriesOpsCacheMisses(entriesToReturn.size(), totalSize); - manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize); - ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize); + ml.getMbean().recordReadEntriesOpsCacheMisses(entriesToReturn.size(), totalSize); + manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize); + ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize); - return entriesToReturn; - } finally { - ledgerEntries.close(); - } - }); + return entriesToReturn; + } finally { + ledgerEntries.close(); + } + }); // handle LH invalidation readResult.exceptionally(exception -> { if (exception instanceof BKException diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java index 29138145d1505..d29aee805293d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java @@ -312,6 +312,12 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return CompletableFuture.completedFuture(LedgerEntriesImpl.create(readEntries)); } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); + return readAsync(startEntry, lastEntry); + } + @Override public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { return unsupported(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 5dec15fc19b89..10c0748ce40fc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1987,6 +1987,7 @@ private CompletableFuture getManagedLedgerConfig(@Nonnull T serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching()); managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching( serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching()); + managedLedgerConfig.setEnableBookkeeperBatchRead(serviceConfig.isBookkeeperUseV2WireProtocol()); OffloadPoliciesImpl nsLevelOffloadPolicies = (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null); diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index aa61e541d0d6b..529f8e8a5e92a 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -240,6 +240,12 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return readHandle.readAsync(firstEntry, lastEntry); } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); + return readAsync(startEntry, lastEntry); + } + @Override public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { return readHandle.readUnconfirmedAsync(firstEntry, lastEntry); diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java index a4361f62254e4..ee07f818a77da 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java @@ -64,6 +64,12 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr }); } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); + return readAsync(startEntry, lastEntry); + } + @Override public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { return readAsync(firstEntry, lastEntry); diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java index 91e7e902eab8a..33a6554426656 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java @@ -175,6 +175,12 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return promise; } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); + return readAsync(startEntry, lastEntry); + } + @Override public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { return readAsync(firstEntry, lastEntry); diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index e050d74a332bc..1a38c8b245d3d 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -207,6 +207,12 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return promise; } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); + return readAsync(startEntry, lastEntry); + } + private void seekToEntry(long nextExpectedId) throws IOException { Long knownOffset = entryOffsetsCache.getIfPresent(ledgerId, nextExpectedId); if (knownOffset != null) { diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java index 502f475174cee..707254b5b0dc4 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java @@ -239,6 +239,12 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return promise; } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); + return readAsync(startEntry, lastEntry); + } + private List getGroupedReader(long firstEntry, long lastEntry) throws Exception { List groupedReaders = new LinkedList<>(); for (int i = indices.size() - 1; i >= 0 && firstEntry <= lastEntry; i--) { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index 5ca4d6da20bee..4592ebb52b6f5 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -158,6 +158,12 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return future; } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); + return readAsync(startEntry, lastEntry); + } + @Override public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { return readAsync(firstEntry, lastEntry); From 9c093430140ae083ba92284efd892b0693547b6c Mon Sep 17 00:00:00 2001 From: dao-jun Date: Fri, 16 Aug 2024 01:48:46 +0800 Subject: [PATCH 2/8] Resolve conflicts --- .../impl/cache/EntryCacheDisabled.java | 19 +++++++++++-------- .../impl/cache/RangeEntryCacheImpl.java | 16 ++++++++++------ .../mledger/impl/cache/ReadEntryUtils.java | 8 ++++++-- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java index 92541a7a72578..3e108ec4fdb14 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java @@ -39,10 +39,12 @@ public class EntryCacheDisabled implements EntryCache { private final ManagedLedgerImpl ml; private final ManagedLedgerInterceptor interceptor; + private final boolean enableBookkeeperBatchRead; public EntryCacheDisabled(ManagedLedgerImpl ml) { this.ml = ml; this.interceptor = ml.getManagedLedgerInterceptor(); + this.enableBookkeeperBatchRead = ml.getConfig().isEnableBookkeeperBatchRead(); } @Override @@ -79,8 +81,8 @@ public void invalidateEntriesBeforeTimestamp(long timestamp) { @Override public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { - ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry).thenAcceptAsync( - ledgerEntries -> { + ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry, enableBookkeeperBatchRead) + .thenAcceptAsync(ledgerEntries -> { List entries = new ArrayList<>(); long totalSize = 0; try { @@ -98,17 +100,18 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole ml.getMbean().addReadEntriesSample(entries.size(), totalSize); callback.readEntriesComplete(entries, ctx); - }, ml.getExecutor()).exceptionally(exception -> { - callback.readEntriesFailed(createManagedLedgerException(exception), ctx); - return null; - }); + }, ml.getExecutor()) + .exceptionally(exception -> { + callback.readEntriesFailed(createManagedLedgerException(exception), ctx); + return null; + }); } @Override public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) { - ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).whenCompleteAsync( - (ledgerEntries, exception) -> { + ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(), enableBookkeeperBatchRead) + .whenCompleteAsync((ledgerEntries, exception) -> { if (exception != null) { ml.invalidateLedgerHandle(lh); callback.readEntryFailed(createManagedLedgerException(exception), ctx); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index cb006a5f0cea9..42ea4ea99d5e2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -65,6 +65,7 @@ public class RangeEntryCacheImpl implements EntryCache { private final RangeCache entries; private final boolean copyEntries; private final PendingReadsManager pendingReadsManager; + private final boolean enableBookkeeperBatchRead; private volatile long estimatedEntrySize = 10 * 1024; @@ -80,6 +81,7 @@ public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl this.readEntryTimeoutMillis = getManagedLedgerConfig().getReadEntryTimeoutSeconds(); this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp); this.copyEntries = copyEntries; + this.enableBookkeeperBatchRead = ml.getConfig().isEnableBookkeeperBatchRead(); if (log.isDebugEnabled()) { log.debug("[{}] Initialized managed-ledger entry cache", ml.getName()); @@ -249,8 +251,8 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength()); callback.readEntryComplete(cachedEntry, ctx); } else { - ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).thenAcceptAsync( - ledgerEntries -> { + ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(), enableBookkeeperBatchRead) + .thenAcceptAsync(ledgerEntries -> { try { Iterator iterator = ledgerEntries.iterator(); if (iterator.hasNext()) { @@ -264,17 +266,18 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa } else { // got an empty sequence callback.readEntryFailed(new ManagedLedgerException("Could not read given position"), - ctx); + ctx); } } finally { ledgerEntries.close(); } - }, ml.getExecutor()).exceptionally(exception -> { + }, ml.getExecutor()) + .exceptionally(exception -> { ml.invalidateLedgerHandle(lh); pendingReadsManager.invalidateLedger(lh.getId()); callback.readEntryFailed(createManagedLedgerException(exception), ctx); return null; - }); + }); } } @@ -429,7 +432,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { CompletableFuture> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry) { final int entriesToRead = (int) (lastEntry - firstEntry) + 1; - CompletableFuture> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry) + CompletableFuture> readResult = ReadEntryUtils + .readAsync(ml, lh, firstEntry, lastEntry, enableBookkeeperBatchRead) .thenApply( ledgerEntries -> { requireNonNull(ml.getName()); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java index 5cf5f053f0ce7..acf10f461e4fd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java @@ -27,12 +27,16 @@ class ReadEntryUtils { static CompletableFuture readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry, - long lastEntry) { + long lastEntry, boolean enableBatchRead) { if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) { // The read handle comes from another managed ledger, in this case, we can only compare the entry range with // the LAC of that read handle. Specifically, it happens when this method is called by a // ReadOnlyManagedLedgerImpl object. - return handle.readAsync(firstEntry, lastEntry); + int entriesToRead = (int) (lastEntry - firstEntry + 1); + if (entriesToRead <= 1 || !enableBatchRead) { + return handle.readAsync(firstEntry, lastEntry); + } + return handle.batchReadAsync(firstEntry, entriesToRead, 0); } // Compare the entry range with the lastConfirmedEntry maintained by the managed ledger because the entry cache // of `ShadowManagedLedgerImpl` reads entries via `ReadOnlyLedgerHandle`, which never updates `lastAddConfirmed` From c58109ae93f26be913335b8741cc21f86f793fea Mon Sep 17 00:00:00 2001 From: dao-jun Date: Sun, 18 Aug 2024 01:46:06 +0800 Subject: [PATCH 3/8] code improve --- .../mledger/ManagedLedgerConfig.java | 10 +++++----- .../impl/cache/EntryCacheDisabled.java | 8 ++++---- .../impl/cache/RangeEntryCacheImpl.java | 8 ++++---- .../mledger/impl/cache/ReadEntryUtils.java | 8 ++++++-- .../pulsar/broker/ServiceConfiguration.java | 6 ++++++ .../broker/BookKeeperClientFactoryImpl.java | 1 + .../pulsar/broker/service/BrokerService.java | 2 +- .../BookKeeperClientFactoryImplTest.java | 20 +++++++++++++++++++ .../client/PulsarMockLedgerHandle.java | 3 +-- .../client/PulsarMockReadHandle.java | 20 +++++++++++++++++-- 10 files changed, 66 insertions(+), 20 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 03d059cb597a7..16dee7370e795 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -86,7 +86,7 @@ public class ManagedLedgerConfig { private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; private boolean triggerOffloadOnTopicLoad = false; - private boolean enableBookkeeperBatchRead = false; + private boolean isUseBookkeeperV2WireProtocol = false; @Getter @Setter @@ -770,12 +770,12 @@ public String getShadowSource() { return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY); } - public void setEnableBookkeeperBatchRead(boolean enableBookkeeperBatchRead) { - this.enableBookkeeperBatchRead = enableBookkeeperBatchRead; + public void setUseBookkeeperV2WireProtocol(boolean isUseBookkeeperV2WireProtocol) { + this.isUseBookkeeperV2WireProtocol = isUseBookkeeperV2WireProtocol; } - public boolean isEnableBookkeeperBatchRead() { - return enableBookkeeperBatchRead; + public boolean isUseBookkeeperV2WireProtocol() { + return isUseBookkeeperV2WireProtocol; } public static final String PROPERTY_SOURCE_TOPIC_KEY = "PULSAR.SHADOW_SOURCE"; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java index 3e108ec4fdb14..1c67ac16acb55 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java @@ -39,12 +39,12 @@ public class EntryCacheDisabled implements EntryCache { private final ManagedLedgerImpl ml; private final ManagedLedgerInterceptor interceptor; - private final boolean enableBookkeeperBatchRead; + private final boolean useBookkeeperV2WireProtocol; public EntryCacheDisabled(ManagedLedgerImpl ml) { this.ml = ml; this.interceptor = ml.getManagedLedgerInterceptor(); - this.enableBookkeeperBatchRead = ml.getConfig().isEnableBookkeeperBatchRead(); + this.useBookkeeperV2WireProtocol = ml.getConfig().isUseBookkeeperV2WireProtocol(); } @Override @@ -81,7 +81,7 @@ public void invalidateEntriesBeforeTimestamp(long timestamp) { @Override public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { - ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry, enableBookkeeperBatchRead) + ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry, useBookkeeperV2WireProtocol) .thenAcceptAsync(ledgerEntries -> { List entries = new ArrayList<>(); long totalSize = 0; @@ -110,7 +110,7 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole @Override public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) { - ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(), enableBookkeeperBatchRead) + ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(), useBookkeeperV2WireProtocol) .whenCompleteAsync((ledgerEntries, exception) -> { if (exception != null) { ml.invalidateLedgerHandle(lh); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index 42ea4ea99d5e2..57e59b1d6a1a2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -65,7 +65,7 @@ public class RangeEntryCacheImpl implements EntryCache { private final RangeCache entries; private final boolean copyEntries; private final PendingReadsManager pendingReadsManager; - private final boolean enableBookkeeperBatchRead; + private final boolean useBookkeeperV2WireProtocol; private volatile long estimatedEntrySize = 10 * 1024; @@ -81,7 +81,7 @@ public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl this.readEntryTimeoutMillis = getManagedLedgerConfig().getReadEntryTimeoutSeconds(); this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp); this.copyEntries = copyEntries; - this.enableBookkeeperBatchRead = ml.getConfig().isEnableBookkeeperBatchRead(); + this.useBookkeeperV2WireProtocol = ml.getConfig().isUseBookkeeperV2WireProtocol(); if (log.isDebugEnabled()) { log.debug("[{}] Initialized managed-ledger entry cache", ml.getName()); @@ -251,7 +251,7 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength()); callback.readEntryComplete(cachedEntry, ctx); } else { - ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(), enableBookkeeperBatchRead) + ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(), useBookkeeperV2WireProtocol) .thenAcceptAsync(ledgerEntries -> { try { Iterator iterator = ledgerEntries.iterator(); @@ -433,7 +433,7 @@ CompletableFuture> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry) { final int entriesToRead = (int) (lastEntry - firstEntry) + 1; CompletableFuture> readResult = ReadEntryUtils - .readAsync(ml, lh, firstEntry, lastEntry, enableBookkeeperBatchRead) + .readAsync(ml, lh, firstEntry, lastEntry, useBookkeeperV2WireProtocol) .thenApply( ledgerEntries -> { requireNonNull(ml.getName()); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java index acf10f461e4fd..75ebcbe14f3b6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java @@ -20,6 +20,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -27,13 +28,16 @@ class ReadEntryUtils { static CompletableFuture readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry, - long lastEntry, boolean enableBatchRead) { + long lastEntry, boolean useBookkeeperV2WireProtocol) { if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) { // The read handle comes from another managed ledger, in this case, we can only compare the entry range with // the LAC of that read handle. Specifically, it happens when this method is called by a // ReadOnlyManagedLedgerImpl object. int entriesToRead = (int) (lastEntry - firstEntry + 1); - if (entriesToRead <= 1 || !enableBatchRead) { + // Batch read is not supported for striped ledgers. + LedgerMetadata m = handle.getLedgerMetadata(); + boolean isStriped = m.getEnsembleSize() != m.getWriteQuorumSize(); + if (entriesToRead <= 1 || !useBookkeeperV2WireProtocol || isStriped) { return handle.readAsync(firstEntry, lastEntry); } return handle.batchReadAsync(firstEntry, entriesToRead, 0); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 26b2f99abf545..b9b0edfa937b2 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1931,6 +1931,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private boolean bookkeeperClientSeparatedIoThreadsEnabled = false; + @FieldContext( + category = CATEGORY_STORAGE_BK, + doc = "Enable Bookkeeper client to read entries in batch mode. Default is false." + ) + private boolean bookkeeperEnableBatchRead = false; + /**** --- Managed Ledger. --- ****/ @FieldContext( minValue = 1, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java index 45299d9ed05d5..f4255d1a42d6e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java @@ -137,6 +137,7 @@ ClientConfiguration createBkClientConfiguration(MetadataStoreExtended store, Ser bkConf.setDiskWeightBasedPlacementEnabled(conf.isBookkeeperDiskWeightBasedPlacementEnabled()); bkConf.setMetadataServiceUri(conf.getBookkeeperMetadataStoreUrl()); bkConf.setLimitStatsLogging(conf.isBookkeeperClientLimitStatsLogging()); + bkConf.setBatchReadEnabled(conf.isBookkeeperEnableBatchRead()); if (!conf.isBookkeeperMetadataStoreSeparated()) { // If we're connecting to the same metadata service, with same config, then diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b79874d421236..449486f9c3b81 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1987,7 +1987,7 @@ private CompletableFuture getManagedLedgerConfig(@Nonnull T serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching()); managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching( serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching()); - managedLedgerConfig.setEnableBookkeeperBatchRead(serviceConfig.isBookkeeperUseV2WireProtocol()); + managedLedgerConfig.setUseBookkeeperV2WireProtocol(serviceConfig.isBookkeeperUseV2WireProtocol()); OffloadPoliciesImpl nsLevelOffloadPolicies = (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java index 3c0e4d0c409df..cfc1c5b525e0c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java @@ -340,4 +340,24 @@ public void testBookKeeperLimitStatsLoggingConfiguration() throws Exception { (ClientConfiguration) FieldUtils.readField(builder, "conf", true); assertFalse(clientConfiguration.getLimitStatsLogging()); } + + @Test + public void testBookkeeperBatchReadConfig() throws Exception { + BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl(); + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setBookkeeperEnableBatchRead(true); + EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class); + BookKeeper.Builder builder = factory.getBookKeeperBuilder(conf, eventLoopGroup, mock(StatsLogger.class), + factory.createBkClientConfiguration(mock(MetadataStoreExtended.class), conf)); + ClientConfiguration clientConfiguration = + (ClientConfiguration) FieldUtils.readField(builder, "conf", true); + assertTrue(clientConfiguration.isBatchReadEnabled()); + + conf.setBookkeeperEnableBatchRead(false); + builder = factory.getBookKeeperBuilder(conf, eventLoopGroup, mock(StatsLogger.class), + factory.createBkClientConfiguration(mock(MetadataStoreExtended.class), conf)); + clientConfiguration = + (ClientConfiguration) FieldUtils.readField(builder, "conf", true); + assertFalse(clientConfiguration.isBatchReadEnabled()); + } } diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index 529f8e8a5e92a..8c8af14fc6482 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -242,8 +242,7 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr @Override public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { - long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); - return readAsync(startEntry, lastEntry); + return readHandle.batchReadAsync(startEntry, maxCount, maxSize); } @Override diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java index ee07f818a77da..9f520f16ecc02 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java @@ -30,6 +30,7 @@ import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.commons.lang3.mutable.MutableInt; /** * Mock implementation of ReadHandle. @@ -40,6 +41,7 @@ class PulsarMockReadHandle implements ReadHandle { private final long ledgerId; private final LedgerMetadata metadata; private final List entries; + private final int maxFrameSize = 5 * 1024 * 1024; PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata, List entries) { @@ -65,9 +67,23 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr } @Override - public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long _maxSize) { + long maxSize = _maxSize > 0 ? _maxSize : maxFrameSize; long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); - return readAsync(startEntry, lastEntry); + MutableInt size = new MutableInt(0); + return bk.getProgrammedFailure().thenComposeAsync(res -> { + List seq = new ArrayList<>(); + long entryId = startEntry; + while (entryId <= lastEntry && entryId < entries.size()) { + LedgerEntry entry = entries.get((int) entryId++).duplicate(); + if (size.addAndGet(entry.getLength()) > maxSize) { + break; + } + seq.add(entry); + } + log.debug("Entries batch read: {}", seq); + return FutureUtils.value(LedgerEntriesImpl.create(seq)); + }); } @Override From 6eea5b194f00c120b6d0906c850f7d7859187b5a Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 21 Aug 2024 11:18:21 +0800 Subject: [PATCH 4/8] Add tests --- .../mledger/impl/cache/ReadEntryUtils.java | 13 +++-- .../mledger/impl/ManagedLedgerTest.java | 49 ++++++++++++++----- .../FileSystemManagedLedgerOffloaderTest.java | 17 ++++++- ...reManagedLedgerOffloaderStreamingTest.java | 17 ++++++- .../BlobStoreManagedLedgerOffloaderTest.java | 29 +++++++++++ 5 files changed, 108 insertions(+), 17 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java index 75ebcbe14f3b6..c143881b29c28 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl.cache; +import com.google.common.annotations.VisibleForTesting; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata; @@ -25,9 +26,11 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; -class ReadEntryUtils { +@VisibleForTesting +public class ReadEntryUtils { - static CompletableFuture readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry, + @VisibleForTesting + public static CompletableFuture readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry, long lastEntry, boolean useBookkeeperV2WireProtocol) { if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) { // The read handle comes from another managed ledger, in this case, we can only compare the entry range with @@ -37,7 +40,7 @@ static CompletableFuture readAsync(ManagedLedger ml, ReadHandle h // Batch read is not supported for striped ledgers. LedgerMetadata m = handle.getLedgerMetadata(); boolean isStriped = m.getEnsembleSize() != m.getWriteQuorumSize(); - if (entriesToRead <= 1 || !useBookkeeperV2WireProtocol || isStriped) { + if (!useBatchRead(entriesToRead, useBookkeeperV2WireProtocol, isStriped)) { return handle.readAsync(firstEntry, lastEntry); } return handle.batchReadAsync(firstEntry, entriesToRead, 0); @@ -59,4 +62,8 @@ static CompletableFuture readAsync(ManagedLedger ml, ReadHandle h } return handle.readUnconfirmedAsync(firstEntry, lastEntry); } + + private static boolean useBatchRead(int entriesToRead, boolean useBookkeeperV2WireProtocol, boolean isStriped) { + return entriesToRead > 1 && useBookkeeperV2WireProtocol && !isStriped; + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index e3b272babb7bb..f569b426922f2 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -19,11 +19,7 @@ package org.apache.bookkeeper.mledger.impl; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyMap; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -85,15 +81,9 @@ import lombok.Cleanup; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.*; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; -import org.apache.bookkeeper.client.EnsemblePlacementPolicy; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.client.PulsarMockBookKeeper; -import org.apache.bookkeeper.client.PulsarMockLedgerHandle; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; @@ -128,6 +118,7 @@ import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.cache.EntryCache; import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; +import org.apache.bookkeeper.mledger.impl.cache.ReadEntryUtils; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; @@ -148,7 +139,9 @@ import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.eclipse.jetty.util.BlockingArrayQueue; +import org.mockito.MockedStatic; import org.mockito.Mockito; +import org.mockito.stubbing.OngoingStubbing; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -4372,4 +4365,36 @@ public void testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry) assertEquals(ml.currentLedgerEntries, 0); }); } + + @Test + public void testBatchReadExceedMaxFrameLength() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setUseBookkeeperV2WireProtocol(true); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("testBatchReadExceedMaxFrameLength", config); + ml = Mockito.spy(ml); + Mockito.doReturn(Optional.empty()).when(ml).getOptionalLedgerInfo(Mockito.anyLong()); + ManagedCursor cursor = ml.openCursor("c1"); + // 1 MB per entry. + byte[] body = new byte[1024 * 1024]; + for (int i = 0; i < 20; i++) { + ml.addEntry(body); + } + + @Cleanup + MockedStatic mockStatic = Mockito.mockStatic(ReadEntryUtils.class); + mockStatic.when(() -> + ReadEntryUtils.readAsync(any(), any(), anyLong(), anyLong(), + anyBoolean())) + .thenAnswer(inv -> { + ReadHandle handle = inv.getArgument(1); + int firstEntry = inv.getArgument(2); + int lastEntry = inv.getArgument(3); + int entriesToRead = lastEntry - firstEntry + 1; + return handle.batchReadAsync(firstEntry, entriesToRead, 0); + }); + + List entries = cursor.readEntries(20); + Assert.assertEquals(entries.size(), 20); + } } diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java index 71fe5ec72193a..67d621a4f3b45 100644 --- a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java +++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java @@ -103,37 +103,52 @@ public void tearDown() { } @Test - public void testOffloadAndRead() throws Exception { + public void testOffloadAndRead_BatchRead() throws Exception { LedgerOffloader offloader = fileSystemManagedLedgerOffloader; UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, map).get(); ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, map).get(); assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1); + LedgerEntries batchEntries = toTest.batchRead(0, numberOfEntries, 0); + Iterator batchIter = batchEntries.iterator(); LedgerEntries toWriteEntries = toWrite.read(0,numberOfEntries - 1); Iterator toTestIter = toTestEntries.iterator(); Iterator toWriteIter = toWriteEntries.iterator(); while(toTestIter.hasNext()) { LedgerEntry toWriteEntry = toWriteIter.next(); LedgerEntry toTestEntry = toTestIter.next(); + LedgerEntry batchEntry = batchIter.next(); assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId()); assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId()); assertEquals(toWriteEntry.getLength(), toTestEntry.getLength()); assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer()); + + assertEquals(toWriteEntry.getLedgerId(), batchEntry.getLedgerId()); + assertEquals(toWriteEntry.getEntryId(), batchEntry.getEntryId()); + assertEquals(toWriteEntry.getLength(), batchEntry.getLength()); + assertEquals(toWriteEntry.getEntryBuffer(), batchEntry.getEntryBuffer()); } toTestEntries = toTest.read(1, numberOfEntries - 1); toWriteEntries = toWrite.read(1,numberOfEntries - 1); + batchEntries = toTest.batchRead(1, numberOfEntries - 1, 0); toTestIter = toTestEntries.iterator(); toWriteIter = toWriteEntries.iterator(); + batchIter = batchEntries.iterator(); while(toTestIter.hasNext()) { LedgerEntry toWriteEntry = toWriteIter.next(); LedgerEntry toTestEntry = toTestIter.next(); + LedgerEntry batchEntry = batchIter.next(); assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId()); assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId()); assertEquals(toWriteEntry.getLength(), toTestEntry.getLength()); assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer()); + assertEquals(toWriteEntry.getLedgerId(), batchEntry.getLedgerId()); + assertEquals(toWriteEntry.getEntryId(), batchEntry.getEntryId()); + assertEquals(toWriteEntry.getLength(), batchEntry.getLength()); + assertEquals(toWriteEntry.getEntryBuffer(), batchEntry.getEntryBuffer()); } } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java index e706e4254cb11..64ddae0ae41e8 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java @@ -26,6 +26,7 @@ import static org.testng.Assert.fail; import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Random; @@ -122,7 +123,7 @@ public void testHappyCase() throws Exception { } @Test - public void testReadAndWrite() throws Exception { + public void testRead_BatchRead_AndWrite() throws Exception { LedgerOffloader offloader = getOffloader(new HashMap() {{ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000"); put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880"); @@ -161,6 +162,7 @@ public void testReadAndWrite() throws Exception { final ReadHandle readHandle = offloader.readOffloaded(0, contextBuilder.build(), driverMeta).get(); final LedgerEntries ledgerEntries = readHandle.readAsync(0, 9).get(); + final LedgerEntries batchLedgerEntries = readHandle.batchReadAsync(0, 10, 0).get(); for (LedgerEntry ledgerEntry : ledgerEntries) { final EntryImpl storedEntry = (EntryImpl) entries.get((int) ledgerEntry.getEntryId()); @@ -168,6 +170,19 @@ public void testReadAndWrite() throws Exception { final byte[] entryBytes = ledgerEntry.getEntryBytes(); assertEquals(storedData, entryBytes); } + + Iterator ledgerEntriesIterator = batchLedgerEntries.iterator(); + Iterator batchLedgerEntriesIterator = batchLedgerEntries.iterator(); + while (ledgerEntriesIterator.hasNext() && batchLedgerEntriesIterator.hasNext()) { + LedgerEntry ledgerEntry = ledgerEntriesIterator.next(); + LedgerEntry batchLedgerEntry = batchLedgerEntriesIterator.next(); + assertEquals(ledgerEntry.getLedgerId(), batchLedgerEntry.getLedgerId()); + assertEquals(ledgerEntry.getEntryId(), batchLedgerEntry.getEntryId()); + assertEquals(ledgerEntry.getLength(), batchLedgerEntry.getLength()); + assertEquals(ledgerEntry.getEntryBuffer(), batchLedgerEntry.getEntryBuffer()); + } + Assert.assertFalse(ledgerEntriesIterator.hasNext()); + Assert.assertFalse(batchLedgerEntriesIterator.hasNext()); } @Test diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index bf6ede896ab28..c1d0f1341d82b 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -165,6 +165,35 @@ public void testOffloadAndRead() throws Exception { } } + @Test + public void testBatchRead() throws Exception { + ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3); + LedgerOffloader offloader = getOffloader(); + + UUID uuid = UUID.randomUUID(); + offloader.offload(toWrite, uuid, new HashMap<>()).get(); + + ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get(); + assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); + int entryCount = (int) (toTest.getLastAddConfirmed() + 1); + + try (LedgerEntries entries = toTest.read(0, toWrite.getLastAddConfirmed()); + LedgerEntries batch = toTest.batchRead(0, entryCount, 0)) { + Iterator entryIterator = entries.iterator(); + Iterator batchIterator = batch.iterator(); + while (entryIterator.hasNext() && batchIterator.hasNext()) { + LedgerEntry toWriteEntry = entryIterator.next(); + LedgerEntry toTestEntry = batchIterator.next(); + assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId()); + assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId()); + assertEquals(toWriteEntry.getLength(), toTestEntry.getLength()); + assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer()); + } + Assert.assertFalse(entryIterator.hasNext()); + Assert.assertFalse(batchIterator.hasNext()); + } + } + @Test(timeOut = 60000) public void testReadHandlerState() throws Exception { ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3); From 8c4eebfc120624c3285944a2ac1b2e190702d9bc Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 22 Aug 2024 03:26:52 +0800 Subject: [PATCH 5/8] Improve code --- .../mledger/impl/cache/ReadEntryUtils.java | 48 +++++++++++++++++-- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java index c143881b29c28..2c7dd9e3d5386 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java @@ -19,28 +19,41 @@ package org.apache.bookkeeper.mledger.impl.cache; import com.google.common.annotations.VisibleForTesting; + +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @VisibleForTesting public class ReadEntryUtils { + private static final Logger log = LoggerFactory.getLogger(ReadEntryUtils.class); + @VisibleForTesting public static CompletableFuture readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry, long lastEntry, boolean useBookkeeperV2WireProtocol) { + int entriesToRead = (int) (lastEntry - firstEntry + 1); + // Batch read is not supported for striped ledgers. + LedgerMetadata m = handle.getLedgerMetadata(); + boolean isStriped = m.getEnsembleSize() != m.getWriteQuorumSize(); + boolean useBatchRead = useBatchRead(entriesToRead, useBookkeeperV2WireProtocol, isStriped); if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) { // The read handle comes from another managed ledger, in this case, we can only compare the entry range with // the LAC of that read handle. Specifically, it happens when this method is called by a // ReadOnlyManagedLedgerImpl object. - int entriesToRead = (int) (lastEntry - firstEntry + 1); - // Batch read is not supported for striped ledgers. - LedgerMetadata m = handle.getLedgerMetadata(); - boolean isStriped = m.getEnsembleSize() != m.getWriteQuorumSize(); - if (!useBatchRead(entriesToRead, useBookkeeperV2WireProtocol, isStriped)) { + if (!useBatchRead) { return handle.readAsync(firstEntry, lastEntry); } return handle.batchReadAsync(firstEntry, entriesToRead, 0); @@ -60,10 +73,35 @@ public static CompletableFuture readAsync(ManagedLedger ml, ReadH return CompletableFuture.failedFuture(new ManagedLedgerException("LastConfirmedEntry is " + lastConfirmedEntry + " when reading entry " + lastEntry)); } + + if (useBatchRead && handle instanceof LedgerHandle lh) { + return asyncBatchReadUnconfirmedEntries(lh, firstEntry, entriesToRead); + } return handle.readUnconfirmedAsync(firstEntry, lastEntry); } private static boolean useBatchRead(int entriesToRead, boolean useBookkeeperV2WireProtocol, boolean isStriped) { return entriesToRead > 1 && useBookkeeperV2WireProtocol && !isStriped; } + + private static CompletableFuture asyncBatchReadUnconfirmedEntries(LedgerHandle lh, long firstEntry, + int numEntries) { + CompletableFuture f = new CompletableFuture<>(); + lh.asyncBatchReadUnconfirmedEntries(firstEntry, numEntries, 0, (rc, lh1, seq, ctx) -> { + if (rc != BKException.Code.OK) { + log.error("Failed to batch read entries from ledger {} : {}", lh1.getId(), BKException.getMessage(rc)); + f.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc)); + return; + } + List entries = new ArrayList<>(numEntries); + while (seq.hasMoreElements()) { + LedgerEntry entry = seq.nextElement(); + LedgerEntryImpl entryImpl = LedgerEntryImpl.create(entry.getLedgerId(), entry.getEntryId(), + entry.getLength(), entry.getEntryBuffer()); + entries.add(entryImpl); + } + f.complete(LedgerEntriesImpl.create(entries)); + }, null); + return f; + } } From eaf48a339483a44534f9ebdf2a6a2525f2c496fd Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 22 Aug 2024 03:30:20 +0800 Subject: [PATCH 6/8] Improve code --- .../bookkeeper/mledger/impl/cache/ReadEntryUtils.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java index 2c7dd9e3d5386..610468118e2b7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java @@ -19,7 +19,6 @@ package org.apache.bookkeeper.mledger.impl.cache; import com.google.common.annotations.VisibleForTesting; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -45,10 +44,7 @@ public class ReadEntryUtils { public static CompletableFuture readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry, long lastEntry, boolean useBookkeeperV2WireProtocol) { int entriesToRead = (int) (lastEntry - firstEntry + 1); - // Batch read is not supported for striped ledgers. - LedgerMetadata m = handle.getLedgerMetadata(); - boolean isStriped = m.getEnsembleSize() != m.getWriteQuorumSize(); - boolean useBatchRead = useBatchRead(entriesToRead, useBookkeeperV2WireProtocol, isStriped); + boolean useBatchRead = useBatchRead(entriesToRead, handle, useBookkeeperV2WireProtocol); if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) { // The read handle comes from another managed ledger, in this case, we can only compare the entry range with // the LAC of that read handle. Specifically, it happens when this method is called by a @@ -80,7 +76,10 @@ public static CompletableFuture readAsync(ManagedLedger ml, ReadH return handle.readUnconfirmedAsync(firstEntry, lastEntry); } - private static boolean useBatchRead(int entriesToRead, boolean useBookkeeperV2WireProtocol, boolean isStriped) { + private static boolean useBatchRead(int entriesToRead, ReadHandle handle, boolean useBookkeeperV2WireProtocol) { + // Batch read is not supported for striped ledgers. + LedgerMetadata m = handle.getLedgerMetadata(); + boolean isStriped = m.getEnsembleSize() != m.getWriteQuorumSize(); return entriesToRead > 1 && useBookkeeperV2WireProtocol && !isStriped; } From 0c5d78551db9f9659ee3000377bce8d9178021fe Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 22 Aug 2024 03:45:45 +0800 Subject: [PATCH 7/8] Improve code --- .../pulsar/broker/ServiceConfiguration.java | 4 +++- .../client/PulsarMockLedgerHandle.java | 19 +++++++++++++++++++ .../client/PulsarMockReadHandle.java | 5 +++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e910f7a8bb312..9a7fd6963d46b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1943,7 +1943,9 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( category = CATEGORY_STORAGE_BK, - doc = "Enable Bookkeeper client to read entries in batch mode. Default is false." + doc = "Enable Bookkeeper client to read entries in batch mode. Default is false. Note: " + + "this feature only works when bookkeeperUseV2WireProtocol is enabled and ensemble " + + "size equals to write quorum size." ) private boolean bookkeeperEnableBatchRead = false; diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index 8c8af14fc6482..d52825802752f 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -45,6 +45,7 @@ import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.versioning.LongVersion; import org.apache.bookkeeper.versioning.Versioned; +import org.apache.commons.collections4.IteratorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -250,6 +251,24 @@ public CompletableFuture readUnconfirmedAsync(long firstEntry, lo return readHandle.readUnconfirmedAsync(firstEntry, lastEntry); } + @Override + public void asyncBatchReadUnconfirmedEntries(long startEntry, int maxCount, long maxSize, ReadCallback cb, + Object ctx) { + PulsarMockReadHandle readHandle = (PulsarMockReadHandle) this.readHandle; + CompletableFuture f = readHandle.batchReadUnconfirmedAsync(startEntry, maxCount, (int) maxSize); + f.whenComplete((entries, exception) -> { + if (exception != null) { + cb.readComplete(PulsarMockBookKeeper.getExceptionCode(exception), this, null, ctx); + } else { + List entries0 = new ArrayList<>(maxCount); + for (org.apache.bookkeeper.client.api.LedgerEntry entry : entries) { + entries0.add(new org.apache.bookkeeper.client.LedgerEntry((LedgerEntryImpl) entry)); + } + cb.readComplete(BKException.Code.OK, this, IteratorUtils.asEnumeration(entries0.iterator()), ctx); + } + }); + } + @Override public CompletableFuture readLastAddConfirmedAsync() { return readHandle.readLastAddConfirmedAsync(); diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java index 9f520f16ecc02..7f68bbf8d9be4 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java @@ -91,6 +91,11 @@ public CompletableFuture readUnconfirmedAsync(long firstEntry, lo return readAsync(firstEntry, lastEntry); } + public CompletableFuture batchReadUnconfirmedAsync(long firstEntry, int maxCount, int maxSize) { + int lastEntry = (int) Math.min(firstEntry + maxCount - 1, getLastAddConfirmed()); + return batchReadAsync(firstEntry, lastEntry, maxSize); + } + @Override public CompletableFuture readLastAddConfirmedAsync() { return CompletableFuture.completedFuture(getLastAddConfirmed()); From b0a47f35f78a40d8dbb40de1d027a7c842b8e8fb Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 22 Aug 2024 04:03:08 +0800 Subject: [PATCH 8/8] Improve code --- .../org/apache/bookkeeper/client/PulsarMockReadHandle.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java index 7f68bbf8d9be4..a430493345726 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java @@ -67,8 +67,8 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr } @Override - public CompletableFuture batchReadAsync(long startEntry, int maxCount, long _maxSize) { - long maxSize = _maxSize > 0 ? _maxSize : maxFrameSize; + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize0) { + long maxSize = maxSize0 > 0 ? maxSize0 : maxFrameSize; long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); MutableInt size = new MutableInt(0); return bk.getProgrammedFailure().thenComposeAsync(res -> {