@@ -85,10 +85,10 @@ class TaskHubGrpcWorker:
8585 _response_stream : Optional [grpc .Future ] = None
8686
8787 def __init__ (self , * ,
88- host_address : Union [str , None ] = None ,
89- metadata : Union [List [Tuple [str , str ]], None ] = None ,
90- log_handler = None ,
91- log_formatter : Union [logging .Formatter , None ] = None ,
88+ host_address : Optional [str ] = None ,
89+ metadata : Optional [List [Tuple [str , str ]]] = None ,
90+ log_handler = None ,
91+ log_formatter : Optional [logging .Formatter ] = None ,
9292 secure_channel : bool = False ):
9393 self ._registry = _Registry ()
9494 self ._host_address = host_address if host_address else shared .get_default_host_address ()
@@ -259,22 +259,20 @@ def resume(self):
259259 # has reached a completed state. The only time this won't be the
260260 # case is if the user yielded on a WhenAll task and there are still
261261 # outstanding child tasks that need to be completed.
262- if self ._previous_task is not None :
262+ while self ._previous_task is not None and self ._previous_task .is_complete :
263+ next_task = None
263264 if self ._previous_task .is_failed :
264- # Raise the failure as an exception to the generator. The orchestrator can then either
265- # handle the exception or allow it to fail the orchestration.
266- self ._generator .throw (self ._previous_task .get_exception ())
267- elif self ._previous_task .is_complete :
268- while True :
269- # Resume the generator. This will either return a Task or raise StopIteration if it's done.
270- # CONSIDER: Should we check for possible infinite loops here?
271- next_task = self ._generator .send (self ._previous_task .get_result ())
272- if not isinstance (next_task , task .Task ):
273- raise TypeError ("The orchestrator generator yielded a non-Task object" )
274- self ._previous_task = next_task
275- # If a completed task was returned, then we can keep running the generator function.
276- if not self ._previous_task .is_complete :
277- break
265+ # Raise the failure as an exception to the generator.
266+ # The orchestrator can then either handle the exception or allow it to fail the orchestration.
267+ next_task = self ._generator .throw (self ._previous_task .get_exception ())
268+ else :
269+ # Resume the generator with the previous result.
270+ # This will either return a Task or raise StopIteration if it's done.
271+ next_task = self ._generator .send (self ._previous_task .get_result ())
272+
273+ if not isinstance (next_task , task .Task ):
274+ raise TypeError ("The orchestrator generator yielded a non-Task object" )
275+ self ._previous_task = next_task
278276
279277 def set_complete (self , result : Any , status : pb .OrchestrationStatus , is_result_encoded : bool = False ):
280278 if self ._is_complete :
@@ -359,9 +357,9 @@ def current_utc_datetime(self, value: datetime):
359357
360358 def create_timer (self , fire_at : Union [datetime , timedelta ]) -> task .Task :
361359 return self .create_timer_internal (fire_at )
362-
360+
363361 def create_timer_internal (self , fire_at : Union [datetime , timedelta ],
364- retryable_task : Optional [task .RetryableTask ] = None ) -> task .Task :
362+ retryable_task : Optional [task .RetryableTask ] = None ) -> task .Task :
365363 id = self .next_sequence_number ()
366364 if isinstance (fire_at , timedelta ):
367365 fire_at = self .current_utc_datetime + fire_at
@@ -390,9 +388,9 @@ def call_sub_orchestrator(self, orchestrator: task.Orchestrator[TInput, TOutput]
390388 id = self .next_sequence_number ()
391389 orchestrator_name = task .get_name (orchestrator )
392390 self .call_activity_function_helper (id , orchestrator_name , input = input , retry_policy = retry_policy ,
393- is_sub_orch = True , instance_id = instance_id )
391+ is_sub_orch = True , instance_id = instance_id )
394392 return self ._pending_tasks .get (id , task .CompletableTask ())
395-
393+
396394 def call_activity_function_helper (self , id : Optional [int ],
397395 activity_function : Union [task .Activity [TInput , TOutput ], str ], * ,
398396 input : Optional [TInput ] = None ,
@@ -402,14 +400,14 @@ def call_activity_function_helper(self, id: Optional[int],
402400 fn_task : Optional [task .CompletableTask [TOutput ]] = None ):
403401 if id is None :
404402 id = self .next_sequence_number ()
405-
403+
406404 if fn_task is None :
407405 encoded_input = shared .to_json (input ) if input is not None else None
408406 else :
409407 # Here, we don't need to convert the input to JSON because it is already converted.
410408 # We just need to take string representation of it.
411409 encoded_input = str (input )
412- if is_sub_orch == False :
410+ if not is_sub_orch :
413411 name = activity_function if isinstance (activity_function , str ) else task .get_name (activity_function )
414412 action = ph .new_schedule_task_action (id , name , encoded_input )
415413 else :
@@ -495,7 +493,7 @@ def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_e
495493 if not ctx ._is_complete :
496494 task_count = len (ctx ._pending_tasks )
497495 event_count = len (ctx ._pending_events )
498- self ._logger .info (f"{ instance_id } : Waiting for { task_count } task(s) and { event_count } event(s)." )
496+ self ._logger .info (f"{ instance_id } : Orchestrator yielded with { task_count } task(s) and { event_count } event(s) outstanding ." )
499497 elif ctx ._completion_status and ctx ._completion_status is not pb .ORCHESTRATION_STATUS_CONTINUED_AS_NEW :
500498 completion_status_str = pbh .get_orchestration_status_str (ctx ._completion_status )
501499 self ._logger .info (f"{ instance_id } : Orchestration completed with status: { completion_status_str } " )
@@ -556,8 +554,8 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven
556554 timer_task .complete (None )
557555 if timer_task ._retryable_parent is not None :
558556 activity_action = timer_task ._retryable_parent ._action
559-
560- if timer_task ._retryable_parent ._is_sub_orch == False :
557+
558+ if not timer_task ._retryable_parent ._is_sub_orch :
561559 cur_task = activity_action .scheduleTask
562560 instance_id = None
563561 else :
@@ -612,11 +610,11 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven
612610 self ._logger .warning (
613611 f"{ ctx .instance_id } : Ignoring unexpected taskFailed event with ID = { task_id } ." )
614612 return
615-
613+
616614 if isinstance (activity_task , task .RetryableTask ):
617615 if activity_task ._retry_policy is not None :
618616 next_delay = activity_task .compute_next_delay ()
619- if next_delay == None :
617+ if next_delay is None :
620618 activity_task .fail (
621619 f"{ ctx .instance_id } : Activity task #{ task_id } failed: { event .taskFailed .failureDetails .errorMessage } " ,
622620 event .taskFailed .failureDetails )
@@ -674,7 +672,7 @@ def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEven
674672 if isinstance (sub_orch_task , task .RetryableTask ):
675673 if sub_orch_task ._retry_policy is not None :
676674 next_delay = sub_orch_task .compute_next_delay ()
677- if next_delay == None :
675+ if next_delay is None :
678676 sub_orch_task .fail (
679677 f"Sub-orchestration task #{ task_id } failed: { failedEvent .failureDetails .errorMessage } " ,
680678 failedEvent .failureDetails )
0 commit comments