From 6feae82652d26374667cd64982b7a7af5c9610b5 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 9 Jul 2025 09:54:26 +0800 Subject: [PATCH 1/4] 1 --- .../mledger/impl/AckSetPositionImpl.java | 11 + .../bookkeeper/mledger/impl/AckSetState.java | 12 + .../mledger/impl/AckSetStateUtil.java | 30 +++ .../mledger/impl/ManagedCursorImpl.java | 37 ++- .../service/AbstractBaseDispatcher.java | 2 +- .../pulsar/broker/service/Consumer.java | 120 +++------ .../pulsar/broker/service/PendingAcksMap.java | 8 +- .../pulsar/broker/service/Subscription.java | 9 +- .../NonPersistentSubscription.java | 3 +- ...PersistentDispatcherMultipleConsumers.java | 8 +- .../persistent/PersistentSubscription.java | 159 +++++++++++- .../PulsarCompactorSubscription.java | 4 +- .../ReplicatedSubscriptionsController.java | 4 +- .../pendingack/impl/PendingAckHandleImpl.java | 6 +- ...ckAndDisableBrokerBatchAckClassicTest.java | 51 ++++ ...nsumerAckAndDisableBrokerBatchAckTest.java | 50 ++++ .../service/ConsumerAckClassicTest.java | 52 ++++ .../broker/service/ConsumerAckTest.java | 234 ++++++++++++++++++ .../service/MessageCumulativeAckTest.java | 8 +- .../broker/service/PersistentTopicTest.java | 2 +- .../service/TransactionMarkerDeleteTest.java | 2 +- .../PersistentSubscriptionTest.java | 4 +- .../PendingAckInMemoryDeleteTest.java | 2 +- .../pulsar/compaction/CompactionTest.java | 2 +- 24 files changed, 699 insertions(+), 121 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckClassicTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckClassicTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java index 22a99eb3607eb..786cf5a2ff4a0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java @@ -19,6 +19,8 @@ package org.apache.bookkeeper.mledger.impl; import java.util.Optional; +import lombok.Getter; +import lombok.Setter; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; @@ -31,6 +33,11 @@ public class AckSetPositionImpl implements Position, AckSetState { protected final long ledgerId; protected final long entryId; protected volatile long[] ackSet; + @Getter + @Setter + private volatile int batchMessagesAckedCount; + @Getter + private volatile boolean positionRemovedFromCursor; public AckSetPositionImpl(long ledgerId, long entryId, long[] ackSet) { this.ledgerId = ledgerId; @@ -63,6 +70,10 @@ public Position getNext() { } } + public void markPositionRemovedFromCursor() { + this.positionRemovedFromCursor = true; + } + @Override public String toString() { return ledgerId + ":" + entryId + " (ackSet " + (ackSet == null ? "is null" : diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java index 03ff50c1c7fe5..f488de5b93711 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java @@ -26,6 +26,10 @@ * ackSet state and to extract the state. */ public interface AckSetState { + + int BATCH_MESSAGE_ACKED_AT_ONCE = -1; + int BATCH_MESSAGE_ACKED_FIRST_PART = -2; + /** * Get the ackSet bitset information encoded as a long array. * @return the ackSet @@ -38,6 +42,14 @@ public interface AckSetState { */ void setAckSet(long[] ackSet); + void setBatchMessagesAckedCount(int messagesCountAcked); + + int getBatchMessagesAckedCount(); + + void markPositionRemovedFromCursor(); + + boolean isPositionRemovedFromCursor(); + /** * Check if the ackSet is set. * @return true if the ackSet is set, false otherwise diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java index 11ab520b68e92..70ec40fbe4779 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java @@ -56,6 +56,36 @@ public static long[] getAckSetArrayOrNull(Position position) { return maybeGetAckSetState(position).map(AckSetState::getAckSet).orElse(null); } + public static void setBatchMessagesAckedCount(Position position, int messagesCountAcked) { + Optional ackSetState = maybeGetAckSetState(position); + if (ackSetState.isPresent()) { + ackSetState.get().setBatchMessagesAckedCount(messagesCountAcked); + } + } + + public static int getBatchMessagesAckedCount(Position position) { + Optional ackSetState = maybeGetAckSetState(position); + if (ackSetState.isPresent()) { + return ackSetState.get().getBatchMessagesAckedCount(); + } + return 0; + } + + public static void markPositionRemovedFromCursor(Position position) { + Optional ackSetState = maybeGetAckSetState(position); + if (ackSetState.isPresent()) { + ackSetState.get().markPositionRemovedFromCursor(); + } + } + + public static boolean isPositionRemovedFromCursor(Position position) { + Optional ackSetState = maybeGetAckSetState(position); + if (ackSetState.isPresent()) { + return ackSetState.get().isPositionRemovedFromCursor(); + } + return true; + } + /** * Get the AckSetState instance from the position. * @param position position which contains the AckSetState diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index b72b392a18dfb..cb649864f90f2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -21,6 +21,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; +import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_AT_ONCE; +import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_FIRST_PART; import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES; @@ -103,6 +105,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo.Builder; import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; @@ -2362,6 +2365,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb lock.writeLock().lock(); boolean skipMarkDeleteBecauseAckedNothing = false; + final MutableBoolean cbHasExecuted = new MutableBoolean(false); try { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}", @@ -2391,8 +2395,12 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb } long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position); if (ackSet == null || ackSet.length == 0) { - if (batchDeletedIndexes != null) { - batchDeletedIndexes.remove(position); + AckSetStateUtil.markPositionRemovedFromCursor(position); + BitSet bitSet; + if (batchDeletedIndexes == null || (bitSet = batchDeletedIndexes.remove(position)) == null) { + AckSetStateUtil.setBatchMessagesAckedCount(position, BATCH_MESSAGE_ACKED_AT_ONCE); + } else { + AckSetStateUtil.setBatchMessagesAckedCount(position, bitSet.cardinality()); } // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will // make the RangeSet recognize the "continuity" between adjacent Positions. @@ -2416,9 +2424,15 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb final var givenBitSet = BitSet.valueOf(ackSet); final var bitSet = batchDeletedIndexes.computeIfAbsent(position, __ -> givenBitSet); if (givenBitSet != bitSet) { + int unAckedBefore = bitSet.cardinality(); bitSet.and(givenBitSet); + int unAckedAfter = bitSet.cardinality(); + AckSetStateUtil.setBatchMessagesAckedCount(position, unAckedBefore - unAckedAfter); + } else { + AckSetStateUtil.setBatchMessagesAckedCount(position, BATCH_MESSAGE_ACKED_FIRST_PART); } if (bitSet.isEmpty()) { + AckSetStateUtil.markPositionRemovedFromCursor(position); Position previousPosition = ledger.getPreviousPosition(position); individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), previousPosition.getEntryId(), @@ -2479,6 +2493,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb lock.writeLock().unlock(); if (skipMarkDeleteBecauseAckedNothing) { callback.deleteComplete(ctx); + cbHasExecuted.setTrue(); } } @@ -2486,7 +2501,9 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) { isDirty = true; updateLastMarkDeleteEntryToLatest(newMarkDeletePosition, null); - callback.deleteComplete(ctx); + if (!cbHasExecuted.booleanValue()) { + callback.deleteComplete(ctx); + } return; } @@ -2497,12 +2514,18 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb internalAsyncMarkDelete(newMarkDeletePosition, properties, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { - callback.deleteComplete(ctx); + if (!cbHasExecuted.booleanValue()) { + callback.deleteComplete(ctx); + cbHasExecuted.setTrue(); + } } @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - callback.deleteFailed(exception, ctx); + if (!cbHasExecuted.booleanValue()) { + callback.deleteFailed(exception, ctx); + cbHasExecuted.setTrue(); + } } }, ctx); @@ -2513,7 +2536,9 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { log.debug("[{}] Consumer {} cursor asyncDelete error, counters: consumed {} mdPos {} rdPos {}", ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition); } - callback.deleteFailed(new ManagedLedgerException(e), ctx); + if (!cbHasExecuted.booleanValue()) { + callback.deleteFailed(new ManagedLedgerException(e), ctx); + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index f074f234b873f..10b4e39b5f275 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -326,7 +326,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i private void individualAcknowledgeMessageIfNeeded(List positions, Map properties) { if (!(subscription instanceof PulsarCompactorSubscription)) { - subscription.acknowledgeMessage(positions, AckType.Individual, properties); + subscription.acknowledgeMessage(positions, AckType.Individual, properties, null); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 3edf6e2d68140..da840525b2ad4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -50,7 +50,6 @@ import org.apache.bookkeeper.mledger.impl.AckSetStateUtil; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.MutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -549,7 +548,7 @@ public CompletableFuture messageAcked(CommandAck ack) { .thenApply(unused -> 1L); } else { List positionsAcked = Collections.singletonList(position); - subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties); + subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties, this); future = CompletableFuture.completedFuture(1L); } } else { @@ -562,87 +561,58 @@ public CompletableFuture messageAcked(CommandAck ack) { return future .thenApply(v -> { - this.messageAckRate.recordEvent(v); - this.messageAckCounter.add(v); + // The case that is typed individual ack without transaction will deal metrics after a callback + // that after cursor deleting positions, so we may receive a 0 value here. + + ackMetricRecord(v); return null; }); } + public void ackMetricRecord(long messageCountInRequest) { + if (messageCountInRequest > 0) { + this.messageAckRate.recordEvent(messageCountInRequest); + this.messageAckCounter.add(messageCountInRequest); + } + } + //this method is for individual ack not carry the transaction private CompletableFuture individualAckNormal(CommandAck ack, Map properties) { - List> positionsAcked = new ArrayList<>(); - long totalAckCount = 0; + List positionsAcked = new ArrayList<>(); for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); Position position; - ObjectIntPair ackOwnerConsumerAndBatchSize = - getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(), msgId.getEntryId()); - Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.left(); - long ackedCount; - int batchSize = ackOwnerConsumerAndBatchSize.rightInt(); if (msgId.getAckSetsCount() > 0) { long[] ackSets = new long[msgId.getAckSetsCount()]; for (int j = 0; j < msgId.getAckSetsCount(); j++) { ackSets[j] = msgId.getAckSetAt(j); } position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), ackSets); - ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets, ackOwnerConsumer); if (isTransactionEnabled()) { //sync the batch position bit set point, in order to delete the position in pending acks if (Subscription.isIndividualAckMode(subType)) { - ((PersistentSubscription) subscription) - .syncBatchPositionBitSetForPendingAck(position); + ((PersistentSubscription) subscription).syncBatchPositionBitSetForPendingAck(position); } } - addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); } else { - position = PositionFactory.create(msgId.getLedgerId(), msgId.getEntryId()); - ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer); - if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId)) { - addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); - updateBlockedConsumerOnUnackedMsgs(ackOwnerConsumer); - } + position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), null); } - - positionsAcked.add(Pair.of(ackOwnerConsumer, position)); + positionsAcked.add(position); checkAckValidationError(ack, position); - - totalAckCount += ackedCount; - } - subscription.acknowledgeMessage(positionsAcked.stream() - .map(Pair::getRight) - .collect(Collectors.toList()), AckType.Individual, properties); - CompletableFuture completableFuture = new CompletableFuture<>(); - completableFuture.complete(totalAckCount); - if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) { - completableFuture.whenComplete((v, e) -> positionsAcked.forEach(positionPair -> { - Consumer ackOwnerConsumer = positionPair.getLeft(); - Position position = positionPair.getRight(); - //check if the position can remove from the consumer pending acks. - // the bit set is empty in pending ack handle. - if (AckSetStateUtil.hasAckSet(position)) { - if (((PersistentSubscription) subscription) - .checkIsCanDeleteConsumerPendingAck(position)) { - removePendingAcks(ackOwnerConsumer, position); - } - } - })); } - return completableFuture; + subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties, this); + return CompletableFuture.completedFuture(0L); } //this method is for individual ack carry the transaction private CompletableFuture individualAckWithTransaction(CommandAck ack) { - // Individual ack - List>> positionsAcked = new ArrayList<>(); if (!isTransactionEnabled()) { return FutureUtil.failedFuture( new BrokerServiceException.NotAllowedException("Server don't support transaction ack!")); } - - LongAdder totalAckCount = new LongAdder(); + List> positionsAcked = new ArrayList<>(); for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); Position position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), null); @@ -653,20 +623,13 @@ private CompletableFuture individualAckWithTransaction(CommandAck ack) { consumerId, position); continue; } - Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.left(); - // acked count at least one - long ackedCount; int batchSize; if (msgId.hasBatchSize()) { - batchSize = msgId.getBatchSize(); - // ack batch messages set ackeCount = batchSize - ackedCount = msgId.getBatchSize(); - positionsAcked.add(Pair.of(ackOwnerConsumer, new MutablePair<>(position, msgId.getBatchSize()))); + positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize())); } else { // ack no batch message set ackedCount = 1 batchSize = 0; - ackedCount = 1; - positionsAcked.add(Pair.of(ackOwnerConsumer, new MutablePair<>(position, (int) batchSize))); + positionsAcked.add(new MutablePair<>(position, batchSize)); } if (msgId.getAckSetsCount() > 0) { @@ -675,34 +638,13 @@ private CompletableFuture individualAckWithTransaction(CommandAck ack) { ackSets[j] = msgId.getAckSetAt(j); } AckSetStateUtil.getAckSetState(position).setAckSet(ackSets); - ackedCount = getAckedCountForTransactionAck(batchSize, ackSets); } - addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); - - checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId); - checkAckValidationError(ack, position); - - totalAckCount.add(ackedCount); } - CompletableFuture completableFuture = transactionIndividualAcknowledge(ack.getTxnidMostBits(), - ack.getTxnidLeastBits(), positionsAcked.stream().map(Pair::getRight).collect(Collectors.toList())); - if (Subscription.isIndividualAckMode(subType)) { - completableFuture.whenComplete((v, e) -> - positionsAcked.forEach(positionPair -> { - Consumer ackOwnerConsumer = positionPair.getLeft(); - MutablePair positionLongMutablePair = positionPair.getRight(); - if (AckSetStateUtil.hasAckSet(positionLongMutablePair.getLeft())) { - if (((PersistentSubscription) subscription) - .checkIsCanDeleteConsumerPendingAck(positionLongMutablePair.left)) { - removePendingAcks(ackOwnerConsumer, positionLongMutablePair.left); - } - } - })); - } - return completableFuture.thenApply(__ -> totalAckCount.sum()); + return transactionIndividualAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked) + .thenApply(__ -> 0L); } private long getAckedCountForMsgIdNoAckSets(int batchSize, Position position, Consumer consumer) { @@ -745,6 +687,7 @@ private long getAckedCountForTransactionAck(int batchSize, long[] ackSets) { } private long getUnAckedCountForBatchIndexLevelEnabled(Position position, int batchSize) { + // TODO compare with the cursor. long unAckedCount = batchSize; if (isAcknowledgmentAtBatchIndexLevelEnabled) { long[] cursorAckSet = getCursorAckSet(position); @@ -1194,11 +1137,24 @@ public Subscription getSubscription() { return subscription; } - private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) { + public int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) { int unackedMsgs = 0; if (isPersistentTopic && Subscription.isIndividualAckMode(subType)) { subscription.addUnAckedMessages(ackedMessages); unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages); + if (log.isDebugEnabled()) { + if (ackedMessages > 0) { + log.debug("[{}][{}]{}-{}-{} delivered out {} messages, un-ack-msg: {}", + topicName, consumer.subscription.getName(), + consumer.cnx(), consumer.consumerId(), consumer.consumerName(), + ackedMessages, consumer.getUnackedMessages()); + } else { + log.debug("[{}][{}]{}-{}-{} acknowledged/redelivered {} messages, un-ack-msg: {}", + topicName, consumer.subscription.getName(), + consumer.cnx(), consumer.consumerId(), consumer.consumerName(), + -ackedMessages, consumer.getUnackedMessages()); + } + } } if (unackedMsgs < 0 && System.currentTimeMillis() - negativeUnackedMsgsTimestamp >= 10_000) { negativeUnackedMsgsTimestamp = System.currentTimeMillis(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java index 7a728a037dc62..c716db55e2d79 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PendingAcksMap.java @@ -304,11 +304,15 @@ public boolean remove(long ledgerId, long entryId, int batchSize, int stickyKeyH * @return true if the pending ack was removed, false otherwise */ public boolean remove(long ledgerId, long entryId) { + return removeAndReturn(ledgerId, entryId) != null; + } + + public IntIntPair removeAndReturn(long ledgerId, long entryId) { try { writeLock.lock(); Long2ObjectSortedMap ledgerMap = pendingAcks.get(ledgerId); if (ledgerMap == null) { - return false; + return null; } IntIntPair removedEntry = ledgerMap.remove(entryId); boolean removed = removedEntry != null; @@ -319,7 +323,7 @@ public boolean remove(long ledgerId, long entryId) { if (removed && ledgerMap.isEmpty()) { pendingAcks.remove(ledgerId); } - return removed; + return removedEntry; } finally { writeLock.unlock(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 452c30b45febb..ba08b2cb67940 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -29,6 +29,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; +import org.jspecify.annotations.Nullable; public interface Subscription extends MessageExpirer { @@ -48,7 +49,13 @@ default void removeConsumer(Consumer consumer) throws BrokerServiceException { void consumerFlow(Consumer consumer, int additionalNumberOfMessages); - void acknowledgeMessage(List positions, AckType ackType, Map properties); + /** + * @param ackFrom It can be null, and it will always be null if {@param ackType} is {@link AckType#Cumulative}. + * The performance will be improved, if this param is the owner consumer that received the messages + * who are being acked when {@param ackType} is {@link AckType#Individual}. + */ + void acknowledgeMessage(List positions, AckType ackType, Map properties, + @Nullable Consumer ackFrom); String getTopicName(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index d469ce1daa1a8..1ec38c8c3e8a2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -197,7 +197,8 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { } @Override - public void acknowledgeMessage(List position, AckType ackType, Map properties) { + public void acknowledgeMessage(List position, AckType ackType, Map properties, + Consumer ackFrom) { // No-op } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index c35d802f43d54..dd5e577642a8b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -362,10 +362,6 @@ public synchronized void readMoreEntries() { Position markDeletePosition = cursor.getMarkDeletedPosition(); if (lastMarkDeletePositionBeforeReadMoreEntries != markDeletePosition) { redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); - for (Consumer consumer : consumerList) { - consumer.getPendingAcks() - .removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); - } lastMarkDeletePositionBeforeReadMoreEntries = markDeletePosition; } @@ -779,9 +775,7 @@ protected final synchronized boolean sendMessagesToConsumers(ReadType readType, * if you need to change it. */ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List entries) { - if (needTrimAckedMessages()) { - cursor.trimDeletedEntries(entries); - } + cursor.trimDeletedEntries(entries); lastNumberOfEntriesProcessed = 0; int entriesToDispatch = entries.size(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 44a3a13022a82..6e2cf2ed6885a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -18,11 +18,14 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_AT_ONCE; +import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_FIRST_PART; import static org.apache.pulsar.broker.service.AbstractBaseDispatcher.checkAndApplyReachedEndOfTopicOrTopicMigration; import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import io.netty.buffer.ByteBuf; +import it.unimi.dsi.fastutil.ints.IntIntPair; import java.io.IOException; import java.util.Collections; import java.util.LinkedHashMap; @@ -52,7 +55,9 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.ScanOutcome; +import org.apache.bookkeeper.mledger.impl.AckSetStateUtil; import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; @@ -93,6 +98,8 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.PositionInPendingAckStats; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.collections.BitSetRecyclable; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -415,7 +422,8 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { } @Override - public void acknowledgeMessage(List positions, AckType ackType, Map properties) { + public void acknowledgeMessage(List positions, AckType ackType, Map properties, + @Nullable Consumer ackFrom) { cursor.updateLastActive(); Position previousMarkDeletePosition = cursor.getMarkDeletedPosition(); @@ -437,7 +445,8 @@ public void acknowledgeMessage(List positions, AckType ackType, Map { if ((cursor.isMessageDeleted(position))) { @@ -517,15 +526,24 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { private final DeleteCallback deleteCallback = new DeleteCallback() { @Override public void deleteComplete(Object context) { + ImmutableTriple, Position> ctx = + (ImmutableTriple, Position>) context; + Consumer ackFrom = ctx.getLeft(); + List positions = ctx.getMiddle(); + Position previousMarkDeletePosition = ctx.getRight(); if (log.isDebugEnabled()) { // The value of the param "context" is a position. - log.debug("[{}][{}] Deleted message at {}", topicName, subName, context); + log.debug("[{}][{}] Deleted message at {}", topicName, subName, previousMarkDeletePosition); } - // Signal the dispatchers to give chance to take extra actions + // Update pendingAcks, un-ack-messages, consumer.metrics. + if (Subscription.isIndividualAckMode(getType())) { + PersistentSubscription.this.updatePendingAckMessagesAfterAcknowledged(ackFrom, positions); + } + // Signal the dispatcher. if (dispatcher != null) { dispatcher.afterAckMessages(null, context); } - notifyTheMarkDeletePositionMoveForwardIfNeeded((Position) context); + notifyTheMarkDeletePositionMoveForwardIfNeeded(previousMarkDeletePosition); } @Override @@ -538,6 +556,137 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { } }; + private void updatePendingAckMessagesAfterAcknowledged(Consumer ackFrom, List positions) { + Dispatcher dispatcher0 = getDispatcher(); + if (dispatcher0 != null) { + /* + * There is a race condition which leads us to add this "synchronized" block. + * - consumer-1 received msg-A + * - consumption task is in-progress + * - topic was unloaded + * - reset messages to consumer-2 + * At this moment, race-condition occurs: + * - consumer-1 is acknowledging msg-A + * - dispatcher is delivering msg-A to consumer-2 + * Issue: broker received the acknowledging of msg-A, but no consumer has pending acknowledge that relate + * to msg-A so broker can not know how many single messages in the batched message. + * Solve: to get a precise messages number, this "synchronized" block is needed. + */ + synchronized (dispatcher0) { + updatePendingAckMessagesAfterAcknowledged0(ackFrom, positions); + } + } else { + updatePendingAckMessagesAfterAcknowledged0(ackFrom, positions); + } + } + + private void updatePendingAckMessagesAfterAcknowledged0(Consumer ackFrom, List positions) { + int attemptAckMsgs = 0; + for (Position position : positions) { + final long ledgerId = position.getLedgerId(); + final long entryId = position.getEntryId(); + final int batchMessagesAckedCount = AckSetStateUtil.getBatchMessagesAckedCount(position); + // TODO Transaction ack + // 并且 broker 没开启 batch index 的时候,positionRemovedFromCursor 会是 0。 + // 和 transaction 有没有关系? + final boolean positionRemovedFromCursor = AckSetStateUtil.isPositionRemovedFromCursor(position); + if (batchMessagesAckedCount == 0) { + // All messages were skipped. + // Since we can not get how many msgs that were attempted to ack, just plus 1 into the + // "attemptAckMsgs". + attemptAckMsgs++; + log.info("[{}][{}]{}-{}-{} is acknowledging {}:{}, which has been acked before. consumer_size: {}." + + " It may caused by a repeatedly consumption", + topicName, subName, + ackFrom == null ? "null" : ackFrom.cnx(), + ackFrom == null ? "null" : ackFrom.consumerId(), + ackFrom == null ? "null" : ackFrom.consumerName(), + ledgerId, entryId, getConsumers().size()); + continue; + } + // Find the messages' owner and update un-acknowledged messages. + Consumer owner = null; + IntIntPair batchSizeAndHashPair = ackFrom == null ? null + : positionRemovedFromCursor + ? ackFrom.getPendingAcks().removeAndReturn(ledgerId, entryId) + : ackFrom.getPendingAcks().get(ledgerId, entryId); + if (batchSizeAndHashPair != null) { + owner = ackFrom; + } else { + for (Consumer consumer : getConsumers()) { + if (consumer == ackFrom) { + continue; + } + batchSizeAndHashPair = positionRemovedFromCursor + ? consumer.getPendingAcks().removeAndReturn(ledgerId, entryId) + : consumer.getPendingAcks().get(ledgerId, entryId); + if (batchSizeAndHashPair != null) { + // Continue find the owner + owner = consumer; + break; + } + } + } + if (owner == null) { + // Since we can not get how many msgs that were attempted to ack, just plus 1 into the + // "attemptAckMsgs". + attemptAckMsgs++; + log.info("[{}][{}]{}-{}-{} skipped to reduce un-ack-msgs for {}:{}, because could not find the" + + " message's owner. consumer size: {}. It may caused by a concurrency acknowledging" + + " and reconnection", + topicName, subName, + ackFrom == null ? "null" : ackFrom.cnx(), + ackFrom == null ? "null" : ackFrom.consumerId(), + ackFrom == null ? "null" : ackFrom.consumerName(), + ledgerId, entryId, getConsumers().size()); + continue; + } + // Calculate messages actually acked in batch. + int actualAcked = 0; + if (batchMessagesAckedCount == BATCH_MESSAGE_ACKED_AT_ONCE) { + // All messages in batch were acked at once. + actualAcked = Math.max(batchSizeAndHashPair.firstInt(), 1); + } else if (batchMessagesAckedCount == BATCH_MESSAGE_ACKED_FIRST_PART) { + // First part of batch message acked. + // Regarding this case, only consumer knows how many messages in batch were acked, because + // the cursor do not know how many messages in the batch, only "consumer.pendingAcks" knows. + long[] ackSetWords = AckSetStateUtil.getAckSetArrayOrNull(position); + if (ackSetWords != null) { + BitSetRecyclable ackSet = BitSetRecyclable.create().resetWords(ackSetWords); + actualAcked = Math.max(batchSizeAndHashPair.firstInt() - ackSet.cardinality(), 0); + ackSet.recycle(); + } + } else { + // Following part of batch message acked. + // Regarding this case, only cursor know how many messages in batch were acked, because + // "consumer.pendingAcks" does not know how many messages were acked count before, only "cursor" + // knows. + actualAcked = batchMessagesAckedCount; + } + attemptAckMsgs += actualAcked; + // Reduce un-acknowledged messages. + owner.addAndGetUnAckedMsgs(owner, -actualAcked); + owner.updateBlockedConsumerOnUnackedMsgs(owner); + if (log.isDebugEnabled()) { + log.debug("[{}][{}] {}-{}-{} {}-{}-{} acknowledged {} messages, un-ack-msg: {}, position: {}:{}" + + " batch messages acked: {}, position {}:{} was deleted: {}", + topicName, subName, owner.cnx(), owner.consumerId(), owner.consumerName(), + ackFrom == null ? "null" : ackFrom.cnx(), + ackFrom == null ? "null" : ackFrom.consumerId(), + ackFrom == null ? "null" : ackFrom.consumerName(), + actualAcked, owner.getUnackedMessages(), ledgerId, entryId, + batchMessagesAckedCount >= 0 ? "batch_particularly_ack " + batchMessagesAckedCount + : batchMessagesAckedCount == BATCH_MESSAGE_ACKED_AT_ONCE + ? "ack_all_messages_at_once & batch_size " + batchSizeAndHashPair.firstInt() + : "first_part_ack", ledgerId, entryId, positionRemovedFromCursor); + } + } + // Consumer metrics. + if (ackFrom != null) { + ackFrom.ackMetricRecord(attemptAckMsgs); + } + } + private void notifyTheMarkDeletePositionMoveForwardIfNeeded(Position oldPosition) { Position oldMD = oldPosition; Position newMD = cursor.getMarkDeletedPosition(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java index fe13aeb572e2e..76374c4d1eb59 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java @@ -27,6 +27,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.compaction.CompactedTopic; import org.apache.pulsar.compaction.CompactedTopicContext; @@ -60,7 +61,8 @@ public PulsarCompactorSubscription(PersistentTopic topic, CompactedTopic compact } @Override - public void acknowledgeMessage(List positions, AckType ackType, Map properties) { + public void acknowledgeMessage(List positions, AckType ackType, Map properties, + Consumer ackFrom) { checkArgument(ackType == AckType.Cumulative); checkArgument(positions.size() == 1); checkArgument(properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index b21fe7acfdb6f..ac30ffdbbccca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -204,7 +204,7 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) { PersistentSubscription sub = topic.getSubscription(update.getSubscriptionName()); if (sub != null) { - sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap()); + sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap(), null); } else { // Subscription doesn't exist. We need to force the creation of the subscription in this cluster. log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subscription", @@ -213,7 +213,7 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) { true /* replicateSubscriptionState */, Collections.emptyMap()) .thenAccept(subscriptionCreated -> { subscriptionCreated.acknowledgeMessage(Collections.singletonList(pos), - AckType.Cumulative, Collections.emptyMap()); + AckType.Cumulative, Collections.emptyMap(), null); }); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 591842927f35b..7430cf85c6e42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -465,7 +465,7 @@ private void internalCommitTxn(TxnID txnID, Map properties, long l } persistentSubscription.acknowledgeMessage( Collections.singletonList(cumulativeAckOfTransaction.getValue()), - AckType.Cumulative, properties); + AckType.Cumulative, properties, null); cumulativeAckOfTransaction = null; commitFuture.complete(null); }).exceptionally(e -> { @@ -755,7 +755,7 @@ protected void handleCommit(TxnID txnID, AckType ackType, Map prop if (this.cumulativeAckOfTransaction != null) { persistentSubscription.acknowledgeMessage( Collections.singletonList(this.cumulativeAckOfTransaction.getValue()), - AckType.Cumulative, properties); + AckType.Cumulative, properties, null); } this.cumulativeAckOfTransaction = null; } else { @@ -774,7 +774,7 @@ private void individualAckCommitCommon(TxnID txnID, Map properties) { if (currentTxn != null) { persistentSubscription.acknowledgeMessage(new ArrayList<>(currentTxn.values()), - AckType.Individual, properties); + AckType.Individual, properties, null); individualAckOfTransaction.remove(txnID); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckClassicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckClassicTest.java new file mode 100644 index 0000000000000..108a022f165a1 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckClassicTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import lombok.extern.slf4j.Slf4j; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class ConsumerAckAndDisableBrokerBatchAckClassicTest extends ConsumerAckTest { + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(false); + conf.setSubscriptionSharedUseClassicPersistentImplementation(true); + super.baseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + @Test(timeOut = 60_000, dataProvider = "argsOfTestAcknowledgeConcurrently") + public void testAcknowledgeConcurrently(boolean enableBatchSend, boolean enableBatchIndexAcknowledgment1, + boolean enableBatchIndexAcknowledgment2) + throws Exception { + super.testAcknowledgeConcurrently(enableBatchSend, enableBatchIndexAcknowledgment1, enableBatchIndexAcknowledgment2); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckTest.java new file mode 100644 index 0000000000000..f6fdaa750a9be --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import lombok.extern.slf4j.Slf4j; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class ConsumerAckAndDisableBrokerBatchAckTest extends ConsumerAckTest { + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(false); + super.baseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + @Test(timeOut = 60_000, dataProvider = "argsOfTestAcknowledgeConcurrently") + public void testAcknowledgeConcurrently(boolean enableBatchSend, boolean enableBatchIndexAcknowledgment1, + boolean enableBatchIndexAcknowledgment2) + throws Exception { + super.testAcknowledgeConcurrently(enableBatchSend, enableBatchIndexAcknowledgment1, enableBatchIndexAcknowledgment2); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckClassicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckClassicTest.java new file mode 100644 index 0000000000000..d86b3327894f6 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckClassicTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import lombok.extern.slf4j.Slf4j; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class ConsumerAckClassicTest extends ConsumerAckTest { + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); + conf.setSubscriptionSharedUseClassicPersistentImplementation(true); + super.baseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + @Test(timeOut = 60_000, dataProvider = "argsOfTestAcknowledgeConcurrently") + public void testAcknowledgeConcurrently(boolean enableBatchSend, boolean enableBatchIndexAcknowledgment1, + boolean enableBatchIndexAcknowledgment2) + throws Exception { + super.testAcknowledgeConcurrently(enableBatchSend, enableBatchIndexAcknowledgment1, + enableBatchIndexAcknowledgment2); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckTest.java new file mode 100644 index 0000000000000..98f705b2830bf --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class ConsumerAckTest extends BrokerTestBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); + super.baseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @DataProvider + public Object[][] argsOfTestAcknowledgeConcurrently() { + // enableBatchSend, enableBatchIndexAcknowledgment1, enableBatchIndexAcknowledgment2 + return new Object[][] { + {true, true, true}, + {true, false, false}, + {true, true, false}, + {true, false, true}, + {false, true, true}, + {false, false, false}, + {false, true, false}, + {false, false, true}, + }; + } + + /** + * Test: one message may be acknowledged by two consumers at the same time. + * Verify: the metric "unackedMessages" should be "0" after acknowledged all messages. + * 1. Consumer-1 received messages. + * 2. Unload the topic. + * 3. The message may be sent to consumer-2, but the consumption of consumer-1 is still in-progress now. + * 4. Consumer-1 and consumer-2 acknowledge the message concurrently. + */ + @Test(timeOut = 60_000, dataProvider = "argsOfTestAcknowledgeConcurrently") + public void testAcknowledgeConcurrently(boolean enableBatchSend, boolean enableBatchIndexAcknowledgment1, + boolean enableBatchIndexAcknowledgment2) + throws Exception { + PulsarClientImpl client1 = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + PulsarClientImpl client2 = + (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); + final int msgCount = 6; + final CountDownLatch acknowledgeSignal1 = new CountDownLatch(1); + final CountDownLatch acknowledgeSignal2 = new CountDownLatch(1); + final CountDownLatch acknowledgeSignal3 = new CountDownLatch(1); + final CountDownLatch acknowledgeFinishedSignal = new CountDownLatch(3); + final String topic = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp"); + final String subscription1 = "s1"; + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createSubscription(topic, subscription1, MessageId.earliest); + Producer producer = client1.newProducer(Schema.STRING).topic(topic) + .enableBatching(enableBatchSend).batchingMaxMessages(4).create(); + for (int i = 0; i < msgCount; i++) { + producer.sendAsync(i + ""); + } + + // Consumer-1 using user threads to consume asynchronously, it will not acknowledge messages one by one. + ConsumerImpl consumer1 = (ConsumerImpl) client1.newConsumer(Schema.STRING) + .topic(topic).subscriptionName(subscription1) + .enableBatchIndexAcknowledgment(enableBatchIndexAcknowledgment1) + .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS).consumerName("c1") + .subscriptionType(SubscriptionType.Shared).subscribe(); + Awaitility.await().untilAsserted(() -> { + assertEquals(consumer1.numMessagesInQueue(), msgCount); + }); + consumer1.pause(); + List msgReceivedC11 = new ArrayList<>(); + List msgReceivedC12 = new ArrayList<>(); + List msgReceivedC13 = new ArrayList<>(); + for (int i = 0; i < msgCount; i++) { + Message msg = consumer1.receive(); + assertNotNull(msg); + if (i % 4 == 0) { + msgReceivedC11.add(msg.getMessageId()); + } else if (i % 3 == 0) { + msgReceivedC12.add(msg.getMessageId()); + } else if (i % 2 == 0) { + msgReceivedC13.add(msg.getMessageId()); + } else { + msgReceivedC13.add(msg.getMessageId()); + } + } + new Thread(() -> { + try { + acknowledgeSignal1.await(); + consumer1.acknowledge(msgReceivedC11); + acknowledgeSignal2.await(); + consumer1.acknowledge(msgReceivedC12); + acknowledgeSignal3.await(); + consumer1.acknowledge(msgReceivedC13); + consumer1.resume(); + acknowledgeFinishedSignal.countDown(); + } catch (Exception e) { + log.error("consumer-1 acknowledge failure", e); + throw new RuntimeException(e); + } + }).start(); + + // After a topic unloading, the messages will be resent to consumer-2. + // Consumer-2 using user threads to consume asynchronously, it will not acknowledge messages one by one. + ConsumerImpl consumer2 = (ConsumerImpl) client2.newConsumer(Schema.STRING) + .topic(topic).subscriptionName(subscription1) + .enableBatchIndexAcknowledgment(enableBatchIndexAcknowledgment2) + .acknowledgmentGroupTime(1, TimeUnit.MILLISECONDS).consumerName("c2") + .subscriptionType(SubscriptionType.Shared).subscribe(); + admin.topics().unload(topic); + Awaitility.await().untilAsserted(() -> { + assertEquals(consumer2.numMessagesInQueue(), msgCount); + }); + List msgReceivedC21 = new ArrayList<>(); + List msgReceivedC22 = new ArrayList<>(); + List msgReceivedC23 = new ArrayList<>(); + for (int i = 0; i < msgCount; i++) { + Message msg = consumer2.receive(); + assertNotNull(msg); + if (i % 4 == 0) { + msgReceivedC21.add(msg.getMessageId()); + } else if (i % 3 == 0) { + msgReceivedC22.add(msg.getMessageId()); + } else if (i % 2 == 0) { + msgReceivedC23.add(msg.getMessageId()); + } else { + msgReceivedC23.add(msg.getMessageId()); + } + } + new Thread(() -> { + try { + acknowledgeSignal1.await(); + consumer2.acknowledge(msgReceivedC21); + acknowledgeSignal2.await(); + consumer2.acknowledge(msgReceivedC22); + acknowledgeSignal3.await(); + consumer2.acknowledge(msgReceivedC23); + acknowledgeFinishedSignal.countDown(); + } catch (Exception e) { + log.error("consumer-2 acknowledge failure", e); + throw new RuntimeException(e); + } + }).start(); + // Start another thread to mock a consumption repeatedly. + new Thread(() -> { + try { + acknowledgeSignal1.await(); + consumer2.acknowledge(msgReceivedC21); + acknowledgeSignal2.await(); + consumer2.acknowledge(msgReceivedC22); + acknowledgeSignal3.await(); + consumer2.acknowledge(msgReceivedC23); + acknowledgeFinishedSignal.countDown(); + } catch (Exception e) { + log.error("consumer-2 acknowledge failure", e); + throw new RuntimeException(e); + } + }).start(); + + // Trigger concurrently acknowledge. + acknowledgeSignal1.countDown(); + Thread.sleep(1000); + acknowledgeSignal2.countDown(); + Thread.sleep(1000); + acknowledgeSignal3.countDown(); + + // Verify: the metric "unackedMessages" should be "0" after acknowledged all messages. + acknowledgeFinishedSignal.await(); + Awaitility.await().untilAsserted(() -> { + SubscriptionStats stats = admin.topics().getStats(topic).getSubscriptions().get(subscription1); + log.info("backlog: {}, unack: {}", stats.getMsgBacklog(), stats.getUnackedMessages()); + assertEquals(stats.getMsgBacklog(), 0); + assertEquals(stats.getUnackedMessages(), 0); + for (ConsumerStats consumerStats : stats.getConsumers()) { + assertEquals(consumerStats.getUnackedMessages(), 0); + } + }); + + // cleanup. + consumer1.close(); + consumer2.close(); + producer.close(); + client1.close(); + client2.close(); + admin.topics().delete(topic, false); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java index 31a9b7f95d676..9ad0814bca89b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java @@ -84,7 +84,7 @@ public void setup() throws Exception { doReturn(Codec.encode("sub-1")).when(cursor).getName(); sub = spy(new PersistentSubscription(persistentTopic, "sub-1", cursor, false)); - doNothing().when(sub).acknowledgeMessage(any(), any(), any()); + doNothing().when(sub).acknowledgeMessage(any(), any(), any(), any()); } @AfterMethod(alwaysRun = true) @@ -124,7 +124,7 @@ public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throw commandAck.addMessageId().setEntryId(0L).setLedgerId(1L); consumer.messageAcked(commandAck).get(); - verify(sub, never()).acknowledgeMessage(any(), any(), any()); + verify(sub, never()).acknowledgeMessage(any(), any(), any(), any()); } @Test(timeOut = 5000, dataProvider = "notIndividualAckModes") @@ -139,7 +139,7 @@ public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) th commandAck.addMessageId().setEntryId(0L).setLedgerId(1L); consumer.messageAcked(commandAck).get(); - verify(sub, times(1)).acknowledgeMessage(any(), any(), any()); + verify(sub, times(1)).acknowledgeMessage(any(), any(), any(), any()); } @Test(timeOut = 5000) @@ -155,6 +155,6 @@ public void testAckWithMoreThanNoneMessageIds() throws Exception { commandAck.addMessageId().setEntryId(0L).setLedgerId(2L); consumer.messageAcked(commandAck).get(); - verify(sub, never()).acknowledgeMessage(any(), any(), any()); + verify(sub, never()).acknowledgeMessage(any(), any(), any(), any()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index e36a940b4ea5c..2a23920d84e88 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1776,7 +1776,7 @@ public void testCompactorSubscription() { Position position = PositionFactory.create(1, 1); long ledgerId = 0xc0bfefeL; sub.acknowledgeMessage(Collections.singletonList(position), AckType.Cumulative, - Map.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId)); + Map.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId), null); verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position, ledgerId); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java index b945f5abcbc4d..2c76f0b22a986 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java @@ -86,7 +86,7 @@ public void testMarkerDeleteTimes() throws Exception { spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "test", cursor, false); Position position = managedLedger.addEntry("test".getBytes()); persistentSubscription.acknowledgeMessage(Collections.singletonList(position), - AckType.Individual, Collections.emptyMap()); + AckType.Individual, Collections.emptyMap(), null); verify(managedLedger, times(0)).asyncReadEntry(any(), any(), any()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index 360be2e435ab1..3d0482f9e3712 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -191,7 +191,7 @@ public void testCanAcknowledgeAndAbortForTransaction() throws Exception { positionList.add(PositionFactory.create(3, 5)); // Acknowledge from normal consumer will succeed ignoring message acked by ongoing transaction. - persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap()); + persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap(), null); //Abort txn. persistentSubscription.endTxn(txnID1.getMostSigBits(), txnID2.getLeastSigBits(), TxnAction.ABORT_VALUE, -1); @@ -223,7 +223,7 @@ public void testAcknowledgeUpdateCursorLastActive() throws Exception { positionList.add(PositionFactory.create(1, 1)); long beforeAcknowledgeTimestamp = System.currentTimeMillis(); Thread.sleep(1); - persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap()); + persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap(), null); // `acknowledgeMessage` should update cursor last active assertTrue(persistentSubscription.cursor.getLastActive() > beforeAcknowledgeTimestamp); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java index 4f773e8a124b0..b9e9d532a2945 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckInMemoryDeleteTest.java @@ -246,7 +246,7 @@ public void txnAckTestBatchAndSharedSubMemoryDeleteTest() throws Exception { // and it won't clear the last message in cursor batch index ack set consumer.acknowledgeAsync(messageIds[1], commitTwice).get(); assertEquals(batchDeletedIndexes.size(), 1); - assertEquals(testPersistentSubscription.getConsumers().get(0).getPendingAcks().size(), 0); + assertEquals(testPersistentSubscription.getConsumers().get(0).getPendingAcks().size(), 1); // the messages has been produced were all acked, the memory in broker for the messages has been cleared. commitTwice.commit().get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index f43e2a1c672c7..b322369f1b013 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -2136,7 +2136,7 @@ public void testDeleteCompactedLedgerWithSlowAck() throws Exception { Thread.sleep(500); } }).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq( - CommandAck.AckType.Cumulative), Mockito.any()); + CommandAck.AckType.Cumulative), Mockito.any(), any()); // trigger compaction admin.topics().triggerCompaction(topicName); From 48f4651fb5977a38f4ce30804bd90de5298f6c80 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 9 Jul 2025 22:25:39 +0800 Subject: [PATCH 2/4] [fix] [broker] Fix negative subscription/consumer's unack-messages --- .../bookkeeper/mledger/AsyncCallbacks.java | 21 ++++ .../mledger/impl/AckSetPositionImpl.java | 11 -- .../bookkeeper/mledger/impl/AckSetState.java | 12 -- .../mledger/impl/AckSetStateUtil.java | 30 ----- .../mledger/impl/ManagedCursorImpl.java | 84 ++++++------- .../service/AbstractBaseDispatcher.java | 2 +- .../pulsar/broker/service/Consumer.java | 55 ++++++-- .../pulsar/broker/service/Subscription.java | 2 +- .../NonPersistentSubscription.java | 2 +- .../persistent/PersistentSubscription.java | 119 ++++++++++-------- .../PulsarCompactorSubscription.java | 2 +- .../ReplicatedSubscriptionsController.java | 4 +- .../pendingack/impl/PendingAckHandleImpl.java | 6 +- ...ckAndDisableBrokerBatchAckClassicTest.java | 51 -------- ...nsumerAckAndDisableBrokerBatchAckTest.java | 50 -------- .../service/ConsumerAckClassicTest.java | 52 -------- ...t.java => ConsumerConcurrencyAckTest.java} | 45 +++++-- .../service/MessageCumulativeAckTest.java | 9 +- .../broker/service/PersistentTopicTest.java | 2 +- .../service/TransactionMarkerDeleteTest.java | 2 +- .../PersistentSubscriptionTest.java | 4 +- .../pulsar/compaction/CompactionTest.java | 3 +- 22 files changed, 225 insertions(+), 343 deletions(-) delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckClassicTest.java delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckTest.java delete mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckClassicTest.java rename pulsar-broker/src/test/java/org/apache/pulsar/broker/service/{ConsumerAckTest.java => ConsumerConcurrencyAckTest.java} (85%) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java index 70db427afce4f..11a7f6b95186b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import lombok.AllArgsConstructor; +import lombok.Data; import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.annotation.InterfaceStability; @@ -118,6 +120,25 @@ interface DeleteCallback { void deleteFailed(ManagedLedgerException exception, Object ctx); } + interface CursorDeleteCallback extends DeleteCallback { + void deleteComplete(Object ctx, List positionAckStates); + } + + @AllArgsConstructor + @Data + class PositionAckState { + Position position; + BatchMsgAckResType batchMsgAckResType; + int batchMessageAckCount; + } + + enum BatchMsgAckResType { + AckAllAtOnce, + FirstPartialAck, + PartialAck, + LatestPartialAck; + } + interface TerminateCallback { void terminateComplete(Position lastCommittedPosition, Object ctx); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java index 786cf5a2ff4a0..22a99eb3607eb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetPositionImpl.java @@ -19,8 +19,6 @@ package org.apache.bookkeeper.mledger.impl; import java.util.Optional; -import lombok.Getter; -import lombok.Setter; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; @@ -33,11 +31,6 @@ public class AckSetPositionImpl implements Position, AckSetState { protected final long ledgerId; protected final long entryId; protected volatile long[] ackSet; - @Getter - @Setter - private volatile int batchMessagesAckedCount; - @Getter - private volatile boolean positionRemovedFromCursor; public AckSetPositionImpl(long ledgerId, long entryId, long[] ackSet) { this.ledgerId = ledgerId; @@ -70,10 +63,6 @@ public Position getNext() { } } - public void markPositionRemovedFromCursor() { - this.positionRemovedFromCursor = true; - } - @Override public String toString() { return ledgerId + ":" + entryId + " (ackSet " + (ackSet == null ? "is null" : diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java index f488de5b93711..03ff50c1c7fe5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetState.java @@ -26,10 +26,6 @@ * ackSet state and to extract the state. */ public interface AckSetState { - - int BATCH_MESSAGE_ACKED_AT_ONCE = -1; - int BATCH_MESSAGE_ACKED_FIRST_PART = -2; - /** * Get the ackSet bitset information encoded as a long array. * @return the ackSet @@ -42,14 +38,6 @@ public interface AckSetState { */ void setAckSet(long[] ackSet); - void setBatchMessagesAckedCount(int messagesCountAcked); - - int getBatchMessagesAckedCount(); - - void markPositionRemovedFromCursor(); - - boolean isPositionRemovedFromCursor(); - /** * Check if the ackSet is set. * @return true if the ackSet is set, false otherwise diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java index 70ec40fbe4779..11ab520b68e92 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/AckSetStateUtil.java @@ -56,36 +56,6 @@ public static long[] getAckSetArrayOrNull(Position position) { return maybeGetAckSetState(position).map(AckSetState::getAckSet).orElse(null); } - public static void setBatchMessagesAckedCount(Position position, int messagesCountAcked) { - Optional ackSetState = maybeGetAckSetState(position); - if (ackSetState.isPresent()) { - ackSetState.get().setBatchMessagesAckedCount(messagesCountAcked); - } - } - - public static int getBatchMessagesAckedCount(Position position) { - Optional ackSetState = maybeGetAckSetState(position); - if (ackSetState.isPresent()) { - return ackSetState.get().getBatchMessagesAckedCount(); - } - return 0; - } - - public static void markPositionRemovedFromCursor(Position position) { - Optional ackSetState = maybeGetAckSetState(position); - if (ackSetState.isPresent()) { - ackSetState.get().markPositionRemovedFromCursor(); - } - } - - public static boolean isPositionRemovedFromCursor(Position position) { - Optional ackSetState = maybeGetAckSetState(position); - if (ackSetState.isPresent()) { - return ackSetState.get().isPositionRemovedFromCursor(); - } - return true; - } - /** * Get the AckSetState instance from the position. * @param position position which contains the AckSetState diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index cb649864f90f2..bb730b592e006 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -21,13 +21,14 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; -import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_AT_ONCE; -import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_FIRST_PART; import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; +import static org.apache.bookkeeper.mledger.AsyncCallbacks.BatchMsgAckResType; +import static org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; +import static org.apache.bookkeeper.mledger.AsyncCallbacks.PositionAckState; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.Collections2; @@ -105,7 +106,6 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo.Builder; import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty; -import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats; @@ -2354,7 +2354,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { @Override - public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallback callback, Object ctx) { + public void asyncDelete(Iterable positions, DeleteCallback callback, Object ctx) { if (isClosed()) { callback.deleteFailed(new ManagedLedgerException .CursorAlreadyClosedException("Cursor was already closed"), ctx); @@ -2365,13 +2365,12 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb lock.writeLock().lock(); boolean skipMarkDeleteBecauseAckedNothing = false; - final MutableBoolean cbHasExecuted = new MutableBoolean(false); + List positionAckNotices = new ArrayList<>(); try { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}", ledger.getName(), name, positions, individualDeletedMessages, markDeletePosition); } - for (Position pos : positions) { Position position = requireNonNull(pos); if (ledger.getLastConfirmedEntry().compareTo(position) < 0) { @@ -2395,12 +2394,12 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb } long[] ackSet = AckSetStateUtil.getAckSetArrayOrNull(position); if (ackSet == null || ackSet.length == 0) { - AckSetStateUtil.markPositionRemovedFromCursor(position); BitSet bitSet; if (batchDeletedIndexes == null || (bitSet = batchDeletedIndexes.remove(position)) == null) { - AckSetStateUtil.setBatchMessagesAckedCount(position, BATCH_MESSAGE_ACKED_AT_ONCE); + positionAckNotices.add(new PositionAckState(position, BatchMsgAckResType.AckAllAtOnce, -1)); } else { - AckSetStateUtil.setBatchMessagesAckedCount(position, bitSet.cardinality()); + positionAckNotices.add(new PositionAckState(position, BatchMsgAckResType.LatestPartialAck, + bitSet.cardinality())); } // Add a range (prev, pos] to the set. Adding the previous entry as an open limit to the range will // make the RangeSet recognize the "continuity" between adjacent Positions. @@ -2421,24 +2420,27 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb individualDeletedMessages); } } else if (batchDeletedIndexes != null) { - final var givenBitSet = BitSet.valueOf(ackSet); - final var bitSet = batchDeletedIndexes.computeIfAbsent(position, __ -> givenBitSet); - if (givenBitSet != bitSet) { - int unAckedBefore = bitSet.cardinality(); - bitSet.and(givenBitSet); - int unAckedAfter = bitSet.cardinality(); - AckSetStateUtil.setBatchMessagesAckedCount(position, unAckedBefore - unAckedAfter); + final var ackingBitSet = BitSet.valueOf(ackSet); + final var combinedBitSet = batchDeletedIndexes.computeIfAbsent(position, __ -> ackingBitSet); + if (ackingBitSet != combinedBitSet) { + int unAckedBefore = combinedBitSet.cardinality(); + combinedBitSet.and(ackingBitSet); + int unAckedAfter = combinedBitSet.cardinality(); + if (combinedBitSet.isEmpty()) { + Position previousPosition = ledger.getPreviousPosition(position); + individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), + previousPosition.getEntryId(), + position.getLedgerId(), position.getEntryId()); + MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this); + batchDeletedIndexes.remove(position); + positionAckNotices.add(new PositionAckState(position, BatchMsgAckResType.LatestPartialAck, + unAckedBefore - unAckedAfter)); + } else { + positionAckNotices.add(new PositionAckState(position, BatchMsgAckResType.PartialAck, + unAckedBefore - unAckedAfter)); + } } else { - AckSetStateUtil.setBatchMessagesAckedCount(position, BATCH_MESSAGE_ACKED_FIRST_PART); - } - if (bitSet.isEmpty()) { - AckSetStateUtil.markPositionRemovedFromCursor(position); - Position previousPosition = ledger.getPreviousPosition(position); - individualDeletedMessages.addOpenClosed(previousPosition.getLedgerId(), - previousPosition.getEntryId(), - position.getLedgerId(), position.getEntryId()); - MSG_CONSUMED_COUNTER_UPDATER.incrementAndGet(this); - batchDeletedIndexes.remove(position); + positionAckNotices.add(new PositionAckState(position, BatchMsgAckResType.FirstPartialAck, -1)); } } } @@ -2492,8 +2494,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb } finally { lock.writeLock().unlock(); if (skipMarkDeleteBecauseAckedNothing) { - callback.deleteComplete(ctx); - cbHasExecuted.setTrue(); + completeDeleteCallback(callback, ctx, positionAckNotices); } } @@ -2501,9 +2502,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) { isDirty = true; updateLastMarkDeleteEntryToLatest(newMarkDeletePosition, null); - if (!cbHasExecuted.booleanValue()) { - callback.deleteComplete(ctx); - } + completeDeleteCallback(callback, ctx, positionAckNotices); return; } @@ -2514,18 +2513,12 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb internalAsyncMarkDelete(newMarkDeletePosition, properties, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { - if (!cbHasExecuted.booleanValue()) { - callback.deleteComplete(ctx); - cbHasExecuted.setTrue(); - } + completeDeleteCallback(callback, ctx, positionAckNotices); } @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - if (!cbHasExecuted.booleanValue()) { - callback.deleteFailed(exception, ctx); - cbHasExecuted.setTrue(); - } + callback.deleteFailed(exception, ctx); } }, ctx); @@ -2536,9 +2529,16 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { log.debug("[{}] Consumer {} cursor asyncDelete error, counters: consumed {} mdPos {} rdPos {}", ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition); } - if (!cbHasExecuted.booleanValue()) { - callback.deleteFailed(new ManagedLedgerException(e), ctx); - } + callback.deleteFailed(new ManagedLedgerException(e), ctx); + } + } + + private void completeDeleteCallback(DeleteCallback deleteCallback, Object ctx, + List positionAckStates) { + if (deleteCallback instanceof AsyncCallbacks.CursorDeleteCallback cursorDeleteCallback) { + cursorDeleteCallback.deleteComplete(ctx, positionAckStates); + } else { + deleteCallback.deleteComplete(ctx); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 10b4e39b5f275..c092a6990a90f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -326,7 +326,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i private void individualAcknowledgeMessageIfNeeded(List positions, Map properties) { if (!(subscription instanceof PulsarCompactorSubscription)) { - subscription.acknowledgeMessage(positions, AckType.Individual, properties, null); + subscription.acknowledgeMessage(positions, AckType.Individual, properties, null, false); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index da840525b2ad4..123ff104661fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -50,6 +50,7 @@ import org.apache.bookkeeper.mledger.impl.AckSetStateUtil; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -548,7 +549,7 @@ public CompletableFuture messageAcked(CommandAck ack) { .thenApply(unused -> 1L); } else { List positionsAcked = Collections.singletonList(position); - subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties, this); + subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties, this, false); future = CompletableFuture.completedFuture(1L); } } else { @@ -563,16 +564,15 @@ public CompletableFuture messageAcked(CommandAck ack) { .thenApply(v -> { // The case that is typed individual ack without transaction will deal metrics after a callback // that after cursor deleting positions, so we may receive a 0 value here. - ackMetricRecord(v); return null; }); } - public void ackMetricRecord(long messageCountInRequest) { - if (messageCountInRequest > 0) { - this.messageAckRate.recordEvent(messageCountInRequest); - this.messageAckCounter.add(messageCountInRequest); + public void ackMetricRecord(long msgCountAcked) { + if (msgCountAcked > 0) { + this.messageAckRate.recordEvent(msgCountAcked); + this.messageAckCounter.add(msgCountAcked); } } @@ -601,18 +601,21 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map individualAckWithTransaction(CommandAck ack) { + // Individual ack + List>> positionsAcked = new ArrayList<>(); if (!isTransactionEnabled()) { return FutureUtil.failedFuture( new BrokerServiceException.NotAllowedException("Server don't support transaction ack!")); } - List> positionsAcked = new ArrayList<>(); + + LongAdder totalAckCount = new LongAdder(); for (int i = 0; i < ack.getMessageIdsCount(); i++) { MessageIdData msgId = ack.getMessageIdAt(i); Position position = AckSetStateUtil.createPositionWithAckSet(msgId.getLedgerId(), msgId.getEntryId(), null); @@ -623,13 +626,20 @@ private CompletableFuture individualAckWithTransaction(CommandAck ack) { consumerId, position); continue; } + Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.left(); + // acked count at least one + long ackedCount; int batchSize; if (msgId.hasBatchSize()) { - positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize())); + batchSize = msgId.getBatchSize(); + // ack batch messages set ackeCount = batchSize + ackedCount = msgId.getBatchSize(); + positionsAcked.add(Pair.of(ackOwnerConsumer, new MutablePair<>(position, msgId.getBatchSize()))); } else { // ack no batch message set ackedCount = 1 batchSize = 0; - positionsAcked.add(new MutablePair<>(position, batchSize)); + ackedCount = 1; + positionsAcked.add(Pair.of(ackOwnerConsumer, new MutablePair<>(position, (int) batchSize))); } if (msgId.getAckSetsCount() > 0) { @@ -638,13 +648,34 @@ private CompletableFuture individualAckWithTransaction(CommandAck ack) { ackSets[j] = msgId.getAckSetAt(j); } AckSetStateUtil.getAckSetState(position).setAckSet(ackSets); + ackedCount = getAckedCountForTransactionAck(batchSize, ackSets); } + addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); + + checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId); + checkAckValidationError(ack, position); + + totalAckCount.add(ackedCount); } - return transactionIndividualAcknowledge(ack.getTxnidMostBits(), ack.getTxnidLeastBits(), positionsAcked) - .thenApply(__ -> 0L); + CompletableFuture completableFuture = transactionIndividualAcknowledge(ack.getTxnidMostBits(), + ack.getTxnidLeastBits(), positionsAcked.stream().map(Pair::getRight).collect(Collectors.toList())); + if (Subscription.isIndividualAckMode(subType)) { + completableFuture.whenComplete((v, e) -> + positionsAcked.forEach(positionPair -> { + Consumer ackOwnerConsumer = positionPair.getLeft(); + MutablePair positionLongMutablePair = positionPair.getRight(); + if (AckSetStateUtil.hasAckSet(positionLongMutablePair.getLeft())) { + if (((PersistentSubscription) subscription) + .checkIsCanDeleteConsumerPendingAck(positionLongMutablePair.left)) { + removePendingAcks(ackOwnerConsumer, positionLongMutablePair.left); + } + } + })); + } + return completableFuture.thenApply(__ -> totalAckCount.sum()); } private long getAckedCountForMsgIdNoAckSets(int batchSize, Position position, Consumer consumer) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index ba08b2cb67940..2ab042c956bdb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -55,7 +55,7 @@ default void removeConsumer(Consumer consumer) throws BrokerServiceException { * who are being acked when {@param ackType} is {@link AckType#Individual}. */ void acknowledgeMessage(List positions, AckType ackType, Map properties, - @Nullable Consumer ackFrom); + @Nullable Consumer ackFrom, boolean triggeredByTxnCommit); String getTopicName(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 1ec38c8c3e8a2..d984f58915015 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -198,7 +198,7 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { @Override public void acknowledgeMessage(List position, AckType ackType, Map properties, - Consumer ackFrom) { + Consumer ackFrom, boolean triggerByTxnCommit) { // No-op } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 6e2cf2ed6885a..8ca4f6cfed083 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.broker.service.persistent; -import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_AT_ONCE; -import static org.apache.bookkeeper.mledger.impl.AckSetState.BATCH_MESSAGE_ACKED_FIRST_PART; import static org.apache.pulsar.broker.service.AbstractBaseDispatcher.checkAndApplyReachedEndOfTopicOrTopicMigration; import static org.apache.pulsar.common.naming.SystemTopicNames.isEventSystemTopic; import com.google.common.annotations.VisibleForTesting; @@ -42,9 +40,12 @@ import java.util.stream.Collectors; import lombok.Getter; import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.AsyncCallbacks.BatchMsgAckResType; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.CursorDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.PositionAckState; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -57,7 +58,7 @@ import org.apache.bookkeeper.mledger.ScanOutcome; import org.apache.bookkeeper.mledger.impl.AckSetStateUtil; import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang3.tuple.ImmutableTriple; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; @@ -423,7 +424,7 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { @Override public void acknowledgeMessage(List positions, AckType ackType, Map properties, - @Nullable Consumer ackFrom) { + @Nullable Consumer ackFrom, boolean triggeredByTxnCommit) { cursor.updateLastActive(); Position previousMarkDeletePosition = cursor.getMarkDeletedPosition(); @@ -445,8 +446,12 @@ public void acknowledgeMessage(List positions, AckType ackType, Map { if ((cursor.isMessageDeleted(position))) { @@ -526,10 +531,32 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { private final DeleteCallback deleteCallback = new DeleteCallback() { @Override public void deleteComplete(Object context) { - ImmutableTriple, Position> ctx = - (ImmutableTriple, Position>) context; + if (log.isDebugEnabled()) { + // The value of the param "context" is a position. + log.debug("[{}][{}] Deleted message at {}", topicName, subName, context); + } + // Signal the dispatchers to give chance to take extra actions + if (dispatcher != null) { + dispatcher.afterAckMessages(null, context); + } + notifyTheMarkDeletePositionMoveForwardIfNeeded((Position) context); + } + + @Override + public void deleteFailed(ManagedLedgerException exception, Object ctx) { + log.warn("[{}][{}] Failed to delete message at {}: {}", topicName, subName, ctx, exception); + // Signal the dispatchers to give chance to take extra actions + if (dispatcher != null) { + dispatcher.afterAckMessages(exception, ctx); + } + } + }; + + private final DeleteCallback deleteCallbackWithHandlingAckState = new CursorDeleteCallback() { + @Override + public void deleteComplete(Object context, List positionAckStates) { + ImmutablePair ctx = (ImmutablePair) context; Consumer ackFrom = ctx.getLeft(); - List positions = ctx.getMiddle(); Position previousMarkDeletePosition = ctx.getRight(); if (log.isDebugEnabled()) { // The value of the param "context" is a position. @@ -537,7 +564,7 @@ public void deleteComplete(Object context) { } // Update pendingAcks, un-ack-messages, consumer.metrics. if (Subscription.isIndividualAckMode(getType())) { - PersistentSubscription.this.updatePendingAckMessagesAfterAcknowledged(ackFrom, positions); + PersistentSubscription.this.updatePendingAckMessagesAfterAcknowledged(ackFrom, positionAckStates); } // Signal the dispatcher. if (dispatcher != null) { @@ -546,17 +573,18 @@ public void deleteComplete(Object context) { notifyTheMarkDeletePositionMoveForwardIfNeeded(previousMarkDeletePosition); } + @Override + public void deleteComplete(Object context) { + deleteCallback.deleteComplete(context); + } + @Override public void deleteFailed(ManagedLedgerException exception, Object ctx) { - log.warn("[{}][{}] Failed to delete message at {}: {}", topicName, subName, ctx, exception); - // Signal the dispatchers to give chance to take extra actions - if (dispatcher != null) { - dispatcher.afterAckMessages(exception, ctx); - } + deleteCallback.deleteFailed(exception, ctx); } }; - private void updatePendingAckMessagesAfterAcknowledged(Consumer ackFrom, List positions) { + private void updatePendingAckMessagesAfterAcknowledged(Consumer ackFrom, List ackedPositions) { Dispatcher dispatcher0 = getDispatcher(); if (dispatcher0 != null) { /* @@ -573,43 +601,28 @@ private void updatePendingAckMessagesAfterAcknowledged(Consumer ackFrom, List positions) { - int attemptAckMsgs = 0; - for (Position position : positions) { + private void updatePendingAckMessagesAfterAcknowledged0(Consumer ackFrom, List ackedPositions) { + int totalMsgAcked = 0; + for (PositionAckState ackState : ackedPositions) { + Position position = ackState.getPosition(); final long ledgerId = position.getLedgerId(); final long entryId = position.getEntryId(); - final int batchMessagesAckedCount = AckSetStateUtil.getBatchMessagesAckedCount(position); - // TODO Transaction ack - // 并且 broker 没开启 batch index 的时候,positionRemovedFromCursor 会是 0。 - // 和 transaction 有没有关系? - final boolean positionRemovedFromCursor = AckSetStateUtil.isPositionRemovedFromCursor(position); - if (batchMessagesAckedCount == 0) { - // All messages were skipped. - // Since we can not get how many msgs that were attempted to ack, just plus 1 into the - // "attemptAckMsgs". - attemptAckMsgs++; - log.info("[{}][{}]{}-{}-{} is acknowledging {}:{}, which has been acked before. consumer_size: {}." - + " It may caused by a repeatedly consumption", - topicName, subName, - ackFrom == null ? "null" : ackFrom.cnx(), - ackFrom == null ? "null" : ackFrom.consumerId(), - ackFrom == null ? "null" : ackFrom.consumerName(), - ledgerId, entryId, getConsumers().size()); - continue; - } + final boolean positionRemovedFromCursor = + ackState.getBatchMsgAckResType() == BatchMsgAckResType.AckAllAtOnce + || ackState.getBatchMsgAckResType() == BatchMsgAckResType.LatestPartialAck; // Find the messages' owner and update un-acknowledged messages. Consumer owner = null; IntIntPair batchSizeAndHashPair = ackFrom == null ? null - : positionRemovedFromCursor + : (positionRemovedFromCursor ? ackFrom.getPendingAcks().removeAndReturn(ledgerId, entryId) - : ackFrom.getPendingAcks().get(ledgerId, entryId); + : ackFrom.getPendingAcks().get(ledgerId, entryId)); if (batchSizeAndHashPair != null) { owner = ackFrom; } else { @@ -630,7 +643,7 @@ private void updatePendingAckMessagesAfterAcknowledged0(Consumer ackFrom, List

= 0 ? "batch_particularly_ack " + batchMessagesAckedCount - : batchMessagesAckedCount == BATCH_MESSAGE_ACKED_AT_ONCE - ? "ack_all_messages_at_once & batch_size " + batchSizeAndHashPair.firstInt() - : "first_part_ack", ledgerId, entryId, positionRemovedFromCursor); + actualAcked, owner.getUnackedMessages(), ledgerId, entryId, ackState); } } // Consumer metrics. if (ackFrom != null) { - ackFrom.ackMetricRecord(attemptAckMsgs); + ackFrom.ackMetricRecord(totalMsgAcked); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java index 76374c4d1eb59..4237f74a74ccc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PulsarCompactorSubscription.java @@ -62,7 +62,7 @@ public PulsarCompactorSubscription(PersistentTopic topic, CompactedTopic compact @Override public void acknowledgeMessage(List positions, AckType ackType, Map properties, - Consumer ackFrom) { + Consumer ackFrom, boolean triggerByTxnCommit) { checkArgument(ackType == AckType.Cumulative); checkArgument(positions.size() == 1); checkArgument(properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index ac30ffdbbccca..0b93de0e0303f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -204,7 +204,7 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) { PersistentSubscription sub = topic.getSubscription(update.getSubscriptionName()); if (sub != null) { - sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap(), null); + sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap(), null, false); } else { // Subscription doesn't exist. We need to force the creation of the subscription in this cluster. log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subscription", @@ -213,7 +213,7 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) { true /* replicateSubscriptionState */, Collections.emptyMap()) .thenAccept(subscriptionCreated -> { subscriptionCreated.acknowledgeMessage(Collections.singletonList(pos), - AckType.Cumulative, Collections.emptyMap(), null); + AckType.Cumulative, Collections.emptyMap(), null, false); }); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 7430cf85c6e42..5a3af0a2c750f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -465,7 +465,7 @@ private void internalCommitTxn(TxnID txnID, Map properties, long l } persistentSubscription.acknowledgeMessage( Collections.singletonList(cumulativeAckOfTransaction.getValue()), - AckType.Cumulative, properties, null); + AckType.Cumulative, properties, null, true); cumulativeAckOfTransaction = null; commitFuture.complete(null); }).exceptionally(e -> { @@ -755,7 +755,7 @@ protected void handleCommit(TxnID txnID, AckType ackType, Map prop if (this.cumulativeAckOfTransaction != null) { persistentSubscription.acknowledgeMessage( Collections.singletonList(this.cumulativeAckOfTransaction.getValue()), - AckType.Cumulative, properties, null); + AckType.Cumulative, properties, null, true); } this.cumulativeAckOfTransaction = null; } else { @@ -774,7 +774,7 @@ private void individualAckCommitCommon(TxnID txnID, Map properties) { if (currentTxn != null) { persistentSubscription.acknowledgeMessage(new ArrayList<>(currentTxn.values()), - AckType.Individual, properties, null); + AckType.Individual, properties, null, true); individualAckOfTransaction.remove(txnID); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckClassicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckClassicTest.java deleted file mode 100644 index 108a022f165a1..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckClassicTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.service; - -import lombok.extern.slf4j.Slf4j; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -@Slf4j -@Test(groups = "broker") -public class ConsumerAckAndDisableBrokerBatchAckClassicTest extends ConsumerAckTest { - - @BeforeClass - @Override - protected void setup() throws Exception { - conf.setAcknowledgmentAtBatchIndexLevelEnabled(false); - conf.setSubscriptionSharedUseClassicPersistentImplementation(true); - super.baseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } - - @Override - @Test(timeOut = 60_000, dataProvider = "argsOfTestAcknowledgeConcurrently") - public void testAcknowledgeConcurrently(boolean enableBatchSend, boolean enableBatchIndexAcknowledgment1, - boolean enableBatchIndexAcknowledgment2) - throws Exception { - super.testAcknowledgeConcurrently(enableBatchSend, enableBatchIndexAcknowledgment1, enableBatchIndexAcknowledgment2); - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckTest.java deleted file mode 100644 index f6fdaa750a9be..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckAndDisableBrokerBatchAckTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.service; - -import lombok.extern.slf4j.Slf4j; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -@Slf4j -@Test(groups = "broker") -public class ConsumerAckAndDisableBrokerBatchAckTest extends ConsumerAckTest { - - @BeforeClass - @Override - protected void setup() throws Exception { - conf.setAcknowledgmentAtBatchIndexLevelEnabled(false); - super.baseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } - - @Override - @Test(timeOut = 60_000, dataProvider = "argsOfTestAcknowledgeConcurrently") - public void testAcknowledgeConcurrently(boolean enableBatchSend, boolean enableBatchIndexAcknowledgment1, - boolean enableBatchIndexAcknowledgment2) - throws Exception { - super.testAcknowledgeConcurrently(enableBatchSend, enableBatchIndexAcknowledgment1, enableBatchIndexAcknowledgment2); - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckClassicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckClassicTest.java deleted file mode 100644 index d86b3327894f6..0000000000000 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckClassicTest.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.service; - -import lombok.extern.slf4j.Slf4j; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -@Slf4j -@Test(groups = "broker") -public class ConsumerAckClassicTest extends ConsumerAckTest { - - @BeforeClass - @Override - protected void setup() throws Exception { - conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); - conf.setSubscriptionSharedUseClassicPersistentImplementation(true); - super.baseSetup(); - } - - @AfterClass(alwaysRun = true) - @Override - protected void cleanup() throws Exception { - super.internalCleanup(); - } - - @Override - @Test(timeOut = 60_000, dataProvider = "argsOfTestAcknowledgeConcurrently") - public void testAcknowledgeConcurrently(boolean enableBatchSend, boolean enableBatchIndexAcknowledgment1, - boolean enableBatchIndexAcknowledgment2) - throws Exception { - super.testAcknowledgeConcurrently(enableBatchSend, enableBatchIndexAcknowledgment1, - enableBatchIndexAcknowledgment2); - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerConcurrencyAckTest.java similarity index 85% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerConcurrencyAckTest.java index 98f705b2830bf..8e8dd3c64b153 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerConcurrencyAckTest.java @@ -40,16 +40,35 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; import org.testng.annotations.Test; @Slf4j @Test(groups = "broker") -public class ConsumerAckTest extends BrokerTestBase { +public class ConsumerConcurrencyAckTest extends BrokerTestBase { + + @DataProvider(name = "brokerParams") + public static Object[][] brokerParams() { + return new Object[][]{ + // subscriptionSharedUseClassicPersistentImplementation, acknowledgmentAtBatchIndexLevelEnabled + {false, true}, + {false, false}, + {true, true}, + {true, false} + }; + } + + @Factory(dataProvider = "brokerParams") + public ConsumerConcurrencyAckTest(boolean subscriptionSharedUseClassicPersistentImplementation, + boolean acknowledgmentAtBatchIndexLevelEnabled) { + conf.setSubscriptionSharedUseClassicPersistentImplementation( + subscriptionSharedUseClassicPersistentImplementation); + conf.setAcknowledgmentAtBatchIndexLevelEnabled(acknowledgmentAtBatchIndexLevelEnabled); + } @BeforeClass @Override protected void setup() throws Exception { - conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); super.baseSetup(); } @@ -63,14 +82,14 @@ protected void cleanup() throws Exception { public Object[][] argsOfTestAcknowledgeConcurrently() { // enableBatchSend, enableBatchIndexAcknowledgment1, enableBatchIndexAcknowledgment2 return new Object[][] { - {true, true, true}, - {true, false, false}, - {true, true, false}, - {true, false, true}, - {false, true, true}, - {false, false, false}, - {false, true, false}, - {false, false, true}, + {true, true, true}, + {true, false, false}, + {true, true, false}, + {true, false, true}, + {false, true, true}, + {false, false, false}, + {false, true, false}, + {false, false, true}, }; } @@ -86,6 +105,12 @@ public Object[][] argsOfTestAcknowledgeConcurrently() { public void testAcknowledgeConcurrently(boolean enableBatchSend, boolean enableBatchIndexAcknowledgment1, boolean enableBatchIndexAcknowledgment2) throws Exception { + log.info("start test. classic dispatcher: {}, broker enabled batch ack: {}, enableBatchSend: {}," + + " enableBatchIndexAcknowledgment1: {}, enableBatchIndexAcknowledgment2: {}", + pulsar.getConfig().isSubscriptionSharedUseClassicPersistentImplementation(), + pulsar.getConfig().isAcknowledgmentAtBatchIndexLevelEnabled(), + enableBatchSend, enableBatchIndexAcknowledgment1, enableBatchIndexAcknowledgment2 + ); PulsarClientImpl client1 = (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); PulsarClientImpl client2 = diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java index 9ad0814bca89b..ee6a7173a22c1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java @@ -26,6 +26,7 @@ import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Shared; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -84,7 +85,7 @@ public void setup() throws Exception { doReturn(Codec.encode("sub-1")).when(cursor).getName(); sub = spy(new PersistentSubscription(persistentTopic, "sub-1", cursor, false)); - doNothing().when(sub).acknowledgeMessage(any(), any(), any(), any()); + doNothing().when(sub).acknowledgeMessage(any(), any(), any(), any(), anyBoolean()); } @AfterMethod(alwaysRun = true) @@ -124,7 +125,7 @@ public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throw commandAck.addMessageId().setEntryId(0L).setLedgerId(1L); consumer.messageAcked(commandAck).get(); - verify(sub, never()).acknowledgeMessage(any(), any(), any(), any()); + verify(sub, never()).acknowledgeMessage(any(), any(), any(), any(), anyBoolean()); } @Test(timeOut = 5000, dataProvider = "notIndividualAckModes") @@ -139,7 +140,7 @@ public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) th commandAck.addMessageId().setEntryId(0L).setLedgerId(1L); consumer.messageAcked(commandAck).get(); - verify(sub, times(1)).acknowledgeMessage(any(), any(), any(), any()); + verify(sub, times(1)).acknowledgeMessage(any(), any(), any(), any(), anyBoolean()); } @Test(timeOut = 5000) @@ -155,6 +156,6 @@ public void testAckWithMoreThanNoneMessageIds() throws Exception { commandAck.addMessageId().setEntryId(0L).setLedgerId(2L); consumer.messageAcked(commandAck).get(); - verify(sub, never()).acknowledgeMessage(any(), any(), any(), any()); + verify(sub, never()).acknowledgeMessage(any(), any(), any(), any(), anyBoolean()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 2a23920d84e88..a4f873339c6a4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1776,7 +1776,7 @@ public void testCompactorSubscription() { Position position = PositionFactory.create(1, 1); long ledgerId = 0xc0bfefeL; sub.acknowledgeMessage(Collections.singletonList(position), AckType.Cumulative, - Map.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId), null); + Map.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId), null, false); verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position, ledgerId); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java index 2c76f0b22a986..7bf4621d055f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java @@ -86,7 +86,7 @@ public void testMarkerDeleteTimes() throws Exception { spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "test", cursor, false); Position position = managedLedger.addEntry("test".getBytes()); persistentSubscription.acknowledgeMessage(Collections.singletonList(position), - AckType.Individual, Collections.emptyMap(), null); + AckType.Individual, Collections.emptyMap(), null, false); verify(managedLedger, times(0)).asyncReadEntry(any(), any(), any()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index 3d0482f9e3712..09c78f227c73e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -191,7 +191,7 @@ public void testCanAcknowledgeAndAbortForTransaction() throws Exception { positionList.add(PositionFactory.create(3, 5)); // Acknowledge from normal consumer will succeed ignoring message acked by ongoing transaction. - persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap(), null); + persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap(), null, false); //Abort txn. persistentSubscription.endTxn(txnID1.getMostSigBits(), txnID2.getLeastSigBits(), TxnAction.ABORT_VALUE, -1); @@ -223,7 +223,7 @@ public void testAcknowledgeUpdateCursorLastActive() throws Exception { positionList.add(PositionFactory.create(1, 1)); long beforeAcknowledgeTimestamp = System.currentTimeMillis(); Thread.sleep(1); - persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap(), null); + persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap(), null, true); // `acknowledgeMessage` should update cursor last active assertTrue(persistentSubscription.cursor.getLastActive() > beforeAcknowledgeTimestamp); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index b322369f1b013..ecd94a452c382 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -22,6 +22,7 @@ import static org.apache.pulsar.broker.BrokerTestUtil.spyWithoutRecordingInvocations; import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.doAnswer; @@ -2136,7 +2137,7 @@ public void testDeleteCompactedLedgerWithSlowAck() throws Exception { Thread.sleep(500); } }).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq( - CommandAck.AckType.Cumulative), Mockito.any(), any()); + CommandAck.AckType.Cumulative), Mockito.any(), any(), anyBoolean()); // trigger compaction admin.topics().triggerCompaction(topicName); From 79c82be884a66876e61784363554e07b0c4591af Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Fri, 1 Aug 2025 10:35:13 +0800 Subject: [PATCH 3/4] fix checkstyle issue --- .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 6 +++--- .../persistent/ReplicatedSubscriptionsController.java | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index bb730b592e006..c2881c150f515 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -20,15 +20,15 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import static org.apache.bookkeeper.mledger.AsyncCallbacks.BatchMsgAckResType; +import static org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; +import static org.apache.bookkeeper.mledger.AsyncCallbacks.PositionAckState; import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; import static org.apache.bookkeeper.mledger.impl.EntryCountEstimator.estimateEntryCountByBytesSize; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; -import static org.apache.bookkeeper.mledger.AsyncCallbacks.BatchMsgAckResType; -import static org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; -import static org.apache.bookkeeper.mledger.AsyncCallbacks.PositionAckState; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.Collections2; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java index 0b93de0e0303f..33c46ca8542d3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java @@ -204,7 +204,8 @@ private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) { PersistentSubscription sub = topic.getSubscription(update.getSubscriptionName()); if (sub != null) { - sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, Collections.emptyMap(), null, false); + sub.acknowledgeMessage(Collections.singletonList(pos), AckType.Cumulative, + Collections.emptyMap(), null, false); } else { // Subscription doesn't exist. We need to force the creation of the subscription in this cluster. log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subscription", From 722cf46d8f8f5505ebb7070e7042783e06abde60 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Fri, 1 Aug 2025 11:38:55 +0800 Subject: [PATCH 4/4] fix checkstyle --- .../broker/service/persistent/PersistentSubscriptionTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index 09c78f227c73e..058d54847c06f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -191,7 +191,8 @@ public void testCanAcknowledgeAndAbortForTransaction() throws Exception { positionList.add(PositionFactory.create(3, 5)); // Acknowledge from normal consumer will succeed ignoring message acked by ongoing transaction. - persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, Collections.emptyMap(), null, false); + persistentSubscription.acknowledgeMessage(positionList, AckType.Individual, + Collections.emptyMap(), null, false); //Abort txn. persistentSubscription.endTxn(txnID1.getMostSigBits(), txnID2.getLeastSigBits(), TxnAction.ABORT_VALUE, -1);