From c3a120ac5b986a6e6185a0db52249df3e8080588 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Sun, 1 Jun 2025 20:05:10 +0800 Subject: [PATCH 1/9] [improve][pip] PIP-423: Add Support for Cancelling Individual Delayed Messages --- pip/pip-423.md | 352 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 352 insertions(+) create mode 100644 pip/pip-423.md diff --git a/pip/pip-423.md b/pip/pip-423.md new file mode 100644 index 0000000000000..6682faa7c10b0 --- /dev/null +++ b/pip/pip-423.md @@ -0,0 +1,352 @@ +# PIP-423: Add Support for Cancelling Individual Delayed Messages + +# Background knowledge + +* **Delayed Messages:** Apache Pulsar allows producers to send messages that are scheduled to be delivered to consumers at a specified future time. This is achieved by setting a `deliverAt` or `deliverAfter` timestamp on the message. The broker then holds these messages and only dispatches them to consumers once their scheduled delivery time is reached. +* **DelayedDeliveryTracker:** For persistent topics, when delayed message delivery is enabled, a `DelayedDeliveryTracker` is used on the broker to manage the schedule of these messages. There are two main implementations: + * `InMemoryDelayedDeliveryTracker`: Keeps all delayed message indexes in memory. Suitable for a smaller number of delayed messages. + * `BucketDelayedDeliveryTracker`: A more scalable solution that uses a hybrid approach. It keeps recent delayed message indexes in a mutable in-memory bucket and spills older/larger sets of indexes into persistent snapshots stored in BookKeeper (via `BookkeeperBucketSnapshotStorage`). This allows handling a large number of delayed messages without exhausting broker memory. When a snapshot is persisted, it contains `DelayedIndex` entries, each representing a delayed message with its original `ledgerId`, `entryId`, and scheduled `timestamp`. +* **Message Identification (LedgerId, EntryId):** In Pulsar, messages are stored in ledgers. Each message within a ledger has a unique `entryId`. The combination of `ledgerId` and `entryId` (often represented as `Position`) uniquely identifies a message within a topic. +* **Current State before this PIP:** Pulsar does not offer a native mechanism to cancel a delayed message once it has been sent by the producer and accepted by the broker. If circumstances change and a scheduled message becomes irrelevant, it would still be delivered at its scheduled time. + +# Motivation + +In modern event-driven architectures, the ability to react to changing conditions is crucial. While Pulsar's delayed message feature is valuable for scheduling future tasks or deliveries, the lack of a cancellation mechanism presents a limitation. +Use cases where cancellation is important include: +* An order is placed with delayed processing, but the user cancels the order before processing begins. +* A scheduled notification becomes obsolete due to a subsequent user action. +* System alerts scheduled for future delivery are resolved before the delivery time. + +Without a cancellation feature, applications might resort to workarounds, such as: +* Consumers receiving and then filtering/ignoring obsolete delayed messages, which wastes processing resources and bandwidth. +* Maintaining complex state management on the client-side to track and ignore messages, which is error-prone and doesn't prevent the message from being delivered by the broker. + +This proposal aims to introduce a robust mechanism to cancel delayed messages directly at the broker level, preventing their delivery if they are no longer needed. + +# Goals + +## In Scope + +* Provide a mechanism for users to request the cancellation of a specific delayed message. +* The cancellation request will target a message identified by its `ledgerId`, `entryId`, and original `deliverAt` timestamp. +* The cancellation will be applied at the broker's `DelayedDeliveryTracker` level. +* Introduce a new Admin API endpoint for cancelling delayed messages. +* Provide a corresponding `pulsar-admin` CLI command. +* Support cancellation for `BucketDelayedDeliveryTracker`. +* Ensure that once a message is successfully cancelled, it will not be delivered to consumers. + +## Out of Scope + +* Client-side (producer/consumer) API for direct cancellation within application code (cancellation is initiated via Admin API). +* Complex wildcard-based cancellation (e.g., cancel all messages for a user). Cancellation is per specific message ID. +* Guaranteed immediate cancellation across geo-replicated clusters (cancellation is primarily a per-cluster operation, though admin command can target partitions). + +# High Level Design + +The proposed solution introduces a mechanism to mark a specific delayed message for cancellation within the broker's `DelayedDeliveryTracker`. + +1. **Cancellation Request:** A user initiates a cancellation request via a new Pulsar Admin API endpoint. The request must specify the target topic, the `ledgerId` and `entryId` of the delayed message to be cancelled, and its original scheduled `deliverAt` timestamp. The `deliverAt` timestamp helps in locating or scheduling the cancellation appropriately if the message is not immediately found in active memory. Optionally, specific subscription names can be provided; otherwise, the cancellation applies to all subscriptions on the topic. + +2. **Dispatcher Involvement:** The Admin API call on the broker handling the request will identify the relevant topic and its subscriptions. For each subscription, it will interact with the `Dispatcher` associated with it. + +3. **DelayedDeliveryTracker Update:** The `Dispatcher` will delegate the cancellation to its `DelayedDeliveryTracker`. + * A new method, `applyDelayOperation(ledgerId, entryId, deliverAt, operationType)`, is introduced in `DelayedDeliveryTracker`. For cancellation, `operationType` will be `CANCEL`. + * **`BucketDelayedDeliveryTracker` Logic:** + * When a `CANCEL` operation is received: + 1. It first checks if the target message (`ledgerId`, `entryId`) is currently tracked (i.e., its index bit is set in the mutable or immutable buckets' bitmaps). If found, its index bit is cleared, effectively removing it from being scheduled, and `numberDelayedMessages` is decremented. A record of this cancellation is stored in a `canceledMessages` set. + 2. If the target message is *not* currently found in the tracker's active indexes (e.g., it's scheduled far in the future and its bucket hasn't been loaded, or it was already delivered/acknowledged), a "cancel command" (a special `DelayedIndex` entry with `DelayedOperationType.CANCEL`) is added to the `lastMutableBucket`. This cancel command is scheduled to be processed approximately `2 * tickTimeMillis` before the original `deliverAt` time of the target message. This ensures the cancel command is loaded and processed before the target message itself would be. + * **Persistence of Cancel Commands:** These cancel commands are persisted as part of bucket snapshots. The `DelayedIndex` proto is extended to include an `DelayedOperationType` field (`DELAY` or `CANCEL`). + * **Recovery:** During recovery from snapshots, `DelayedIndex` entries of type `CANCEL` are loaded into the `canceledMessages` set. Regular `DELAY` type messages are only added to the scheduling queue if they are not present in `canceledMessages`. + * **Dispatch Filtering:** When `getScheduledMessages()` is called to retrieve messages for dispatch, if a message ID from the `sharedBucketPriorityQueue` is found in `canceledMessages`, it is skipped and its index bit removed. + * **Cleanup of `canceledMessages`:** To prevent unbounded growth, entries in `canceledMessages` (and the persisted CANCEL commands) will have an expiry mechanism. A `canceledMessagesExpiryQueue` tracks these entries, and they are cleaned up after a certain period (e.g., `original_deliver_at + 2 * tickTimeMillis`) post their relevance. + +4. **Result:** If the cancellation is successfully processed by the tracker, the target delayed message will not be delivered to any consumer on the specified subscriptions. + +# Detailed Design + +## Design & Implementation Details + +### 1. `DelayedOperationType` Enum +A new enum `DelayedOperationType` is defined in `pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto`: +```protobuf +enum DelayedOperationType { + DELAY = 0; + CANCEL = 1; +} +``` +This enum is used to differentiate between regular delayed messages and cancellation commands within the `DelayedDeliveryTracker` and its persisted snapshots. + +### 2. `DelayedIndex` Proto Modification +The `DelayedIndex` message in `DelayedMessageIndexBucketSegment.proto` is extended: +```protobuf +message DelayedIndex { + required uint64 timestamp = 1; + required uint64 ledger_id = 2; + required uint64 entry_id = 3; + optional DelayedOperationType delayed_operation_type = 4 [default = DELAY]; +} +``` + +### 3. `DelayedDeliveryTracker` Interface +The `DelayedDeliveryTracker` interface is extended with a new method: +```java +public interface DelayedDeliveryTracker extends AutoCloseable { + // ... + boolean applyDelayOperation(long ledgerId, long entryId, long deliverAt, DelayedOperationType operationType); +} +``` +* `ledgerId`, `entryId`: Identify the target delayed message. +* `deliverAt`: The original scheduled delivery timestamp of the target delayed message. +* `operationType`: Either `DELAY` (for adding/tracking) or `CANCEL`. + +### 4. `BucketDelayedDeliveryTracker` Enhancements +* **`applyDelayOperation(ledgerId, entryId, deliverAt, DelayedOperationType.CANCEL)`:** + * Calls `doCancelOperation(ledgerId, entryId, deliverAt)`. +* **`doCancelOperation(long ledgerId, long entryId, long deliverAt)`:** + * Synchronized method. + * Checks if the message `(ledgerId, entryId)` exists using `containsMessage()`. + * If **yes**: + * Calls `removeIndexBit(ledgerId, entryId)` to remove it from tracking bitmaps. + * Adds `(ledgerId, entryId)` to the `canceledMessages` set. + * Adds `(ledgerId, entryId)` to `canceledMessagesExpiryQueue`, scheduled to expire at `deliverAt + 2 * tickTimeMillis` (original delivery time of target message + buffer). + * Decrements `numberDelayedMessages`. + * Returns `true`. + * If **no** (message not currently tracked, e.g., too far in future, or already past): + * Checks if `deliverAt` (original delivery time) is valid (not in the past relative to cutoff). If invalid, return `false`. + * Calculates `cancelTime = Math.max(clock.millis(), deliverAt - (2 * tickTimeMillis))`. This is when the "cancel command" itself should be processed. + * Adds a cancel command to `lastMutableBucket`: `lastMutableBucket.addMessage(ledgerId, entryId, cancelTime, DelayedOperationType.CANCEL)`. Note: `ledgerId` and `entryId` here refer to the *target* message. This adds a marker that will be processed at `cancelTime`. + * Returns `true`. +* **`canceledMessages`:** A `ConcurrentLongPairSet` storing `(ledgerId, entryId)` of messages for which cancellation has been processed or requested. +* **`canceledMessagesExpiryQueue`:** A `TripleLongPriorityQueue` storing `(expiryTimestamp, ledgerId, entryId)` for entries in `canceledMessages`. Used by `canceledMessagesCleanup()`. +* **`addCanceledMessage(long ledgerId, long entryId, long timestamp)`:** Helper method to add to `canceledMessages` and schedule expiry in `canceledMessagesExpiryQueue`. The `timestamp` here is the reference time for expiry calculation (e.g., CANCEL command's processing time or target message's original delivery time). +* **Recovery (`recoverBucketSnapshot`)**: + * When loading `DelayedIndex` entries from a snapshot: + * If `delayed_operation_type` is `CANCEL`, `addCanceledMessage(ledgerId, entryId, timestamp)` is called. + * If `delayed_operation_type` is `DELAY` (or not set), it's added to `sharedBucketPriorityQueue` only if `(ledgerId, entryId)` is *not* in `canceledMessages`. +* **Message Scheduling (`getScheduledMessages`)**: + * When peeking messages from `sharedBucketPriorityQueue`: + * If `(ledgerId, entryId)` is found in `canceledMessages`, the message is popped, `removeIndexBit()` is called, `numberDelayedMessages` is decremented if applicable, and the message is skipped (not returned for dispatch). +* **Cleanup (`run()` and `canceledMessagesCleanup()`):** + * The `run()` method, called periodically by the timer, now also calls `canceledMessagesCleanup()`. + * `canceledMessagesCleanup()`: Iterates `canceledMessagesExpiryQueue`. If an entry's `expiryTimestamp <= clock.millis()`, it's removed from `canceledMessagesExpiryQueue` and `canceledMessages`. This prevents unbounded growth of the `canceledMessages` set. +* **Clearing State (`clear()`, `close()`):** These methods are updated to also clear/close `canceledMessages` and `canceledMessagesExpiryQueue`. + +### 5. `MutableBucket` Enhancements +* **`addMessage(long ledgerId, long entryId, long deliverAt, DelayedOperationType operationType)`:** + * If `operationType` is `CANCEL`: + * Adds `(deliverAt, ledgerId, entryId)` to its internal `priorityQueue`. `deliverAt` here is the scheduled processing time for the CANCEL command. + * Adds `(ledgerId, entryId)` to its `canceledOperations` set (identifies the target message). + * **Crucially, `putIndexBit()` is NOT called for CANCEL operations.** This means CANCEL operations don't contribute to the message count/bitmap of messages to be delivered from this bucket. +* **Sealing and Persisting (`createImmutableBucketAndAsyncPersistent`)**: + * When creating `DelayedIndex` entries for the snapshot from its `priorityQueue`: + * It checks if `(ledgerId, entryId)` is in `canceledOperations`. If so, the `DelayedIndex` is marked with `DelayedOperationType.CANCEL`. Otherwise, it's marked `DELAY`. + * When moving messages from the first segment of a newly created immutable bucket to the `sharedBucketPriorityQueue`, it only adds them if they are not in `canceledOperations`. +* **`clearCanceledOperations()`**: Called after an immutable bucket is successfully created and persisted, to clear the `canceledOperations` set in the (now old) mutable bucket. + +### 6. `Dispatcher` Interface +The `Dispatcher` interface is extended: +```java +public interface Dispatcher { + // ... + default CompletableFuture cancelDelayedMessage(long ledgerId, long entryId, long deliverAt) { + return CompletableFuture.completedFuture(false); // Default for non-supporting dispatchers + } +} +``` + +### 7. `PersistentDispatcherMultipleConsumers` +Implements `cancelDelayedMessage`: +```java +@Override +public CompletableFuture cancelDelayedMessage(long ledgerId, long entryId, long deliverAt) { + synchronized (this) { + if (delayedDeliveryTracker.isPresent()) { + boolean result = delayedDeliveryTracker.get() + .applyDelayOperation(ledgerId, entryId, deliverAt, DelayedOperationType.CANCEL); + // Logging ... + return CompletableFuture.completedFuture(result); + } else { + // Logging ... + return CompletableFuture.completedFuture(false); + } + } +} +``` + +## Public-facing Changes + +### Public API + +A new REST Admin API endpoint is added to `PersistentTopics`: + +* **Path (v1):** `POST /admin/v1/persistent/{property}/{cluster}/{namespace}/{topic}/cancelDelayedMessage` + +```java + @POST + @Path("/{property}/{cluster}/{namespace}/{topic}/cancelDelayedMessage") + @ApiOperation(hidden = true, value = "Cancel a delayed message on specified subscriptions" + + " (or all if none specified).") + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Operation successful"), + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace or topic or subscription does not exist"), + @ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"), + @ApiResponse(code = 412, message = "Failed to cancel delayed message or invalid parameters"), + @ApiResponse(code = 500, message = "Internal server error")}) + public void cancelDelayedMessage( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the property (tenant)", required = true) + @PathParam("property") String property, + @ApiParam(value = "Specify the cluster", required = true) + @PathParam("cluster") String cluster, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Ledger ID of the target delayed message", required = true) + @QueryParam("ledgerId") long ledgerId, + @ApiParam(value = "Entry ID of the target delayed message", required = true) + @QueryParam("entryId") long entryId, + @ApiParam(value = "Original deliverAt time of the target delayed message (in milliseconds from epoch)", + required = true) + @QueryParam("deliverAt") long deliverAt, + @ApiParam(value = "List of subscription names to cancel on (comma-separated, empty or null for" + + " all subscriptions)") + @QueryParam("subscriptionNames") List subscriptionNames) { + try { + validateTopicName(property, cluster, namespace, encodedTopic); + if (ledgerId < 0 || entryId < 0 || deliverAt <= 0) { + asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, + "ledgerId, entryId must be non-negative, and deliverAt must be positive.")); + return; + } + List finalSubscriptionNames = (subscriptionNames == null || subscriptionNames.isEmpty()) + ? null : subscriptionNames; + internalCancelDelayedMessage(asyncResponse, ledgerId, entryId, deliverAt, + finalSubscriptionNames, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } +``` + +* **Path (v2):** `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/cancelDelayedMessage` + +```java + @POST + @Path("/{tenant}/{namespace}/{topic}/cancelDelayedMessage") + @ApiOperation(value = "Cancel a delayed message on specified subscriptions (or all if none specified).") + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Operation successful"), + @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), + @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Namespace or topic or subscription does not exist"), + @ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"), + @ApiResponse(code = 412, message = "Failed to cancel delayed message or invalid parameters"), + @ApiResponse(code = 500, message = "Internal server error")}) + public void cancelDelayedMessage( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Ledger ID of the target delayed message", required = true) + @QueryParam("ledgerId") long ledgerId, + @ApiParam(value = "Entry ID of the target delayed message", required = true) + @QueryParam("entryId") long entryId, + @ApiParam(value = "Original deliverAt time of the target delayed message (in milliseconds from epoch)", + required = true) + @QueryParam("deliverAt") long deliverAt, + @ApiParam(value = "List of subscription names to cancel on (empty or null for all subscriptions)") + @QueryParam("subscriptionNames") List subscriptionNames) { + try { + validateTopicName(tenant, namespace, encodedTopic); + if (ledgerId < 0 || entryId < 0 || deliverAt <= 0) { + asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, + "ledgerId, entryId, and deliverAt must be positive.")); + return; + } + List finalSubscriptionNames = (subscriptionNames == null || subscriptionNames.isEmpty()) + ? null : subscriptionNames; + internalCancelDelayedMessage(asyncResponse, ledgerId, entryId, deliverAt, + finalSubscriptionNames, authoritative); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } +``` + +### Binary protocol + +pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto + +```protobuf +message DelayedIndex { + ... + optional DelayedOperationType delayed_operation_type = 4 [default = DELAY]; +} + +enum DelayedOperationType { + DELAY = 0; + CANCEL = 1; +} +``` + +### Configuration + +### CLI + +A new CLI command is added to `pulsar-admin topics`: + +* **Command:** `cancel-delayed-message` +* **Synopsis:** `pulsar-admin topics cancel-delayed-message [options]` +* **Parameters:** + * `topic-name` (String, required): The name of the topic (e.g., `persistent://tenant/namespace/topic`). +* **Options:** + * `-l, --ledgerId ` (required): Ledger ID of the message to cancel. + * `-e, --entryId ` (required): Entry ID of the message to cancel. + * `-t, --deliverAt ` (required): Original scheduled delivery time (timestamp in ms from epoch) of the message to cancel. + * `-s, --subscriptionNames ` (optional): Comma-separated list of subscription names to target. If not specified, applies to all subscriptions. +* **Example:** +```bash + pulsar-admin topics cancel-delayed-message persistent://public/default/my-topic \ + --ledgerId 12345 --entryId 100 --deliverAt 1678886400000 \ + --subscriptionNames sub1,sub2 +``` + +# Backward & Forward Compatibility + +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations + +* The `cancelDelayedMessage` operation is performed on a per-cluster basis via the Admin API. It directly modifies the state of the `DelayedDeliveryTracker` in the cluster where the command is executed. +* Delayed messages themselves are not typically subject to geo-replication of their "delayed" state; they are produced to a specific cluster. The message data might be replicated if the topic is replicated, but its delayed scheduling is local. +* **Upgrade/Downgrade:** + * If different clusters in a geo-replicated setup are on different versions (one with cancellation, one without), a message cancelled in the upgraded cluster will not be automatically cancelled in the non-upgraded cluster. Cancellation would need to be issued to each cluster's admin endpoint separately if desired. + * No specific issues related to geo-replication data consistency are expected beyond the per-cluster nature of this operation. + +# Alternatives + +1. **Client-Side Filtering (Current Workaround):** Consumers receive all delayed messages and filter out unwanted ones based on application logic. + * *Rejected because:* Inefficient, wastes resources, and doesn't prevent broker-level delivery. + +2. **Mark Message with Special Properties (Initial Idea in PR #23907 description/Patch 1):** Send a regular Pulsar message with special properties indicating it's a "cancel marker" for another target message. The `Dispatcher` would inspect message properties. + * The PR initially included an implementation (Patch 1/6) using this approach by modifying `AbstractBaseDispatcher` to check for properties like `IS_MARK_DELETE_DELAY_MESSAGE` and `DELAY_CANCELED_MESSAGE_POSITION`. + * *Seemingly rejected in favor of current Admin API approach because:* The property-based approach might be less direct and could complicate dispatcher logic. The Admin API provides a more explicit control plane operation. + +# Links + +* Mailing List discussion thread: +* Mailing List voting thread: From b8afb802fd695ba814f07da05ee0622fd3801e47 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Mon, 2 Jun 2025 15:14:24 +0800 Subject: [PATCH 2/9] [improve][pip] PIP-423: Add Support for Cancelling Individual Delayed Messages --- pip/pip-423.md | 57 ++++++++++++++++++++++++-------------------------- 1 file changed, 27 insertions(+), 30 deletions(-) diff --git a/pip/pip-423.md b/pip/pip-423.md index 6682faa7c10b0..acab099e7c324 100644 --- a/pip/pip-423.md +++ b/pip/pip-423.md @@ -100,36 +100,33 @@ public interface DelayedDeliveryTracker extends AutoCloseable { * `operationType`: Either `DELAY` (for adding/tracking) or `CANCEL`. ### 4. `BucketDelayedDeliveryTracker` Enhancements -* **`applyDelayOperation(ledgerId, entryId, deliverAt, DelayedOperationType.CANCEL)`:** - * Calls `doCancelOperation(ledgerId, entryId, deliverAt)`. -* **`doCancelOperation(long ledgerId, long entryId, long deliverAt)`:** - * Synchronized method. - * Checks if the message `(ledgerId, entryId)` exists using `containsMessage()`. - * If **yes**: - * Calls `removeIndexBit(ledgerId, entryId)` to remove it from tracking bitmaps. - * Adds `(ledgerId, entryId)` to the `canceledMessages` set. - * Adds `(ledgerId, entryId)` to `canceledMessagesExpiryQueue`, scheduled to expire at `deliverAt + 2 * tickTimeMillis` (original delivery time of target message + buffer). - * Decrements `numberDelayedMessages`. - * Returns `true`. - * If **no** (message not currently tracked, e.g., too far in future, or already past): - * Checks if `deliverAt` (original delivery time) is valid (not in the past relative to cutoff). If invalid, return `false`. - * Calculates `cancelTime = Math.max(clock.millis(), deliverAt - (2 * tickTimeMillis))`. This is when the "cancel command" itself should be processed. - * Adds a cancel command to `lastMutableBucket`: `lastMutableBucket.addMessage(ledgerId, entryId, cancelTime, DelayedOperationType.CANCEL)`. Note: `ledgerId` and `entryId` here refer to the *target* message. This adds a marker that will be processed at `cancelTime`. - * Returns `true`. -* **`canceledMessages`:** A `ConcurrentLongPairSet` storing `(ledgerId, entryId)` of messages for which cancellation has been processed or requested. -* **`canceledMessagesExpiryQueue`:** A `TripleLongPriorityQueue` storing `(expiryTimestamp, ledgerId, entryId)` for entries in `canceledMessages`. Used by `canceledMessagesCleanup()`. -* **`addCanceledMessage(long ledgerId, long entryId, long timestamp)`:** Helper method to add to `canceledMessages` and schedule expiry in `canceledMessagesExpiryQueue`. The `timestamp` here is the reference time for expiry calculation (e.g., CANCEL command's processing time or target message's original delivery time). -* **Recovery (`recoverBucketSnapshot`)**: - * When loading `DelayedIndex` entries from a snapshot: - * If `delayed_operation_type` is `CANCEL`, `addCanceledMessage(ledgerId, entryId, timestamp)` is called. - * If `delayed_operation_type` is `DELAY` (or not set), it's added to `sharedBucketPriorityQueue` only if `(ledgerId, entryId)` is *not* in `canceledMessages`. -* **Message Scheduling (`getScheduledMessages`)**: - * When peeking messages from `sharedBucketPriorityQueue`: - * If `(ledgerId, entryId)` is found in `canceledMessages`, the message is popped, `removeIndexBit()` is called, `numberDelayedMessages` is decremented if applicable, and the message is skipped (not returned for dispatch). -* **Cleanup (`run()` and `canceledMessagesCleanup()`):** - * The `run()` method, called periodically by the timer, now also calls `canceledMessagesCleanup()`. - * `canceledMessagesCleanup()`: Iterates `canceledMessagesExpiryQueue`. If an entry's `expiryTimestamp <= clock.millis()`, it's removed from `canceledMessagesExpiryQueue` and `canceledMessages`. This prevents unbounded growth of the `canceledMessages` set. -* **Clearing State (`clear()`, `close()`):** These methods are updated to also clear/close `canceledMessages` and `canceledMessagesExpiryQueue`. + +The `BucketDelayedDeliveryTracker` is modified to support message cancellation through the `applyDelayOperation` method with `DelayedOperationType.CANCEL`. The core logic resides in `doCancelOperation(ledgerId, entryId, deliverAt)`: + +* **Handling Cancellation Requests (`doCancelOperation`):** + 1. **If Target Message is Tracked:** If `containsMessage(ledgerId, entryId)` is true (message index is loaded), it's immediately removed (`removeIndexBit`), its ID is added to a new `canceledMessages` set, and an expiry for this cancellation record is scheduled in `canceledMessagesExpiryQueue`. `numberDelayedMessages` is decremented. + 2. **If Target Message is Not Tracked:** (e.g., delivery is far in the future, not yet loaded, or already processed). If `deliverAt` is valid, a "cancel command" (a `DelayedIndex` entry of type `CANCEL`) is added to `lastMutableBucket`. This command is scheduled to be processed at `max(now, deliverAt - 2 * tickTimeMillis)`, ensuring it's handled before the original message's delivery time. + +* **Key Data Structures:** + * `canceledMessages` (`ConcurrentLongPairSet`): Stores `(ledgerId, entryId)` of definitively cancelled messages. This set is the primary filter to prevent delivery of cancelled messages. + * `canceledMessagesExpiryQueue` (`TripleLongPriorityQueue`): Manages `(expiryTimestamp, ledgerId, entryId)` for entries in `canceledMessages` to enable cleanup. + +* **Persistence and Recovery:** + * "Cancel commands" are persisted in bucket snapshots by setting `DelayedIndex.delayed_operation_type` to `CANCEL`. + * During snapshot recovery (`recoverBucketSnapshot`): + * `CANCEL` type entries populate the `canceledMessages` set (via `addCanceledMessage`). + * `DELAY` type entries are only added to `sharedBucketPriorityQueue` if not found in `canceledMessages`. + +* **Message Scheduling (`getScheduledMessages`):** + * Messages retrieved from `sharedBucketPriorityQueue` are skipped if their ID is present in `canceledMessages`. The corresponding index bit is also cleared and `numberDelayedMessages` decremented. + +* **Cleanup (`canceledMessagesCleanup()`):** + * Periodically called (via `run()`) to remove expired entries from `canceledMessages` and `canceledMessagesExpiryQueue`, based on their scheduled expiry time, preventing unbounded growth. + +* **State Management (`clear()`, `close()`):** + * These methods now also manage the lifecycle of `canceledMessages` and `canceledMessagesExpiryQueue`. + +This design ensures that cancellations are either applied immediately or are durably recorded and processed before the target message's scheduled delivery, with a mechanism to clean up cancellation markers over time. ### 5. `MutableBucket` Enhancements * **`addMessage(long ledgerId, long entryId, long deliverAt, DelayedOperationType operationType)`:** From 941c6556275f6c96f7e4c4ad98bc3b9e42523445 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Sun, 8 Jun 2025 13:43:22 +0800 Subject: [PATCH 3/9] Implement the delayed message cancellation function through acknowledge message. --- pip/pip-423.md | 331 +++++++++++++++++++++++-------------------------- 1 file changed, 157 insertions(+), 174 deletions(-) diff --git a/pip/pip-423.md b/pip/pip-423.md index acab099e7c324..a90efe2585f18 100644 --- a/pip/pip-423.md +++ b/pip/pip-423.md @@ -2,12 +2,11 @@ # Background knowledge -* **Delayed Messages:** Apache Pulsar allows producers to send messages that are scheduled to be delivered to consumers at a specified future time. This is achieved by setting a `deliverAt` or `deliverAfter` timestamp on the message. The broker then holds these messages and only dispatches them to consumers once their scheduled delivery time is reached. -* **DelayedDeliveryTracker:** For persistent topics, when delayed message delivery is enabled, a `DelayedDeliveryTracker` is used on the broker to manage the schedule of these messages. There are two main implementations: - * `InMemoryDelayedDeliveryTracker`: Keeps all delayed message indexes in memory. Suitable for a smaller number of delayed messages. - * `BucketDelayedDeliveryTracker`: A more scalable solution that uses a hybrid approach. It keeps recent delayed message indexes in a mutable in-memory bucket and spills older/larger sets of indexes into persistent snapshots stored in BookKeeper (via `BookkeeperBucketSnapshotStorage`). This allows handling a large number of delayed messages without exhausting broker memory. When a snapshot is persisted, it contains `DelayedIndex` entries, each representing a delayed message with its original `ledgerId`, `entryId`, and scheduled `timestamp`. -* **Message Identification (LedgerId, EntryId):** In Pulsar, messages are stored in ledgers. Each message within a ledger has a unique `entryId`. The combination of `ledgerId` and `entryId` (often represented as `Position`) uniquely identifies a message within a topic. -* **Current State before this PIP:** Pulsar does not offer a native mechanism to cancel a delayed message once it has been sent by the producer and accepted by the broker. If circumstances change and a scheduled message becomes irrelevant, it would still be delivered at its scheduled time. +* **Delayed Messages:** Apache Pulsar allows producers to send messages that are scheduled to be delivered to consumers at a specified future time. This is achieved by setting a `deliverAt` or `deliverAfter` timestamp on the message. When a topic has delayed message delivery enabled, the broker holds these messages and only dispatches them to a subscription's consumers once their scheduled delivery time is reached. The scheduling logic is managed per subscription. +* **Message Identification (MessageId):** In Pulsar, every message is uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an `entryId`, and optionally a partition index. The combination of `ledgerId` and `entryId` (often represented as a `Position`) uniquely identifies a message's physical storage location within a topic. A producer receives a `MessageId` for each message it successfully sends. +* **Subscriptions and Cursors:** Each subscription on a topic maintains its own "cursor". A cursor tracks the consumption progress for that subscription. It maintains a `mark-delete position`, indicating that all messages up to that point have been acknowledged. +* **Individual Acknowledgement and Acknowledgement Holes:** Pulsar subscriptions support different acknowledgement modes. While cumulative acknowledgement acknowledges all messages up to a certain point, **individual acknowledgement** allows a consumer to acknowledge a single message, even if it's out of order. When a message is acknowledged individually ahead of the `mark-delete position`, it creates an "acknowledgement hole". The broker's `ManagedCursor` must persistently track these holes to ensure that already-acknowledged messages are not redelivered after a broker restart or cursor re-initialization. +* **Current State before this PIP:** Pulsar does not offer a native API to cancel a specific delayed message once it has been sent. The only way to prevent its delivery is to have a consumer receive it and then discard it, which is inefficient. # Motivation @@ -21,154 +20,90 @@ Without a cancellation feature, applications might resort to workarounds, such a * Consumers receiving and then filtering/ignoring obsolete delayed messages, which wastes processing resources and bandwidth. * Maintaining complex state management on the client-side to track and ignore messages, which is error-prone and doesn't prevent the message from being delivered by the broker. -This proposal aims to introduce a robust mechanism to cancel delayed messages directly at the broker level, preventing their delivery if they are no longer needed. +This proposal aims to introduce a robust mechanism to cancel delayed messages directly at the subscription level, preventing their delivery if they are no longer needed. # Goals ## In Scope -* Provide a mechanism for users to request the cancellation of a specific delayed message. -* The cancellation request will target a message identified by its `ledgerId`, `entryId`, and original `deliverAt` timestamp. -* The cancellation will be applied at the broker's `DelayedDeliveryTracker` level. -* Introduce a new Admin API endpoint for cancelling delayed messages. -* Provide a corresponding `pulsar-admin` CLI command. -* Support cancellation for `BucketDelayedDeliveryTracker`. -* Ensure that once a message is successfully cancelled, it will not be delivered to consumers. +* Provide a new Admin API endpoint and corresponding `pulsar-admin` CLI command to request the cancellation of a single delayed message. +* The cancellation will be targeted at a specific message on a specific subscription (or all subscriptions of a topic) using the message's `ledgerId` and `entryId`. +* The core implementation will leverage Pulsar's existing individual message acknowledgement mechanism (`AckType.Individual`) for persistence and reliability. +* This feature will only be supported for subscriptions that allow individual acknowledgements (e.g., `Shared`, `Key_Shared`). It will not be supported for `Exclusive` or `Failover` subscriptions which use cumulative acknowledgements. +* Ensure that once a message is successfully cancelled for a subscription, it will not be delivered to any consumer on that subscription. ## Out of Scope -* Client-side (producer/consumer) API for direct cancellation within application code (cancellation is initiated via Admin API). -* Complex wildcard-based cancellation (e.g., cancel all messages for a user). Cancellation is per specific message ID. -* Guaranteed immediate cancellation across geo-replicated clusters (cancellation is primarily a per-cluster operation, though admin command can target partitions). +* A new client-facing API for producers or consumers to directly cancel messages. The cancellation is an administrative action. +* Automatic cancellation across geo-replicated clusters. The cancellation command is a per-cluster administrative operation. # High Level Design -The proposed solution introduces a mechanism to mark a specific delayed message for cancellation within the broker's `DelayedDeliveryTracker`. +The proposed solution re-frames the concept of "cancelling" a delayed message as "pre-acknowledging" it for a given subscription. This leverages the broker's robust, existing infrastructure for handling individual acknowledgements. -1. **Cancellation Request:** A user initiates a cancellation request via a new Pulsar Admin API endpoint. The request must specify the target topic, the `ledgerId` and `entryId` of the delayed message to be cancelled, and its original scheduled `deliverAt` timestamp. The `deliverAt` timestamp helps in locating or scheduling the cancellation appropriately if the message is not immediately found in active memory. Optionally, specific subscription names can be provided; otherwise, the cancellation applies to all subscriptions on the topic. +1. **Initiate Cancellation:** A user or operator initiates a cancellation request using a new `pulsar-admin topics cancel-delayed-message` command or its corresponding REST API endpoint. The request must specify the topic, the target `ledgerId` and `entryId` of the message, and optionally, one or more subscription names. If no subscription is specified, the operation will apply to all eligible subscriptions on the topic. -2. **Dispatcher Involvement:** The Admin API call on the broker handling the request will identify the relevant topic and its subscriptions. For each subscription, it will interact with the `Dispatcher` associated with it. +2. **Broker Receives Request:** The Pulsar broker owning the topic receives the admin request. It validates the parameters and permissions. -3. **DelayedDeliveryTracker Update:** The `Dispatcher` will delegate the cancellation to its `DelayedDeliveryTracker`. - * A new method, `applyDelayOperation(ledgerId, entryId, deliverAt, operationType)`, is introduced in `DelayedDeliveryTracker`. For cancellation, `operationType` will be `CANCEL`. - * **`BucketDelayedDeliveryTracker` Logic:** - * When a `CANCEL` operation is received: - 1. It first checks if the target message (`ledgerId`, `entryId`) is currently tracked (i.e., its index bit is set in the mutable or immutable buckets' bitmaps). If found, its index bit is cleared, effectively removing it from being scheduled, and `numberDelayedMessages` is decremented. A record of this cancellation is stored in a `canceledMessages` set. - 2. If the target message is *not* currently found in the tracker's active indexes (e.g., it's scheduled far in the future and its bucket hasn't been loaded, or it was already delivered/acknowledged), a "cancel command" (a special `DelayedIndex` entry with `DelayedOperationType.CANCEL`) is added to the `lastMutableBucket`. This cancel command is scheduled to be processed approximately `2 * tickTimeMillis` before the original `deliverAt` time of the target message. This ensures the cancel command is loaded and processed before the target message itself would be. - * **Persistence of Cancel Commands:** These cancel commands are persisted as part of bucket snapshots. The `DelayedIndex` proto is extended to include an `DelayedOperationType` field (`DELAY` or `CANCEL`). - * **Recovery:** During recovery from snapshots, `DelayedIndex` entries of type `CANCEL` are loaded into the `canceledMessages` set. Regular `DELAY` type messages are only added to the scheduling queue if they are not present in `canceledMessages`. - * **Dispatch Filtering:** When `getScheduledMessages()` is called to retrieve messages for dispatch, if a message ID from the `sharedBucketPriorityQueue` is found in `canceledMessages`, it is skipped and its index bit removed. - * **Cleanup of `canceledMessages`:** To prevent unbounded growth, entries in `canceledMessages` (and the persisted CANCEL commands) will have an expiry mechanism. A `canceledMessagesExpiryQueue` tracks these entries, and they are cleaned up after a certain period (e.g., `original_deliver_at + 2 * tickTimeMillis`) post their relevance. +3. **Delegate to Subscription:** For each specified (or discovered) subscription, the broker invokes a new `cancelDelayedMessage` method on the `PersistentSubscription` object. -4. **Result:** If the cancellation is successfully processed by the tracker, the target delayed message will not be delivered to any consumer on the specified subscriptions. +4. **Perform Individual Acknowledgement:** Inside the `PersistentSubscription`, the core logic is executed: + * It verifies that the subscription type supports individual acknowledgements. + * It constructs a `Position` object from the provided `ledgerId` and `entryId`. + * It calls the internal `acknowledgeMessage()` method with `AckType.Individual` for that single position. -# Detailed Design - -## Design & Implementation Details - -### 1. `DelayedOperationType` Enum -A new enum `DelayedOperationType` is defined in `pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto`: -```protobuf -enum DelayedOperationType { - DELAY = 0; - CANCEL = 1; -} -``` -This enum is used to differentiate between regular delayed messages and cancellation commands within the `DelayedDeliveryTracker` and its persisted snapshots. - -### 2. `DelayedIndex` Proto Modification -The `DelayedIndex` message in `DelayedMessageIndexBucketSegment.proto` is extended: -```protobuf -message DelayedIndex { - required uint64 timestamp = 1; - required uint64 ledger_id = 2; - required uint64 entry_id = 3; - optional DelayedOperationType delayed_operation_type = 4 [default = DELAY]; -} -``` - -### 3. `DelayedDeliveryTracker` Interface -The `DelayedDeliveryTracker` interface is extended with a new method: -```java -public interface DelayedDeliveryTracker extends AutoCloseable { - // ... - boolean applyDelayOperation(long ledgerId, long entryId, long deliverAt, DelayedOperationType operationType); -} -``` -* `ledgerId`, `entryId`: Identify the target delayed message. -* `deliverAt`: The original scheduled delivery timestamp of the target delayed message. -* `operationType`: Either `DELAY` (for adding/tracking) or `CANCEL`. - -### 4. `BucketDelayedDeliveryTracker` Enhancements - -The `BucketDelayedDeliveryTracker` is modified to support message cancellation through the `applyDelayOperation` method with `DelayedOperationType.CANCEL`. The core logic resides in `doCancelOperation(ledgerId, entryId, deliverAt)`: +5. **Persistence and Effect:** This action is identical to a consumer acknowledging that specific message. The `ManagedCursor` for the subscription records this individual acknowledgement, creating an "acknowledgement hole". This state is persisted to BookKeeper (or ZooKeeper, depending on configuration) just like any other acknowledgement information. When the `DelayedDeliveryTracker` later attempts to schedule this message for delivery, the dispatcher will read from the topic, but the `ManagedCursor` will see that this specific message has already been acknowledged and will therefore skip it, preventing its delivery to any consumer on that subscription. -* **Handling Cancellation Requests (`doCancelOperation`):** - 1. **If Target Message is Tracked:** If `containsMessage(ledgerId, entryId)` is true (message index is loaded), it's immediately removed (`removeIndexBit`), its ID is added to a new `canceledMessages` set, and an expiry for this cancellation record is scheduled in `canceledMessagesExpiryQueue`. `numberDelayedMessages` is decremented. - 2. **If Target Message is Not Tracked:** (e.g., delivery is far in the future, not yet loaded, or already processed). If `deliverAt` is valid, a "cancel command" (a `DelayedIndex` entry of type `CANCEL`) is added to `lastMutableBucket`. This command is scheduled to be processed at `max(now, deliverAt - 2 * tickTimeMillis)`, ensuring it's handled before the original message's delivery time. +This design is simple, robust, and minimally invasive, as it relies almost entirely on existing, well-tested broker functionality. -* **Key Data Structures:** - * `canceledMessages` (`ConcurrentLongPairSet`): Stores `(ledgerId, entryId)` of definitively cancelled messages. This set is the primary filter to prevent delivery of cancelled messages. - * `canceledMessagesExpiryQueue` (`TripleLongPriorityQueue`): Manages `(expiryTimestamp, ledgerId, entryId)` for entries in `canceledMessages` to enable cleanup. - -* **Persistence and Recovery:** - * "Cancel commands" are persisted in bucket snapshots by setting `DelayedIndex.delayed_operation_type` to `CANCEL`. - * During snapshot recovery (`recoverBucketSnapshot`): - * `CANCEL` type entries populate the `canceledMessages` set (via `addCanceledMessage`). - * `DELAY` type entries are only added to `sharedBucketPriorityQueue` if not found in `canceledMessages`. - -* **Message Scheduling (`getScheduledMessages`):** - * Messages retrieved from `sharedBucketPriorityQueue` are skipped if their ID is present in `canceledMessages`. The corresponding index bit is also cleared and `numberDelayedMessages` decremented. - -* **Cleanup (`canceledMessagesCleanup()`):** - * Periodically called (via `run()`) to remove expired entries from `canceledMessages` and `canceledMessagesExpiryQueue`, based on their scheduled expiry time, preventing unbounded growth. - -* **State Management (`clear()`, `close()`):** - * These methods now also manage the lifecycle of `canceledMessages` and `canceledMessagesExpiryQueue`. +# Detailed Design -This design ensures that cancellations are either applied immediately or are durably recorded and processed before the target message's scheduled delivery, with a mechanism to clean up cancellation markers over time. +## Design & Implementation Details -### 5. `MutableBucket` Enhancements -* **`addMessage(long ledgerId, long entryId, long deliverAt, DelayedOperationType operationType)`:** - * If `operationType` is `CANCEL`: - * Adds `(deliverAt, ledgerId, entryId)` to its internal `priorityQueue`. `deliverAt` here is the scheduled processing time for the CANCEL command. - * Adds `(ledgerId, entryId)` to its `canceledOperations` set (identifies the target message). - * **Crucially, `putIndexBit()` is NOT called for CANCEL operations.** This means CANCEL operations don't contribute to the message count/bitmap of messages to be delivered from this bucket. -* **Sealing and Persisting (`createImmutableBucketAndAsyncPersistent`)**: - * When creating `DelayedIndex` entries for the snapshot from its `priorityQueue`: - * It checks if `(ledgerId, entryId)` is in `canceledOperations`. If so, the `DelayedIndex` is marked with `DelayedOperationType.CANCEL`. Otherwise, it's marked `DELAY`. - * When moving messages from the first segment of a newly created immutable bucket to the `sharedBucketPriorityQueue`, it only adds them if they are not in `canceledOperations`. -* **`clearCanceledOperations()`**: Called after an immutable bucket is successfully created and persisted, to clear the `canceledOperations` set in the (now old) mutable bucket. +The implementation centers on adding a new administrative pathway to trigger an existing broker capability (individual acknowledgement). -### 6. `Dispatcher` Interface -The `Dispatcher` interface is extended: -```java -public interface Dispatcher { - // ... - default CompletableFuture cancelDelayedMessage(long ledgerId, long entryId, long deliverAt) { - return CompletableFuture.completedFuture(false); // Default for non-supporting dispatchers +1. **Subscription Interface Extension:** + The `Subscription` interface is extended with a new default method. + ```java + public interface Subscription { + // ... + default CompletableFuture cancelDelayedMessage(long ledgerId, long entryId) { + return CompletableFuture.completedFuture(false); + } } -} -``` - -### 7. `PersistentDispatcherMultipleConsumers` -Implements `cancelDelayedMessage`: -```java -@Override -public CompletableFuture cancelDelayedMessage(long ledgerId, long entryId, long deliverAt) { - synchronized (this) { - if (delayedDeliveryTracker.isPresent()) { - boolean result = delayedDeliveryTracker.get() - .applyDelayOperation(ledgerId, entryId, deliverAt, DelayedOperationType.CANCEL); - // Logging ... - return CompletableFuture.completedFuture(result); - } else { - // Logging ... + ``` + +2. **PersistentSubscription Implementation:** + The `PersistentSubscription` class provides the concrete implementation. + ```java + // In PersistentSubscription.java + @Override + public CompletableFuture cancelDelayedMessage(long ledgerId, long entryId) { + // This operation is only valid for subscription types that track individual messages. + if (Subscription.isCumulativeAckMode(getType())) { + log.warn("Cannot cancel delayed message on subscription {} of type {} which uses cumulative ack.", + getName(), getType()); return CompletableFuture.completedFuture(false); } + + Position position = PositionFactory.create(ledgerId, entryId); + List positions = Collections.singletonList(position); + Map properties = Collections.emptyMap(); + + // Acknowledge the single message. This will be persisted in the cursor. + acknowledgeMessage(positions, AckType.Individual, properties); + return CompletableFuture.completedFuture(true); } -} -``` + ``` + +3. **Admin API and Topic Logic:** + * A new `internalCancelDelayedMessage` method is added to `PersistentTopicsBase`. + * This method handles the logic for partitioned and non-partitioned topics. For partitioned topics, it fans out the request to each partition. + * For a given topic, it retrieves the `PersistentTopic` object. If `subscriptionNames` is not provided, it gets a list of all subscriptions on the topic. + * It iterates through the target subscriptions and calls the `cancelDelayedMessage(ledgerId, entryId)` method on each `Subscription` object. + +4. **Authorization:** + A new `TopicOperation` enum value, `CANCEL_DELAYED_MESSAGE`, is added. This ensures that the operation can be controlled via Pulsar's role-based access control. A client role must be granted the `cancel_delayed_message` permission on a topic to use this feature. ## Public-facing Changes @@ -208,23 +143,19 @@ A new REST Admin API endpoint is added to `PersistentTopics`: @QueryParam("ledgerId") long ledgerId, @ApiParam(value = "Entry ID of the target delayed message", required = true) @QueryParam("entryId") long entryId, - @ApiParam(value = "Original deliverAt time of the target delayed message (in milliseconds from epoch)", - required = true) - @QueryParam("deliverAt") long deliverAt, @ApiParam(value = "List of subscription names to cancel on (comma-separated, empty or null for" + " all subscriptions)") @QueryParam("subscriptionNames") List subscriptionNames) { try { validateTopicName(property, cluster, namespace, encodedTopic); - if (ledgerId < 0 || entryId < 0 || deliverAt <= 0) { + if (ledgerId < 0 || entryId < 0) { asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, - "ledgerId, entryId must be non-negative, and deliverAt must be positive.")); + "ledgerId, entryId must be non-negative.")); return; } List finalSubscriptionNames = (subscriptionNames == null || subscriptionNames.isEmpty()) ? null : subscriptionNames; - internalCancelDelayedMessage(asyncResponse, ledgerId, entryId, deliverAt, - finalSubscriptionNames, authoritative); + internalCancelDelayedMessage(asyncResponse, ledgerId, entryId, finalSubscriptionNames, authoritative); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { @@ -262,21 +193,18 @@ A new REST Admin API endpoint is added to `PersistentTopics`: @QueryParam("ledgerId") long ledgerId, @ApiParam(value = "Entry ID of the target delayed message", required = true) @QueryParam("entryId") long entryId, - @ApiParam(value = "Original deliverAt time of the target delayed message (in milliseconds from epoch)", - required = true) - @QueryParam("deliverAt") long deliverAt, @ApiParam(value = "List of subscription names to cancel on (empty or null for all subscriptions)") @QueryParam("subscriptionNames") List subscriptionNames) { try { validateTopicName(tenant, namespace, encodedTopic); - if (ledgerId < 0 || entryId < 0 || deliverAt <= 0) { + if (ledgerId < 0 || entryId < 0) { asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, - "ledgerId, entryId, and deliverAt must be positive.")); + "ledgerId, entryId must be positive.")); return; } List finalSubscriptionNames = (subscriptionNames == null || subscriptionNames.isEmpty()) ? null : subscriptionNames; - internalCancelDelayedMessage(asyncResponse, ledgerId, entryId, deliverAt, + internalCancelDelayedMessage(asyncResponse, ledgerId, entryId, finalSubscriptionNames, authoritative); } catch (WebApplicationException wae) { asyncResponse.resume(wae); @@ -288,62 +216,117 @@ A new REST Admin API endpoint is added to `PersistentTopics`: ### Binary protocol -pulsar-broker/src/main/proto/DelayedMessageIndexBucketSegment.proto +No changes are made to the Pulsar binary protocol. -```protobuf -message DelayedIndex { - ... - optional DelayedOperationType delayed_operation_type = 4 [default = DELAY]; -} +### Configuration -enum DelayedOperationType { - DELAY = 0; - CANCEL = 1; -} -``` +**No new configuration parameters are introduced. However, this feature's performance and behavior under heavy load are directly influenced by existing `ManagedLedger` configurations that govern the persistence of acknowledgement holes.** -### Configuration +**Administrators should be aware of these settings if they expect a high volume of delayed message cancellations:** + +``` +# Max number of "acknowledgment holes" that are going to be persistently stored. +# When acknowledging out of order, a consumer will leave holes that are supposed +# to be quickly filled by acking all the messages. The information of which +# messages are acknowledged is persisted by compressing in "ranges" of messages +# that were acknowledged. After the max number of ranges is reached, the information +# will only be tracked in memory and messages will be redelivered in case of +# crashes. +managedLedgerMaxUnackedRangesToPersist=10000 + +# Maximum number of partially acknowledged batch messages per subscription that will have their batch +# deleted indexes persisted. Batch deleted index state is handled when acknowledgmentAtBatchIndexLevelEnabled=true. +# When this limit is exceeded, remaining batch message containing the batch deleted indexes will +# only be tracked in memory. In case of broker restarts or load balancing events, the batch +# deleted indexes will be cleared while redelivering the messages to consumers. +managedLedgerMaxBatchDeletedIndexToPersist=10000 + +# When storing acknowledgement state, choose a more compact serialization format that stores +# individual acknowledgements as a bitmap which is serialized to an array of long values. NOTE: This setting requires +# managedLedgerUnackedRangesOpenCacheSetEnabled=true to be effective. +managedLedgerPersistIndividualAckAsLongArray=true + +# When set to true, a BitSet will be used to track acknowledged messages that come after the "mark delete position" +# for each subscription. RoaringBitmap is used as a memory efficient BitSet implementation for the acknowledged +# messages tracking. Unacknowledged ranges are the message ranges excluding the acknowledged messages. +managedLedgerUnackedRangesOpenCacheSetEnabled=true + +# Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher +# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into +# MetadataStore. +managedLedgerMaxUnackedRangesToPersistInMetadataStore=1000 + +# ManagedCursorInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY). +# If value is NONE, then save the ManagedCursorInfo bytes data directly without compression. +# Using compression reduces the size of persistent cursor (subscription) metadata. This enables using a higher +# managedLedgerMaxUnackedRangesToPersistInMetadataStore value and reduces the overall metadata stored in +# the metadata store such as ZooKeeper. +managedCursorInfoCompressionType=NONE + +# ManagedCursorInfo compression size threshold (bytes), only compress metadata when origin size more then this value. +# 0 means compression will always apply. +managedCursorInfoCompressionThresholdInBytes=16384 + +# ManagedLedgerInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY). +# If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly without compression. +# Using compression reduces the size of the persistent topic metadata. When a topic contains a large number of +# individual ledgers in BookKeeper or tiered storage, compression helps prevent the metadata size from exceeding +# the maximum size of a metadata store entry (ZNode in ZooKeeper). This also reduces the overall metadata stored +# in the metadata store such as ZooKeeper. +managedLedgerInfoCompressionType=NONE + +# ManagedLedgerInfo compression size threshold (bytes), only compress metadata when origin size more then this value. +# 0 means compression will always apply. +managedLedgerInfoCompressionThresholdInBytes=16384 +``` ### CLI -A new CLI command is added to `pulsar-admin topics`: +A new command is added to the `pulsar-admin topics` tool. * **Command:** `cancel-delayed-message` * **Synopsis:** `pulsar-admin topics cancel-delayed-message [options]` * **Parameters:** - * `topic-name` (String, required): The name of the topic (e.g., `persistent://tenant/namespace/topic`). + * `topic-name` (String, required): The full name of the topic (e.g., `persistent://tenant/namespace/topic`). * **Options:** * `-l, --ledgerId ` (required): Ledger ID of the message to cancel. * `-e, --entryId ` (required): Entry ID of the message to cancel. - * `-t, --deliverAt ` (required): Original scheduled delivery time (timestamp in ms from epoch) of the message to cancel. - * `-s, --subscriptionNames ` (optional): Comma-separated list of subscription names to target. If not specified, applies to all subscriptions. + * `-s, --subscriptionNames ` (optional): Comma-separated list of subscription names. If not specified, applies to all subscriptions on the topic. + * **Example:** -```bash - pulsar-admin topics cancel-delayed-message persistent://public/default/my-topic \ - --ledgerId 12345 --entryId 100 --deliverAt 1678886400000 \ - --subscriptionNames sub1,sub2 -``` + ```bash + pulsar-admin topics cancel-delayed-message persistent://public/default/my-orders \ + --ledgerId 12345 --entryId 100 --subscriptionNames my-app-subscription + ``` # Backward & Forward Compatibility ## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations -* The `cancelDelayedMessage` operation is performed on a per-cluster basis via the Admin API. It directly modifies the state of the `DelayedDeliveryTracker` in the cluster where the command is executed. -* Delayed messages themselves are not typically subject to geo-replication of their "delayed" state; they are produced to a specific cluster. The message data might be replicated if the topic is replicated, but its delayed scheduling is local. -* **Upgrade/Downgrade:** - * If different clusters in a geo-replicated setup are on different versions (one with cancellation, one without), a message cancelled in the upgraded cluster will not be automatically cancelled in the non-upgraded cluster. Cancellation would need to be issued to each cluster's admin endpoint separately if desired. - * No specific issues related to geo-replication data consistency are expected beyond the per-cluster nature of this operation. +The `cancelDelayedMessage` operation is fundamentally a per-cluster operation that modifies a subscription's local cursor. It has no direct interaction with geo-replication. + +* A delayed message's scheduling state is local to the cluster where it is being managed. +* To cancel a delayed message on a replicated topic across multiple clusters, the admin command must be issued to each cluster individually. # Alternatives -1. **Client-Side Filtering (Current Workaround):** Consumers receive all delayed messages and filter out unwanted ones based on application logic. - * *Rejected because:* Inefficient, wastes resources, and doesn't prevent broker-level delivery. +1. **Client-Side Filtering (Current Workaround):** + +* **Description:** Consumers receive all delayed messages and filter out unwanted ones based on application logic. +* **Reason for Rejection:** Inefficient, wastes resources, and doesn't prevent broker-level delivery. + +2. **Mark Message with Special Properties:** + +* **Description:** Send a regular Pulsar message with special properties indicating it's a "cancel marker" for + another target message. The `Dispatcher` would inspect message properties. The PR initially included an implementation (Patch 1/6) using this approach by modifying `AbstractBaseDispatcher` to check for properties like `IS_MARK_DELETE_DELAY_MESSAGE` and `DELAY_CANCELED_MESSAGE_POSITION`. +* **Reason for Rejection:** The property-based approach might be less direct and could complicate dispatcher logic. The Admin API provides a more explicit control plane operation. + +3. **In-Tracker Cancellation Commands:** -2. **Mark Message with Special Properties (Initial Idea in PR #23907 description/Patch 1):** Send a regular Pulsar message with special properties indicating it's a "cancel marker" for another target message. The `Dispatcher` would inspect message properties. - * The PR initially included an implementation (Patch 1/6) using this approach by modifying `AbstractBaseDispatcher` to check for properties like `IS_MARK_DELETE_DELAY_MESSAGE` and `DELAY_CANCELED_MESSAGE_POSITION`. - * *Seemingly rejected in favor of current Admin API approach because:* The property-based approach might be less direct and could complicate dispatcher logic. The Admin API provides a more explicit control plane operation. +* **Description:** This approach involved modifying the `BucketDelayedDeliveryTracker` itself. A new `CANCEL` operation type would be added to the `DelayedIndex` protobuf message. When a cancellation was requested, a new `DelayedIndex` record with this `CANCEL` type would be created and persisted in the bucket snapshots. During dispatch, the tracker would check for these cancel markers and skip delivering the corresponding messages. +* **Reason for Rejection:** This design introduced a critical reliability problem. If a cancellation request was processed and added to the in-memory `lastMutableBucket`, but the broker crashed before this bucket was sealed and persisted to BookKeeper, the cancellation record would be lost forever. The client would have received a success response, but the message would still be delivered after the broker restarted. Guaranteeing the persistence of this cancel command before returning success would introduce unacceptable latency. The chosen approach of using acknowledgements avoids this by leveraging a proven, robust persistence mechanism. # Links -* Mailing List discussion thread: +* Mailing List discussion thread: https://lists.apache.org/thread/lo182ztgrkzlq6mbkytj8krd050yvb9w * Mailing List voting thread: From 8fb5e0b995aa4d2059062a93eade626ad23a6502 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Tue, 10 Jun 2025 19:31:25 +0800 Subject: [PATCH 4/9] expanding the scope of this PIP is not just to achieve cancellation of delayed messages. --- pip/pip-423.md | 196 ++++++++++++++++++++++--------------------------- 1 file changed, 89 insertions(+), 107 deletions(-) diff --git a/pip/pip-423.md b/pip/pip-423.md index a90efe2585f18..ad46a5f65488a 100644 --- a/pip/pip-423.md +++ b/pip/pip-423.md @@ -1,87 +1,89 @@ -# PIP-423: Add Support for Cancelling Individual Delayed Messages +# PIP-423: Add a new admin API to acknowledge a single message # Background knowledge -* **Delayed Messages:** Apache Pulsar allows producers to send messages that are scheduled to be delivered to consumers at a specified future time. This is achieved by setting a `deliverAt` or `deliverAfter` timestamp on the message. When a topic has delayed message delivery enabled, the broker holds these messages and only dispatches them to a subscription's consumers once their scheduled delivery time is reached. The scheduling logic is managed per subscription. * **Message Identification (MessageId):** In Pulsar, every message is uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an `entryId`, and optionally a partition index. The combination of `ledgerId` and `entryId` (often represented as a `Position`) uniquely identifies a message's physical storage location within a topic. A producer receives a `MessageId` for each message it successfully sends. -* **Subscriptions and Cursors:** Each subscription on a topic maintains its own "cursor". A cursor tracks the consumption progress for that subscription. It maintains a `mark-delete position`, indicating that all messages up to that point have been acknowledged. -* **Individual Acknowledgement and Acknowledgement Holes:** Pulsar subscriptions support different acknowledgement modes. While cumulative acknowledgement acknowledges all messages up to a certain point, **individual acknowledgement** allows a consumer to acknowledge a single message, even if it's out of order. When a message is acknowledged individually ahead of the `mark-delete position`, it creates an "acknowledgement hole". The broker's `ManagedCursor` must persistently track these holes to ensure that already-acknowledged messages are not redelivered after a broker restart or cursor re-initialization. -* **Current State before this PIP:** Pulsar does not offer a native API to cancel a specific delayed message once it has been sent. The only way to prevent its delivery is to have a consumer receive it and then discard it, which is inefficient. +* **Subscriptions and Cursors:** Each subscription on a topic maintains its own "cursor" which tracks consumption progress. It maintains a `mark-delete position`, indicating that all messages up to that point have been acknowledged. +* **Individual Acknowledgement:** Pulsar subscriptions support different acknowledgement modes. While cumulative acknowledgement is the default for some types, `Shared` and `Key_Shared` subscriptions heavily rely on **individual acknowledgement**. This allows a consumer to acknowledge a single message, even if it's out of order. When a message is acknowledged individually ahead of the `mark-delete position`, it creates an "acknowledgement hole". The broker's `ManagedCursor` must persistently track these holes to ensure that acknowledged messages are not redelivered after a broker restart. +* **Delayed Messages:** Pulsar supports scheduling messages for future delivery. This is a primary use case that benefits from this proposal, as it allows a scheduled message to be effectively "cancelled" before its delivery time. +* **Existing `skip-messages` API:** Pulsar has an admin API to `skip` a specified *number* of messages for a subscription. This is useful for bulk-skipping, but it lacks the precision to target a single, specific message within a large backlog, especially if its exact sequence is unknown. # Motivation -In modern event-driven architectures, the ability to react to changing conditions is crucial. While Pulsar's delayed message feature is valuable for scheduling future tasks or deliveries, the lack of a cancellation mechanism presents a limitation. -Use cases where cancellation is important include: -* An order is placed with delayed processing, but the user cancels the order before processing begins. -* A scheduled notification becomes obsolete due to a subsequent user action. -* System alerts scheduled for future delivery are resolved before the delivery time. +Operators and SREs occasionally need to intervene in a topic's backlog to handle problematic messages or adapt to changing business requirements. For instance: -Without a cancellation feature, applications might resort to workarounds, such as: -* Consumers receiving and then filtering/ignoring obsolete delayed messages, which wastes processing resources and bandwidth. -* Maintaining complex state management on the client-side to track and ignore messages, which is error-prone and doesn't prevent the message from being delivered by the broker. +* **Cancelling Scheduled Actions:** A delayed message representing a future task (e.g., a scheduled report or a notification) may become obsolete. The most efficient way to handle this is to prevent its delivery entirely. +* **Removing Backlogs:** A specific message in a backlog might have a malformed payload that causes consumer applications to crash repeatedly. Removing this single message without affecting valid messages around it is a critical operational capability. +* **Manual Business Logic Correction:** An event may have been sent that is later determined to be invalid due to external factors. An administrator needs a precise tool to remove this specific event from a subscription's queue. -This proposal aims to introduce a robust mechanism to cancel delayed messages directly at the subscription level, preventing their delivery if they are no longer needed. +The existing `skip-messages` API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal introduces a more fine-grained administrative tool to acknowledge a single message by its unique `MessageId`. This provides a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog. # Goals ## In Scope -* Provide a new Admin API endpoint and corresponding `pulsar-admin` CLI command to request the cancellation of a single delayed message. -* The cancellation will be targeted at a specific message on a specific subscription (or all subscriptions of a topic) using the message's `ledgerId` and `entryId`. -* The core implementation will leverage Pulsar's existing individual message acknowledgement mechanism (`AckType.Individual`) for persistence and reliability. -* This feature will only be supported for subscriptions that allow individual acknowledgements (e.g., `Shared`, `Key_Shared`). It will not be supported for `Exclusive` or `Failover` subscriptions which use cumulative acknowledgements. -* Ensure that once a message is successfully cancelled for a subscription, it will not be delivered to any consumer on that subscription. +* Provide a new Admin API endpoint (`acknowledgeMessage`) and corresponding `pulsar-admin` CLI command to individually acknowledge a single message for a subscription. +* The target message will be identified by its `ledgerId` and `entryId`. +* The implementation will leverage Pulsar's existing, robust `AckType.Individual` mechanism for persistence and reliability. +* This feature will only be supported for subscription types that allow individual acknowledgements (e.g., `Shared`, `Key_Shared`). +* Ensure that once a message is successfully acknowledged via this API, it will not be delivered to any consumer on the targeted subscription. ## Out of Scope -* A new client-facing API for producers or consumers to directly cancel messages. The cancellation is an administrative action. -* Automatic cancellation across geo-replicated clusters. The cancellation command is a per-cluster administrative operation. +* Acknowledging a batch of messages by ID in a single command. This proposal focuses on a single message. +* Automatic acknowledgement across geo-replicated clusters. The command is a per-cluster administrative operation. # High Level Design -The proposed solution re-frames the concept of "cancelling" a delayed message as "pre-acknowledging" it for a given subscription. This leverages the broker's robust, existing infrastructure for handling individual acknowledgements. +The proposed solution provides a new administrative pathway to trigger Pulsar's existing individual acknowledgement capability on demand. -1. **Initiate Cancellation:** A user or operator initiates a cancellation request using a new `pulsar-admin topics cancel-delayed-message` command or its corresponding REST API endpoint. The request must specify the topic, the target `ledgerId` and `entryId` of the message, and optionally, one or more subscription names. If no subscription is specified, the operation will apply to all eligible subscriptions on the topic. +1. **Initiate Acknowledgement:** An administrator initiates the action via the new `pulsar-admin topics acknowledgeMessage` command or its corresponding REST API endpoint. The request must specify the topic, the target `ledgerId` and `entryId` of the message, and one or more subscription names. -2. **Broker Receives Request:** The Pulsar broker owning the topic receives the admin request. It validates the parameters and permissions. +2. **Broker Receives Request:** The Pulsar broker that owns the topic partition receives the admin request. It validates the parameters and the administrator's permissions for the specified topic. -3. **Delegate to Subscription:** For each specified (or discovered) subscription, the broker invokes a new `cancelDelayedMessage` method on the `PersistentSubscription` object. +3. **Delegate to Subscription:** For each specified subscription, the broker invokes a new `acknowledgeIndividualMessage` method on the `PersistentSubscription` object. -4. **Perform Individual Acknowledgement:** Inside the `PersistentSubscription`, the core logic is executed: - * It verifies that the subscription type supports individual acknowledgements. +4. **Perform Individual Acknowledgement:** Inside the `PersistentSubscription`, the following occurs: + * It verifies that the subscription's type is compatible with individual acknowledgement. * It constructs a `Position` object from the provided `ledgerId` and `entryId`. - * It calls the internal `acknowledgeMessage()` method with `AckType.Individual` for that single position. + * It calls its internal `acknowledgeMessage()` method with `AckType.Individual` for that single position. -5. **Persistence and Effect:** This action is identical to a consumer acknowledging that specific message. The `ManagedCursor` for the subscription records this individual acknowledgement, creating an "acknowledgement hole". This state is persisted to BookKeeper (or ZooKeeper, depending on configuration) just like any other acknowledgement information. When the `DelayedDeliveryTracker` later attempts to schedule this message for delivery, the dispatcher will read from the topic, but the `ManagedCursor` will see that this specific message has already been acknowledged and will therefore skip it, preventing its delivery to any consumer on that subscription. +5. **Persistence and Effect:** This action is functionally identical to a consumer acknowledging the message. The `ManagedCursor` for the subscription records this individual acknowledgement, which is then persisted according to the cluster's configuration. + * For a **regular message** in the backlog, it is marked as consumed for that subscription and will not be delivered. + * For a **delayed message**, it is marked as consumed before the `DelayedDeliveryTracker` even attempts to schedule it. The message is thus effectively **cancelled**. -This design is simple, robust, and minimally invasive, as it relies almost entirely on existing, well-tested broker functionality. +This design is simple, robust, and minimally invasive, as it builds upon the broker's proven message acknowledgement foundation. # Detailed Design ## Design & Implementation Details -The implementation centers on adding a new administrative pathway to trigger an existing broker capability (individual acknowledgement). - 1. **Subscription Interface Extension:** - The `Subscription` interface is extended with a new default method. - ```java + The `Subscription` interface is extended with a new default method for this operation. +```java public interface Subscription { - // ... - default CompletableFuture cancelDelayedMessage(long ledgerId, long entryId) { + /** + * Acknowledge a single message for the subscription. + * + * @param ledgerId The ledger ID of the message to acknowledge. + * @param entryId The entry ID of the message to acknowledge. + * @return A CompletableFuture that completes with true if the request was accepted. + */ + default CompletableFuture acknowledgeIndividualMessage(long ledgerId, long entryId) { return CompletableFuture.completedFuture(false); } } - ``` +``` 2. **PersistentSubscription Implementation:** The `PersistentSubscription` class provides the concrete implementation. - ```java +```java // In PersistentSubscription.java @Override - public CompletableFuture cancelDelayedMessage(long ledgerId, long entryId) { + public CompletableFuture acknowledgeIndividualMessage(long ledgerId, long entryId) { // This operation is only valid for subscription types that track individual messages. if (Subscription.isCumulativeAckMode(getType())) { - log.warn("Cannot cancel delayed message on subscription {} of type {} which uses cumulative ack.", + log.warn("Cannot acknowledge single message on subscription {} of type {} which uses cumulative ack.", getName(), getType()); return CompletableFuture.completedFuture(false); } @@ -94,16 +96,15 @@ The implementation centers on adding a new administrative pathway to trigger an acknowledgeMessage(positions, AckType.Individual, properties); return CompletableFuture.completedFuture(true); } - ``` +``` 3. **Admin API and Topic Logic:** - * A new `internalCancelDelayedMessage` method is added to `PersistentTopicsBase`. - * This method handles the logic for partitioned and non-partitioned topics. For partitioned topics, it fans out the request to each partition. - * For a given topic, it retrieves the `PersistentTopic` object. If `subscriptionNames` is not provided, it gets a list of all subscriptions on the topic. - * It iterates through the target subscriptions and calls the `cancelDelayedMessage(ledgerId, entryId)` method on each `Subscription` object. + * New methods (`internalAcknowledgeMessage`, etc.) are added to `PersistentTopicsBase` to orchestrate the operation. + * The logic handles fanning out requests to all partitions for a partitioned topic. + * For a given topic and subscription, it calls the `acknowledgeMessage(ledgerId, entryId)` method on the corresponding `Subscription` object. 4. **Authorization:** - A new `TopicOperation` enum value, `CANCEL_DELAYED_MESSAGE`, is added. This ensures that the operation can be controlled via Pulsar's role-based access control. A client role must be granted the `cancel_delayed_message` permission on a topic to use this feature. + A new `TopicOperation` enum value, `ACKNOWLEDGE_MESSAGE`, is added. This allows the operation to be secured by Pulsar's role-based access control. A client role must be granted the `acknowledge_message` permission on a topic to use this feature. ## Public-facing Changes @@ -111,23 +112,22 @@ The implementation centers on adding a new administrative pathway to trigger an A new REST Admin API endpoint is added to `PersistentTopics`: -* **Path (v1):** `POST /admin/v1/persistent/{property}/{cluster}/{namespace}/{topic}/cancelDelayedMessage` +* **Path (v1):** `POST /admin/v1/persistent/{property}/{cluster}/{namespace}/{topic}/acknowledgeMessage` ```java @POST - @Path("/{property}/{cluster}/{namespace}/{topic}/cancelDelayedMessage") - @ApiOperation(hidden = true, value = "Cancel a delayed message on specified subscriptions" - + " (or all if none specified).") + @Path("/{property}/{cluster}/{namespace}/{topic}/acknowledgeMessage") + @ApiOperation(hidden = true, value = "Acknowledge a single message by its message ID on specified subscriptions (or all if none specified).") @ApiResponses(value = { @ApiResponse(code = 204, message = "Operation successful"), @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), - @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), - @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 404, message = "Namespace or topic or subscription does not exist"), - @ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"), - @ApiResponse(code = 412, message = "Failed to cancel delayed message or invalid parameters"), + @ApiResponse(code = 401, message = "Client is not authorized to perform this operation"), + @ApiResponse(code = 403, message = "Client does not have permission to acknowledge messages on this topic"), + @ApiResponse(code = 404, message = "Namespace, topic, or subscription does not exist"), + @ApiResponse(code = 405, message = "Operation not allowed on a non-persistent topic"), + @ApiResponse(code = 412, message = "Failed to acknowledge message due to invalid parameters or incompatible subscription type"), @ApiResponse(code = 500, message = "Internal server error")}) - public void cancelDelayedMessage( + public void acknowledgeMessage( @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the property (tenant)", required = true) @PathParam("property") String property, @@ -137,25 +137,25 @@ A new REST Admin API endpoint is added to `PersistentTopics`: @PathParam("namespace") String namespace, @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") + @ApiParam(value = "Whether this broker is the authoritative owner of the topic. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, - @ApiParam(value = "Ledger ID of the target delayed message", required = true) + @ApiParam(value = "Ledger ID of the message to acknowledge.", required = true) @QueryParam("ledgerId") long ledgerId, - @ApiParam(value = "Entry ID of the target delayed message", required = true) + @ApiParam(value = "Entry ID of the message to acknowledge.", required = true) @QueryParam("entryId") long entryId, - @ApiParam(value = "List of subscription names to cancel on (comma-separated, empty or null for" - + " all subscriptions)") + @ApiParam(value = "List of subscription names to acknowledge on. If empty or not specified, the operation is applied to all subscriptions.") @QueryParam("subscriptionNames") List subscriptionNames) { try { validateTopicName(property, cluster, namespace, encodedTopic); if (ledgerId < 0 || entryId < 0) { asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, - "ledgerId, entryId must be non-negative.")); + "ledgerId and entryId must be non-negative.")); return; } List finalSubscriptionNames = (subscriptionNames == null || subscriptionNames.isEmpty()) ? null : subscriptionNames; - internalCancelDelayedMessage(asyncResponse, ledgerId, entryId, finalSubscriptionNames, authoritative); + internalAcknowledgeMessage(asyncResponse, ledgerId, entryId, + finalSubscriptionNames, authoritative); } catch (WebApplicationException wae) { asyncResponse.resume(wae); } catch (Exception e) { @@ -164,22 +164,22 @@ A new REST Admin API endpoint is added to `PersistentTopics`: } ``` -* **Path (v2):** `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/cancelDelayedMessage` +* **Path (v2):** `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/acknowledgeMessage` ```java @POST - @Path("/{tenant}/{namespace}/{topic}/cancelDelayedMessage") - @ApiOperation(value = "Cancel a delayed message on specified subscriptions (or all if none specified).") + @Path("/{tenant}/{namespace}/{topic}/acknowledgeMessage") + @ApiOperation(value = "Acknowledge a single message by its message ID on specified subscriptions (or all if none specified).") @ApiResponses(value = { @ApiResponse(code = 204, message = "Operation successful"), @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), - @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"), - @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 404, message = "Namespace or topic or subscription does not exist"), - @ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"), - @ApiResponse(code = 412, message = "Failed to cancel delayed message or invalid parameters"), + @ApiResponse(code = 401, message = "Client is not authorized to perform this operation"), + @ApiResponse(code = 403, message = "Client does not have permission to acknowledge messages on this topic"), + @ApiResponse(code = 404, message = "Namespace, topic, or subscription does not exist"), + @ApiResponse(code = 405, message = "Operation not allowed on a non-persistent topic"), + @ApiResponse(code = 412, message = "Failed to acknowledge message due to invalid parameters or incompatible subscription type"), @ApiResponse(code = 500, message = "Internal server error")}) - public void cancelDelayedMessage( + public void acknowledgeMessage( @Suspended final AsyncResponse asyncResponse, @ApiParam(value = "Specify the tenant", required = true) @PathParam("tenant") String tenant, @@ -187,24 +187,24 @@ A new REST Admin API endpoint is added to `PersistentTopics`: @PathParam("namespace") String namespace, @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.") + @ApiParam(value = "Whether this broker is the authoritative owner of the topic. For internal use.") @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, - @ApiParam(value = "Ledger ID of the target delayed message", required = true) + @ApiParam(value = "Ledger ID of the message to acknowledge.", required = true) @QueryParam("ledgerId") long ledgerId, - @ApiParam(value = "Entry ID of the target delayed message", required = true) + @ApiParam(value = "Entry ID of the message to acknowledge.", required = true) @QueryParam("entryId") long entryId, - @ApiParam(value = "List of subscription names to cancel on (empty or null for all subscriptions)") + @ApiParam(value = "List of subscription names to acknowledge on. If empty or not specified, the operation is applied to all subscriptions.") @QueryParam("subscriptionNames") List subscriptionNames) { try { validateTopicName(tenant, namespace, encodedTopic); if (ledgerId < 0 || entryId < 0) { asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, - "ledgerId, entryId must be positive.")); + "ledgerId and entryId must be non-negative.")); return; } List finalSubscriptionNames = (subscriptionNames == null || subscriptionNames.isEmpty()) ? null : subscriptionNames; - internalCancelDelayedMessage(asyncResponse, ledgerId, entryId, + internalAcknowledgeMessage(asyncResponse, ledgerId, entryId, finalSubscriptionNames, authoritative); } catch (WebApplicationException wae) { asyncResponse.resume(wae); @@ -222,7 +222,7 @@ No changes are made to the Pulsar binary protocol. **No new configuration parameters are introduced. However, this feature's performance and behavior under heavy load are directly influenced by existing `ManagedLedger` configurations that govern the persistence of acknowledgement holes.** -**Administrators should be aware of these settings if they expect a high volume of delayed message cancellations:** +**Administrators should be aware of these settings if they expect a high volume of message cancellations:** ``` # Max number of "acknowledgment holes" that are going to be persistently stored. @@ -284,48 +284,30 @@ managedLedgerInfoCompressionThresholdInBytes=16384 A new command is added to the `pulsar-admin topics` tool. -* **Command:** `cancel-delayed-message` -* **Synopsis:** `pulsar-admin topics cancel-delayed-message [options]` +* **Command:** `acknowledgeMessage` +* **Synopsis:** `pulsar-admin topics acknowledgeMessage [options]` * **Parameters:** * `topic-name` (String, required): The full name of the topic (e.g., `persistent://tenant/namespace/topic`). * **Options:** - * `-l, --ledgerId ` (required): Ledger ID of the message to cancel. - * `-e, --entryId ` (required): Entry ID of the message to cancel. - * `-s, --subscriptionNames ` (optional): Comma-separated list of subscription names. If not specified, applies to all subscriptions on the topic. - + * `-l, --ledgerId ` (required): Ledger ID of the message to acknowledge. + * `-e, --entryId ` (required): Entry ID of the message to acknowledge. + * `-s, --subscriptionNames ` (required): Comma-separated list of subscription names. * **Example:** ```bash - pulsar-admin topics cancel-delayed-message persistent://public/default/my-orders \ - --ledgerId 12345 --entryId 100 --subscriptionNames my-app-subscription + # Acknowledge a specific message for two subscriptions + pulsar-admin topics acknowledgeMessage persistent://public/default/my-topic \ + --ledgerId 12345 --entryId 100 \ + -s sub1,sub2 ``` # Backward & Forward Compatibility ## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations -The `cancelDelayedMessage` operation is fundamentally a per-cluster operation that modifies a subscription's local cursor. It has no direct interaction with geo-replication. - -* A delayed message's scheduling state is local to the cluster where it is being managed. -* To cancel a delayed message on a replicated topic across multiple clusters, the admin command must be issued to each cluster individually. +This operation is local to a cluster and a subscription's cursor. To acknowledge a message on a replicated topic in multiple clusters, the admin command must be executed against each cluster. Geo-replication state is not affected. # Alternatives -1. **Client-Side Filtering (Current Workaround):** - -* **Description:** Consumers receive all delayed messages and filter out unwanted ones based on application logic. -* **Reason for Rejection:** Inefficient, wastes resources, and doesn't prevent broker-level delivery. - -2. **Mark Message with Special Properties:** - -* **Description:** Send a regular Pulsar message with special properties indicating it's a "cancel marker" for - another target message. The `Dispatcher` would inspect message properties. The PR initially included an implementation (Patch 1/6) using this approach by modifying `AbstractBaseDispatcher` to check for properties like `IS_MARK_DELETE_DELAY_MESSAGE` and `DELAY_CANCELED_MESSAGE_POSITION`. -* **Reason for Rejection:** The property-based approach might be less direct and could complicate dispatcher logic. The Admin API provides a more explicit control plane operation. - -3. **In-Tracker Cancellation Commands:** - -* **Description:** This approach involved modifying the `BucketDelayedDeliveryTracker` itself. A new `CANCEL` operation type would be added to the `DelayedIndex` protobuf message. When a cancellation was requested, a new `DelayedIndex` record with this `CANCEL` type would be created and persisted in the bucket snapshots. During dispatch, the tracker would check for these cancel markers and skip delivering the corresponding messages. -* **Reason for Rejection:** This design introduced a critical reliability problem. If a cancellation request was processed and added to the in-memory `lastMutableBucket`, but the broker crashed before this bucket was sealed and persisted to BookKeeper, the cancellation record would be lost forever. The client would have received a success response, but the message would still be delivered after the broker restarted. Guaranteeing the persistence of this cancel command before returning success would introduce unacceptable latency. The chosen approach of using acknowledgements avoids this by leveraging a proven, robust persistence mechanism. - # Links * Mailing List discussion thread: https://lists.apache.org/thread/lo182ztgrkzlq6mbkytj8krd050yvb9w From fa751bf1d90e83606c63739ec95b70afb421d941 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Thu, 17 Jul 2025 22:10:12 +0800 Subject: [PATCH 5/9] docs(pip): enhance pip-423.md with skipMessages API details --- pip/pip-423.md | 285 ++++++++++++++++++++----------------------------- 1 file changed, 113 insertions(+), 172 deletions(-) diff --git a/pip/pip-423.md b/pip/pip-423.md index ad46a5f65488a..a9c8e7ce860c4 100644 --- a/pip/pip-423.md +++ b/pip/pip-423.md @@ -3,10 +3,10 @@ # Background knowledge * **Message Identification (MessageId):** In Pulsar, every message is uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an `entryId`, and optionally a partition index. The combination of `ledgerId` and `entryId` (often represented as a `Position`) uniquely identifies a message's physical storage location within a topic. A producer receives a `MessageId` for each message it successfully sends. -* **Subscriptions and Cursors:** Each subscription on a topic maintains its own "cursor" which tracks consumption progress. It maintains a `mark-delete position`, indicating that all messages up to that point have been acknowledged. -* **Individual Acknowledgement:** Pulsar subscriptions support different acknowledgement modes. While cumulative acknowledgement is the default for some types, `Shared` and `Key_Shared` subscriptions heavily rely on **individual acknowledgement**. This allows a consumer to acknowledge a single message, even if it's out of order. When a message is acknowledged individually ahead of the `mark-delete position`, it creates an "acknowledgement hole". The broker's `ManagedCursor` must persistently track these holes to ensure that acknowledged messages are not redelivered after a broker restart. -* **Delayed Messages:** Pulsar supports scheduling messages for future delivery. This is a primary use case that benefits from this proposal, as it allows a scheduled message to be effectively "cancelled" before its delivery time. -* **Existing `skip-messages` API:** Pulsar has an admin API to `skip` a specified *number* of messages for a subscription. This is useful for bulk-skipping, but it lacks the precision to target a single, specific message within a large backlog, especially if its exact sequence is unknown. +* **Subscriptions and Cursors:** Each subscription on a topic maintains its own "cursor" which tracks consumption progress. For subscription types like `Shared` or `Key_Shared`, the cursor can track individually acknowledged messages, even if they are out of order relative to the main consumption progress marker (`mark-delete position`). +* **Individual Acknowledgement:** Pulsar subscriptions support different acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on **individual acknowledgement**. This allows a consumer to acknowledge a single message. When a message is acknowledged individually ahead of the `mark-delete position`, the broker's `ManagedCursor` persistently tracks this to ensure that acknowledged messages are not redelivered after a broker or consumer restart. +* **Delayed Messages:** Pulsar supports scheduling messages for future delivery. A primary use case for this proposal is to allow a scheduled message to be effectively "cancelled" by acknowledging it before its delivery time. +* **Existing `skipMessages` API:** Pulsar has an admin API to `skip` a specified *number* of messages for a subscription. This is useful for bulk-skipping but lacks the precision to target a single, specific message within a large backlog, especially if its exact sequence is unknown. This proposal extends this existing API to add the ability to skip messages by their specific `MessageId`. # Motivation @@ -16,204 +16,144 @@ Operators and SREs occasionally need to intervene in a topic's backlog to handle * **Removing Backlogs:** A specific message in a backlog might have a malformed payload that causes consumer applications to crash repeatedly. Removing this single message without affecting valid messages around it is a critical operational capability. * **Manual Business Logic Correction:** An event may have been sent that is later determined to be invalid due to external factors. An administrator needs a precise tool to remove this specific event from a subscription's queue. -The existing `skip-messages` API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal introduces a more fine-grained administrative tool to acknowledge a single message by its unique `MessageId`. This provides a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog. +The existing `skipMessages(numMessages)` API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal enhances the `skipMessages` API to accept specific message IDs, providing a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog. # Goals ## In Scope -* Provide a new Admin API endpoint (`acknowledgeMessage`) and corresponding `pulsar-admin` CLI command to individually acknowledge a single message for a subscription. -* The target message will be identified by its `ledgerId` and `entryId`. +* Enhance the existing Admin API endpoint and `pulsar-admin` CLI to support skipping specific messages for a subscription. +* Introduce a new CLI command `pulsar-admin topics skip-messages` for this purpose. +* The target message(s) will be identified by their `ledgerId` and `entryId`. * The implementation will leverage Pulsar's existing, robust `AckType.Individual` mechanism for persistence and reliability. * This feature will only be supported for subscription types that allow individual acknowledgements (e.g., `Shared`, `Key_Shared`). -* Ensure that once a message is successfully acknowledged via this API, it will not be delivered to any consumer on the targeted subscription. +* Ensure that once a message is successfully skipped via this API, it will not be delivered to any consumer on the targeted subscription. ## Out of Scope - -* Acknowledging a batch of messages by ID in a single command. This proposal focuses on a single message. -* Automatic acknowledgement across geo-replicated clusters. The command is a per-cluster administrative operation. +* Adding a new, separate Admin API endpoint. This feature enhances the existing `skip` endpoint. +* Automatic skipping of messages across geo-replicated clusters. The command is a per-cluster administrative operation. # High Level Design -The proposed solution provides a new administrative pathway to trigger Pulsar's existing individual acknowledgement capability on demand. +The proposed solution extends the existing administrative `skipMessages` API to trigger Pulsar's individual acknowledgement capability on demand. -1. **Initiate Acknowledgement:** An administrator initiates the action via the new `pulsar-admin topics acknowledgeMessage` command or its corresponding REST API endpoint. The request must specify the topic, the target `ledgerId` and `entryId` of the message, and one or more subscription names. +1. **Initiate Skip-by-ID:** An administrator initiates the action via the new `pulsar-admin topics skip-messages` command or its corresponding REST API call. The request must specify the topic, the target subscription, and a map of `ledgerId` to `entryId` for the messages to be skipped. -2. **Broker Receives Request:** The Pulsar broker that owns the topic partition receives the admin request. It validates the parameters and the administrator's permissions for the specified topic. +2. **Broker Receives Request:** The Pulsar broker that owns the topic partition receives the admin request. It validates the parameters and the administrator's permissions for the topic, re-using the existing `TopicOperation.SKIP` permission. The API call is an overload of the existing skip endpoint, where the number of messages to skip is specified as `0` in the URL path, and the message IDs are passed in the POST body. -3. **Delegate to Subscription:** For each specified subscription, the broker invokes a new `acknowledgeIndividualMessage` method on the `PersistentSubscription` object. +3. **Delegate to Subscription:** The broker invokes a new `skipMessages(Map messageIds)` method on the target `PersistentSubscription` object. 4. **Perform Individual Acknowledgement:** Inside the `PersistentSubscription`, the following occurs: - * It verifies that the subscription's type is compatible with individual acknowledgement. - * It constructs a `Position` object from the provided `ledgerId` and `entryId`. - * It calls its internal `acknowledgeMessage()` method with `AckType.Individual` for that single position. + * It verifies that the subscription's type is compatible with individual acknowledgement (i.e., not cumulative). + * It constructs `Position` objects from the provided ledger and entry IDs. + * It calls its internal `acknowledgeMessage()` method with `AckType.Individual` for the specified positions. This is functionally identical to a consumer individually acknowledging the messages. -5. **Persistence and Effect:** This action is functionally identical to a consumer acknowledging the message. The `ManagedCursor` for the subscription records this individual acknowledgement, which is then persisted according to the cluster's configuration. +5. **Persistence and Effect:** The `ManagedCursor` for the subscription records these individual acknowledgements, which are persisted. * For a **regular message** in the backlog, it is marked as consumed for that subscription and will not be delivered. - * For a **delayed message**, it is marked as consumed before the `DelayedDeliveryTracker` even attempts to schedule it. The message is thus effectively **cancelled**. + * For a **delayed message**, it is marked as consumed before the `DelayedDeliveryTracker` attempts to schedule it. The message is thus effectively **cancelled**. -This design is simple, robust, and minimally invasive, as it builds upon the broker's proven message acknowledgement foundation. +This design is simple and robust as it builds upon the broker's proven message acknowledgement foundation while cleanly extending an existing administrative API. # Detailed Design ## Design & Implementation Details +The core of the implementation involves adding a new method to the `Subscription` interface and implementing it in `PersistentSubscription` to leverage the existing individual acknowledgment mechanism. + 1. **Subscription Interface Extension:** - The `Subscription` interface is extended with a new default method for this operation. + The `Subscription` interface is extended with a new method to handle skipping by message IDs. + ```java - public interface Subscription { - /** - * Acknowledge a single message for the subscription. - * - * @param ledgerId The ledger ID of the message to acknowledge. - * @param entryId The entry ID of the message to acknowledge. - * @return A CompletableFuture that completes with true if the request was accepted. - */ - default CompletableFuture acknowledgeIndividualMessage(long ledgerId, long entryId) { - return CompletableFuture.completedFuture(false); - } - } +// in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +public interface Subscription extends MessageExpirer { + // ... existing methods + CompletableFuture skipMessages(int numMessagesToSkip); + + CompletableFuture skipMessages(Map messageIds); + // ... existing methods +} ``` 2. **PersistentSubscription Implementation:** - The `PersistentSubscription` class provides the concrete implementation. -```java - // In PersistentSubscription.java - @Override - public CompletableFuture acknowledgeIndividualMessage(long ledgerId, long entryId) { - // This operation is only valid for subscription types that track individual messages. - if (Subscription.isCumulativeAckMode(getType())) { - log.warn("Cannot acknowledge single message on subscription {} of type {} which uses cumulative ack.", - getName(), getType()); - return CompletableFuture.completedFuture(false); - } - - Position position = PositionFactory.create(ledgerId, entryId); - List positions = Collections.singletonList(position); - Map properties = Collections.emptyMap(); + The `PersistentSubscription` class provides the concrete implementation. It converts the message ID map into `Position` objects and uses the standard individual acknowledge flow. - // Acknowledge the single message. This will be persisted in the cursor. - acknowledgeMessage(positions, AckType.Individual, properties); - return CompletableFuture.completedFuture(true); +```java +// in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +@Override +public CompletableFuture skipMessages(Map messageIds) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Skipping messages by messageIds, current backlog {}", topicName, subName, + cursor.getNumberOfEntriesInBacklog(false)); } -``` - -3. **Admin API and Topic Logic:** - * New methods (`internalAcknowledgeMessage`, etc.) are added to `PersistentTopicsBase` to orchestrate the operation. - * The logic handles fanning out requests to all partitions for a partitioned topic. - * For a given topic and subscription, it calls the `acknowledgeMessage(ledgerId, entryId)` method on the corresponding `Subscription` object. - -4. **Authorization:** - A new `TopicOperation` enum value, `ACKNOWLEDGE_MESSAGE`, is added. This allows the operation to be secured by Pulsar's role-based access control. A client role must be granted the `acknowledge_message` permission on a topic to use this feature. - -## Public-facing Changes - -### Public API - -A new REST Admin API endpoint is added to `PersistentTopics`: -* **Path (v1):** `POST /admin/v1/persistent/{property}/{cluster}/{namespace}/{topic}/acknowledgeMessage` + if (Subscription.isCumulativeAckMode(getType())) { + return CompletableFuture.failedFuture(new NotAllowedException("Unsupported subscription type.")); + } -```java - @POST - @Path("/{property}/{cluster}/{namespace}/{topic}/acknowledgeMessage") - @ApiOperation(hidden = true, value = "Acknowledge a single message by its message ID on specified subscriptions (or all if none specified).") - @ApiResponses(value = { - @ApiResponse(code = 204, message = "Operation successful"), - @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), - @ApiResponse(code = 401, message = "Client is not authorized to perform this operation"), - @ApiResponse(code = 403, message = "Client does not have permission to acknowledge messages on this topic"), - @ApiResponse(code = 404, message = "Namespace, topic, or subscription does not exist"), - @ApiResponse(code = 405, message = "Operation not allowed on a non-persistent topic"), - @ApiResponse(code = 412, message = "Failed to acknowledge message due to invalid parameters or incompatible subscription type"), - @ApiResponse(code = 500, message = "Internal server error")}) - public void acknowledgeMessage( - @Suspended final AsyncResponse asyncResponse, - @ApiParam(value = "Specify the property (tenant)", required = true) - @PathParam("property") String property, - @ApiParam(value = "Specify the cluster", required = true) - @PathParam("cluster") String cluster, - @ApiParam(value = "Specify the namespace", required = true) - @PathParam("namespace") String namespace, - @ApiParam(value = "Specify topic name", required = true) - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "Whether this broker is the authoritative owner of the topic. For internal use.") - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, - @ApiParam(value = "Ledger ID of the message to acknowledge.", required = true) - @QueryParam("ledgerId") long ledgerId, - @ApiParam(value = "Entry ID of the message to acknowledge.", required = true) - @QueryParam("entryId") long entryId, - @ApiParam(value = "List of subscription names to acknowledge on. If empty or not specified, the operation is applied to all subscriptions.") - @QueryParam("subscriptionNames") List subscriptionNames) { + List positions = new ArrayList<>(); + for (Map.Entry entry : messageIds.entrySet()) { try { - validateTopicName(property, cluster, namespace, encodedTopic); - if (ledgerId < 0 || entryId < 0) { - asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, - "ledgerId and entryId must be non-negative.")); - return; - } - List finalSubscriptionNames = (subscriptionNames == null || subscriptionNames.isEmpty()) - ? null : subscriptionNames; - internalAcknowledgeMessage(asyncResponse, ledgerId, entryId, - finalSubscriptionNames, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); + long ledgerId = Long.parseLong(entry.getKey()); + long entryId = Long.parseLong(entry.getValue()); + Position position = PositionFactory.create(ledgerId, entryId); + positions.add(position); } catch (Exception e) { - asyncResponse.resume(new RestException(e)); + return CompletableFuture.failedFuture(new NotAllowedException("Invalid message ID.")); } } + + Map properties = Collections.emptyMap(); + acknowledgeMessage(positions, AckType.Individual, properties); + + return CompletableFuture.completedFuture(null); +} ``` -* **Path (v2):** `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/acknowledgeMessage` +3. **Admin API Logic:** + The `PersistentTopicsBase` class is updated to handle the overloaded `skipMessages` request. When `numMessages` is 0 and the `messageIds` map is not empty, it routes the request to the new `subscription.skipMessages(Map)` method. ```java - @POST - @Path("/{tenant}/{namespace}/{topic}/acknowledgeMessage") - @ApiOperation(value = "Acknowledge a single message by its message ID on specified subscriptions (or all if none specified).") - @ApiResponses(value = { - @ApiResponse(code = 204, message = "Operation successful"), - @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), - @ApiResponse(code = 401, message = "Client is not authorized to perform this operation"), - @ApiResponse(code = 403, message = "Client does not have permission to acknowledge messages on this topic"), - @ApiResponse(code = 404, message = "Namespace, topic, or subscription does not exist"), - @ApiResponse(code = 405, message = "Operation not allowed on a non-persistent topic"), - @ApiResponse(code = 412, message = "Failed to acknowledge message due to invalid parameters or incompatible subscription type"), - @ApiResponse(code = 500, message = "Internal server error")}) - public void acknowledgeMessage( - @Suspended final AsyncResponse asyncResponse, - @ApiParam(value = "Specify the tenant", required = true) - @PathParam("tenant") String tenant, - @ApiParam(value = "Specify the namespace", required = true) - @PathParam("namespace") String namespace, - @ApiParam(value = "Specify topic name", required = true) - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "Whether this broker is the authoritative owner of the topic. For internal use.") - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, - @ApiParam(value = "Ledger ID of the message to acknowledge.", required = true) - @QueryParam("ledgerId") long ledgerId, - @ApiParam(value = "Entry ID of the message to acknowledge.", required = true) - @QueryParam("entryId") long entryId, - @ApiParam(value = "List of subscription names to acknowledge on. If empty or not specified, the operation is applied to all subscriptions.") - @QueryParam("subscriptionNames") List subscriptionNames) { - try { - validateTopicName(tenant, namespace, encodedTopic); - if (ledgerId < 0 || entryId < 0) { - asyncResponse.resume(new RestException(Response.Status.PRECONDITION_FAILED, - "ledgerId and entryId must be non-negative.")); - return; - } - List finalSubscriptionNames = (subscriptionNames == null || subscriptionNames.isEmpty()) - ? null : subscriptionNames; - internalAcknowledgeMessage(asyncResponse, ledgerId, entryId, - finalSubscriptionNames, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } - } +// in pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +protected void internalSkipMessages(AsyncResponse asyncResponse, String subName, int numMessages, + boolean authoritative, Map messageIds) { + // ... + // In the implementation logic for the topic: + // ... + if (!messageIds.isEmpty() && numMessages == 0) { + return sub.skipMessages(messageIds).thenAccept(unused -> { + log.info("[{}] Skipped messages on {} {}", clientAppId(), topicName, subName); + asyncResponse.resume(Response.noContent().build()); + } + ); + } + return sub.skipMessages(numMessages).thenAccept(unused -> { + log.info("[{}] Skipped {} messages on {} {}", clientAppId(), numMessages, + topicName, subName); + // ... + }); + // ... +} ``` +## Public-facing Changes +The existing `skipMessages` API is modified to accept a POST body containing message IDs. + +### Public API +The REST endpoint for skipping messages is updated. To skip by message ID, a client must send a `POST` request with `numMessages` set to `0` in the path and provide a map of message IDs in the JSON body. + +* **Path (v2):** `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}` +* **Path (v1):** `POST /admin/v1/persistent/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}` +* **Path Parameters:** + * `numMessages`: Must be set to `0` to enable skipping by message ID. +* **HTTP Body Parameters (JSON):** A map where keys are `ledgerId` as a string and values are `entryId` as a string. + * **Example Body:** `{"12345": "100", "12345": "101"}` +* **Response Codes:** + * `204 No Content`: The operation was successful. + * `400 Bad Request`: Invalid message ID format in the request body. + * `403 Forbidden`: The client is not authorized to perform a `skip` operation on this topic. + * `404 Not Found`: The topic or subscription does not exist. + * `405 Method Not Allowed`: The subscription type does not support individual acknowledgement (e.g., `Exclusive`, `Failover`). + ### Binary protocol No changes are made to the Pulsar binary protocol. @@ -282,23 +222,24 @@ managedLedgerInfoCompressionThresholdInBytes=16384 ### CLI -A new command is added to the `pulsar-admin topics` tool. +A new CLI command is added to `pulsar-admin topics`. -* **Command:** `acknowledgeMessage` -* **Synopsis:** `pulsar-admin topics acknowledgeMessage [options]` -* **Parameters:** - * `topic-name` (String, required): The full name of the topic (e.g., `persistent://tenant/namespace/topic`). +* **Command:** `skip-messages` +* **Description:** Skip some messages for a subscription by their message IDs. +* **Usage:** `pulsar-admin topics skip-messages [options]` * **Options:** - * `-l, --ledgerId ` (required): Ledger ID of the message to acknowledge. - * `-e, --entryId ` (required): Entry ID of the message to acknowledge. - * `-s, --subscriptionNames ` (required): Comma-separated list of subscription names. + * `-s, --subscription ` (required): The subscription to skip messages on. + * `-m, --messageId ` (required): The message ID to skip. This option can be specified multiple times. * **Example:** - ```bash - # Acknowledge a specific message for two subscriptions - pulsar-admin topics acknowledgeMessage persistent://public/default/my-topic \ - --ledgerId 12345 --entryId 100 \ - -s sub1,sub2 - ``` +```bash +# Skip a single message for subscription 'my-sub' +pulsar-admin topics skip-messages persistent://public/default/my-topic \ + -s my-sub -m 12345=100 + +# Skip multiple messages +pulsar-admin topics skip-messages persistent://public/default/my-topic \ + -s my-sub -m 12345=100 -m 12345=101 +``` # Backward & Forward Compatibility From 96962a83b4d9b359512380152cdd5cd8896af5b0 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Sat, 2 Aug 2025 19:18:19 +0800 Subject: [PATCH 6/9] docs(pip-423): Separate endpoint for skipping by message ID --- pip/pip-423.md | 89 +++++++++++++++++++++++++------------------------- 1 file changed, 44 insertions(+), 45 deletions(-) diff --git a/pip/pip-423.md b/pip/pip-423.md index a9c8e7ce860c4..b2f3a0e213ec6 100644 --- a/pip/pip-423.md +++ b/pip/pip-423.md @@ -3,56 +3,57 @@ # Background knowledge * **Message Identification (MessageId):** In Pulsar, every message is uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an `entryId`, and optionally a partition index. The combination of `ledgerId` and `entryId` (often represented as a `Position`) uniquely identifies a message's physical storage location within a topic. A producer receives a `MessageId` for each message it successfully sends. -* **Subscriptions and Cursors:** Each subscription on a topic maintains its own "cursor" which tracks consumption progress. For subscription types like `Shared` or `Key_Shared`, the cursor can track individually acknowledged messages, even if they are out of order relative to the main consumption progress marker (`mark-delete position`). -* **Individual Acknowledgement:** Pulsar subscriptions support different acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on **individual acknowledgement**. This allows a consumer to acknowledge a single message. When a message is acknowledged individually ahead of the `mark-delete position`, the broker's `ManagedCursor` persistently tracks this to ensure that acknowledged messages are not redelivered after a broker or consumer restart. -* **Delayed Messages:** Pulsar supports scheduling messages for future delivery. A primary use case for this proposal is to allow a scheduled message to be effectively "cancelled" by acknowledging it before its delivery time. -* **Existing `skipMessages` API:** Pulsar has an admin API to `skip` a specified *number* of messages for a subscription. This is useful for bulk-skipping but lacks the precision to target a single, specific message within a large backlog, especially if its exact sequence is unknown. This proposal extends this existing API to add the ability to skip messages by their specific `MessageId`. +* **Subscriptions and Cursors:** Each subscription on a topic maintains its own "cursor" which tracks consumption progress. For subscription types like `Shared` or `Key_Shared`, the cursor can track individually acknowledged messages, even if they are out of order relative to the main consumption progress marker (`mark-delete position`). The cursor is responsible for ensuring that acknowledged messages are not redelivered. +* **Individual Acknowledgement:** Pulsar subscriptions support different acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on **individual acknowledgement**. This allows a consumer to acknowledge a single message. When a message is acknowledged individually ahead of the `mark-delete position`, the broker's `ManagedCursor` persistently tracks this to ensure that acknowledged messages are not redelivered after a broker or consumer restart. This proposal leverages this existing, robust mechanism. +* **Delayed Messages:** Pulsar supports scheduling messages for future delivery. A primary use case for this proposal is to allow a scheduled message to be effectively "cancelled" by acknowledging it before its delivery time. Since the message is marked as consumed by the cursor, the `DelayedDeliveryTracker` will not dispatch it. +* **Existing `skip` API:** Pulsar has an admin API to `skip` a specified *number* of messages for a subscription. This is useful for bulk-skipping but lacks the precision to target a single, specific message within a large backlog, especially if its exact sequence is unknown. This proposal provides a more precise way to skip messages by their specific `MessageId`. # Motivation Operators and SREs occasionally need to intervene in a topic's backlog to handle problematic messages or adapt to changing business requirements. For instance: -* **Cancelling Scheduled Actions:** A delayed message representing a future task (e.g., a scheduled report or a notification) may become obsolete. The most efficient way to handle this is to prevent its delivery entirely. -* **Removing Backlogs:** A specific message in a backlog might have a malformed payload that causes consumer applications to crash repeatedly. Removing this single message without affecting valid messages around it is a critical operational capability. -* **Manual Business Logic Correction:** An event may have been sent that is later determined to be invalid due to external factors. An administrator needs a precise tool to remove this specific event from a subscription's queue. +* **Cancelling Scheduled Actions:** A delayed message representing a future task (e.g., a scheduled report or a notification) may become obsolete. The most efficient way to handle this is to prevent its delivery entirely by acknowledging it pre-emptively. +* **Removing Backlogs:** A specific message in a backlog might have a malformed payload that causes consumer applications to crash repeatedly. Removing this single "poison pill" message without affecting valid messages around it is a critical operational capability. +* **Manual Business Logic Correction:** An event may have been sent that is later determined to be invalid due to external factors. An administrator needs a precise tool to remove this specific event from a subscription's delivery queue. -The existing `skipMessages(numMessages)` API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal enhances the `skipMessages` API to accept specific message IDs, providing a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog. +The existing `skip(numMessages)` API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal introduces an administrative API to skip messages by their specific `MessageId`, providing a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog. # Goals ## In Scope -* Enhance the existing Admin API endpoint and `pulsar-admin` CLI to support skipping specific messages for a subscription. -* Introduce a new CLI command `pulsar-admin topics skip-messages` for this purpose. +* Introduce a new Admin API endpoint and a corresponding `pulsar-admin` CLI command to support skipping specific messages for a subscription. * The target message(s) will be identified by their `ledgerId` and `entryId`. * The implementation will leverage Pulsar's existing, robust `AckType.Individual` mechanism for persistence and reliability. * This feature will only be supported for subscription types that allow individual acknowledgements (e.g., `Shared`, `Key_Shared`). * Ensure that once a message is successfully skipped via this API, it will not be delivered to any consumer on the targeted subscription. +* Support for both partitioned and non-partitioned topics. ## Out of Scope -* Adding a new, separate Admin API endpoint. This feature enhances the existing `skip` endpoint. -* Automatic skipping of messages across geo-replicated clusters. The command is a per-cluster administrative operation. + +* Modifying the existing `skip/{numMessages}` endpoint. A new, dedicated endpoint will be created for clarity. +* Automatic skipping of messages across geo-replicated clusters. The command is a per-cluster administrative operation that must be run on each cluster where the skip is needed. # High Level Design -The proposed solution extends the existing administrative `skipMessages` API to trigger Pulsar's individual acknowledgement capability on demand. +The proposed solution introduces a new admin API that triggers Pulsar's individual acknowledgement capability on demand for specific messages. -1. **Initiate Skip-by-ID:** An administrator initiates the action via the new `pulsar-admin topics skip-messages` command or its corresponding REST API call. The request must specify the topic, the target subscription, and a map of `ledgerId` to `entryId` for the messages to be skipped. +1. **Initiate Skip-by-ID:** An administrator initiates the action via the new `pulsar-admin topics skip-messages` command or its corresponding REST API call. The request must specify the topic, the target subscription, and a list of message IDs (`ledgerId:entryId`) to be skipped. -2. **Broker Receives Request:** The Pulsar broker that owns the topic partition receives the admin request. It validates the parameters and the administrator's permissions for the topic, re-using the existing `TopicOperation.SKIP` permission. The API call is an overload of the existing skip endpoint, where the number of messages to skip is specified as `0` in the URL path, and the message IDs are passed in the POST body. +2. **Broker Receives Request:** The Pulsar broker receives the admin request for the new endpoint `.../subscription/{subName}/skipByMessageIds`. It validates the administrator's permissions for the topic, re-using the existing `TopicOperation.SKIP` authorization rule. -3. **Delegate to Subscription:** The broker invokes a new `skipMessages(Map messageIds)` method on the target `PersistentSubscription` object. +3. **Delegate to Subscription:** The broker, after validating topic ownership and permissions, invokes a new method `skipMessages(Map messageIds)` on the target `PersistentSubscription` object. For partitioned topics, the request is scattered to all partitions, and each partition broker performs this action. 4. **Perform Individual Acknowledgement:** Inside the `PersistentSubscription`, the following occurs: - * It verifies that the subscription's type is compatible with individual acknowledgement (i.e., not cumulative). + * It verifies that the subscription's type supports individual acknowledgements. * It constructs `Position` objects from the provided ledger and entry IDs. - * It calls its internal `acknowledgeMessage()` method with `AckType.Individual` for the specified positions. This is functionally identical to a consumer individually acknowledging the messages. + * It calls its internal `acknowledgeMessage()` method with `AckType.Individual` for the specified positions. This is functionally identical to what would happen if a consumer individually acknowledged the message. -5. **Persistence and Effect:** The `ManagedCursor` for the subscription records these individual acknowledgements, which are persisted. +5. **Persistence and Effect:** The `ManagedCursor` for the subscription records these individual acknowledgements, which are persisted to metadata storage. * For a **regular message** in the backlog, it is marked as consumed for that subscription and will not be delivered. * For a **delayed message**, it is marked as consumed before the `DelayedDeliveryTracker` attempts to schedule it. The message is thus effectively **cancelled**. -This design is simple and robust as it builds upon the broker's proven message acknowledgement foundation while cleanly extending an existing administrative API. +This design is simple and robust as it builds upon the broker's proven message acknowledgement foundation while providing a clean, dedicated administrative API for this precise operational task. # Detailed Design @@ -75,7 +76,8 @@ public interface Subscription extends MessageExpirer { ``` 2. **PersistentSubscription Implementation:** - The `PersistentSubscription` class provides the concrete implementation. It converts the message ID map into `Position` objects and uses the standard individual acknowledge flow. + +The `PersistentSubscription` class provides the concrete implementation. It converts the message ID map into `Position` objects and uses the standard individual acknowledge flow. ```java // in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -110,27 +112,22 @@ public CompletableFuture skipMessages(Map messageIds) { ``` 3. **Admin API Logic:** - The `PersistentTopicsBase` class is updated to handle the overloaded `skipMessages` request. When `numMessages` is 0 and the `messageIds` map is not empty, it routes the request to the new `subscription.skipMessages(Map)` method. +The `PersistentTopicsBase` class is updated with a new `internalSkipByMessageIds` method which handles the request and calls the `subscription.skipMessages(Map)` method. It also includes logic to handle partitioned topics by fanning out the request to each partition owner. ```java // in pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java -protected void internalSkipMessages(AsyncResponse asyncResponse, String subName, int numMessages, - boolean authoritative, Map messageIds) { - // ... - // In the implementation logic for the topic: +protected void internalSkipByMessageIds(AsyncResponse asyncResponse, String subName, boolean authoritative, + Map messageIds) { + // ... validate operation and ownership ... + + // Logic for a single partition / non-partitioned topic // ... - if (!messageIds.isEmpty() && numMessages == 0) { - return sub.skipMessages(messageIds).thenAccept(unused -> { - log.info("[{}] Skipped messages on {} {}", clientAppId(), topicName, subName); - asyncResponse.resume(Response.noContent().build()); - } - ); - } - return sub.skipMessages(numMessages).thenAccept(unused -> { - log.info("[{}] Skipped {} messages on {} {}", clientAppId(), numMessages, - topicName, subName); - // ... - }); + Subscription sub = topic.getSubscription(subName); + if (sub == null) { + return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topic.getName(), subName))); + } + return sub.skipMessages(messageIds); // ... } ``` @@ -139,20 +136,22 @@ protected void internalSkipMessages(AsyncResponse asyncResponse, String subName, The existing `skipMessages` API is modified to accept a POST body containing message IDs. ### Public API -The REST endpoint for skipping messages is updated. To skip by message ID, a client must send a `POST` request with `numMessages` set to `0` in the path and provide a map of message IDs in the JSON body. -* **Path (v2):** `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}` -* **Path (v1):** `POST /admin/v1/persistent/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}` +A new REST endpoint is added for skipping specific messages. + +* **Path (v2):** `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/skipByMessageIds` +* **Path (v1):** `POST /admin/v1/persistent/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/skipByMessageIds` * **Path Parameters:** - * `numMessages`: Must be set to `0` to enable skipping by message ID. + * All path parameters identify the target subscription. * **HTTP Body Parameters (JSON):** A map where keys are `ledgerId` as a string and values are `entryId` as a string. - * **Example Body:** `{"12345": "100", "12345": "101"}` + * **Example Body:** `{"12345": "100", "12346": "200"}` * **Response Codes:** * `204 No Content`: The operation was successful. - * `400 Bad Request`: Invalid message ID format in the request body. - * `403 Forbidden`: The client is not authorized to perform a `skip` operation on this topic. + * `400 Bad Request`: Invalid message ID format in the request body (e.g., non-numeric ledgerId or entryId). + * `403 Forbidden`: The client is not authorized to perform a `SKIP` operation on this topic. * `404 Not Found`: The topic or subscription does not exist. * `405 Method Not Allowed`: The subscription type does not support individual acknowledgement (e.g., `Exclusive`, `Failover`). + * `500 Internal Server Error`: An unexpected error occurred in the broker. ### Binary protocol From 920aecf9dda3b31a0c04bf302d3f0179857ec287 Mon Sep 17 00:00:00 2001 From: sinan liu Date: Sat, 11 Oct 2025 19:40:25 +0800 Subject: [PATCH 7/9] Update pip-423.md --- pip/pip-423.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pip/pip-423.md b/pip/pip-423.md index b2f3a0e213ec6..20451c6ca3075 100644 --- a/pip/pip-423.md +++ b/pip/pip-423.md @@ -251,4 +251,4 @@ This operation is local to a cluster and a subscription's cursor. To acknowledge # Links * Mailing List discussion thread: https://lists.apache.org/thread/lo182ztgrkzlq6mbkytj8krd050yvb9w -* Mailing List voting thread: +* Mailing List voting thread: https://lists.apache.org/thread/7jbc3h42no9whjrpd6q0kmsyw985d7zo From 767b603a362ed0ccc53539fbd6851877b3a50d42 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Sun, 9 Nov 2025 17:57:46 +0800 Subject: [PATCH 8/9] docs(pip-423): Enhance pip-423.md with details on skipping messages by MessageId and batch index support --- pip/pip-423.md | 160 ++++++++++++++++++++++++++++--------------------- 1 file changed, 92 insertions(+), 68 deletions(-) diff --git a/pip/pip-423.md b/pip/pip-423.md index 20451c6ca3075..3f8709f1ff266 100644 --- a/pip/pip-423.md +++ b/pip/pip-423.md @@ -3,8 +3,9 @@ # Background knowledge * **Message Identification (MessageId):** In Pulsar, every message is uniquely identified by its `MessageId`, which encapsulates a `ledgerId`, an `entryId`, and optionally a partition index. The combination of `ledgerId` and `entryId` (often represented as a `Position`) uniquely identifies a message's physical storage location within a topic. A producer receives a `MessageId` for each message it successfully sends. +* **Batch Messages:** To improve throughput, Pulsar producers can batch multiple individual messages into a single entry that is written to BookKeeper. In this case, the `MessageId` also contains a `batchIndex` to identify a specific message within the batch. The entry's metadata stores the total number of messages in the batch. * **Subscriptions and Cursors:** Each subscription on a topic maintains its own "cursor" which tracks consumption progress. For subscription types like `Shared` or `Key_Shared`, the cursor can track individually acknowledged messages, even if they are out of order relative to the main consumption progress marker (`mark-delete position`). The cursor is responsible for ensuring that acknowledged messages are not redelivered. -* **Individual Acknowledgement:** Pulsar subscriptions support different acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on **individual acknowledgement**. This allows a consumer to acknowledge a single message. When a message is acknowledged individually ahead of the `mark-delete position`, the broker's `ManagedCursor` persistently tracks this to ensure that acknowledged messages are not redelivered after a broker or consumer restart. This proposal leverages this existing, robust mechanism. +* **Individual Acknowledgement:** Pulsar subscriptions support different acknowledgement modes. `Shared` and `Key_Shared` subscriptions heavily rely on **individual acknowledgement**. This allows a consumer to acknowledge a single message, or even a single message within a batch. When a message is acknowledged individually, the broker's `ManagedCursor` persistently tracks this "acknowledgement hole" to ensure that acknowledged messages are not redelivered after a broker or consumer restart. This proposal leverages this existing, robust mechanism. * **Delayed Messages:** Pulsar supports scheduling messages for future delivery. A primary use case for this proposal is to allow a scheduled message to be effectively "cancelled" by acknowledging it before its delivery time. Since the message is marked as consumed by the cursor, the `DelayedDeliveryTracker` will not dispatch it. * **Existing `skip` API:** Pulsar has an admin API to `skip` a specified *number* of messages for a subscription. This is useful for bulk-skipping but lacks the precision to target a single, specific message within a large backlog, especially if its exact sequence is unknown. This proposal provides a more precise way to skip messages by their specific `MessageId`. @@ -13,17 +14,17 @@ Operators and SREs occasionally need to intervene in a topic's backlog to handle problematic messages or adapt to changing business requirements. For instance: * **Cancelling Scheduled Actions:** A delayed message representing a future task (e.g., a scheduled report or a notification) may become obsolete. The most efficient way to handle this is to prevent its delivery entirely by acknowledging it pre-emptively. -* **Removing Backlogs:** A specific message in a backlog might have a malformed payload that causes consumer applications to crash repeatedly. Removing this single "poison pill" message without affecting valid messages around it is a critical operational capability. +* **Removing Backlogs:** A specific message in a backlog might have a malformed payload that causes consumer applications to crash repeatedly. Removing this single "poison pill" message without affecting valid messages around it is a critical operational capability. This also applies to removing a single bad message from within a larger batch. * **Manual Business Logic Correction:** An event may have been sent that is later determined to be invalid due to external factors. An administrator needs a precise tool to remove this specific event from a subscription's delivery queue. -The existing `skip(numMessages)` API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal introduces an administrative API to skip messages by their specific `MessageId`, providing a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog. +The existing `skip(numMessages)` API is a blunt instrument, ill-suited for these precise, targeted operations. This proposal introduces an administrative API to skip messages by their specific `MessageId` (including `ledgerId`, `entryId`, and optional `batchIndex`), providing a robust and reliable way to remove any individual message—delayed or not—from a subscription's backlog. # Goals ## In Scope * Introduce a new Admin API endpoint and a corresponding `pulsar-admin` CLI command to support skipping specific messages for a subscription. -* The target message(s) will be identified by their `ledgerId` and `entryId`. +* The target message(s) will be identified by their `ledgerId`, `entryId`, and an optional `batchIndex` for messages within a batch. * The implementation will leverage Pulsar's existing, robust `AckType.Individual` mechanism for persistence and reliability. * This feature will only be supported for subscription types that allow individual acknowledgements (e.g., `Shared`, `Key_Shared`). * Ensure that once a message is successfully skipped via this API, it will not be delivered to any consumer on the targeted subscription. @@ -38,16 +39,17 @@ The existing `skip(numMessages)` API is a blunt instrument, ill-suited for these The proposed solution introduces a new admin API that triggers Pulsar's individual acknowledgement capability on demand for specific messages. -1. **Initiate Skip-by-ID:** An administrator initiates the action via the new `pulsar-admin topics skip-messages` command or its corresponding REST API call. The request must specify the topic, the target subscription, and a list of message IDs (`ledgerId:entryId`) to be skipped. +1. **Initiate Skip-by-ID:** An administrator initiates the action via the new `pulsar-admin topics skip-messages` command or its corresponding REST API call. The request specifies the topic, target subscription, and a list of message identifiers. These identifiers can be provided as a triplet of `ledgerId:entryId:batchIndex` or as Base64-encoded `MessageId` byte arrays. -2. **Broker Receives Request:** The Pulsar broker receives the admin request for the new endpoint `.../subscription/{subName}/skipByMessageIds`. It validates the administrator's permissions for the topic, re-using the existing `TopicOperation.SKIP` authorization rule. +2. **Broker Receives Request:** The Pulsar broker receives the admin request for the new endpoint `.../subscription/{subName}/skipByMessageIds`. It parses the flexible JSON payload and validates the administrator's permissions for the topic, re-using the existing `TopicOperation.SKIP` authorization rule. -3. **Delegate to Subscription:** The broker, after validating topic ownership and permissions, invokes a new method `skipMessages(Map messageIds)` on the target `PersistentSubscription` object. For partitioned topics, the request is scattered to all partitions, and each partition broker performs this action. +3. **Delegate to Subscription:** The broker, after validating topic ownership and permissions, invokes a new method `skipMessages(List entries)` on the target `PersistentSubscription` object. For partitioned topics, the request is scattered to all partition brokers, and each partition broker performs this action. 4. **Perform Individual Acknowledgement:** Inside the `PersistentSubscription`, the following occurs: * It verifies that the subscription's type supports individual acknowledgements. - * It constructs `Position` objects from the provided ledger and entry IDs. - * It calls its internal `acknowledgeMessage()` method with `AckType.Individual` for the specified positions. This is functionally identical to what would happen if a consumer individually acknowledged the message. + * For messages specified without a `batchIndex`, it constructs a `Position` object for the entire entry. + * For messages specified with a `batchIndex`, it first reads the entry from BookKeeper to get the batch metadata (e.g., batch size). It then constructs a `Position` object that includes an "ack set" (a bitset) indicating which messages within the batch are being acknowledged. + * It calls its internal `acknowledgeMessage()` method with `AckType.Individual` for all the constructed `Position` objects. 5. **Persistence and Effect:** The `ManagedCursor` for the subscription records these individual acknowledgements, which are persisted to metadata storage. * For a **regular message** in the backlog, it is marked as consumed for that subscription and will not be delivered. @@ -59,95 +61,101 @@ This design is simple and robust as it builds upon the broker's proven message a ## Design & Implementation Details -The core of the implementation involves adding a new method to the `Subscription` interface and implementing it in `PersistentSubscription` to leverage the existing individual acknowledgment mechanism. +The implementation introduces a new flexible request DTO, extends the `Subscription` interface, and implements the core logic in `PersistentSubscription`. -1. **Subscription Interface Extension:** - The `Subscription` interface is extended with a new method to handle skipping by message IDs. +1. **New Request DTO:** A new class `SkipMessageIdsRequest` is created to handle polymorphic JSON deserialization on the broker. This allows the API to accept multiple formats for specifying message IDs. +2. **Subscription Interface Extension:** The `Subscription` interface is extended with a new method. `SkipEntry` is an internal record holding the `ledgerId`, `entryId`, and an optional list of `batchIndexes`. ```java // in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java public interface Subscription extends MessageExpirer { // ... existing methods CompletableFuture skipMessages(int numMessagesToSkip); - CompletableFuture skipMessages(Map messageIds); + CompletableFuture skipMessages(List entries); // ... existing methods } ``` -2. **PersistentSubscription Implementation:** - -The `PersistentSubscription` class provides the concrete implementation. It converts the message ID map into `Position` objects and uses the standard individual acknowledge flow. +3. **PersistentSubscription Implementation:** The `PersistentSubscription` class provides the concrete implementation. It differentiates between full-entry acknowledgements and partial (batch) acknowledgements. ```java // in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @Override -public CompletableFuture skipMessages(Map messageIds) { - if (log.isDebugEnabled()) { - log.debug("[{}][{}] Skipping messages by messageIds, current backlog {}", topicName, subName, - cursor.getNumberOfEntriesInBacklog(false)); - } - +public CompletableFuture skipMessages(List entries) { if (Subscription.isCumulativeAckMode(getType())) { return CompletableFuture.failedFuture(new NotAllowedException("Unsupported subscription type.")); } - List positions = new ArrayList<>(); - for (Map.Entry entry : messageIds.entrySet()) { - try { - long ledgerId = Long.parseLong(entry.getKey()); - long entryId = Long.parseLong(entry.getValue()); - Position position = PositionFactory.create(ledgerId, entryId); - positions.add(position); - } catch (Exception e) { - return CompletableFuture.failedFuture(new NotAllowedException("Invalid message ID.")); - } + // Separate full-entry acks from partial (batchIndex) acks + List fullEntryPositions = new ArrayList<>(); + Map> partialAckIndexByPos = new HashMap<>(); + // ... logic to populate these collections from 'entries' + + // If there are partial acks, read the corresponding entries to get batch metadata + if (!partialAckIndexByPos.isEmpty()) { + Set positionsToLoad = ...; // positions for entries with batch acks + cursor.asyncReplayEntries(positionsToLoad, new AsyncCallbacks.ReadEntriesCallback() { + @Override + public void readEntriesComplete(List readEntries, Object ctx) { + // ... logic for each entry: + // 1. Parse MessageMetadata to get batch size. + // 2. Validate batch indexes. + // 3. Create a BitSet representing the ack state. + // 4. Create a Position with the ack set using AckSetStateUtil.createPositionWithAckSet(). + // 5. Add this special position to a final list. + + // Finally, acknowledge all positions (full and partial) + acknowledgeMessage(finalPositionsList, AckType.Individual, properties); + result.complete(null); + } + // ... handle failures + }); + } else { + // Only full-entry acks are present, no need to read entries + acknowledgeMessage(fullEntryPositions, AckType.Individual, properties); + return CompletableFuture.completedFuture(null); } - - Map properties = Collections.emptyMap(); - acknowledgeMessage(positions, AckType.Individual, properties); - - return CompletableFuture.completedFuture(null); + return result; } ``` -3. **Admin API Logic:** - -The `PersistentTopicsBase` class is updated with a new `internalSkipByMessageIds` method which handles the request and calls the `subscription.skipMessages(Map)` method. It also includes logic to handle partitioned topics by fanning out the request to each partition owner. -```java -// in pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java -protected void internalSkipByMessageIds(AsyncResponse asyncResponse, String subName, boolean authoritative, - Map messageIds) { - // ... validate operation and ownership ... - - // Logic for a single partition / non-partitioned topic - // ... - Subscription sub = topic.getSubscription(subName); - if (sub == null) { - return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, - getSubNotFoundErrorMessage(topic.getName(), subName))); - } - return sub.skipMessages(messageIds); - // ... -} -``` +4. **Admin API Logic:** The `PersistentTopicsBase` class is updated with a new `internalSkipByMessageIds` method that accepts the `SkipMessageIdsRequest` object, aggregates the requests into `SkipEntry` objects, and calls the `subscription.skipMessages` method. It also contains the logic to fan out the request to each partition for partitioned topics. ## Public-facing Changes -The existing `skipMessages` API is modified to accept a POST body containing message IDs. ### Public API A new REST endpoint is added for skipping specific messages. -* **Path (v2):** `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/skipByMessageIds` -* **Path (v1):** `POST /admin/v1/persistent/{property}/{cluster}/{namespace}/{topic}/subscription/{subName}/skipByMessageIds` +* **Path:** `POST /admin/v2/persistent/{tenant}/{namespace}/{topic}/subscription/{subName}/skipByMessageIds` * **Path Parameters:** * All path parameters identify the target subscription. -* **HTTP Body Parameters (JSON):** A map where keys are `ledgerId` as a string and values are `entryId` as a string. - * **Example Body:** `{"12345": "100", "12346": "200"}` +* **HTTP Body Parameters (JSON):** A flexible JSON object that supports multiple formats. + * **Format 1: Structured Message IDs (with batch index support)** + * **Description:** Recommended format, supports batch indexes. + * **Example Body:** + ```json + { + "type": "messageId", + "messageIds": [ + { "ledgerId": 12345, "entryId": 100 }, + { "ledgerId": 12346, "entryId": 200, "batchIndex": 5 } + ] + } + ``` + * **Format 2: Base64 Encoded Message IDs** + * **Description:** A simple list of Base64 encoded byte arrays from `MessageId.toByteArray()`. + * **Example Body:** + ```json + [ + "CLlgEAQwAA==", + "CLlgEAYwAA==" + ] + ``` * **Response Codes:** * `204 No Content`: The operation was successful. - * `400 Bad Request`: Invalid message ID format in the request body (e.g., non-numeric ledgerId or entryId). + * `400 Bad Request`: Invalid JSON payload or invalid message ID format (e.g., non-numeric ledgerId, invalid batchIndex). * `403 Forbidden`: The client is not authorized to perform a `SKIP` operation on this topic. * `404 Not Found`: The topic or subscription does not exist. * `405 Method Not Allowed`: The subscription type does not support individual acknowledgement (e.g., `Exclusive`, `Failover`). @@ -228,23 +236,39 @@ A new CLI command is added to `pulsar-admin topics`. * **Usage:** `pulsar-admin topics skip-messages [options]` * **Options:** * `-s, --subscription ` (required): The subscription to skip messages on. - * `-m, --messageId ` (required): The message ID to skip. This option can be specified multiple times. + * `--messageId-triplet ` (repeatable): The message ID to skip, specified as a triplet. `batchIndex` is optional. + * `--messageId-base64 ` (repeatable): A Base64-encoded `MessageId`. * **Example:** ```bash # Skip a single message for subscription 'my-sub' pulsar-admin topics skip-messages persistent://public/default/my-topic \ - -s my-sub -m 12345=100 + -s my-sub --messageId-triplet 12345:100 + +# Skip a single message within a batch (batchIndex 3) +pulsar-admin topics skip-messages persistent://public/default/my-topic \ + -s my-sub --messageId-triplet 12345:101:3 -# Skip multiple messages +# Skip multiple messages using different options pulsar-admin topics skip-messages persistent://public/default/my-topic \ - -s my-sub -m 12345=100 -m 12345=101 + -s my-sub --messageId-triplet 12345:102 --messageId-base64 "CLlgEAQwAA==" ``` # Backward & Forward Compatibility +## Upgrade + +This feature is purely additive. It introduces a new admin endpoint and CLI command. +* Upgrading brokers will enable this new functionality. Existing clients and operations are unaffected. +* Clients must be upgraded to a version that includes the new `pulsar-admin` command to use this feature. + +## Downgrade / Rollback + +* If brokers are downgraded to a version without this feature, the new admin endpoint will no longer be available, and calls to it will fail. +* No persistent state is changed in a way that would prevent a downgrade. Acknowledgement holes created by this API are stored in the same format as those created by regular consumer acknowledgements and will be understood by older broker versions. + ## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations -This operation is local to a cluster and a subscription's cursor. To acknowledge a message on a replicated topic in multiple clusters, the admin command must be executed against each cluster. Geo-replication state is not affected. +This operation is local to a subscription's cursor within a single cluster. It has no direct impact on geo-replication. To skip a message on a replicated topic in multiple clusters, the admin command must be executed against each cluster individually. # Alternatives From c66183685716861ddc768e494c784f5e0db1b582 Mon Sep 17 00:00:00 2001 From: Denovo1998 Date: Sun, 9 Nov 2025 18:29:07 +0800 Subject: [PATCH 9/9] docs(pip-423): Update pip-423.md with details on new SkipEntry model and Subscription interface extension --- pip/pip-423.md | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/pip/pip-423.md b/pip/pip-423.md index 3f8709f1ff266..e8f2c3fc51df7 100644 --- a/pip/pip-423.md +++ b/pip/pip-423.md @@ -61,11 +61,11 @@ This design is simple and robust as it builds upon the broker's proven message a ## Design & Implementation Details -The implementation introduces a new flexible request DTO, extends the `Subscription` interface, and implements the core logic in `PersistentSubscription`. +The implementation introduces a new flexible request DTO, an internal model for skip requests, extends the `Subscription` interface, and implements the core logic in `PersistentSubscription`. 1. **New Request DTO:** A new class `SkipMessageIdsRequest` is created to handle polymorphic JSON deserialization on the broker. This allows the API to accept multiple formats for specifying message IDs. -2. **Subscription Interface Extension:** The `Subscription` interface is extended with a new method. `SkipEntry` is an internal record holding the `ledgerId`, `entryId`, and an optional list of `batchIndexes`. +2. **Subscription Interface Extension:** The `Subscription` interface is extended with a new method that accepts a list of `SkipEntry` objects. ```java // in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java public interface Subscription extends MessageExpirer { @@ -77,7 +77,33 @@ public interface Subscription extends MessageExpirer { } ``` -3. **PersistentSubscription Implementation:** The `PersistentSubscription` class provides the concrete implementation. It differentiates between full-entry acknowledgements and partial (batch) acknowledgements. +3. **Internal Skip Model (`SkipEntry`):** The `SkipEntry` class serves as the internal data transfer object between the admin layer and the subscription logic. It encapsulates all the information needed to skip a full entry or specific messages within a batched entry. +```java +/** + * Internal model for skipping messages by entry, with optional batch indexes. + * If {@code batchIndexes} is null or empty, the whole entry is skipped. + */ +@Getter +public final class SkipEntry { + private final long ledgerId; + private final long entryId; + // null or empty => full entry + private final List batchIndexes; + + public SkipEntry(long ledgerId, long entryId, List batchIndexes) { + this.ledgerId = ledgerId; + this.entryId = entryId; + if (batchIndexes == null || batchIndexes.isEmpty()) { + this.batchIndexes = null; + } else { + // make a defensive copy + this.batchIndexes = List.copyOf(batchIndexes); + } + } +} +``` + +4. **PersistentSubscription Implementation:** The `PersistentSubscription` class provides the concrete implementation. It differentiates between full-entry acknowledgements and partial (batch) acknowledgements. ```java // in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -120,7 +146,7 @@ public CompletableFuture skipMessages(List entries) { } ``` -4. **Admin API Logic:** The `PersistentTopicsBase` class is updated with a new `internalSkipByMessageIds` method that accepts the `SkipMessageIdsRequest` object, aggregates the requests into `SkipEntry` objects, and calls the `subscription.skipMessages` method. It also contains the logic to fan out the request to each partition for partitioned topics. +5. **Admin API Logic:** The `PersistentTopicsBase` class is updated with a new `internalSkipByMessageIds` method that accepts the `SkipMessageIdsRequest` object, aggregates the requests into `SkipEntry` objects, and calls the `subscription.skipMessages` method. It also contains the logic to fan out the request to each partition for partitioned topics. ## Public-facing Changes