Skip to content

Conversation

@pperanich
Copy link
Collaborator

@pperanich pperanich commented Jan 15, 2026

There are many times where you may have a subscriber that does something slow, but doesn't need every message; for example visualization which may take a while for a particular message, but we don't need to render EVERY one of them. It would be ideal if these subscribers do NOT cause backpressure, and instead drop older messages.

This PR implements that feature using a few keyword arguments for InputStreams, as well as Subscribers.

  • leaky = False: If True, drop oldest messages when queue is full.
  • max_queue = None: Maximum queue size (ignored if leaky=False). If leaky = True, max_queue defaults to 1.

There is also a new example and tests to verify the functionality.

@griffinmilsap
Copy link
Collaborator

Love this addition!

Currently Subscribers/InputStreams take TWO parameters to dictate leaky behavior; one boolean to set it as a leaky queue and another to dictate how many messages should be queued before leaking occurs. This feels like it should just be one parameter, and I'd argue for just the leaky boolean.

  1. max_queue is bounded by num_buffers for a particular Publisher/channel connection
  2. Any particular subscriber can receive messages from multiple Publisher/channel connections that may have different num_buffers
  3. Pausing due to backpressure is handled by a Publisher's perception of backpressure. In order for a leaky subscriber to NOT cause backpressure, max_queue must be set to a MAXIMUM value of num_buffers - 1

I believe it's unexpected behavior when a leaky subscriber still causes backpressure because its max_queue size is too large, so I'd like to just set max_queue = 1 when its a leaky queue. This is morally equivalent to double buffering.

One very important note -- because of this, we actually end up with a weird edge case when someone sets num_buffers = 1 enforcing a "soft-realtime" condition with minimal latency -- leaky subscribers will also only buffer one message, but the publisher will still see this buffering and cause backpressure. While this is unexpected behavior, I do not believe there's any way to handle this edge case in the expected way. As such, it might be useful to update the documentation (and maybe issue a warning) when users run any publisher with num_buffers = 1 to indicate that backpressure will be enforced regardless of subscriber leakiness.

@griffinmilsap
Copy link
Collaborator

Alright, so I took another pass over this and got rid of an isinstance check in the hot path, and added some additional debug information for the confusing circumstance where leaky subscribers can still cause backpressure. I decided to leave the two parameters as there are possible use cases for setting max_queue to numbers other than 1. I think this is ready to go. Aside, Preston found that every subscriber is actually using recv_zero_copy and ... has been for a while now. See #209. This PR affects the code around that regression, and the fix for it will reference/rely on this PR being merged. @cboulay or @KonradPilch, lmk if you want to review, otherwise I'll merge this on my own sometime tomorrow.

@griffinmilsap griffinmilsap merged commit e524eaa into dev Jan 21, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants