Skip to content

Commit b25cc70

Browse files
committed
Match .NET behavior exactly
1 parent f31e825 commit b25cc70

File tree

3 files changed

+1067
-256
lines changed

3 files changed

+1067
-256
lines changed

durabletask/internal/tracing.py

Lines changed: 118 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from __future__ import annotations
1616

1717
import logging
18+
import time
1819
from contextlib import contextmanager
1920
from datetime import datetime
2021
from typing import Any, Optional
@@ -32,8 +33,8 @@
3233
from opentelemetry import context as otel_context
3334
from opentelemetry import trace
3435
from opentelemetry.trace import (
35-
SpanKind,
36-
StatusCode,
36+
SpanKind, # type: ignore[no-redef]
37+
StatusCode, # type: ignore[no-redef]
3738
)
3839
from opentelemetry.trace.propagation.tracecontext import (
3940
TraceContextTextMapPropagator,
@@ -46,16 +47,16 @@
4647
# without guarding every reference with OTEL_AVAILABLE checks.
4748

4849
class SpanKind: # type: ignore[no-redef]
49-
INTERNAL = None
50-
CLIENT = None
51-
SERVER = None
52-
PRODUCER = None
53-
CONSUMER = None
50+
INTERNAL: Any = None
51+
CLIENT: Any = None
52+
SERVER: Any = None
53+
PRODUCER: Any = None
54+
CONSUMER: Any = None
5455

5556
class StatusCode: # type: ignore[no-redef]
56-
OK = None
57-
ERROR = None
58-
UNSET = None
57+
OK: Any = None
58+
ERROR: Any = None
59+
UNSET: Any = None
5960

6061
# Re-export so callers can check without importing opentelemetry themselves.
6162
OTEL_AVAILABLE = _OTEL_AVAILABLE
@@ -107,38 +108,46 @@ def create_timer_span_name(orchestration_name: str) -> str:
107108
# Public helpers – extracting / injecting trace context
108109
# ---------------------------------------------------------------------------
109110

110-
def get_current_trace_context() -> Optional[pb.TraceContext]:
111-
"""Capture the current OpenTelemetry span context as a protobuf ``TraceContext``.
112111

113-
Returns ``None`` when OpenTelemetry is not installed or there is no
114-
active span.
115-
"""
116-
if not _OTEL_AVAILABLE:
117-
return None
118-
119-
propagator = TraceContextTextMapPropagator()
120-
carrier: dict[str, str] = {}
121-
propagator.inject(carrier)
112+
def _trace_context_from_carrier(carrier: dict[str, str]) -> Optional[pb.TraceContext]:
113+
"""Build a ``TraceContext`` protobuf from a W3C propagation carrier.
122114
115+
Returns ``None`` when the carrier does not contain a valid
116+
``traceparent`` header.
117+
"""
123118
traceparent = carrier.get("traceparent")
124119
if not traceparent:
125120
return None
126121

127122
tracestate = carrier.get("tracestate")
128-
129-
# Extract the span ID from the traceparent header.
130123
# Format: 00-<trace-id>-<span-id>-<flags>
131124
parts = traceparent.split("-")
132125
span_id = parts[2] if len(parts) >= 4 else ""
133126

134127
return pb.TraceContext(
135128
traceParent=traceparent,
136129
spanID=span_id,
137-
traceState=wrappers_pb2.StringValue(value=tracestate) if tracestate else None,
130+
traceState=wrappers_pb2.StringValue(value=tracestate)
131+
if tracestate else None,
138132
)
139133

140134

141-
def extract_trace_context(proto_ctx: Optional[pb.TraceContext]) -> Optional[object]:
135+
def get_current_trace_context() -> Optional[pb.TraceContext]:
136+
"""Capture the current OpenTelemetry span context as a protobuf ``TraceContext``.
137+
138+
Returns ``None`` when OpenTelemetry is not installed or there is no
139+
active span.
140+
"""
141+
if not _OTEL_AVAILABLE:
142+
return None
143+
144+
propagator = TraceContextTextMapPropagator()
145+
carrier: dict[str, str] = {}
146+
propagator.inject(carrier)
147+
return _trace_context_from_carrier(carrier)
148+
149+
150+
def extract_trace_context(proto_ctx: Optional[pb.TraceContext]) -> Optional[Any]:
142151
"""Convert a protobuf ``TraceContext`` into an OpenTelemetry ``Context``.
143152
144153
Returns ``None`` when OpenTelemetry is not installed or the supplied
@@ -164,7 +173,7 @@ def extract_trace_context(proto_ctx: Optional[pb.TraceContext]) -> Optional[obje
164173
def start_span(
165174
name: str,
166175
trace_context: Optional[pb.TraceContext] = None,
167-
kind: Optional[object] = None,
176+
kind: Any = None,
168177
attributes: Optional[dict[str, str]] = None,
169178
):
170179
"""Context manager that starts an OpenTelemetry span linked to a parent trace context.
@@ -219,13 +228,6 @@ def set_span_error(span: Any, ex: Exception) -> None:
219228
span.record_exception(ex)
220229

221230

222-
def set_span_status_completed(span: Any) -> None:
223-
"""Mark the span with ``durabletask.task.status`` = ``Completed``."""
224-
if not _OTEL_AVAILABLE or span is None:
225-
return
226-
span.set_attribute(ATTR_TASK_STATUS, "Completed")
227-
228-
229231
# ---------------------------------------------------------------------------
230232
# Orchestration-level span helpers
231233
# ---------------------------------------------------------------------------
@@ -262,12 +264,15 @@ def start_orchestration_span(
262264
tracer = trace.get_tracer(_TRACER_NAME)
263265
parent_ctx = extract_trace_context(parent_trace_context)
264266

265-
# Determine start time from orchestration trace context (replay)
267+
# Determine start time: prefer the value persisted in the
268+
# OrchestrationTraceContext (replay / cross-worker), otherwise
269+
# capture "now" so the value can be fed back to the sidecar.
266270
start_time_ns: Optional[int] = None
267-
has_start_time = (orchestration_trace_context is not None and orchestration_trace_context.HasField("spanStartTime"))
268-
if has_start_time:
271+
if orchestration_trace_context is not None and orchestration_trace_context.HasField("spanStartTime"):
269272
start_dt = orchestration_trace_context.spanStartTime.ToDatetime()
270273
start_time_ns = int(start_dt.timestamp() * 1e9)
274+
else:
275+
start_time_ns = time.time_ns()
271276

272277
token = None
273278
if parent_ctx is not None:
@@ -291,6 +296,33 @@ def start_orchestration_span(
291296
return span, (token, span_token), span_id_hex, start_time_ns
292297

293298

299+
def reattach_orchestration_span(span: Any) -> Any:
300+
"""Re-attach a saved orchestration span as the current span.
301+
302+
Returns the context token that must be detached later.
303+
Returns ``None`` when OTel is not available or *span* is ``None``.
304+
"""
305+
if not _OTEL_AVAILABLE or span is None:
306+
return None
307+
308+
ctx_with_span = trace.set_span_in_context(span)
309+
return otel_context.attach(ctx_with_span)
310+
311+
312+
def detach_orchestration_tokens(tokens: Any) -> None:
313+
"""Detach context tokens without ending the span.
314+
315+
Use this on intermediate dispatches where the orchestration is not
316+
yet complete so the span is kept alive for subsequent dispatches.
317+
"""
318+
if tokens is None:
319+
return
320+
parent_token, span_token = tokens
321+
otel_context.detach(span_token)
322+
if parent_token is not None:
323+
otel_context.detach(parent_token)
324+
325+
294326
def end_orchestration_span(
295327
span: Any,
296328
tokens: Any,
@@ -318,39 +350,41 @@ def end_orchestration_span(
318350

319351
span.end()
320352

321-
# Detach context tokens in reverse order
322-
if tokens is not None:
323-
parent_token, span_token = tokens
324-
otel_context.detach(span_token)
325-
if parent_token is not None:
326-
otel_context.detach(parent_token)
353+
detach_orchestration_tokens(tokens)
327354

328355

329356
# ---------------------------------------------------------------------------
330-
# Scheduling-side Client / Producer span helpers (emit-and-close)
357+
# CLIENT span helpers (create / end)
331358
# ---------------------------------------------------------------------------
332359

333-
def emit_activity_schedule_span(
334-
activity_name: str,
360+
361+
def create_client_span_context(
362+
task_type: str,
363+
name: str,
335364
instance_id: str,
336-
task_id: int,
365+
task_id: Optional[int] = None,
337366
version: Optional[str] = None,
338-
) -> None:
339-
"""Emit a Client span for a scheduled activity (emit-and-close pattern).
367+
) -> Optional[tuple[pb.TraceContext, Any]]:
368+
"""Create a CLIENT span and return its trace context for propagation.
340369
341-
Called during orchestration replay when a ``taskCompleted`` or
342-
``taskFailed`` event is processed.
370+
The span is **not** ended here — the caller must keep a reference
371+
and call :func:`end_client_span` when the downstream task completes
372+
so the CLIENT span captures the full scheduling-to-completion duration.
373+
374+
Returns a ``(TraceContext, span)`` tuple, or ``None`` when
375+
OpenTelemetry is not installed.
343376
"""
344377
if not _OTEL_AVAILABLE:
345-
return
378+
return None
346379

347-
span_name = create_span_name("activity", activity_name, version)
380+
span_name = create_span_name(task_type, name, version)
348381
attrs: dict[str, str] = {
349-
ATTR_TASK_TYPE: "activity",
350-
ATTR_TASK_NAME: activity_name,
382+
ATTR_TASK_TYPE: task_type,
383+
ATTR_TASK_NAME: name,
351384
ATTR_TASK_INSTANCE_ID: instance_id,
352-
ATTR_TASK_TASK_ID: str(task_id),
353385
}
386+
if task_id is not None:
387+
attrs[ATTR_TASK_TASK_ID] = str(task_id)
354388
if version:
355389
attrs[ATTR_TASK_VERSION] = version
356390

@@ -360,93 +394,35 @@ def emit_activity_schedule_span(
360394
kind=SpanKind.CLIENT,
361395
attributes=attrs,
362396
)
363-
span.end()
364397

398+
# Capture the trace context with this CLIENT span as the current span,
399+
# so that the downstream SERVER span is parented by this CLIENT span.
400+
ctx = trace.set_span_in_context(span)
401+
propagator = TraceContextTextMapPropagator()
402+
carrier: dict[str, str] = {}
403+
propagator.inject(carrier, context=ctx)
365404

366-
def emit_activity_schedule_span_failed(
367-
activity_name: str,
368-
instance_id: str,
369-
task_id: int,
370-
error_message: str,
371-
version: Optional[str] = None,
372-
) -> None:
373-
"""Emit a Client span for a failed activity (emit-and-close pattern)."""
374-
if not _OTEL_AVAILABLE:
375-
return
376-
377-
span_name = create_span_name("activity", activity_name, version)
378-
attrs: dict[str, str] = {
379-
ATTR_TASK_TYPE: "activity",
380-
ATTR_TASK_NAME: activity_name,
381-
ATTR_TASK_INSTANCE_ID: instance_id,
382-
ATTR_TASK_TASK_ID: str(task_id),
383-
}
384-
if version:
385-
attrs[ATTR_TASK_VERSION] = version
405+
trace_ctx = _trace_context_from_carrier(carrier)
406+
if trace_ctx is None:
407+
span.end()
408+
return None
386409

387-
tracer = trace.get_tracer(_TRACER_NAME)
388-
span = tracer.start_span(
389-
span_name,
390-
kind=SpanKind.CLIENT,
391-
attributes=attrs,
392-
)
393-
span.set_status(StatusCode.ERROR, error_message)
394-
span.end()
410+
return trace_ctx, span
395411

396412

397-
def emit_sub_orchestration_schedule_span(
398-
sub_orchestration_name: str,
399-
instance_id: str,
400-
version: Optional[str] = None,
413+
def end_client_span(
414+
span,
415+
is_error: bool = False,
416+
error_message: Optional[str] = None,
401417
) -> None:
402-
"""Emit a Client span for a scheduled sub-orchestration (emit-and-close)."""
403-
if not _OTEL_AVAILABLE:
404-
return
405-
406-
span_name = create_span_name("orchestration", sub_orchestration_name, version)
407-
attrs: dict[str, str] = {
408-
ATTR_TASK_TYPE: "orchestration",
409-
ATTR_TASK_NAME: sub_orchestration_name,
410-
ATTR_TASK_INSTANCE_ID: instance_id,
411-
}
412-
if version:
413-
attrs[ATTR_TASK_VERSION] = version
418+
"""End a CLIENT span previously created by :func:`create_client_span_context`.
414419
415-
tracer = trace.get_tracer(_TRACER_NAME)
416-
span = tracer.start_span(
417-
span_name,
418-
kind=SpanKind.CLIENT,
419-
attributes=attrs,
420-
)
421-
span.end()
422-
423-
424-
def emit_sub_orchestration_schedule_span_failed(
425-
sub_orchestration_name: str,
426-
instance_id: str,
427-
error_message: str,
428-
version: Optional[str] = None,
429-
) -> None:
430-
"""Emit a Client span for a failed sub-orchestration (emit-and-close)."""
431-
if not _OTEL_AVAILABLE:
420+
If *is_error* is ``True`` the span status is set to ERROR before closing.
421+
"""
422+
if span is None or not _OTEL_AVAILABLE:
432423
return
433-
434-
span_name = create_span_name("orchestration", sub_orchestration_name, version)
435-
attrs: dict[str, str] = {
436-
ATTR_TASK_TYPE: "orchestration",
437-
ATTR_TASK_NAME: sub_orchestration_name,
438-
ATTR_TASK_INSTANCE_ID: instance_id,
439-
}
440-
if version:
441-
attrs[ATTR_TASK_VERSION] = version
442-
443-
tracer = trace.get_tracer(_TRACER_NAME)
444-
span = tracer.start_span(
445-
span_name,
446-
kind=SpanKind.CLIENT,
447-
attributes=attrs,
448-
)
449-
span.set_status(StatusCode.ERROR, error_message)
424+
if is_error:
425+
span.set_status(StatusCode.ERROR, error_message or "")
450426
span.end()
451427

452428

@@ -455,8 +431,14 @@ def emit_timer_span(
455431
instance_id: str,
456432
timer_id: int,
457433
fire_at: datetime,
434+
scheduled_time_ns: Optional[int] = None,
458435
) -> None:
459-
"""Emit an Internal span for a timer (emit-and-close pattern)."""
436+
"""Emit an Internal span for a timer (emit-and-close pattern).
437+
438+
When *scheduled_time_ns* is provided the span start time is backdated
439+
to when the timer was originally created, so the span duration covers
440+
the full wait period.
441+
"""
460442
if not _OTEL_AVAILABLE:
461443
return
462444

@@ -474,6 +456,7 @@ def emit_timer_span(
474456
span_name,
475457
kind=SpanKind.INTERNAL,
476458
attributes=attrs,
459+
start_time=scheduled_time_ns,
477460
)
478461
span.end()
479462

0 commit comments

Comments
 (0)