-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before asking
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
Version
any released version, master branch
Minimal reproduce step
This was found when analysing the source code, so no reproducer yet.
A test application that consumes and acks messages while a topic is being frequently unloaded will reproduce this issue.
What did you expect to see?
When ack receipt is enabled, the broker should respond with a failure when it discards an ack.
What did you see instead?
When the consumer isn't found, it's silently discarded. There's debug logging, but no ack receipt response logic to respond with an error:
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Lines 1982 to 1988 in 6c300f5
| } else { | |
| if (log.isDebugEnabled()) { | |
| log.debug("Consumer future is not complete(not complete or error), but received command ack. so discard" | |
| + " this command. consumerId: {}, cnx: {}, messageIdCount: {}", ack.getConsumerId(), | |
| this.toString(), ack.getMessageIdsCount()); | |
| } | |
| } |
and
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Lines 1952 to 1960 in 6c300f5
| var pulsar = getBrokerService().getPulsar(); | |
| var ignoredAckCount = ack.getMessageIdsCount(); | |
| var ignoredAckTotalCount = ExtensibleLoadManagerImpl.get(pulsar).getIgnoredAckCount(). | |
| addAndGet(ignoredAckCount); | |
| if (log.isDebugEnabled()) { | |
| log.debug("[{}] [{}] Ignoring {} message acks during topic transfer. Total ignored ack count: {}", | |
| subscription, consumerId, ignoredAckCount, ignoredAckTotalCount); | |
| } | |
| return; |
Anything else?
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!
Metadata
Metadata
Assignees
Labels
Type
Projects
Status