Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ private static final class MessagePublishContext implements PublishContext, Runn

private long entryTimestamp;

private MessageMetadata messageMetadata;

@Override
public long getLedgerId() {
return ledgerId;
Expand Down Expand Up @@ -686,6 +688,17 @@ protected MessagePublishContext newObject(Handle<MessagePublishContext> handle)
}
};

@Override
public MessageMetadata peekMessageMetadata(ByteBuf entryData) {
if (messageMetadata == null) {
entryData.markReaderIndex();
messageMetadata = new MessageMetadata();
Commands.parseMessageMetadata(entryData, messageMetadata);
entryData.resetReaderIndex();
}
return messageMetadata;
}

public void recycle() {
producer = null;
sequenceId = -1L;
Expand All @@ -702,6 +715,10 @@ public void recycle() {
if (propertyMap != null) {
propertyMap.clear();
}
if (messageMetadata != null) {
messageMetadata.clear();
messageMetadata = null;
}
recyclerHandle.recycle(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.EntryFilters;
Expand Down Expand Up @@ -130,6 +131,10 @@ default void setEntryTimestamp(long entryTimestamp) {
default boolean supportsReplDedupByLidAndEid() {
return false;
}

default MessageMetadata peekMessageMetadata(ByteBuf entryData) {
return null;
}
}

CompletableFuture<Void> initialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -294,6 +295,11 @@ protected TopicStatsHelper initialValue() {
@Getter
private final PersistentTopicMetrics persistentTopicMetrics = new PersistentTopicMetrics();

/**
* Counter for counting delayed messages exceed TTL time.
*/
private final Rate ttlExceededDelayedMessagesRate = new Rate();

private volatile PersistentTopicAttributes persistentTopicAttributes = null;
private static final AtomicReferenceFieldUpdater<PersistentTopic, PersistentTopicAttributes>
PERSISTENT_TOPIC_ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
Expand Down Expand Up @@ -638,14 +644,16 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont
decrementPendingWriteOpsAndCheck();
return;
}
if (isExceedMaximumDeliveryDelay(headersAndPayload)) {
if (isExceedMaximumDeliveryDelay(headersAndPayload, publishContext)) {
publishContext.completed(
new NotAllowedException(
String.format("Exceeds max allowed delivery delay of %s milliseconds",
getDelayedDeliveryMaxDelayInMillis())), -1, -1);
decrementPendingWriteOpsAndCheck();
return;
}
// Count exceed ttl delayed messages.
checkDelayedMessageExceededTTL(headersAndPayload, publishContext);

MessageDeduplication.MessageDupStatus status =
messageDeduplication.isDuplicate(publishContext, headersAndPayload);
Expand Down Expand Up @@ -2752,6 +2760,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
this.addEntryLatencyStatsUsec.refresh();
NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
this.addEntryLatencyStatsUsec.reset();
this.ttlExceededDelayedMessagesRate.calculateRate();
}

public double getLastUpdatedAvgPublishRateInMsg() {
Expand Down Expand Up @@ -2825,6 +2834,7 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
stats.ongoingTxnCount = txnBuffer.getOngoingTxnCount();
stats.abortedTxnCount = txnBuffer.getAbortedTxnCount();
stats.committedTxnCount = txnBuffer.getCommittedTxnCount();
stats.ttlExceededDelayedMessages = getTtlExceededDelayedMessages();

replicators.forEach((cluster, replicator) -> {
ReplicatorStatsImpl replicatorStats = replicator.computeStats();
Expand Down Expand Up @@ -4544,14 +4554,16 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
decrementPendingWriteOpsAndCheck();
return;
}
if (isExceedMaximumDeliveryDelay(headersAndPayload)) {
if (isExceedMaximumDeliveryDelay(headersAndPayload, publishContext)) {
publishContext.completed(
new NotAllowedException(
String.format("Exceeds max allowed delivery delay of %s milliseconds",
getDelayedDeliveryMaxDelayInMillis())), -1, -1);
decrementPendingWriteOpsAndCheck();
return;
}
// Count exceed ttl delayed messages.
checkDelayedMessageExceededTTL(headersAndPayload, publishContext);

MessageDeduplication.MessageDupStatus status =
messageDeduplication.isDuplicate(publishContext, headersAndPayload);
Expand Down Expand Up @@ -4784,20 +4796,34 @@ public Optional<TopicName> getShadowSourceTopic() {
return Optional.ofNullable(shadowSourceTopic);
}

protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {
protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload, PublishContext publishContext) {
if (isDelayedDeliveryEnabled()) {
long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis();
if (maxDeliveryDelayInMs > 0) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
return msgMetadata.hasDeliverAtTime()
&& msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs;
MessageMetadata msgMetadata = publishContext.peekMessageMetadata(headersAndPayload);
if (msgMetadata == null || !msgMetadata.hasDeliverAtTime()) {
return false;
}
return msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs;
}
}
return false;
}

private void checkDelayedMessageExceededTTL(ByteBuf headersAndPayload, PublishContext publishContext) {
if (isDelayedDeliveryEnabled()) {
Integer messageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get();
if (messageTTLInSeconds != null && messageTTLInSeconds > 0) {
MessageMetadata msgMetadata = publishContext.peekMessageMetadata(headersAndPayload);
if (msgMetadata != null && msgMetadata.hasDeliverAtTime()) {
if (msgMetadata.getDeliverAtTime() >= (messageTTLInSeconds * 1000L) + System.currentTimeMillis()) {
this.incrementTtlExceededDelayedMessages();
}
}
}
}
}

@Override
public PersistentTopicAttributes getTopicAttributes() {
if (persistentTopicAttributes != null) {
Expand Down Expand Up @@ -4867,4 +4893,12 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {

return future;
}

public void incrementTtlExceededDelayedMessages() {
this.ttlExceededDelayedMessagesRate.recordEvent();
}

public long getTtlExceededDelayedMessages() {
return this.ttlExceededDelayedMessagesRate.getTotalCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ public class TopicStatsImpl implements TopicStats {
/** The last publish timestamp in epoch milliseconds. */
public long lastPublishTimeStamp;

/** The number of delay messages that exceed TTL delay. */
public long ttlExceededDelayedMessages;

public List<? extends PublisherStats> getPublishers() {
return Stream.concat(publishers.stream().sorted(
Comparator.comparing(PublisherStatsImpl::getProducerName, nullsLast(naturalOrder()))),
Expand Down Expand Up @@ -264,6 +267,7 @@ public void reset() {
this.oldestBacklogMessageSubscriptionName = null;
this.topicCreationTimeStamp = 0;
this.lastPublishTimeStamp = 0;
this.ttlExceededDelayedMessages = 0;
}

// if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
Expand Down Expand Up @@ -295,6 +299,7 @@ public TopicStatsImpl add(TopicStats ts) {
this.committedTxnCount = stats.committedTxnCount;
this.backlogQuotaLimitTime = stats.backlogQuotaLimitTime;
this.backlogQuotaLimitSize = stats.backlogQuotaLimitSize;
this.ttlExceededDelayedMessages += stats.ttlExceededDelayedMessages;
if (stats.oldestBacklogMessageAgeSeconds > this.oldestBacklogMessageAgeSeconds) {
this.oldestBacklogMessageAgeSeconds = stats.oldestBacklogMessageAgeSeconds;
this.oldestBacklogMessageSubscriptionName = stats.oldestBacklogMessageSubscriptionName;
Expand Down