-
Notifications
You must be signed in to change notification settings - Fork 9
Description
Background
Publishers serialize messages to subscribers via TCP or SHM (and sometimes via a module-level dictionary for "local" mode). Backend process then yields a deepcopy of that message to interested subscribers, to do .. well just about anything with.
Subscribers have a feature that allows them to receive a "zero-copy" view of the incoming message WITHOUT that deepcopy. All subscribers in that process (and in the case of SHM channels, all subscribers on the machine) have a (hopefully read-only) view of the same chunk of memory which results in a FAST transfer with the danger that if any subscriber mutates the message, all other subscribers will see that mutation -- which is most-likely REALLY bad and difficult to debug. The idea was to give a higher-performance shortcut to developers who PROMISE not to mutate the object, and they OPT-IN to this functionality by creating an InputStream with zero_copy = True
But...
In the current dev branch zero_copy is ignored. BackendProcess just uses recv_zero_copy every time.
ezmsg/src/ezmsg/core/backendprocess.py
Lines 409 to 425 in eaf8068
| while True: | |
| if not callables: | |
| sub.close() | |
| await sub.wait_closed() | |
| break | |
| async with sub.recv_zero_copy() as msg: | |
| try: | |
| for callable in list(callables): | |
| try: | |
| await callable(msg) | |
| except (Complete, NormalTermination): | |
| callables.remove(callable) | |
| finally: | |
| del msg | |
| if len(callables) > 1: | |
| await asyncio.sleep(0) |
This wasn't always the case; As can be seen in this commit from 2022:
ezmsg/ezmsg/core/backendprocess.py
Lines 206 to 213 in 0504747
| while True: | |
| if getattr(task, ZERO_COPY_ATTR) == False: | |
| msg = await sub.recv() | |
| await handle_message(msg) | |
| else: | |
| async with sub.recv_zero_copy() as msg: | |
| await handle_message(msg) | |
| del msg |
Reviewing the git blame, it looks like I introduced this issue sometime in early 2023. I believe I was implementing single-process mode.
What does this mean to me?
This has a few implications:
ezmsgwas running FASTER than it really should have been. Unless your code is religiously using zero-copy (like most of ezmsg-sigproc), the fix to this issue will result in a deepcopy on every message your subscriber receives, which will negatively affect performance in the name of safety/guardrails.- Any system that uses
ezmsg3.2.0+ will have this issue and any subscriber could mutate an incoming message, resulting in message corruption for all other subscribers. - A nuance:
BackendProcesswas still honoringzero_copy = Truefor a nichedeepcopyon publish which would only be triggered if your coroutine yields the same object that it was given. Basically, since the MessageCache was locally transferring the very same object your coroutine publishes, we didn't want that to be a zero-copy view of a message we recieved from an upstream publisher. BUT subscribers were receiving a zero-copy view without even requesting it, so thisdeepcopywasn't being triggered on publish.
What is the fix?
We call recv instead of recv_zero_copy when subscribers do not declare zero_copy = True. This internally handles the deepcopy and performance metrics take a hit.