Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
998ca94
Add distributed tracing (OpenTelemetry) support
torosent Mar 3, 2026
1622adc
Add CHANGELOG entry for distributed tracing
torosent Mar 3, 2026
823d17f
Validate SpanContext in extractTraceContext
torosent Mar 3, 2026
a300569
Align tracing with .NET SDK schema and conventions
torosent Mar 3, 2026
9174904
Update screenshots and README with .NET-aligned span attributes
torosent Mar 3, 2026
cbfa05b
Update sample to FanOutFanIn pattern with fresh screenshots
torosent Mar 3, 2026
9691eb1
Fix exception handling and stale trace context in distributed tracing…
Copilot Mar 3, 2026
183cb07
Add paired Client+Server spans matching .NET SDK trace structure
torosent Mar 3, 2026
ccdefe9
Fix span lifecycle issues found in code review
torosent Mar 3, 2026
3539919
Add timer and event spans matching .NET SDK
torosent Mar 3, 2026
33d0808
Fix timer span parent context and update screenshots
torosent Mar 3, 2026
6d53eb9
Address PR review comments
torosent Mar 3, 2026
a2a22a5
Remove unused local variable flagged by CodeQL
torosent Mar 3, 2026
ad7937e
fix: orchestration span covers full lifecycle using ExecutionStartedE…
torosent Mar 3, 2026
11ea4f0
Potential fix for pull request finding 'Useless null check'
torosent Mar 3, 2026
0ede985
fix: span durations for timers, client spans, and trace hierarchy
torosent Mar 3, 2026
57d32cc
fix: remove unused subInstanceId variable (CodeQL)
torosent Mar 3, 2026
8bc2fb1
docs: update Jaeger and DTS dashboard screenshots
torosent Mar 3, 2026
a7ecd60
fix: update Azure Functions sample to FanOutFanIn pattern
torosent Mar 3, 2026
35830d8
refactor: simplify tracing code — remove dead code, extract helpers
torosent Mar 4, 2026
1834576
fix: proper trace hierarchy — orchestration span as parent of child s…
torosent Mar 4, 2026
78afdab
Potential fix for pull request finding 'Useless null check'
torosent Mar 4, 2026
1d427b0
fix: create_orchestration span kind to PRODUCER, error status on fail…
torosent Mar 4, 2026
f169ab4
fix: improve orchestration span handling and error status reporting
torosent Mar 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## Unreleased

* Adding rewind client API ([#253](https://github.com/microsoft/durabletask-java/pull/253)). Note: orchestration processing for rewind is supported with Azure Functions but not with the standalone `GrpcDurableTaskWorker`.
* Add distributed tracing (OpenTelemetry) support with W3C Trace Context propagation ([#266](https://github.com/microsoft/durabletask-java/pull/266))

## v1.7.0
* Add descriptive error when orchestration type is not registered ([#261](https://github.com/microsoft/durabletask-java/pull/261))
Expand Down
3 changes: 3 additions & 0 deletions client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ dependencies {
testImplementation(platform('org.junit:junit-bom:5.14.2'))
testImplementation('org.junit.jupiter:junit-jupiter')
testRuntimeOnly('org.junit.platform:junit-platform-launcher')
testImplementation "io.opentelemetry:opentelemetry-sdk:${openTelemetryVersion}"
testImplementation "io.opentelemetry:opentelemetry-sdk-trace:${openTelemetryVersion}"
testImplementation "io.opentelemetry:opentelemetry-sdk-testing:${openTelemetryVersion}"
testImplementation project(':azuremanaged')
testImplementation "com.azure:azure-core:${azureCoreVersion}"
testImplementation "com.azure:azure-identity:${azureIdentityVersion}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*;

import io.grpc.*;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Scope;

import javax.annotation.Nullable;
import java.time.Duration;
Expand All @@ -17,10 +20,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;

/**
* Durable Task client implementation that uses gRPC to connect to a remote "sidecar" process.
Expand Down Expand Up @@ -139,47 +138,40 @@ public String scheduleNewOrchestrationInstance(
builder.putAllTags(options.getTags());
}

Span currentSpan = Span.current();
String traceParent = null;
String traceState = null;

if (currentSpan != null && currentSpan.getSpanContext().isValid()) {
SpanContext spanContext = currentSpan.getSpanContext();

// Construct the traceparent according to the W3C Trace Context specification
// https://www.w3.org/TR/trace-context/#traceparent-header
traceParent = String.format("00-%s-%s-%02x",
spanContext.getTraceId(), // 32-character trace ID
spanContext.getSpanId(), // 16-character span ID
spanContext.getTraceFlags().asByte() // Trace flags (i.e. sampled or not)
);

// Get the tracestate
traceState = spanContext.getTraceState().asMap()
.entrySet()
.stream()
.map(entry -> entry.getKey() + "=" + entry.getValue())
.collect(Collectors.joining(","));
}
// Create a create_orchestration span (matching .NET SDK pattern)
Map<String, String> spanAttrs = new HashMap<>();
spanAttrs.put(TracingHelper.ATTR_TYPE, TracingHelper.TYPE_CREATE_ORCHESTRATION);
spanAttrs.put(TracingHelper.ATTR_TASK_NAME, orchestratorName);
spanAttrs.put(TracingHelper.ATTR_INSTANCE_ID, instanceId);
Span createSpan = TracingHelper.startSpan(
TracingHelper.TYPE_CREATE_ORCHESTRATION + ":" + orchestratorName,
null, SpanKind.PRODUCER, spanAttrs);
Scope createScope = createSpan.makeCurrent();

if (traceParent != null) {
TraceContext traceContext = TraceContext.newBuilder()
.setTraceParent(traceParent)
.setTraceState(traceState != null ? StringValue.of(traceState) : StringValue.getDefaultInstance())
.build();
builder.setParentTraceContext(traceContext); // Set the TraceContext in the CreateInstanceRequest
}
try {
// Capture trace context from the create_orchestration span
TraceContext traceContext = TracingHelper.getCurrentTraceContext();
if (traceContext != null) {
builder.setParentTraceContext(traceContext);
}

CreateInstanceRequest request = builder.build();
CreateInstanceResponse response = this.sidecarClient.startInstance(request);
return response.getInstanceId();
CreateInstanceRequest request = builder.build();
CreateInstanceResponse response = this.sidecarClient.startInstance(request);
return response.getInstanceId();
} finally {
createScope.close();
createSpan.end();
}
}

@Override
public void raiseEvent(String instanceId, String eventName, Object eventPayload) {
Helpers.throwIfArgumentNull(instanceId, "instanceId");
Helpers.throwIfArgumentNull(eventName, "eventName");

// Emit an event span StartActivityForNewEventRaisedFromClient
TracingHelper.emitEventRaisedFromClientSpan(eventName, instanceId);

RaiseEventRequest.Builder builder = RaiseEventRequest.newBuilder()
.setInstanceId(instanceId)
.setName(eventName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,13 @@
import com.microsoft.durabletask.util.VersionUtils;

import io.grpc.*;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;

import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
Expand Down Expand Up @@ -173,9 +178,78 @@ public void startAndBlock() {
// TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava
// TODO: Error handling
if (!versioningFailed) {
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
orchestratorRequest.getPastEventsList(),
orchestratorRequest.getNewEventsList());
// Extract ExecutionStartedEvent and its timestamp for trace context
HistoryEvent startedHistoryEvent = Stream.concat(
orchestratorRequest.getPastEventsList().stream(),
orchestratorRequest.getNewEventsList().stream())
.filter(event -> event.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONSTARTED)
.findFirst()
.orElse(null);

ExecutionStartedEvent startedEvent = startedHistoryEvent != null
? startedHistoryEvent.getExecutionStarted() : null;

TraceContext orchTraceCtx = (startedEvent != null && startedEvent.hasParentTraceContext())
? startedEvent.getParentTraceContext() : null;
String orchName = startedEvent != null ? startedEvent.getName() : "";

// Pass parentTraceContext to executor so child spans
// (activities, timers) reference the correct trace.
TaskOrchestratorResult taskOrchestratorResult;
try {
taskOrchestratorResult = taskOrchestrationExecutor.execute(
orchestratorRequest.getPastEventsList(),
orchestratorRequest.getNewEventsList(),
orchTraceCtx);
} catch (Throwable e) {
if (e instanceof Error) {
throw (Error) e;
}
throw new RuntimeException(e);
}

// Emit a single orchestration span only on the completion dispatch.
// Uses ExecutionStartedEvent timestamp as start time for full lifecycle.
// Java OTel doesn't support SetSpanId() like .NET, so we emit one
// span on completion rather than merging across dispatches.
boolean isCompleting = taskOrchestratorResult.getActions().stream()
.anyMatch(a -> a.getOrchestratorActionTypeCase() == OrchestratorAction.OrchestratorActionTypeCase.COMPLETEORCHESTRATION
|| a.getOrchestratorActionTypeCase() == OrchestratorAction.OrchestratorActionTypeCase.TERMINATEORCHESTRATION);

if (isCompleting && orchTraceCtx != null) {
Map<String, String> orchSpanAttrs = new HashMap<>();
orchSpanAttrs.put(TracingHelper.ATTR_TYPE, TracingHelper.TYPE_ORCHESTRATION);
orchSpanAttrs.put(TracingHelper.ATTR_TASK_NAME, orchName);
orchSpanAttrs.put(TracingHelper.ATTR_INSTANCE_ID, orchestratorRequest.getInstanceId());

Instant spanStartTime = null;
if (startedHistoryEvent != null && startedHistoryEvent.hasTimestamp()) {
spanStartTime = DataConverter.getInstantFromTimestamp(
startedHistoryEvent.getTimestamp());
}

Span orchestrationSpan = TracingHelper.startSpanWithStartTime(
TracingHelper.TYPE_ORCHESTRATION + ":" + orchName,
orchTraceCtx,
SpanKind.SERVER,
orchSpanAttrs,
spanStartTime);

// Set error status if orchestration failed
for (OrchestratorAction action : taskOrchestratorResult.getActions()) {
if (action.getOrchestratorActionTypeCase() == OrchestratorAction.OrchestratorActionTypeCase.COMPLETEORCHESTRATION) {
CompleteOrchestrationAction complete = action.getCompleteOrchestration();
if (complete.getOrchestrationStatus() == OrchestrationStatus.ORCHESTRATION_STATUS_FAILED) {
String errorMsg = complete.hasFailureDetails()
? complete.getFailureDetails().getErrorMessage()
: "Orchestration failed";
orchestrationSpan.setStatus(StatusCode.ERROR, errorMsg);
}
break;
}
}
orchestrationSpan.end();
}

OrchestratorResponse response = OrchestratorResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
Expand Down Expand Up @@ -218,25 +292,46 @@ public void startAndBlock() {
}
} else if (requestType == RequestCase.ACTIVITYREQUEST) {
ActivityRequest activityRequest = workItem.getActivityRequest();
String activityInstanceId = activityRequest.getOrchestrationInstance().getInstanceId();

// Start a tracing span for this activity execution
TraceContext activityTraceCtx = activityRequest.hasParentTraceContext()
? activityRequest.getParentTraceContext() : null;
Map<String, String> spanAttributes = new HashMap<>();
spanAttributes.put(TracingHelper.ATTR_TYPE, TracingHelper.TYPE_ACTIVITY);
spanAttributes.put(TracingHelper.ATTR_TASK_NAME, activityRequest.getName());
spanAttributes.put(TracingHelper.ATTR_INSTANCE_ID, activityInstanceId);
spanAttributes.put(TracingHelper.ATTR_TASK_ID, String.valueOf(activityRequest.getTaskId()));
Span activitySpan = TracingHelper.startSpan(
TracingHelper.TYPE_ACTIVITY + ":" + activityRequest.getName(),
activityTraceCtx,
SpanKind.SERVER,
spanAttributes);
Scope activityScope = activitySpan.makeCurrent();

// TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava
String output = null;
TaskFailureDetails failureDetails = null;
Throwable activityError = null;
try {
output = taskActivityExecutor.execute(
activityRequest.getName(),
activityRequest.getInput().getValue(),
activityRequest.getTaskId());
} catch (Throwable e) {
activityError = e;
failureDetails = TaskFailureDetails.newBuilder()
.setErrorType(e.getClass().getName())
.setErrorMessage(e.getMessage())
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
.build();
} finally {
activityScope.close();
TracingHelper.endSpan(activitySpan, activityError);
}

ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder()
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
.setInstanceId(activityInstanceId)
.setTaskId(activityRequest.getTaskId())
.setCompletionToken(workItem.getCompletionToken());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
import com.microsoft.durabletask.DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy;
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Scope;

import java.time.Duration;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

/**
Expand Down Expand Up @@ -146,10 +151,45 @@ public TaskOrchestration create() {
logger,
versioningOptions);

// Extract ExecutionStartedEvent for trace context and orchestration name
OrchestratorService.ExecutionStartedEvent startedEvent = java.util.stream.Stream.concat(
orchestratorRequest.getPastEventsList().stream(),
orchestratorRequest.getNewEventsList().stream())
.filter(event -> event.getEventTypeCase() == OrchestratorService.HistoryEvent.EventTypeCase.EXECUTIONSTARTED)
.map(OrchestratorService.HistoryEvent::getExecutionStarted)
.findFirst()
.orElse(null);

OrchestratorService.TraceContext orchTraceCtx = (startedEvent != null && startedEvent.hasParentTraceContext())
? startedEvent.getParentTraceContext() : null;
String orchName = startedEvent != null ? startedEvent.getName() : "";

Map<String, String> orchSpanAttrs = new HashMap<>();
orchSpanAttrs.put(TracingHelper.ATTR_TYPE, TracingHelper.TYPE_ORCHESTRATION);
orchSpanAttrs.put(TracingHelper.ATTR_TASK_NAME, orchName);
orchSpanAttrs.put(TracingHelper.ATTR_INSTANCE_ID, orchestratorRequest.getInstanceId());
Span orchestrationSpan = TracingHelper.startSpan(
TracingHelper.TYPE_ORCHESTRATION + ":" + orchName,
orchTraceCtx,
SpanKind.SERVER,
orchSpanAttrs);
Scope orchestrationScope = orchestrationSpan.makeCurrent();

// TODO: Error handling
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
orchestratorRequest.getPastEventsList(),
orchestratorRequest.getNewEventsList());
TaskOrchestratorResult taskOrchestratorResult;
Throwable orchestrationError = null;
try {
taskOrchestratorResult = taskOrchestrationExecutor.execute(
orchestratorRequest.getPastEventsList(),
orchestratorRequest.getNewEventsList(),
TracingHelper.getCurrentTraceContext(orchestrationSpan));
} catch (Exception e) {
orchestrationError = e;
throw e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException(e);
} finally {
if (orchestrationScope != null) orchestrationScope.close();
TracingHelper.endSpan(orchestrationSpan, orchestrationError);
}

OrchestratorService.OrchestratorResponse response = OrchestratorService.OrchestratorResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
Expand Down
Loading
Loading