diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 9d0c10802546f..25a6ae9516bb1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -406,6 +406,8 @@ private static final class MessagePublishContext implements PublishContext, Runn private long entryTimestamp; + private MessageMetadata messageMetadata; + @Override public long getLedgerId() { return ledgerId; @@ -686,6 +688,17 @@ protected MessagePublishContext newObject(Handle handle) } }; + @Override + public MessageMetadata peekMessageMetadata(ByteBuf entryData) { + if (messageMetadata == null) { + entryData.markReaderIndex(); + messageMetadata = new MessageMetadata(); + Commands.parseMessageMetadata(entryData, messageMetadata); + entryData.resetReaderIndex(); + } + return messageMetadata; + } + public void recycle() { producer = null; sequenceId = -1L; @@ -702,6 +715,10 @@ public void recycle() { if (propertyMap != null) { propertyMap.clear(); } + if (messageMetadata != null) { + messageMetadata.clear(); + messageMetadata = null; + } recyclerHandle.recycle(this); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 8f66d9c0e3e0f..32cb206d48afc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -35,6 +35,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.EntryFilters; @@ -130,6 +131,10 @@ default void setEntryTimestamp(long entryTimestamp) { default boolean supportsReplDedupByLidAndEid() { return false; } + + default MessageMetadata peekMessageMetadata(ByteBuf entryData) { + return null; + } } CompletableFuture initialize(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2f0255f4f2aae..17f5db7a3412f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -187,6 +187,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.topics.TopicCompactionStrategy; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; @@ -294,6 +295,11 @@ protected TopicStatsHelper initialValue() { @Getter private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics(); + /** + * Counter for counting delayed messages exceed TTL time. + */ + private final Rate ttlExceededDelayedMessagesRate = new Rate(); + private volatile PersistentTopicAttributes persistentTopicAttributes = null; private static final AtomicReferenceFieldUpdater PERSISTENT_TOPIC_ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater( @@ -638,7 +644,7 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont decrementPendingWriteOpsAndCheck(); return; } - if (isExceedMaximumDeliveryDelay(headersAndPayload)) { + if (isExceedMaximumDeliveryDelay(headersAndPayload, publishContext)) { publishContext.completed( new NotAllowedException( String.format("Exceeds max allowed delivery delay of %s milliseconds", @@ -646,6 +652,8 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont decrementPendingWriteOpsAndCheck(); return; } + // Count exceed ttl delayed messages. + checkDelayedMessageExceededTTL(headersAndPayload, publishContext); MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload); @@ -2752,6 +2760,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats this.addEntryLatencyStatsUsec.refresh(); NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket); this.addEntryLatencyStatsUsec.reset(); + this.ttlExceededDelayedMessagesRate.calculateRate(); } public double getLastUpdatedAvgPublishRateInMsg() { @@ -2825,6 +2834,7 @@ public CompletableFuture asyncGetStats(GetStatsOptions stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount(); stats.abortedTxnCount = txnBuffer.getAbortedTxnCount(); stats.committedTxnCount = txnBuffer.getCommittedTxnCount(); + stats.ttlExceededDelayedMessages = getTtlExceededDelayedMessages(); replicators.forEach((cluster, replicator) -> { ReplicatorStatsImpl replicatorStats = replicator.computeStats(); @@ -4544,7 +4554,7 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon decrementPendingWriteOpsAndCheck(); return; } - if (isExceedMaximumDeliveryDelay(headersAndPayload)) { + if (isExceedMaximumDeliveryDelay(headersAndPayload, publishContext)) { publishContext.completed( new NotAllowedException( String.format("Exceeds max allowed delivery delay of %s milliseconds", @@ -4552,6 +4562,8 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon decrementPendingWriteOpsAndCheck(); return; } + // Count exceed ttl delayed messages. + checkDelayedMessageExceededTTL(headersAndPayload, publishContext); MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload); @@ -4784,20 +4796,34 @@ public Optional getShadowSourceTopic() { return Optional.ofNullable(shadowSourceTopic); } - protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) { + protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload, PublishContext publishContext) { if (isDelayedDeliveryEnabled()) { long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis(); if (maxDeliveryDelayInMs > 0) { - headersAndPayload.markReaderIndex(); - MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); - headersAndPayload.resetReaderIndex(); - return msgMetadata.hasDeliverAtTime() - && msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs; + MessageMetadata msgMetadata = publishContext.peekMessageMetadata(headersAndPayload); + if (msgMetadata == null || !msgMetadata.hasDeliverAtTime()) { + return false; + } + return msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs; } } return false; } + private void checkDelayedMessageExceededTTL(ByteBuf headersAndPayload, PublishContext publishContext) { + if (isDelayedDeliveryEnabled()) { + Integer messageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get(); + if (messageTTLInSeconds != null && messageTTLInSeconds > 0) { + MessageMetadata msgMetadata = publishContext.peekMessageMetadata(headersAndPayload); + if (msgMetadata != null && msgMetadata.hasDeliverAtTime()) { + if (msgMetadata.getDeliverAtTime() >= (messageTTLInSeconds * 1000L) + System.currentTimeMillis()) { + this.incrementTtlExceededDelayedMessages(); + } + } + } + } + } + @Override public PersistentTopicAttributes getTopicAttributes() { if (persistentTopicAttributes != null) { @@ -4867,4 +4893,12 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { return future; } + + public void incrementTtlExceededDelayedMessages() { + this.ttlExceededDelayedMessagesRate.recordEvent(); + } + + public long getTtlExceededDelayedMessages() { + return this.ttlExceededDelayedMessagesRate.getTotalCount(); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java index 523ec5ac87d97..e5232fa7dc4f8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java @@ -186,6 +186,9 @@ public class TopicStatsImpl implements TopicStats { /** The last publish timestamp in epoch milliseconds. */ public long lastPublishTimeStamp; + /** The number of delay messages that exceed TTL delay. */ + public long ttlExceededDelayedMessages; + public List getPublishers() { return Stream.concat(publishers.stream().sorted( Comparator.comparing(PublisherStatsImpl::getProducerName, nullsLast(naturalOrder()))), @@ -264,6 +267,7 @@ public void reset() { this.oldestBacklogMessageSubscriptionName = null; this.topicCreationTimeStamp = 0; this.lastPublishTimeStamp = 0; + this.ttlExceededDelayedMessages = 0; } // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current @@ -295,6 +299,7 @@ public TopicStatsImpl add(TopicStats ts) { this.committedTxnCount = stats.committedTxnCount; this.backlogQuotaLimitTime = stats.backlogQuotaLimitTime; this.backlogQuotaLimitSize = stats.backlogQuotaLimitSize; + this.ttlExceededDelayedMessages += stats.ttlExceededDelayedMessages; if (stats.oldestBacklogMessageAgeSeconds > this.oldestBacklogMessageAgeSeconds) { this.oldestBacklogMessageAgeSeconds = stats.oldestBacklogMessageAgeSeconds; this.oldestBacklogMessageSubscriptionName = stats.oldestBacklogMessageSubscriptionName;