diff --git a/broqer/op/combine_latest.py b/broqer/op/combine_latest.py index 627e6e4..cbff705 100644 --- a/broqer/op/combine_latest.py +++ b/broqer/op/combine_latest.py @@ -38,9 +38,12 @@ class CombineLatest(MultiOperator): :param emit_on: publisher or list of publishers - only emitting result when emit comes from one of this list. If None, emit on any source publisher. + :param emit_partial: if True, emit even if not all source publishers have a + state. emit_partial should only be used if an emit_on publisher is + defined. """ def __init__(self, *publishers: Publisher, map_: Callable[..., Any] = None, - emit_on=None) -> None: + emit_on=None, emit_partial: bool = False) -> None: MultiOperator.__init__(self, *publishers) # ._partial_state is a list keeping the latest emitted values from @@ -68,6 +71,10 @@ def __init__(self, *publishers: Publisher, map_: Callable[..., Any] = None, else: self._emit_on = emit_on + assert emit_partial is False or emit_on is not None, \ + 'emit_on must be defined if emit_partial is True' + self._emit_partial = emit_partial + self._map = map_ def unsubscribe(self, subscriber: Subscriber) -> None: @@ -105,8 +112,10 @@ def emit(self, value: Any, who: Publisher) -> None: # if emits from publishers are missing or source of this emit # is not one of emit_on -> don't evaluate and notify subscribers - if self._missing or (self._emit_on is not None and all( - who is not p for p in self._emit_on)): + if not self._emit_partial and (self._missing or + (self._emit_on is not None and all( + who is not p for p in self._emit_on + ))): return None # evaluate @@ -124,17 +133,22 @@ def emit(self, value: Any, who: Publisher) -> None: return Publisher.notify(self, state) -def build_combine_latest(map_: Callable[..., Any] = None, *, emit_on=None): +def build_combine_latest(map_: Callable[..., Any] = None, *, emit_on=None, + emit_partial: bool = False) -> Callable: """ Decorator to wrap a function to return a CombineLatest operator. :param emit_on: publisher or list of publishers - only emitting result when emit comes from one of this list. If None, emit on any source publisher. + :param emit_partial: if True, emit even if not all source publishers have a + state. emit_partial should only be used if an emit_on publisher is + defined. """ def _build_combine_latest(map_: Callable[..., Any]): @wraps(map_) def _wrapper(*publishers) -> CombineLatest: - return CombineLatest(*publishers, map_=map_, emit_on=emit_on) + return CombineLatest(*publishers, map_=map_, emit_on=emit_on, + emit_partial=emit_partial) return _wrapper if map_: