Skip to content

Commit 998ca94

Browse files
torosentCopilot
andcommitted
Add distributed tracing (OpenTelemetry) support
Add W3C Trace Context propagation throughout the SDK, enabling end-to-end distributed tracing from client to orchestrations, activities, and sub-orchestrations. Core changes: - TracingHelper.java: utility class for trace context capture, extraction, and span management - DurableTaskGrpcClient: refactored to use TracingHelper - TaskOrchestrationExecutor: reads parentTraceContext from ExecutionStartedEvent and propagates to ScheduleTaskAction and CreateSubOrchestrationAction - DurableTaskGrpcWorker: wraps activity and orchestration execution in OTel spans with proper scope management - OrchestrationRunner: adds orchestration span for Azure Functions execution path Tests: - TracingHelperTest: 12 tests covering all utility methods - TaskOrchestrationExecutorTest: 3 new tests verifying trace context propagation to activities and sub-orchestrations Samples: - TracingPattern.java: standalone SDK sample with DTS emulator and Jaeger OTLP exporter - TracingChain.java: Azure Functions sample with chained activities and sub-orchestration - README.md with screenshots showing Jaeger traces and DTS dashboard Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent b446edf commit 998ca94

File tree

15 files changed

+1115
-40
lines changed

15 files changed

+1115
-40
lines changed

client/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ dependencies {
4444
testImplementation(platform('org.junit:junit-bom:5.14.2'))
4545
testImplementation('org.junit.jupiter:junit-jupiter')
4646
testRuntimeOnly('org.junit.platform:junit-platform-launcher')
47+
testImplementation "io.opentelemetry:opentelemetry-sdk:${openTelemetryVersion}"
48+
testImplementation "io.opentelemetry:opentelemetry-sdk-trace:${openTelemetryVersion}"
49+
testImplementation "io.opentelemetry:opentelemetry-sdk-testing:${openTelemetryVersion}"
4750
testImplementation project(':azuremanaged')
4851
testImplementation "com.azure:azure-core:${azureCoreVersion}"
4952
testImplementation "com.azure:azure-identity:${azureIdentityVersion}"

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java

Lines changed: 3 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717
import java.util.concurrent.TimeUnit;
1818
import java.util.concurrent.TimeoutException;
1919
import java.util.logging.Logger;
20-
import java.util.stream.Collectors;
21-
22-
import io.opentelemetry.api.trace.Span;
23-
import io.opentelemetry.api.trace.SpanContext;
2420

2521
/**
2622
* Durable Task client implementation that uses gRPC to connect to a remote "sidecar" process.
@@ -139,35 +135,9 @@ public String scheduleNewOrchestrationInstance(
139135
builder.putAllTags(options.getTags());
140136
}
141137

142-
Span currentSpan = Span.current();
143-
String traceParent = null;
144-
String traceState = null;
145-
146-
if (currentSpan != null && currentSpan.getSpanContext().isValid()) {
147-
SpanContext spanContext = currentSpan.getSpanContext();
148-
149-
// Construct the traceparent according to the W3C Trace Context specification
150-
// https://www.w3.org/TR/trace-context/#traceparent-header
151-
traceParent = String.format("00-%s-%s-%02x",
152-
spanContext.getTraceId(), // 32-character trace ID
153-
spanContext.getSpanId(), // 16-character span ID
154-
spanContext.getTraceFlags().asByte() // Trace flags (i.e. sampled or not)
155-
);
156-
157-
// Get the tracestate
158-
traceState = spanContext.getTraceState().asMap()
159-
.entrySet()
160-
.stream()
161-
.map(entry -> entry.getKey() + "=" + entry.getValue())
162-
.collect(Collectors.joining(","));
163-
}
164-
165-
if (traceParent != null) {
166-
TraceContext traceContext = TraceContext.newBuilder()
167-
.setTraceParent(traceParent)
168-
.setTraceState(traceState != null ? StringValue.of(traceState) : StringValue.getDefaultInstance())
169-
.build();
170-
builder.setParentTraceContext(traceContext); // Set the TraceContext in the CreateInstanceRequest
138+
TraceContext traceContext = TracingHelper.getCurrentTraceContext();
139+
if (traceContext != null) {
140+
builder.setParentTraceContext(traceContext);
171141
}
172142

173143
CreateInstanceRequest request = builder.build();

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
import com.microsoft.durabletask.util.VersionUtils;
1212

1313
import io.grpc.*;
14+
import io.opentelemetry.api.trace.Span;
15+
import io.opentelemetry.api.trace.SpanKind;
16+
import io.opentelemetry.context.Scope;
1417

1518
import java.time.Duration;
1619
import java.util.*;
@@ -173,9 +176,36 @@ public void startAndBlock() {
173176
// TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava
174177
// TODO: Error handling
175178
if (!versioningFailed) {
176-
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
177-
orchestratorRequest.getPastEventsList(),
178-
orchestratorRequest.getNewEventsList());
179+
// Extract trace context from ExecutionStartedEvent in the history
180+
TraceContext orchTraceCtx = Stream.concat(
181+
orchestratorRequest.getPastEventsList().stream(),
182+
orchestratorRequest.getNewEventsList().stream())
183+
.filter(event -> event.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONSTARTED)
184+
.filter(event -> event.getExecutionStarted().hasParentTraceContext())
185+
.map(event -> event.getExecutionStarted().getParentTraceContext())
186+
.findFirst()
187+
.orElse(null);
188+
Map<String, String> orchSpanAttrs = new HashMap<>();
189+
orchSpanAttrs.put("durabletask.task.instance_id", orchestratorRequest.getInstanceId());
190+
Span orchestrationSpan = TracingHelper.startSpan(
191+
"orchestration:" + orchestratorRequest.getInstanceId(),
192+
orchTraceCtx,
193+
SpanKind.INTERNAL,
194+
orchSpanAttrs);
195+
Scope orchestrationScope = orchestrationSpan.makeCurrent();
196+
197+
TaskOrchestratorResult taskOrchestratorResult;
198+
try {
199+
taskOrchestratorResult = taskOrchestrationExecutor.execute(
200+
orchestratorRequest.getPastEventsList(),
201+
orchestratorRequest.getNewEventsList());
202+
} catch (Throwable e) {
203+
TracingHelper.endSpan(orchestrationSpan, e);
204+
orchestrationScope.close();
205+
throw e;
206+
}
207+
orchestrationScope.close();
208+
TracingHelper.endSpan(orchestrationSpan, null);
179209

180210
OrchestratorResponse response = OrchestratorResponse.newBuilder()
181211
.setInstanceId(orchestratorRequest.getInstanceId())
@@ -218,6 +248,21 @@ public void startAndBlock() {
218248
}
219249
} else if (requestType == RequestCase.ACTIVITYREQUEST) {
220250
ActivityRequest activityRequest = workItem.getActivityRequest();
251+
String activityInstanceId = activityRequest.getOrchestrationInstance().getInstanceId();
252+
253+
// Start a tracing span for this activity execution
254+
TraceContext activityTraceCtx = activityRequest.hasParentTraceContext()
255+
? activityRequest.getParentTraceContext() : null;
256+
Map<String, String> spanAttributes = new HashMap<>();
257+
spanAttributes.put("durabletask.task.instance_id", activityInstanceId);
258+
spanAttributes.put("durabletask.task.name", activityRequest.getName());
259+
spanAttributes.put("durabletask.task.task_id", String.valueOf(activityRequest.getTaskId()));
260+
Span activitySpan = TracingHelper.startSpan(
261+
"activity:" + activityRequest.getName(),
262+
activityTraceCtx,
263+
SpanKind.INTERNAL,
264+
spanAttributes);
265+
Scope activityScope = activitySpan.makeCurrent();
221266

222267
// TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava
223268
String output = null;
@@ -228,15 +273,20 @@ public void startAndBlock() {
228273
activityRequest.getInput().getValue(),
229274
activityRequest.getTaskId());
230275
} catch (Throwable e) {
276+
TracingHelper.endSpan(activitySpan, e);
277+
activitySpan = null;
231278
failureDetails = TaskFailureDetails.newBuilder()
232279
.setErrorType(e.getClass().getName())
233280
.setErrorMessage(e.getMessage())
234281
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
235282
.build();
283+
} finally {
284+
activityScope.close();
285+
TracingHelper.endSpan(activitySpan, null);
236286
}
237287

238288
ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder()
239-
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
289+
.setInstanceId(activityInstanceId)
240290
.setTaskId(activityRequest.getTaskId())
241291
.setCompletionToken(workItem.getCompletionToken());
242292

client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,14 @@
88
import com.microsoft.durabletask.DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy;
99
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService;
1010

11+
import io.opentelemetry.api.trace.Span;
12+
import io.opentelemetry.api.trace.SpanKind;
13+
import io.opentelemetry.context.Scope;
14+
1115
import java.time.Duration;
1216
import java.util.Base64;
1317
import java.util.HashMap;
18+
import java.util.Map;
1419
import java.util.logging.Logger;
1520

1621
/**
@@ -146,10 +151,37 @@ public TaskOrchestration create() {
146151
logger,
147152
versioningOptions);
148153

154+
// Extract trace context from ExecutionStartedEvent in the history
155+
OrchestratorService.TraceContext orchTraceCtx = java.util.stream.Stream.concat(
156+
orchestratorRequest.getPastEventsList().stream(),
157+
orchestratorRequest.getNewEventsList().stream())
158+
.filter(event -> event.getEventTypeCase() == OrchestratorService.HistoryEvent.EventTypeCase.EXECUTIONSTARTED)
159+
.filter(event -> event.getExecutionStarted().hasParentTraceContext())
160+
.map(event -> event.getExecutionStarted().getParentTraceContext())
161+
.findFirst()
162+
.orElse(null);
163+
Map<String, String> orchSpanAttrs = new HashMap<>();
164+
orchSpanAttrs.put("durabletask.task.instance_id", orchestratorRequest.getInstanceId());
165+
Span orchestrationSpan = TracingHelper.startSpan(
166+
"orchestration:" + orchestratorRequest.getInstanceId(),
167+
orchTraceCtx,
168+
SpanKind.INTERNAL,
169+
orchSpanAttrs);
170+
Scope orchestrationScope = orchestrationSpan.makeCurrent();
171+
149172
// TODO: Error handling
150-
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
151-
orchestratorRequest.getPastEventsList(),
152-
orchestratorRequest.getNewEventsList());
173+
TaskOrchestratorResult taskOrchestratorResult;
174+
try {
175+
taskOrchestratorResult = taskOrchestrationExecutor.execute(
176+
orchestratorRequest.getPastEventsList(),
177+
orchestratorRequest.getNewEventsList());
178+
} catch (Throwable e) {
179+
TracingHelper.endSpan(orchestrationSpan, e);
180+
orchestrationScope.close();
181+
throw e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException(e);
182+
}
183+
orchestrationScope.close();
184+
TracingHelper.endSpan(orchestrationSpan, null);
153185

154186
OrchestratorService.OrchestratorResponse response = OrchestratorService.OrchestratorResponse.newBuilder()
155187
.setInstanceId(orchestratorRequest.getInstanceId())

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ private class ContextImplTask implements TaskOrchestrationContext {
107107
private Object continuedAsNewInput;
108108
private boolean preserveUnprocessedEvents;
109109
private Object customStatus;
110+
private TraceContext parentTraceContext;
110111

111112
public ContextImplTask(List<HistoryEvent> pastEvents, List<HistoryEvent> newEvents) {
112113
this.historyEventPlayer = new OrchestrationHistoryIterator(pastEvents, newEvents);
@@ -297,6 +298,9 @@ public <V> Task<V> callActivity(
297298
if (serializedInput != null) {
298299
scheduleTaskBuilder.setInput(StringValue.of(serializedInput));
299300
}
301+
if (this.parentTraceContext != null) {
302+
scheduleTaskBuilder.setParentTraceContext(this.parentTraceContext);
303+
}
300304

301305
TaskFactory<V> taskFactory = () -> {
302306
int id = this.sequenceNumber++;
@@ -407,6 +411,10 @@ public <V> Task<V> callSubOrchestrator(
407411
}
408412
createSubOrchestrationActionBuilder.setInstanceId(instanceId);
409413

414+
if (this.parentTraceContext != null) {
415+
createSubOrchestrationActionBuilder.setParentTraceContext(this.parentTraceContext);
416+
}
417+
410418
if (options instanceof NewSubOrchestrationInstanceOptions && ((NewSubOrchestrationInstanceOptions)options).getVersion() != null) {
411419
NewSubOrchestrationInstanceOptions subOrchestrationOptions = (NewSubOrchestrationInstanceOptions) options;
412420
createSubOrchestrationActionBuilder.setVersion(StringValue.of(subOrchestrationOptions.getVersion()));
@@ -877,6 +885,9 @@ private void processEvent(HistoryEvent e) {
877885
this.setInput(input);
878886
String version = startedEvent.getVersion().getValue();
879887
this.setVersion(version);
888+
if (startedEvent.hasParentTraceContext()) {
889+
this.parentTraceContext = startedEvent.getParentTraceContext();
890+
}
880891
TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name);
881892
if (factory == null) {
882893
// Try getting the default orchestrator

0 commit comments

Comments
 (0)