Skip to content

Commit f31e825

Browse files
committed
Feature updates - dotnet parity
1 parent 8799185 commit f31e825

File tree

5 files changed

+871
-34
lines changed

5 files changed

+871
-34
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
ADDED
1111

1212
- Added `durabletask.testing` module with `InMemoryOrchestrationBackend` for testing orchestrations without a sidecar process
13+
- Improved distributed tracing support with full span coverage for orchestrations, activities, sub-orchestrations, timers, and events
1314

1415
FIXED:
1516

@@ -20,7 +21,6 @@ FIXED:
2021
ADDED
2122

2223
- Allow entities with custom names
23-
- Support linking orchestrations to activities/entities/sub-orchestrators for distributed tracing with `opentelemetry-api` as an optional requirement.
2424

2525
CHANGED
2626

durabletask/client.py

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -170,21 +170,26 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
170170
version: Optional[str] = None) -> str:
171171

172172
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
173+
resolved_instance_id = instance_id if instance_id else uuid.uuid4().hex
174+
resolved_version = version if version else self.default_version
175+
176+
with tracing.start_create_orchestration_span(
177+
name, resolved_instance_id, version=resolved_version,
178+
):
179+
req = pb.CreateInstanceRequest(
180+
name=name,
181+
instanceId=resolved_instance_id,
182+
input=helpers.get_string_value(shared.to_json(input) if input is not None else None),
183+
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
184+
version=helpers.get_string_value(resolved_version),
185+
orchestrationIdReusePolicy=reuse_id_policy,
186+
tags=tags,
187+
parentTraceContext=tracing.get_current_trace_context(),
188+
)
173189

174-
req = pb.CreateInstanceRequest(
175-
name=name,
176-
instanceId=instance_id if instance_id else uuid.uuid4().hex,
177-
input=helpers.get_string_value(shared.to_json(input) if input is not None else None),
178-
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
179-
version=helpers.get_string_value(version if version else self.default_version),
180-
orchestrationIdReusePolicy=reuse_id_policy,
181-
tags=tags,
182-
parentTraceContext=tracing.get_current_trace_context(),
183-
)
184-
185-
self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
186-
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
187-
return res.instanceId
190+
self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
191+
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
192+
return res.instanceId
188193

189194
def get_orchestration_state(self, instance_id: str, *, fetch_payloads: bool = True) -> Optional[OrchestrationState]:
190195
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
@@ -271,14 +276,15 @@ def wait_for_orchestration_completion(self, instance_id: str, *,
271276

272277
def raise_orchestration_event(self, instance_id: str, event_name: str, *,
273278
data: Optional[Any] = None):
274-
req = pb.RaiseEventRequest(
275-
instanceId=instance_id,
276-
name=event_name,
277-
input=helpers.get_string_value(shared.to_json(data) if data is not None else None)
278-
)
279+
with tracing.start_raise_event_span(event_name, instance_id):
280+
req = pb.RaiseEventRequest(
281+
instanceId=instance_id,
282+
name=event_name,
283+
input=helpers.get_string_value(shared.to_json(data) if data is not None else None)
284+
)
279285

280-
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
281-
self._stub.RaiseEvent(req)
286+
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
287+
self._stub.RaiseEvent(req)
282288

283289
def terminate_orchestration(self, instance_id: str, *,
284290
output: Optional[Any] = None,

0 commit comments

Comments
 (0)