diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 03439f93ccad8..16dee7370e795 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -86,6 +86,7 @@ public class ManagedLedgerConfig { private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; private boolean triggerOffloadOnTopicLoad = false; + private boolean isUseBookkeeperV2WireProtocol = false; @Getter @Setter @@ -769,5 +770,13 @@ public String getShadowSource() { return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY); } + public void setUseBookkeeperV2WireProtocol(boolean isUseBookkeeperV2WireProtocol) { + this.isUseBookkeeperV2WireProtocol = isUseBookkeeperV2WireProtocol; + } + + public boolean isUseBookkeeperV2WireProtocol() { + return isUseBookkeeperV2WireProtocol; + } + public static final String PROPERTY_SOURCE_TOPIC_KEY = "PULSAR.SHADOW_SOURCE"; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java index 92541a7a72578..1c67ac16acb55 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java @@ -39,10 +39,12 @@ public class EntryCacheDisabled implements EntryCache { private final ManagedLedgerImpl ml; private final ManagedLedgerInterceptor interceptor; + private final boolean useBookkeeperV2WireProtocol; public EntryCacheDisabled(ManagedLedgerImpl ml) { this.ml = ml; this.interceptor = ml.getManagedLedgerInterceptor(); + this.useBookkeeperV2WireProtocol = ml.getConfig().isUseBookkeeperV2WireProtocol(); } @Override @@ -79,8 +81,8 @@ public void invalidateEntriesBeforeTimestamp(long timestamp) { @Override public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { - ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry).thenAcceptAsync( - ledgerEntries -> { + ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry, useBookkeeperV2WireProtocol) + .thenAcceptAsync(ledgerEntries -> { List entries = new ArrayList<>(); long totalSize = 0; try { @@ -98,17 +100,18 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole ml.getMbean().addReadEntriesSample(entries.size(), totalSize); callback.readEntriesComplete(entries, ctx); - }, ml.getExecutor()).exceptionally(exception -> { - callback.readEntriesFailed(createManagedLedgerException(exception), ctx); - return null; - }); + }, ml.getExecutor()) + .exceptionally(exception -> { + callback.readEntriesFailed(createManagedLedgerException(exception), ctx); + return null; + }); } @Override public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) { - ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).whenCompleteAsync( - (ledgerEntries, exception) -> { + ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(), useBookkeeperV2WireProtocol) + .whenCompleteAsync((ledgerEntries, exception) -> { if (exception != null) { ml.invalidateLedgerHandle(lh); callback.readEntryFailed(createManagedLedgerException(exception), ctx); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index cb006a5f0cea9..57e59b1d6a1a2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -65,6 +65,7 @@ public class RangeEntryCacheImpl implements EntryCache { private final RangeCache entries; private final boolean copyEntries; private final PendingReadsManager pendingReadsManager; + private final boolean useBookkeeperV2WireProtocol; private volatile long estimatedEntrySize = 10 * 1024; @@ -80,6 +81,7 @@ public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl this.readEntryTimeoutMillis = getManagedLedgerConfig().getReadEntryTimeoutSeconds(); this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp); this.copyEntries = copyEntries; + this.useBookkeeperV2WireProtocol = ml.getConfig().isUseBookkeeperV2WireProtocol(); if (log.isDebugEnabled()) { log.debug("[{}] Initialized managed-ledger entry cache", ml.getName()); @@ -249,8 +251,8 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength()); callback.readEntryComplete(cachedEntry, ctx); } else { - ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).thenAcceptAsync( - ledgerEntries -> { + ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId(), useBookkeeperV2WireProtocol) + .thenAcceptAsync(ledgerEntries -> { try { Iterator iterator = ledgerEntries.iterator(); if (iterator.hasNext()) { @@ -264,17 +266,18 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa } else { // got an empty sequence callback.readEntryFailed(new ManagedLedgerException("Could not read given position"), - ctx); + ctx); } } finally { ledgerEntries.close(); } - }, ml.getExecutor()).exceptionally(exception -> { + }, ml.getExecutor()) + .exceptionally(exception -> { ml.invalidateLedgerHandle(lh); pendingReadsManager.invalidateLedger(lh.getId()); callback.readEntryFailed(createManagedLedgerException(exception), ctx); return null; - }); + }); } } @@ -429,7 +432,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { CompletableFuture> readFromStorage(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry) { final int entriesToRead = (int) (lastEntry - firstEntry) + 1; - CompletableFuture> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry) + CompletableFuture> readResult = ReadEntryUtils + .readAsync(ml, lh, firstEntry, lastEntry, useBookkeeperV2WireProtocol) .thenApply( ledgerEntries -> { requireNonNull(ml.getName()); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java index 5cf5f053f0ce7..610468118e2b7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/ReadEntryUtils.java @@ -18,21 +18,41 @@ */ package org.apache.bookkeeper.mledger.impl.cache; +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -class ReadEntryUtils { +@VisibleForTesting +public class ReadEntryUtils { - static CompletableFuture readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry, - long lastEntry) { + private static final Logger log = LoggerFactory.getLogger(ReadEntryUtils.class); + + @VisibleForTesting + public static CompletableFuture readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry, + long lastEntry, boolean useBookkeeperV2WireProtocol) { + int entriesToRead = (int) (lastEntry - firstEntry + 1); + boolean useBatchRead = useBatchRead(entriesToRead, handle, useBookkeeperV2WireProtocol); if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) { // The read handle comes from another managed ledger, in this case, we can only compare the entry range with // the LAC of that read handle. Specifically, it happens when this method is called by a // ReadOnlyManagedLedgerImpl object. - return handle.readAsync(firstEntry, lastEntry); + if (!useBatchRead) { + return handle.readAsync(firstEntry, lastEntry); + } + return handle.batchReadAsync(firstEntry, entriesToRead, 0); } // Compare the entry range with the lastConfirmedEntry maintained by the managed ledger because the entry cache // of `ShadowManagedLedgerImpl` reads entries via `ReadOnlyLedgerHandle`, which never updates `lastAddConfirmed` @@ -49,6 +69,38 @@ static CompletableFuture readAsync(ManagedLedger ml, ReadHandle h return CompletableFuture.failedFuture(new ManagedLedgerException("LastConfirmedEntry is " + lastConfirmedEntry + " when reading entry " + lastEntry)); } + + if (useBatchRead && handle instanceof LedgerHandle lh) { + return asyncBatchReadUnconfirmedEntries(lh, firstEntry, entriesToRead); + } return handle.readUnconfirmedAsync(firstEntry, lastEntry); } + + private static boolean useBatchRead(int entriesToRead, ReadHandle handle, boolean useBookkeeperV2WireProtocol) { + // Batch read is not supported for striped ledgers. + LedgerMetadata m = handle.getLedgerMetadata(); + boolean isStriped = m.getEnsembleSize() != m.getWriteQuorumSize(); + return entriesToRead > 1 && useBookkeeperV2WireProtocol && !isStriped; + } + + private static CompletableFuture asyncBatchReadUnconfirmedEntries(LedgerHandle lh, long firstEntry, + int numEntries) { + CompletableFuture f = new CompletableFuture<>(); + lh.asyncBatchReadUnconfirmedEntries(firstEntry, numEntries, 0, (rc, lh1, seq, ctx) -> { + if (rc != BKException.Code.OK) { + log.error("Failed to batch read entries from ledger {} : {}", lh1.getId(), BKException.getMessage(rc)); + f.completeExceptionally(org.apache.bookkeeper.client.BKException.create(rc)); + return; + } + List entries = new ArrayList<>(numEntries); + while (seq.hasMoreElements()) { + LedgerEntry entry = seq.nextElement(); + LedgerEntryImpl entryImpl = LedgerEntryImpl.create(entry.getLedgerId(), entry.getEntryId(), + entry.getLength(), entry.getEntryBuffer()); + entries.add(entryImpl); + } + f.complete(LedgerEntriesImpl.create(entries)); + }, null); + return f; + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index e3b272babb7bb..f569b426922f2 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -19,11 +19,7 @@ package org.apache.bookkeeper.mledger.impl; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyMap; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -85,15 +81,9 @@ import lombok.Cleanup; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.*; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; -import org.apache.bookkeeper.client.EnsemblePlacementPolicy; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.client.PulsarMockBookKeeper; -import org.apache.bookkeeper.client.PulsarMockLedgerHandle; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.api.ReadHandle; @@ -128,6 +118,7 @@ import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.cache.EntryCache; import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; +import org.apache.bookkeeper.mledger.impl.cache.ReadEntryUtils; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; @@ -148,7 +139,9 @@ import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.eclipse.jetty.util.BlockingArrayQueue; +import org.mockito.MockedStatic; import org.mockito.Mockito; +import org.mockito.stubbing.OngoingStubbing; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -4372,4 +4365,36 @@ public void testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry) assertEquals(ml.currentLedgerEntries, 0); }); } + + @Test + public void testBatchReadExceedMaxFrameLength() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setUseBookkeeperV2WireProtocol(true); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("testBatchReadExceedMaxFrameLength", config); + ml = Mockito.spy(ml); + Mockito.doReturn(Optional.empty()).when(ml).getOptionalLedgerInfo(Mockito.anyLong()); + ManagedCursor cursor = ml.openCursor("c1"); + // 1 MB per entry. + byte[] body = new byte[1024 * 1024]; + for (int i = 0; i < 20; i++) { + ml.addEntry(body); + } + + @Cleanup + MockedStatic mockStatic = Mockito.mockStatic(ReadEntryUtils.class); + mockStatic.when(() -> + ReadEntryUtils.readAsync(any(), any(), anyLong(), anyLong(), + anyBoolean())) + .thenAnswer(inv -> { + ReadHandle handle = inv.getArgument(1); + int firstEntry = inv.getArgument(2); + int lastEntry = inv.getArgument(3); + int entriesToRead = lastEntry - firstEntry + 1; + return handle.batchReadAsync(firstEntry, entriesToRead, 0); + }); + + List entries = cursor.readEntries(20); + Assert.assertEquals(entries.size(), 20); + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java index 6d8ecba868847..d3a927da3a71b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java @@ -312,6 +312,12 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return CompletableFuture.completedFuture(LedgerEntriesImpl.create(readEntries)); } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); + return readAsync(startEntry, lastEntry); + } + @Override public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { return readAsync(firstEntry, lastEntry); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 20addc3924bf3..9a7fd6963d46b 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1941,6 +1941,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private boolean bookkeeperClientSeparatedIoThreadsEnabled = false; + @FieldContext( + category = CATEGORY_STORAGE_BK, + doc = "Enable Bookkeeper client to read entries in batch mode. Default is false. Note: " + + "this feature only works when bookkeeperUseV2WireProtocol is enabled and ensemble " + + "size equals to write quorum size." + ) + private boolean bookkeeperEnableBatchRead = false; + /**** --- Managed Ledger. --- ****/ @FieldContext( minValue = 1, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java index 45299d9ed05d5..f4255d1a42d6e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java @@ -137,6 +137,7 @@ ClientConfiguration createBkClientConfiguration(MetadataStoreExtended store, Ser bkConf.setDiskWeightBasedPlacementEnabled(conf.isBookkeeperDiskWeightBasedPlacementEnabled()); bkConf.setMetadataServiceUri(conf.getBookkeeperMetadataStoreUrl()); bkConf.setLimitStatsLogging(conf.isBookkeeperClientLimitStatsLogging()); + bkConf.setBatchReadEnabled(conf.isBookkeeperEnableBatchRead()); if (!conf.isBookkeeperMetadataStoreSeparated()) { // If we're connecting to the same metadata service, with same config, then diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 066bfc98cc0cf..c47cf3dd5447b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2010,6 +2010,7 @@ private CompletableFuture getManagedLedgerConfig(@Nonnull T serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching()); managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching( serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching()); + managedLedgerConfig.setUseBookkeeperV2WireProtocol(serviceConfig.isBookkeeperUseV2WireProtocol()); OffloadPoliciesImpl nsLevelOffloadPolicies = (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java index 3c0e4d0c409df..cfc1c5b525e0c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BookKeeperClientFactoryImplTest.java @@ -340,4 +340,24 @@ public void testBookKeeperLimitStatsLoggingConfiguration() throws Exception { (ClientConfiguration) FieldUtils.readField(builder, "conf", true); assertFalse(clientConfiguration.getLimitStatsLogging()); } + + @Test + public void testBookkeeperBatchReadConfig() throws Exception { + BookKeeperClientFactoryImpl factory = new BookKeeperClientFactoryImpl(); + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setBookkeeperEnableBatchRead(true); + EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class); + BookKeeper.Builder builder = factory.getBookKeeperBuilder(conf, eventLoopGroup, mock(StatsLogger.class), + factory.createBkClientConfiguration(mock(MetadataStoreExtended.class), conf)); + ClientConfiguration clientConfiguration = + (ClientConfiguration) FieldUtils.readField(builder, "conf", true); + assertTrue(clientConfiguration.isBatchReadEnabled()); + + conf.setBookkeeperEnableBatchRead(false); + builder = factory.getBookKeeperBuilder(conf, eventLoopGroup, mock(StatsLogger.class), + factory.createBkClientConfiguration(mock(MetadataStoreExtended.class), conf)); + clientConfiguration = + (ClientConfiguration) FieldUtils.readField(builder, "conf", true); + assertFalse(clientConfiguration.isBatchReadEnabled()); + } } diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java index aa61e541d0d6b..d52825802752f 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java @@ -45,6 +45,7 @@ import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.versioning.LongVersion; import org.apache.bookkeeper.versioning.Versioned; +import org.apache.commons.collections4.IteratorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -240,11 +241,34 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return readHandle.readAsync(firstEntry, lastEntry); } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + return readHandle.batchReadAsync(startEntry, maxCount, maxSize); + } + @Override public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { return readHandle.readUnconfirmedAsync(firstEntry, lastEntry); } + @Override + public void asyncBatchReadUnconfirmedEntries(long startEntry, int maxCount, long maxSize, ReadCallback cb, + Object ctx) { + PulsarMockReadHandle readHandle = (PulsarMockReadHandle) this.readHandle; + CompletableFuture f = readHandle.batchReadUnconfirmedAsync(startEntry, maxCount, (int) maxSize); + f.whenComplete((entries, exception) -> { + if (exception != null) { + cb.readComplete(PulsarMockBookKeeper.getExceptionCode(exception), this, null, ctx); + } else { + List entries0 = new ArrayList<>(maxCount); + for (org.apache.bookkeeper.client.api.LedgerEntry entry : entries) { + entries0.add(new org.apache.bookkeeper.client.LedgerEntry((LedgerEntryImpl) entry)); + } + cb.readComplete(BKException.Code.OK, this, IteratorUtils.asEnumeration(entries0.iterator()), ctx); + } + }); + } + @Override public CompletableFuture readLastAddConfirmedAsync() { return readHandle.readLastAddConfirmedAsync(); diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java index a4361f62254e4..a430493345726 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockReadHandle.java @@ -30,6 +30,7 @@ import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.commons.lang3.mutable.MutableInt; /** * Mock implementation of ReadHandle. @@ -40,6 +41,7 @@ class PulsarMockReadHandle implements ReadHandle { private final long ledgerId; private final LedgerMetadata metadata; private final List entries; + private final int maxFrameSize = 5 * 1024 * 1024; PulsarMockReadHandle(PulsarMockBookKeeper bk, long ledgerId, LedgerMetadata metadata, List entries) { @@ -64,11 +66,36 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr }); } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize0) { + long maxSize = maxSize0 > 0 ? maxSize0 : maxFrameSize; + long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); + MutableInt size = new MutableInt(0); + return bk.getProgrammedFailure().thenComposeAsync(res -> { + List seq = new ArrayList<>(); + long entryId = startEntry; + while (entryId <= lastEntry && entryId < entries.size()) { + LedgerEntry entry = entries.get((int) entryId++).duplicate(); + if (size.addAndGet(entry.getLength()) > maxSize) { + break; + } + seq.add(entry); + } + log.debug("Entries batch read: {}", seq); + return FutureUtils.value(LedgerEntriesImpl.create(seq)); + }); + } + @Override public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { return readAsync(firstEntry, lastEntry); } + public CompletableFuture batchReadUnconfirmedAsync(long firstEntry, int maxCount, int maxSize) { + int lastEntry = (int) Math.min(firstEntry + maxCount - 1, getLastAddConfirmed()); + return batchReadAsync(firstEntry, lastEntry, maxSize); + } + @Override public CompletableFuture readLastAddConfirmedAsync() { return CompletableFuture.completedFuture(getLastAddConfirmed()); diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java index 91e7e902eab8a..33a6554426656 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java @@ -175,6 +175,12 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return promise; } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); + return readAsync(startEntry, lastEntry); + } + @Override public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { return readAsync(firstEntry, lastEntry); diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java index 71fe5ec72193a..67d621a4f3b45 100644 --- a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java +++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java @@ -103,37 +103,52 @@ public void tearDown() { } @Test - public void testOffloadAndRead() throws Exception { + public void testOffloadAndRead_BatchRead() throws Exception { LedgerOffloader offloader = fileSystemManagedLedgerOffloader; UUID uuid = UUID.randomUUID(); offloader.offload(toWrite, uuid, map).get(); ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, map).get(); assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1); + LedgerEntries batchEntries = toTest.batchRead(0, numberOfEntries, 0); + Iterator batchIter = batchEntries.iterator(); LedgerEntries toWriteEntries = toWrite.read(0,numberOfEntries - 1); Iterator toTestIter = toTestEntries.iterator(); Iterator toWriteIter = toWriteEntries.iterator(); while(toTestIter.hasNext()) { LedgerEntry toWriteEntry = toWriteIter.next(); LedgerEntry toTestEntry = toTestIter.next(); + LedgerEntry batchEntry = batchIter.next(); assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId()); assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId()); assertEquals(toWriteEntry.getLength(), toTestEntry.getLength()); assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer()); + + assertEquals(toWriteEntry.getLedgerId(), batchEntry.getLedgerId()); + assertEquals(toWriteEntry.getEntryId(), batchEntry.getEntryId()); + assertEquals(toWriteEntry.getLength(), batchEntry.getLength()); + assertEquals(toWriteEntry.getEntryBuffer(), batchEntry.getEntryBuffer()); } toTestEntries = toTest.read(1, numberOfEntries - 1); toWriteEntries = toWrite.read(1,numberOfEntries - 1); + batchEntries = toTest.batchRead(1, numberOfEntries - 1, 0); toTestIter = toTestEntries.iterator(); toWriteIter = toWriteEntries.iterator(); + batchIter = batchEntries.iterator(); while(toTestIter.hasNext()) { LedgerEntry toWriteEntry = toWriteIter.next(); LedgerEntry toTestEntry = toTestIter.next(); + LedgerEntry batchEntry = batchIter.next(); assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId()); assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId()); assertEquals(toWriteEntry.getLength(), toTestEntry.getLength()); assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer()); + assertEquals(toWriteEntry.getLedgerId(), batchEntry.getLedgerId()); + assertEquals(toWriteEntry.getEntryId(), batchEntry.getEntryId()); + assertEquals(toWriteEntry.getLength(), batchEntry.getLength()); + assertEquals(toWriteEntry.getEntryBuffer(), batchEntry.getEntryBuffer()); } } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index e050d74a332bc..1a38c8b245d3d 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -207,6 +207,12 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return promise; } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); + return readAsync(startEntry, lastEntry); + } + private void seekToEntry(long nextExpectedId) throws IOException { Long knownOffset = entryOffsetsCache.getIfPresent(ledgerId, nextExpectedId); if (knownOffset != null) { diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java index 502f475174cee..707254b5b0dc4 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java @@ -239,6 +239,12 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return promise; } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); + return readAsync(startEntry, lastEntry); + } + private List getGroupedReader(long firstEntry, long lastEntry) throws Exception { List groupedReaders = new LinkedList<>(); for (int i = indices.size() - 1; i >= 0 && firstEntry <= lastEntry; i--) { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java index e706e4254cb11..64ddae0ae41e8 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java @@ -26,6 +26,7 @@ import static org.testng.Assert.fail; import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.Map; import java.util.Random; @@ -122,7 +123,7 @@ public void testHappyCase() throws Exception { } @Test - public void testReadAndWrite() throws Exception { + public void testRead_BatchRead_AndWrite() throws Exception { LedgerOffloader offloader = getOffloader(new HashMap() {{ put(TieredStorageConfiguration.MAX_OFFLOAD_SEGMENT_SIZE_IN_BYTES, "1000"); put(config.getKeys(TieredStorageConfiguration.METADATA_FIELD_MAX_BLOCK_SIZE).get(0), "5242880"); @@ -161,6 +162,7 @@ public void testReadAndWrite() throws Exception { final ReadHandle readHandle = offloader.readOffloaded(0, contextBuilder.build(), driverMeta).get(); final LedgerEntries ledgerEntries = readHandle.readAsync(0, 9).get(); + final LedgerEntries batchLedgerEntries = readHandle.batchReadAsync(0, 10, 0).get(); for (LedgerEntry ledgerEntry : ledgerEntries) { final EntryImpl storedEntry = (EntryImpl) entries.get((int) ledgerEntry.getEntryId()); @@ -168,6 +170,19 @@ public void testReadAndWrite() throws Exception { final byte[] entryBytes = ledgerEntry.getEntryBytes(); assertEquals(storedData, entryBytes); } + + Iterator ledgerEntriesIterator = batchLedgerEntries.iterator(); + Iterator batchLedgerEntriesIterator = batchLedgerEntries.iterator(); + while (ledgerEntriesIterator.hasNext() && batchLedgerEntriesIterator.hasNext()) { + LedgerEntry ledgerEntry = ledgerEntriesIterator.next(); + LedgerEntry batchLedgerEntry = batchLedgerEntriesIterator.next(); + assertEquals(ledgerEntry.getLedgerId(), batchLedgerEntry.getLedgerId()); + assertEquals(ledgerEntry.getEntryId(), batchLedgerEntry.getEntryId()); + assertEquals(ledgerEntry.getLength(), batchLedgerEntry.getLength()); + assertEquals(ledgerEntry.getEntryBuffer(), batchLedgerEntry.getEntryBuffer()); + } + Assert.assertFalse(ledgerEntriesIterator.hasNext()); + Assert.assertFalse(batchLedgerEntriesIterator.hasNext()); } @Test diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index bf6ede896ab28..c1d0f1341d82b 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -165,6 +165,35 @@ public void testOffloadAndRead() throws Exception { } } + @Test + public void testBatchRead() throws Exception { + ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3); + LedgerOffloader offloader = getOffloader(); + + UUID uuid = UUID.randomUUID(); + offloader.offload(toWrite, uuid, new HashMap<>()).get(); + + ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get(); + assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed()); + int entryCount = (int) (toTest.getLastAddConfirmed() + 1); + + try (LedgerEntries entries = toTest.read(0, toWrite.getLastAddConfirmed()); + LedgerEntries batch = toTest.batchRead(0, entryCount, 0)) { + Iterator entryIterator = entries.iterator(); + Iterator batchIterator = batch.iterator(); + while (entryIterator.hasNext() && batchIterator.hasNext()) { + LedgerEntry toWriteEntry = entryIterator.next(); + LedgerEntry toTestEntry = batchIterator.next(); + assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId()); + assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId()); + assertEquals(toWriteEntry.getLength(), toTestEntry.getLength()); + assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer()); + } + Assert.assertFalse(entryIterator.hasNext()); + Assert.assertFalse(batchIterator.hasNext()); + } + } + @Test(timeOut = 60000) public void testReadHandlerState() throws Exception { ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3); diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java index 5ca4d6da20bee..4592ebb52b6f5 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamTest.java @@ -158,6 +158,12 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return future; } + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + long lastEntry = Math.min(startEntry + maxCount - 1, getLastAddConfirmed()); + return readAsync(startEntry, lastEntry); + } + @Override public CompletableFuture readUnconfirmedAsync(long firstEntry, long lastEntry) { return readAsync(firstEntry, lastEntry);