Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions broqer/op/combine_latest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ class CombineLatest(MultiOperator):
:param emit_on: publisher or list of publishers - only emitting result when
emit comes from one of this list. If None, emit on any source
publisher.
:param emit_partial: if True, emit even if not all source publishers have a
state. emit_partial should only be used if an emit_on publisher is
defined.
"""
def __init__(self, *publishers: Publisher, map_: Callable[..., Any] = None,
emit_on=None) -> None:
emit_on=None, emit_partial: bool = False) -> None:
MultiOperator.__init__(self, *publishers)

# ._partial_state is a list keeping the latest emitted values from
Expand Down Expand Up @@ -68,6 +71,10 @@ def __init__(self, *publishers: Publisher, map_: Callable[..., Any] = None,
else:
self._emit_on = emit_on

assert emit_partial is False or emit_on is not None, \
'emit_on must be defined if emit_partial is True'
self._emit_partial = emit_partial

self._map = map_

def unsubscribe(self, subscriber: Subscriber) -> None:
Expand Down Expand Up @@ -105,8 +112,10 @@ def emit(self, value: Any, who: Publisher) -> None:
# if emits from publishers are missing or source of this emit
# is not one of emit_on -> don't evaluate and notify subscribers

if self._missing or (self._emit_on is not None and all(
who is not p for p in self._emit_on)):
if not self._emit_partial and (self._missing or
(self._emit_on is not None and all(
who is not p for p in self._emit_on
))):
return None

# evaluate
Expand All @@ -124,17 +133,22 @@ def emit(self, value: Any, who: Publisher) -> None:
return Publisher.notify(self, state)


def build_combine_latest(map_: Callable[..., Any] = None, *, emit_on=None):
def build_combine_latest(map_: Callable[..., Any] = None, *, emit_on=None,
emit_partial: bool = False) -> Callable:
""" Decorator to wrap a function to return a CombineLatest operator.

:param emit_on: publisher or list of publishers - only emitting result when
emit comes from one of this list. If None, emit on any source
publisher.
:param emit_partial: if True, emit even if not all source publishers have a
state. emit_partial should only be used if an emit_on publisher is
defined.
"""
def _build_combine_latest(map_: Callable[..., Any]):
@wraps(map_)
def _wrapper(*publishers) -> CombineLatest:
return CombineLatest(*publishers, map_=map_, emit_on=emit_on)
return CombineLatest(*publishers, map_=map_, emit_on=emit_on,
emit_partial=emit_partial)
return _wrapper

if map_:
Expand Down
Loading