Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
ADDED

- Added `durabletask.testing` module with `InMemoryOrchestrationBackend` for testing orchestrations without a sidecar process
- Improved distributed tracing support with full span coverage for orchestrations, activities, sub-orchestrations, timers, and events

FIXED:

Expand Down
50 changes: 29 additions & 21 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import durabletask.internal.orchestrator_service_pb2 as pb
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
import durabletask.internal.shared as shared
import durabletask.internal.tracing as tracing
from durabletask import task
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl

Expand Down Expand Up @@ -169,20 +170,26 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
version: Optional[str] = None) -> str:

name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
resolved_instance_id = instance_id if instance_id else uuid.uuid4().hex
resolved_version = version if version else self.default_version

with tracing.start_create_orchestration_span(
name, resolved_instance_id, version=resolved_version,
):
req = pb.CreateInstanceRequest(
name=name,
instanceId=resolved_instance_id,
input=helpers.get_string_value(shared.to_json(input) if input is not None else None),
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
version=helpers.get_string_value(resolved_version),
orchestrationIdReusePolicy=reuse_id_policy,
tags=tags,
parentTraceContext=tracing.get_current_trace_context(),
)

req = pb.CreateInstanceRequest(
name=name,
instanceId=instance_id if instance_id else uuid.uuid4().hex,
input=helpers.get_string_value(shared.to_json(input) if input is not None else None),
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
version=helpers.get_string_value(version if version else self.default_version),
orchestrationIdReusePolicy=reuse_id_policy,
tags=tags
)

self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
return res.instanceId
self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
return res.instanceId

def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = True) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
Expand Down Expand Up @@ -269,14 +276,15 @@ def wait_for_orchestration_completion(self, instance_id: str, *,

def raise_orchestration_event(self, instance_id: str, event_name: str, *,
data: Optional[Any] = None):
req = pb.RaiseEventRequest(
instanceId=instance_id,
name=event_name,
input=helpers.get_string_value(shared.to_json(data) if data is not None else None)
)
with tracing.start_raise_event_span(event_name, instance_id):
req = pb.RaiseEventRequest(
instanceId=instance_id,
name=event_name,
input=helpers.get_string_value(shared.to_json(data) if data is not None else None)
)

self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
self._stub.RaiseEvent(req)
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
self._stub.RaiseEvent(req)

def terminate_orchestration(self, instance_id: str, *,
output: Optional[Any] = None,
Expand Down Expand Up @@ -355,7 +363,7 @@ def signal_entity(self,
input=helpers.get_string_value(shared.to_json(input) if input is not None else None),
requestId=str(uuid.uuid4()),
scheduledTime=None,
parentTraceContext=None,
parentTraceContext=tracing.get_current_trace_context(),
requestTime=helpers.new_timestamp(datetime.now(timezone.utc))
)
self._logger.info(f"Signaling entity '{entity_instance_id}' operation '{operation_name}'.")
Expand Down
12 changes: 8 additions & 4 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,13 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction


def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str],
tags: Optional[dict[str, str]]) -> pb.OrchestratorAction:
tags: Optional[dict[str, str]],
parent_trace_context: Optional[pb.TraceContext] = None) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, scheduleTask=pb.ScheduleTaskAction(
name=name,
input=get_string_value(encoded_input),
tags=tags
tags=tags,
parentTraceContext=parent_trace_context,
))


Expand Down Expand Up @@ -302,12 +304,14 @@ def new_create_sub_orchestration_action(
name: str,
instance_id: Optional[str],
encoded_input: Optional[str],
version: Optional[str]) -> pb.OrchestratorAction:
version: Optional[str],
parent_trace_context: Optional[pb.TraceContext] = None) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, createSubOrchestration=pb.CreateSubOrchestrationAction(
name=name,
instanceId=instance_id,
input=get_string_value(encoded_input),
version=get_string_value(version)
version=get_string_value(version),
parentTraceContext=parent_trace_context,
))


Expand Down
Loading