From 1cfa3c450ebdc9f8ddb3d16acfd74712a7a8c2e0 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Fri, 21 Nov 2025 00:17:05 +0800 Subject: [PATCH 01/14] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../org/apache/pulsar/broker/PulsarService.java | 13 +++++++++++++ .../apache/pulsar/broker/service/ServerCnx.java | 7 +++++++ .../org/apache/pulsar/broker/service/Topic.java | 15 +++++++++++++++ .../service/persistent/PersistentTopic.java | 15 +++++++++++++++ .../policies/data/stats/TopicStatsImpl.java | 5 +++++ 5 files changed, 55 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index e61fbfac56622..b189bee595784 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -2252,4 +2252,17 @@ public HealthChecker getHealthChecker() { } return healthChecker; } + + /** + * Check if message delay time exceeds TTL + * + * @param topic + * @param deliverAtTime + * @return true if message delay time exceeds TTL, false otherwise + */ + public boolean isMessageDelayTimeExceedTTL(Topic topic, long deliverAtTime) { + return deliverAtTime + >= topic.getHierarchyTopicPolicies().getMessageTTLInSeconds().get() * 1000 + System.currentTimeMillis(); + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 55cd0fe7e3ddb..6085c4363aef5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2042,6 +2042,13 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position); } + + // count delayed message times exceeding the ttl policy time + MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); + if (msgMetadata.hasDeliverAtTime() && service.getPulsar() + .isMessageDelayTimeExceedTTL(producer.getTopic(), msgMetadata.getDeliverAtTime())) { + producer.getTopic().incrementExceedTTLDelayMessages(); + } } private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 8f66d9c0e3e0f..c65d8b90c7525 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -391,4 +391,19 @@ default boolean isSystemTopic() { * @return */ TopicAttributes getTopicAttributes(); + + /** + * Increment exceed TTL delay message + */ + default void incrementExceedTTLDelayMessages() { + } + + /** + * Get exceed TTL delay messages number + * + * @return + */ + default long getExceedTTLDelayMessages() { + return 0; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2f0255f4f2aae..f54f6f2a63d10 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -187,6 +187,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.topics.TopicCompactionStrategy; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; @@ -294,6 +295,8 @@ protected TopicStatsHelper initialValue() { @Getter private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics(); + private final Rate exceedTTLDelayMessage = new Rate(); + private volatile PersistentTopicAttributes persistentTopicAttributes = null; private static final AtomicReferenceFieldUpdater PERSISTENT_TOPIC_ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater( @@ -2752,6 +2755,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats this.addEntryLatencyStatsUsec.refresh(); NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket); this.addEntryLatencyStatsUsec.reset(); + this.exceedTTLDelayMessage.calculateRate(); } public double getLastUpdatedAvgPublishRateInMsg() { @@ -2825,6 +2829,7 @@ public CompletableFuture asyncGetStats(GetStatsOptions stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount(); stats.abortedTxnCount = txnBuffer.getAbortedTxnCount(); stats.committedTxnCount = txnBuffer.getCommittedTxnCount(); + stats.exceedTTLDelayMessages = getExceedTTLDelayMessages(); replicators.forEach((cluster, replicator) -> { ReplicatorStatsImpl replicatorStats = replicator.computeStats(); @@ -4867,4 +4872,14 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { return future; } + + @Override + public void incrementExceedTTLDelayMessages() { + this.exceedTTLDelayMessage.recordEvent(); + } + + @Override + public long getExceedTTLDelayMessages() { + return this.exceedTTLDelayMessage.getCount(); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java index 523ec5ac87d97..e6868aad01b49 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java @@ -186,6 +186,9 @@ public class TopicStatsImpl implements TopicStats { /** The last publish timestamp in epoch milliseconds. */ public long lastPublishTimeStamp; + /** The number of messages that exceed TTL delay. */ + public long exceedTTLDelayMessages; + public List getPublishers() { return Stream.concat(publishers.stream().sorted( Comparator.comparing(PublisherStatsImpl::getProducerName, nullsLast(naturalOrder()))), @@ -264,6 +267,7 @@ public void reset() { this.oldestBacklogMessageSubscriptionName = null; this.topicCreationTimeStamp = 0; this.lastPublishTimeStamp = 0; + this.exceedTTLDelayMessages = 0; } // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current @@ -295,6 +299,7 @@ public TopicStatsImpl add(TopicStats ts) { this.committedTxnCount = stats.committedTxnCount; this.backlogQuotaLimitTime = stats.backlogQuotaLimitTime; this.backlogQuotaLimitSize = stats.backlogQuotaLimitSize; + this.exceedTTLDelayMessages += stats.exceedTTLDelayMessages; if (stats.oldestBacklogMessageAgeSeconds > this.oldestBacklogMessageAgeSeconds) { this.oldestBacklogMessageAgeSeconds = stats.oldestBacklogMessageAgeSeconds; this.oldestBacklogMessageSubscriptionName = stats.oldestBacklogMessageSubscriptionName; From 29ccb033ff88401aeb97f32c4b4b529eef6c0c63 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Fri, 21 Nov 2025 00:19:51 +0800 Subject: [PATCH 02/14] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../src/main/java/org/apache/pulsar/broker/service/Topic.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index c65d8b90c7525..f8e1ee961f87c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -400,7 +400,6 @@ default void incrementExceedTTLDelayMessages() { /** * Get exceed TTL delay messages number - * * @return */ default long getExceedTTLDelayMessages() { From 4148cc44a3752aad707c2a14e940a9b2a4da7766 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Mon, 24 Nov 2025 16:44:21 +0800 Subject: [PATCH 03/14] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../src/main/java/org/apache/pulsar/broker/service/Topic.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index f8e1ee961f87c..c65d8b90c7525 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -400,6 +400,7 @@ default void incrementExceedTTLDelayMessages() { /** * Get exceed TTL delay messages number + * * @return */ default long getExceedTTLDelayMessages() { From 9f386532a22e389371b298e08aca6247804ad025 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Mon, 24 Nov 2025 16:52:31 +0800 Subject: [PATCH 04/14] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../src/main/java/org/apache/pulsar/broker/service/Topic.java | 2 +- .../pulsar/common/policies/data/stats/TopicStatsImpl.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index c65d8b90c7525..e07115522df50 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -393,7 +393,7 @@ default boolean isSystemTopic() { TopicAttributes getTopicAttributes(); /** - * Increment exceed TTL delay message + * Increment exceed TTL delay message number. */ default void incrementExceedTTLDelayMessages() { } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java index e6868aad01b49..7dc7dd68f40f9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java @@ -186,7 +186,7 @@ public class TopicStatsImpl implements TopicStats { /** The last publish timestamp in epoch milliseconds. */ public long lastPublishTimeStamp; - /** The number of messages that exceed TTL delay. */ + /** The number of delay messages that exceed TTL delay. */ public long exceedTTLDelayMessages; public List getPublishers() { From c7a0fe701306efafef89c0abfa16d68dc9e01e98 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Mon, 24 Nov 2025 17:36:54 +0800 Subject: [PATCH 05/14] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../src/main/java/org/apache/pulsar/broker/service/Topic.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index e07115522df50..7fa01d08c9c56 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -400,7 +400,6 @@ default void incrementExceedTTLDelayMessages() { /** * Get exceed TTL delay messages number - * * @return */ default long getExceedTTLDelayMessages() { From 9de09ab198f154e818bf1dd62e191c807940683f Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Tue, 23 Dec 2025 22:23:17 +0800 Subject: [PATCH 06/14] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../apache/pulsar/broker/PulsarService.java | 12 ----- .../pulsar/broker/service/ServerCnx.java | 6 --- .../apache/pulsar/broker/service/Topic.java | 13 ------ .../service/persistent/PersistentTopic.java | 44 ++++++++++++++----- .../policies/data/stats/TopicStatsImpl.java | 6 +-- 5 files changed, 35 insertions(+), 46 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index b189bee595784..9ef7f4e576600 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -2253,16 +2253,4 @@ public HealthChecker getHealthChecker() { return healthChecker; } - /** - * Check if message delay time exceeds TTL - * - * @param topic - * @param deliverAtTime - * @return true if message delay time exceeds TTL, false otherwise - */ - public boolean isMessageDelayTimeExceedTTL(Topic topic, long deliverAtTime) { - return deliverAtTime - >= topic.getHierarchyTopicPolicies().getMessageTTLInSeconds().get() * 1000 + System.currentTimeMillis(); - } - } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 6085c4363aef5..31939fe8fa156 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2043,12 +2043,6 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { send.getNumMessages(), send.isIsChunk(), send.isMarker(), position); } - // count delayed message times exceeding the ttl policy time - MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); - if (msgMetadata.hasDeliverAtTime() && service.getPulsar() - .isMessageDelayTimeExceedTTL(producer.getTopic(), msgMetadata.getDeliverAtTime())) { - producer.getTopic().incrementExceedTTLDelayMessages(); - } } private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 7fa01d08c9c56..56480cfd6c664 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -392,17 +392,4 @@ default boolean isSystemTopic() { */ TopicAttributes getTopicAttributes(); - /** - * Increment exceed TTL delay message number. - */ - default void incrementExceedTTLDelayMessages() { - } - - /** - * Get exceed TTL delay messages number - * @return - */ - default long getExceedTTLDelayMessages() { - return 0; - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f54f6f2a63d10..6e49b06e8d746 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -295,7 +295,7 @@ protected TopicStatsHelper initialValue() { @Getter private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics(); - private final Rate exceedTTLDelayMessage = new Rate(); + private final Rate exceedTTLDelayedMessage = new Rate(); private volatile PersistentTopicAttributes persistentTopicAttributes = null; private static final AtomicReferenceFieldUpdater @@ -2755,7 +2755,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats this.addEntryLatencyStatsUsec.refresh(); NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket); this.addEntryLatencyStatsUsec.reset(); - this.exceedTTLDelayMessage.calculateRate(); + this.exceedTTLDelayedMessage.calculateRate(); } public double getLastUpdatedAvgPublishRateInMsg() { @@ -2829,7 +2829,7 @@ public CompletableFuture asyncGetStats(GetStatsOptions stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount(); stats.abortedTxnCount = txnBuffer.getAbortedTxnCount(); stats.committedTxnCount = txnBuffer.getCommittedTxnCount(); - stats.exceedTTLDelayMessages = getExceedTTLDelayMessages(); + stats.exceedTTLDelayedMessages = getExceedTTLDelayedMessages(); replicators.forEach((cluster, replicator) -> { ReplicatorStatsImpl replicatorStats = replicator.computeStats(); @@ -4792,10 +4792,14 @@ public Optional getShadowSourceTopic() { protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) { if (isDelayedDeliveryEnabled()) { long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis(); + headersAndPayload.markReaderIndex(); + MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); + headersAndPayload.resetReaderIndex(); + // count exceed ttl delayed messages + if (isExceedMessageTTL(msgMetadata)) { + this.incrementExceedTTLDelayedMessages(); + } if (maxDeliveryDelayInMs > 0) { - headersAndPayload.markReaderIndex(); - MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); - headersAndPayload.resetReaderIndex(); return msgMetadata.hasDeliverAtTime() && msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs; } @@ -4873,13 +4877,29 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { return future; } - @Override - public void incrementExceedTTLDelayMessages() { - this.exceedTTLDelayMessage.recordEvent(); + /** + * Check if the message deliver time is expired by message TTL. + * + * @param msgMetadata the message metadata + * @return true if the message deliver time is expired by message TTL, false otherwise + */ + protected boolean isExceedMessageTTL(MessageMetadata msgMetadata) { + Integer messageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get(); + if (messageTTLInSeconds == null || messageTTLInSeconds <= 0) { + return false; + } + if (!msgMetadata.hasDeliverAtTime()) { + return false; + } + return msgMetadata.getDeliverAtTime() >= (messageTTLInSeconds * 1000L) + System.currentTimeMillis(); } - @Override - public long getExceedTTLDelayMessages() { - return this.exceedTTLDelayMessage.getCount(); + + public void incrementExceedTTLDelayedMessages() { + this.exceedTTLDelayedMessage.recordEvent(); + } + + public long getExceedTTLDelayedMessages() { + return this.exceedTTLDelayedMessage.getCount(); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java index 7dc7dd68f40f9..b39638df249b8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java @@ -187,7 +187,7 @@ public class TopicStatsImpl implements TopicStats { public long lastPublishTimeStamp; /** The number of delay messages that exceed TTL delay. */ - public long exceedTTLDelayMessages; + public long exceedTTLDelayedMessages; public List getPublishers() { return Stream.concat(publishers.stream().sorted( @@ -267,7 +267,7 @@ public void reset() { this.oldestBacklogMessageSubscriptionName = null; this.topicCreationTimeStamp = 0; this.lastPublishTimeStamp = 0; - this.exceedTTLDelayMessages = 0; + this.exceedTTLDelayedMessages = 0; } // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current @@ -299,7 +299,7 @@ public TopicStatsImpl add(TopicStats ts) { this.committedTxnCount = stats.committedTxnCount; this.backlogQuotaLimitTime = stats.backlogQuotaLimitTime; this.backlogQuotaLimitSize = stats.backlogQuotaLimitSize; - this.exceedTTLDelayMessages += stats.exceedTTLDelayMessages; + this.exceedTTLDelayedMessages += stats.exceedTTLDelayedMessages; if (stats.oldestBacklogMessageAgeSeconds > this.oldestBacklogMessageAgeSeconds) { this.oldestBacklogMessageAgeSeconds = stats.oldestBacklogMessageAgeSeconds; this.oldestBacklogMessageSubscriptionName = stats.oldestBacklogMessageSubscriptionName; From 506a84de6ed45fbb42534a5b3a663c2ce18486ef Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Tue, 23 Dec 2025 22:25:23 +0800 Subject: [PATCH 07/14] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../pulsar/broker/service/persistent/PersistentTopic.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 6e49b06e8d746..dce70d99436bd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -4900,6 +4900,6 @@ public void incrementExceedTTLDelayedMessages() { } public long getExceedTTLDelayedMessages() { - return this.exceedTTLDelayedMessage.getCount(); + return this.exceedTTLDelayedMessage.getTotalCount(); } } From 89c645e14457d6c62aef423827cd8a8c7d7a5377 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Tue, 23 Dec 2025 22:28:06 +0800 Subject: [PATCH 08/14] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../src/main/java/org/apache/pulsar/broker/PulsarService.java | 1 - .../main/java/org/apache/pulsar/broker/service/ServerCnx.java | 1 - .../src/main/java/org/apache/pulsar/broker/service/Topic.java | 1 - 3 files changed, 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 9ef7f4e576600..e61fbfac56622 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -2252,5 +2252,4 @@ public HealthChecker getHealthChecker() { } return healthChecker; } - } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 31939fe8fa156..55cd0fe7e3ddb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2042,7 +2042,6 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position); } - } private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 56480cfd6c664..8f66d9c0e3e0f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -391,5 +391,4 @@ default boolean isSystemTopic() { * @return */ TopicAttributes getTopicAttributes(); - } From b1afd8a99c89696e0a8cd07e0219eb25b351a97d Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Wed, 24 Dec 2025 21:25:24 +0800 Subject: [PATCH 09/14] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../service/persistent/PersistentTopic.java | 33 ++++++------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index dce70d99436bd..306d1d72b650e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -4792,17 +4792,22 @@ public Optional getShadowSourceTopic() { protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) { if (isDelayedDeliveryEnabled()) { long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis(); + Integer messageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get(); + if (maxDeliveryDelayInMs <= 0 && (messageTTLInSeconds == null || messageTTLInSeconds <= 0)) { + return false; + } headersAndPayload.markReaderIndex(); MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); headersAndPayload.resetReaderIndex(); + if (!msgMetadata.hasDeliverAtTime()) { + return false; + } + long deliverAtTime = msgMetadata.getDeliverAtTime(); // count exceed ttl delayed messages - if (isExceedMessageTTL(msgMetadata)) { + if (deliverAtTime >= (messageTTLInSeconds * 1000L) + System.currentTimeMillis()) { this.incrementExceedTTLDelayedMessages(); } - if (maxDeliveryDelayInMs > 0) { - return msgMetadata.hasDeliverAtTime() - && msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs; - } + return deliverAtTime - msgMetadata.getPublishTime() > maxDeliveryDelayInMs; } return false; } @@ -4877,24 +4882,6 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { return future; } - /** - * Check if the message deliver time is expired by message TTL. - * - * @param msgMetadata the message metadata - * @return true if the message deliver time is expired by message TTL, false otherwise - */ - protected boolean isExceedMessageTTL(MessageMetadata msgMetadata) { - Integer messageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get(); - if (messageTTLInSeconds == null || messageTTLInSeconds <= 0) { - return false; - } - if (!msgMetadata.hasDeliverAtTime()) { - return false; - } - return msgMetadata.getDeliverAtTime() >= (messageTTLInSeconds * 1000L) + System.currentTimeMillis(); - } - - public void incrementExceedTTLDelayedMessages() { this.exceedTTLDelayedMessage.recordEvent(); } From e3d0a46dc57c88784171158d79da85e559c47075 Mon Sep 17 00:00:00 2001 From: wangZhenjiang Date: Tue, 13 Jan 2026 16:30:24 +0800 Subject: [PATCH 10/14] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../service/persistent/PersistentTopic.java | 22 +++++++++++-------- .../policies/data/stats/TopicStatsImpl.java | 6 ++--- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 306d1d72b650e..9fdcd2fcbfd50 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -295,7 +295,10 @@ protected TopicStatsHelper initialValue() { @Getter private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics(); - private final Rate exceedTTLDelayedMessage = new Rate(); + /** + * Counter for counting delayed messages exceed TTL time + */ + private final Rate ttlExceededDelayedMessagesRate = new Rate(); private volatile PersistentTopicAttributes persistentTopicAttributes = null; private static final AtomicReferenceFieldUpdater @@ -2755,7 +2758,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats this.addEntryLatencyStatsUsec.refresh(); NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket); this.addEntryLatencyStatsUsec.reset(); - this.exceedTTLDelayedMessage.calculateRate(); + this.ttlExceededDelayedMessagesRate.calculateRate(); } public double getLastUpdatedAvgPublishRateInMsg() { @@ -2829,7 +2832,7 @@ public CompletableFuture asyncGetStats(GetStatsOptions stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount(); stats.abortedTxnCount = txnBuffer.getAbortedTxnCount(); stats.committedTxnCount = txnBuffer.getCommittedTxnCount(); - stats.exceedTTLDelayedMessages = getExceedTTLDelayedMessages(); + stats.ttlExceededDelayedMessages = getTtlExceededDelayedMessages(); replicators.forEach((cluster, replicator) -> { ReplicatorStatsImpl replicatorStats = replicator.computeStats(); @@ -4804,8 +4807,9 @@ protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) { } long deliverAtTime = msgMetadata.getDeliverAtTime(); // count exceed ttl delayed messages - if (deliverAtTime >= (messageTTLInSeconds * 1000L) + System.currentTimeMillis()) { - this.incrementExceedTTLDelayedMessages(); + if (messageTTLInSeconds != null && messageTTLInSeconds > 0 + && deliverAtTime >= (messageTTLInSeconds * 1000L) + System.currentTimeMillis()) { + this.incrementTtlExceededDelayedMessages(); } return deliverAtTime - msgMetadata.getPublishTime() > maxDeliveryDelayInMs; } @@ -4882,11 +4886,11 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { return future; } - public void incrementExceedTTLDelayedMessages() { - this.exceedTTLDelayedMessage.recordEvent(); + public void incrementTtlExceededDelayedMessages() { + this.ttlExceededDelayedMessagesRate.recordEvent(); } - public long getExceedTTLDelayedMessages() { - return this.exceedTTLDelayedMessage.getTotalCount(); + public long getTtlExceededDelayedMessages() { + return this.ttlExceededDelayedMessagesRate.getTotalCount(); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java index b39638df249b8..e5232fa7dc4f8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java @@ -187,7 +187,7 @@ public class TopicStatsImpl implements TopicStats { public long lastPublishTimeStamp; /** The number of delay messages that exceed TTL delay. */ - public long exceedTTLDelayedMessages; + public long ttlExceededDelayedMessages; public List getPublishers() { return Stream.concat(publishers.stream().sorted( @@ -267,7 +267,7 @@ public void reset() { this.oldestBacklogMessageSubscriptionName = null; this.topicCreationTimeStamp = 0; this.lastPublishTimeStamp = 0; - this.exceedTTLDelayedMessages = 0; + this.ttlExceededDelayedMessages = 0; } // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current @@ -299,7 +299,7 @@ public TopicStatsImpl add(TopicStats ts) { this.committedTxnCount = stats.committedTxnCount; this.backlogQuotaLimitTime = stats.backlogQuotaLimitTime; this.backlogQuotaLimitSize = stats.backlogQuotaLimitSize; - this.exceedTTLDelayedMessages += stats.exceedTTLDelayedMessages; + this.ttlExceededDelayedMessages += stats.ttlExceededDelayedMessages; if (stats.oldestBacklogMessageAgeSeconds > this.oldestBacklogMessageAgeSeconds) { this.oldestBacklogMessageAgeSeconds = stats.oldestBacklogMessageAgeSeconds; this.oldestBacklogMessageSubscriptionName = stats.oldestBacklogMessageSubscriptionName; From 54cccac019762ba48724620cf77ed4bd60c6f52e Mon Sep 17 00:00:00 2001 From: wangZhenjiang Date: Tue, 13 Jan 2026 16:35:27 +0800 Subject: [PATCH 11/14] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../pulsar/broker/service/persistent/PersistentTopic.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 9fdcd2fcbfd50..1049b875acf4e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -296,7 +296,7 @@ protected TopicStatsHelper initialValue() { private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics(); /** - * Counter for counting delayed messages exceed TTL time + * Counter for counting delayed messages exceed TTL time. */ private final Rate ttlExceededDelayedMessagesRate = new Rate(); From be0a169bdb4c81489bc54f660a018c32e1ad3fa7 Mon Sep 17 00:00:00 2001 From: WangZhenjiang Date: Tue, 20 Jan 2026 17:08:17 +0800 Subject: [PATCH 12/14] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../pulsar/broker/service/persistent/PersistentTopic.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1049b875acf4e..723fa5c77b950 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -4795,10 +4795,10 @@ public Optional getShadowSourceTopic() { protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) { if (isDelayedDeliveryEnabled()) { long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis(); - Integer messageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get(); - if (maxDeliveryDelayInMs <= 0 && (messageTTLInSeconds == null || messageTTLInSeconds <= 0)) { + if (maxDeliveryDelayInMs <= 0) { return false; } + Integer messageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get(); headersAndPayload.markReaderIndex(); MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); headersAndPayload.resetReaderIndex(); From 98a80961d4acb29146ea7852fff495a279cdf8dd Mon Sep 17 00:00:00 2001 From: WangZhenjiang Date: Tue, 20 Jan 2026 17:10:05 +0800 Subject: [PATCH 13/14] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../pulsar/broker/service/persistent/PersistentTopic.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 723fa5c77b950..4e0c3cb72e174 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -4798,7 +4798,6 @@ protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) { if (maxDeliveryDelayInMs <= 0) { return false; } - Integer messageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get(); headersAndPayload.markReaderIndex(); MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); headersAndPayload.resetReaderIndex(); @@ -4807,10 +4806,12 @@ protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) { } long deliverAtTime = msgMetadata.getDeliverAtTime(); // count exceed ttl delayed messages + Integer messageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get(); if (messageTTLInSeconds != null && messageTTLInSeconds > 0 && deliverAtTime >= (messageTTLInSeconds * 1000L) + System.currentTimeMillis()) { this.incrementTtlExceededDelayedMessages(); } + return deliverAtTime - msgMetadata.getPublishTime() > maxDeliveryDelayInMs; } return false; From d85fc651abd828c9b56eb45d3ffb5c4b03d637f4 Mon Sep 17 00:00:00 2001 From: WangZhenjiang Date: Thu, 22 Jan 2026 16:17:42 +0800 Subject: [PATCH 14/14] [improve][broker]Add an indicator for the count of expired messages within a cycle --- .../pulsar/broker/service/Producer.java | 17 +++++++ .../apache/pulsar/broker/service/Topic.java | 5 +++ .../service/persistent/PersistentTopic.java | 45 +++++++++++-------- 3 files changed, 48 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 9d0c10802546f..25a6ae9516bb1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -406,6 +406,8 @@ private static final class MessagePublishContext implements PublishContext, Runn private long entryTimestamp; + private MessageMetadata messageMetadata; + @Override public long getLedgerId() { return ledgerId; @@ -686,6 +688,17 @@ protected MessagePublishContext newObject(Handle handle) } }; + @Override + public MessageMetadata peekMessageMetadata(ByteBuf entryData) { + if (messageMetadata == null) { + entryData.markReaderIndex(); + messageMetadata = new MessageMetadata(); + Commands.parseMessageMetadata(entryData, messageMetadata); + entryData.resetReaderIndex(); + } + return messageMetadata; + } + public void recycle() { producer = null; sequenceId = -1L; @@ -702,6 +715,10 @@ public void recycle() { if (propertyMap != null) { propertyMap.clear(); } + if (messageMetadata != null) { + messageMetadata.clear(); + messageMetadata = null; + } recyclerHandle.recycle(this); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 8f66d9c0e3e0f..32cb206d48afc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -35,6 +35,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.EntryFilters; @@ -130,6 +131,10 @@ default void setEntryTimestamp(long entryTimestamp) { default boolean supportsReplDedupByLidAndEid() { return false; } + + default MessageMetadata peekMessageMetadata(ByteBuf entryData) { + return null; + } } CompletableFuture initialize(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4e0c3cb72e174..17f5db7a3412f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -644,7 +644,7 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont decrementPendingWriteOpsAndCheck(); return; } - if (isExceedMaximumDeliveryDelay(headersAndPayload)) { + if (isExceedMaximumDeliveryDelay(headersAndPayload, publishContext)) { publishContext.completed( new NotAllowedException( String.format("Exceeds max allowed delivery delay of %s milliseconds", @@ -652,6 +652,8 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont decrementPendingWriteOpsAndCheck(); return; } + // Count exceed ttl delayed messages. + checkDelayedMessageExceededTTL(headersAndPayload, publishContext); MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload); @@ -4552,7 +4554,7 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon decrementPendingWriteOpsAndCheck(); return; } - if (isExceedMaximumDeliveryDelay(headersAndPayload)) { + if (isExceedMaximumDeliveryDelay(headersAndPayload, publishContext)) { publishContext.completed( new NotAllowedException( String.format("Exceeds max allowed delivery delay of %s milliseconds", @@ -4560,6 +4562,8 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon decrementPendingWriteOpsAndCheck(); return; } + // Count exceed ttl delayed messages. + checkDelayedMessageExceededTTL(headersAndPayload, publishContext); MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload); @@ -4792,29 +4796,32 @@ public Optional getShadowSourceTopic() { return Optional.ofNullable(shadowSourceTopic); } - protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) { + protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload, PublishContext publishContext) { if (isDelayedDeliveryEnabled()) { long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis(); - if (maxDeliveryDelayInMs <= 0) { - return false; - } - headersAndPayload.markReaderIndex(); - MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); - headersAndPayload.resetReaderIndex(); - if (!msgMetadata.hasDeliverAtTime()) { - return false; + if (maxDeliveryDelayInMs > 0) { + MessageMetadata msgMetadata = publishContext.peekMessageMetadata(headersAndPayload); + if (msgMetadata == null || !msgMetadata.hasDeliverAtTime()) { + return false; + } + return msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs; } - long deliverAtTime = msgMetadata.getDeliverAtTime(); - // count exceed ttl delayed messages + } + return false; + } + + private void checkDelayedMessageExceededTTL(ByteBuf headersAndPayload, PublishContext publishContext) { + if (isDelayedDeliveryEnabled()) { Integer messageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get(); - if (messageTTLInSeconds != null && messageTTLInSeconds > 0 - && deliverAtTime >= (messageTTLInSeconds * 1000L) + System.currentTimeMillis()) { - this.incrementTtlExceededDelayedMessages(); + if (messageTTLInSeconds != null && messageTTLInSeconds > 0) { + MessageMetadata msgMetadata = publishContext.peekMessageMetadata(headersAndPayload); + if (msgMetadata != null && msgMetadata.hasDeliverAtTime()) { + if (msgMetadata.getDeliverAtTime() >= (messageTTLInSeconds * 1000L) + System.currentTimeMillis()) { + this.incrementTtlExceededDelayedMessages(); + } + } } - - return deliverAtTime - msgMetadata.getPublishTime() > maxDeliveryDelayInMs; } - return false; } @Override