Skip to content

Commit 06f3064

Browse files
committed
Add distributed tracing using opentelemetry
1 parent b5ae902 commit 06f3064

File tree

7 files changed

+646
-33
lines changed

7 files changed

+646
-33
lines changed

dev-requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
grpcio-tools
2+
opentelemetry-api
3+
opentelemetry-sdk
24
pymarkdownlnt

durabletask/client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import durabletask.internal.orchestrator_service_pb2 as pb
1717
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
1818
import durabletask.internal.shared as shared
19+
import durabletask.internal.tracing as tracing
1920
from durabletask import task
2021
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
2122

@@ -177,7 +178,8 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
177178
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
178179
version=helpers.get_string_value(version if version else self.default_version),
179180
orchestrationIdReusePolicy=reuse_id_policy,
180-
tags=tags
181+
tags=tags,
182+
parentTraceContext=tracing.get_current_trace_context(),
181183
)
182184

183185
self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
@@ -355,7 +357,7 @@ def signal_entity(self,
355357
input=helpers.get_string_value(shared.to_json(input) if input is not None else None),
356358
requestId=str(uuid.uuid4()),
357359
scheduledTime=None,
358-
parentTraceContext=None,
360+
parentTraceContext=tracing.get_current_trace_context(),
359361
requestTime=helpers.new_timestamp(datetime.now(timezone.utc))
360362
)
361363
self._logger.info(f"Signaling entity '{entity_instance_id}' operation '{operation_name}'.")

durabletask/internal/helpers.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,11 +219,13 @@ def new_create_timer_action(id: int, fire_at: datetime) -> pb.OrchestratorAction
219219

220220

221221
def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str],
222-
tags: Optional[dict[str, str]]) -> pb.OrchestratorAction:
222+
tags: Optional[dict[str, str]],
223+
parent_trace_context: Optional[pb.TraceContext] = None) -> pb.OrchestratorAction:
223224
return pb.OrchestratorAction(id=id, scheduleTask=pb.ScheduleTaskAction(
224225
name=name,
225226
input=get_string_value(encoded_input),
226-
tags=tags
227+
tags=tags,
228+
parentTraceContext=parent_trace_context,
227229
))
228230

229231

@@ -298,12 +300,14 @@ def new_create_sub_orchestration_action(
298300
name: str,
299301
instance_id: Optional[str],
300302
encoded_input: Optional[str],
301-
version: Optional[str]) -> pb.OrchestratorAction:
303+
version: Optional[str],
304+
parent_trace_context: Optional[pb.TraceContext] = None) -> pb.OrchestratorAction:
302305
return pb.OrchestratorAction(id=id, createSubOrchestration=pb.CreateSubOrchestrationAction(
303306
name=name,
304307
instanceId=instance_id,
305308
input=get_string_value(encoded_input),
306-
version=get_string_value(version)
309+
version=get_string_value(version),
310+
parentTraceContext=parent_trace_context,
307311
))
308312

309313

durabletask/internal/tracing.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
"""OpenTelemetry distributed tracing utilities for the Durable Task SDK.
5+
6+
This module provides helpers for propagating W3C Trace Context between
7+
orchestrations, activities, sub-orchestrations, and entities via the
8+
``TraceContext`` protobuf message carried over gRPC.
9+
10+
OpenTelemetry is an **optional** dependency. When the ``opentelemetry-api``
11+
package is not installed every helper gracefully degrades to a no-op so
12+
that the rest of the SDK continues to work without any tracing overhead.
13+
"""
14+
15+
from __future__ import annotations
16+
17+
import logging
18+
from contextlib import contextmanager
19+
from typing import Optional
20+
21+
from google.protobuf import wrappers_pb2
22+
23+
import durabletask.internal.orchestrator_service_pb2 as pb
24+
25+
logger = logging.getLogger("durabletask-tracing")
26+
27+
# ---------------------------------------------------------------------------
28+
# Lazy / optional OpenTelemetry imports
29+
# ---------------------------------------------------------------------------
30+
try:
31+
from opentelemetry import context as otel_context
32+
from opentelemetry import trace
33+
from opentelemetry.trace import (
34+
SpanKind,
35+
StatusCode,
36+
)
37+
from opentelemetry.trace.propagation.tracecontext import (
38+
TraceContextTextMapPropagator,
39+
)
40+
41+
_OTEL_AVAILABLE = True
42+
except ImportError: # pragma: no cover
43+
_OTEL_AVAILABLE = False
44+
45+
# Re-export so callers can check without importing opentelemetry themselves.
46+
OTEL_AVAILABLE = _OTEL_AVAILABLE
47+
48+
# The instrumentation scope name used when creating spans.
49+
_TRACER_NAME = "durabletask"
50+
51+
52+
# ---------------------------------------------------------------------------
53+
# Public helpers – extracting / injecting trace context
54+
# ---------------------------------------------------------------------------
55+
56+
def get_current_trace_context() -> Optional[pb.TraceContext]:
57+
"""Capture the current OpenTelemetry span context as a protobuf ``TraceContext``.
58+
59+
Returns ``None`` when OpenTelemetry is not installed or there is no
60+
active span.
61+
"""
62+
if not _OTEL_AVAILABLE:
63+
return None
64+
65+
propagator = TraceContextTextMapPropagator()
66+
carrier: dict[str, str] = {}
67+
propagator.inject(carrier)
68+
69+
traceparent = carrier.get("traceparent")
70+
if not traceparent:
71+
return None
72+
73+
tracestate = carrier.get("tracestate")
74+
75+
# Extract the span ID from the traceparent header.
76+
# Format: 00-<trace-id>-<span-id>-<flags>
77+
parts = traceparent.split("-")
78+
span_id = parts[2] if len(parts) >= 4 else ""
79+
80+
return pb.TraceContext(
81+
traceParent=traceparent,
82+
spanID=span_id,
83+
traceState=wrappers_pb2.StringValue(value=tracestate) if tracestate else None,
84+
)
85+
86+
87+
def extract_trace_context(proto_ctx: Optional[pb.TraceContext]) -> Optional[object]:
88+
"""Convert a protobuf ``TraceContext`` into an OpenTelemetry ``Context``.
89+
90+
Returns ``None`` when OpenTelemetry is not installed or the supplied
91+
context is empty / ``None``.
92+
"""
93+
if not _OTEL_AVAILABLE or proto_ctx is None:
94+
return None
95+
96+
traceparent = proto_ctx.traceParent
97+
if not traceparent:
98+
return None
99+
100+
carrier: dict[str, str] = {"traceparent": traceparent}
101+
if proto_ctx.HasField("traceState") and proto_ctx.traceState.value:
102+
carrier["tracestate"] = proto_ctx.traceState.value
103+
104+
propagator = TraceContextTextMapPropagator()
105+
ctx = propagator.extract(carrier)
106+
return ctx
107+
108+
109+
@contextmanager
110+
def start_span(
111+
name: str,
112+
trace_context: Optional[pb.TraceContext] = None,
113+
kind: Optional[object] = None,
114+
attributes: Optional[dict[str, str]] = None,
115+
):
116+
"""Context manager that starts an OpenTelemetry span linked to a parent trace context.
117+
118+
If OpenTelemetry is not installed, the block executes without tracing.
119+
120+
Parameters
121+
----------
122+
name:
123+
Human-readable span name (e.g. ``"activity:say_hello"``).
124+
trace_context:
125+
The protobuf ``TraceContext`` received from the sidecar. When
126+
provided the new span will be created as a **child** of this
127+
context.
128+
kind:
129+
The ``SpanKind`` for the new span. Defaults to ``SpanKind.INTERNAL``.
130+
attributes:
131+
Optional dictionary of span attributes.
132+
"""
133+
if not _OTEL_AVAILABLE:
134+
yield None
135+
return
136+
137+
parent_ctx = extract_trace_context(trace_context)
138+
139+
if kind is None:
140+
kind = SpanKind.INTERNAL
141+
142+
tracer = trace.get_tracer(_TRACER_NAME)
143+
144+
if parent_ctx is not None:
145+
token = otel_context.attach(parent_ctx)
146+
try:
147+
with tracer.start_as_current_span(
148+
name, kind=kind, attributes=attributes
149+
) as span:
150+
yield span
151+
finally:
152+
otel_context.detach(token)
153+
else:
154+
with tracer.start_as_current_span(
155+
name, kind=kind, attributes=attributes
156+
) as span:
157+
yield span
158+
159+
160+
def set_span_error(span, ex: Exception) -> None:
161+
"""Record an exception on the given span (if tracing is available)."""
162+
if not _OTEL_AVAILABLE or span is None:
163+
return
164+
span.set_status(StatusCode.ERROR, str(ex))
165+
span.record_exception(ex)

durabletask/worker.py

Lines changed: 59 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import durabletask.internal.orchestrator_service_pb2 as pb
3333
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
3434
import durabletask.internal.shared as shared
35+
import durabletask.internal.tracing as tracing
3536
from durabletask import task
3637
from durabletask.internal.grpc_interceptor import DefaultClientInterceptorImpl
3738

@@ -641,6 +642,7 @@ def _execute_orchestrator(
641642
actions=result.actions,
642643
customStatus=ph.get_string_value(result.encoded_custom_status),
643644
completionToken=completionToken,
645+
orchestrationTraceContext=req.orchestrationTraceContext,
644646
)
645647
except pe.AbandonOrchestrationError:
646648
self._logger.info(
@@ -697,9 +699,20 @@ def _execute_activity(
697699
instance_id = req.orchestrationInstance.instanceId
698700
try:
699701
executor = _ActivityExecutor(self._registry, self._logger)
700-
result = executor.execute(
701-
instance_id, req.name, req.taskId, req.input.value
702-
)
702+
with tracing.start_span(
703+
f"activity:{req.name}",
704+
trace_context=req.parentTraceContext,
705+
attributes={"durabletask.task.instance_id": instance_id,
706+
"durabletask.task.name": req.name,
707+
"durabletask.task.task_id": str(req.taskId)},
708+
) as span:
709+
try:
710+
result = executor.execute(
711+
instance_id, req.name, req.taskId, req.input.value
712+
)
713+
except Exception as ex:
714+
tracing.set_span_error(span, ex)
715+
raise
703716
res = pb.ActivityResponse(
704717
instanceId=instance_id,
705718
taskId=req.taskId,
@@ -759,30 +772,41 @@ def _execute_entity_batch(
759772

760773
operation_result = None
761774

762-
try:
763-
entity_result = executor.execute(
764-
instance_id, entity_instance_id, operation.operation, entity_state, operation.input.value
765-
)
766-
767-
entity_result = ph.get_string_value_or_empty(entity_result)
768-
operation_result = pb.OperationResult(success=pb.OperationResultSuccess(
769-
result=entity_result,
770-
startTimeUtc=new_timestamp(start_time),
771-
endTimeUtc=new_timestamp(datetime.now(timezone.utc))
772-
))
773-
results.append(operation_result)
775+
# Get the trace context for this operation, if available
776+
op_trace_ctx = operation.traceContext if operation.HasField("traceContext") else None
774777

775-
entity_state.commit()
776-
except Exception as ex:
777-
self._logger.exception(ex)
778-
operation_result = pb.OperationResult(failure=pb.OperationResultFailure(
779-
failureDetails=ph.new_failure_details(ex),
780-
startTimeUtc=new_timestamp(start_time),
781-
endTimeUtc=new_timestamp(datetime.now(timezone.utc))
782-
))
783-
results.append(operation_result)
778+
with tracing.start_span(
779+
f"entity:{entity_instance_id.entity}:{operation.operation}",
780+
trace_context=op_trace_ctx,
781+
attributes={"durabletask.entity.instance_id": instance_id,
782+
"durabletask.entity.name": entity_instance_id.entity,
783+
"durabletask.entity.operation": operation.operation},
784+
) as span:
785+
try:
786+
entity_result = executor.execute(
787+
instance_id, entity_instance_id, operation.operation, entity_state, operation.input.value
788+
)
784789

785-
entity_state.rollback()
790+
entity_result = ph.get_string_value_or_empty(entity_result)
791+
operation_result = pb.OperationResult(success=pb.OperationResultSuccess(
792+
result=entity_result,
793+
startTimeUtc=new_timestamp(start_time),
794+
endTimeUtc=new_timestamp(datetime.now(timezone.utc))
795+
))
796+
results.append(operation_result)
797+
798+
entity_state.commit()
799+
except Exception as ex:
800+
tracing.set_span_error(span, ex)
801+
self._logger.exception(ex)
802+
operation_result = pb.OperationResult(failure=pb.OperationResultFailure(
803+
failureDetails=ph.new_failure_details(ex),
804+
startTimeUtc=new_timestamp(start_time),
805+
endTimeUtc=new_timestamp(datetime.now(timezone.utc))
806+
))
807+
results.append(operation_result)
808+
809+
entity_state.rollback()
786810

787811
batch_result = pb.EntityBatchResult(
788812
results=results,
@@ -847,6 +871,7 @@ def __init__(self, instance_id: str, registry: _Registry):
847871
self._new_input: Optional[Any] = None
848872
self._save_events = False
849873
self._encoded_custom_status: Optional[str] = None
874+
self._parent_trace_context: Optional[pb.TraceContext] = None
850875

851876
def run(self, generator: Generator[task.Task, Any, Any]):
852877
self._generator = generator
@@ -1136,15 +1161,18 @@ def call_activity_function_helper(
11361161
if isinstance(activity_function, str)
11371162
else task.get_name(activity_function)
11381163
)
1139-
action = ph.new_schedule_task_action(id, name, encoded_input, tags)
1164+
action = ph.new_schedule_task_action(
1165+
id, name, encoded_input, tags,
1166+
parent_trace_context=self._parent_trace_context)
11401167
else:
11411168
if instance_id is None:
11421169
# Create a deteministic instance ID based on the parent instance ID
11431170
instance_id = f"{self.instance_id}:{id:04x}"
11441171
if not isinstance(activity_function, str):
11451172
raise ValueError("Orchestrator function name must be a string")
11461173
action = ph.new_create_sub_orchestration_action(
1147-
id, activity_function, instance_id, encoded_input, version
1174+
id, activity_function, instance_id, encoded_input, version,
1175+
parent_trace_context=self._parent_trace_context
11481176
)
11491177
self._pending_actions[id] = action
11501178

@@ -1397,6 +1425,10 @@ def process_event(
13971425
if event.executionStarted.version:
13981426
ctx._version = event.executionStarted.version.value
13991427

1428+
# Store the parent trace context for propagation to child tasks
1429+
if event.executionStarted.HasField("parentTraceContext"):
1430+
ctx._parent_trace_context = event.executionStarted.parentTraceContext
1431+
14001432
if self._registry.versioning:
14011433
version_failure = self.evaluate_orchestration_versioning(
14021434
self._registry.versioning,

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ dependencies = [
3131
"packaging"
3232
]
3333

34+
[project.optional-dependencies]
35+
opentelemetry = [
36+
"opentelemetry-api>=1.0.0"
37+
]
38+
3439
[project.urls]
3540
repository = "https://github.com/microsoft/durabletask-python"
3641
changelog = "https://github.com/microsoft/durabletask-python/blob/main/CHANGELOG.md"

0 commit comments

Comments
 (0)