diff --git a/CHANGELOG.md b/CHANGELOG.md index 9751d3ac..f3551d3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## Unreleased * Adding rewind client API ([#253](https://github.com/microsoft/durabletask-java/pull/253)). Note: orchestration processing for rewind is supported with Azure Functions but not with the standalone `GrpcDurableTaskWorker`. +* Add distributed tracing (OpenTelemetry) support with W3C Trace Context propagation ([#266](https://github.com/microsoft/durabletask-java/pull/266)) ## v1.7.0 * Add descriptive error when orchestration type is not registered ([#261](https://github.com/microsoft/durabletask-java/pull/261)) diff --git a/client/build.gradle b/client/build.gradle index 9ba98dce..350ccf5b 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -44,6 +44,9 @@ dependencies { testImplementation(platform('org.junit:junit-bom:5.14.2')) testImplementation('org.junit.jupiter:junit-jupiter') testRuntimeOnly('org.junit.platform:junit-platform-launcher') + testImplementation "io.opentelemetry:opentelemetry-sdk:${openTelemetryVersion}" + testImplementation "io.opentelemetry:opentelemetry-sdk-trace:${openTelemetryVersion}" + testImplementation "io.opentelemetry:opentelemetry-sdk-testing:${openTelemetryVersion}" testImplementation project(':azuremanaged') testImplementation "com.azure:azure-core:${azureCoreVersion}" testImplementation "com.azure:azure-identity:${azureIdentityVersion}" diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 897c93f0..b64f675a 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -9,6 +9,9 @@ import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*; import io.grpc.*; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Scope; import javax.annotation.Nullable; import java.time.Duration; @@ -17,10 +20,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.logging.Logger; -import java.util.stream.Collectors; - -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanContext; /** * Durable Task client implementation that uses gRPC to connect to a remote "sidecar" process. @@ -139,40 +138,30 @@ public String scheduleNewOrchestrationInstance( builder.putAllTags(options.getTags()); } - Span currentSpan = Span.current(); - String traceParent = null; - String traceState = null; - - if (currentSpan != null && currentSpan.getSpanContext().isValid()) { - SpanContext spanContext = currentSpan.getSpanContext(); - - // Construct the traceparent according to the W3C Trace Context specification - // https://www.w3.org/TR/trace-context/#traceparent-header - traceParent = String.format("00-%s-%s-%02x", - spanContext.getTraceId(), // 32-character trace ID - spanContext.getSpanId(), // 16-character span ID - spanContext.getTraceFlags().asByte() // Trace flags (i.e. sampled or not) - ); - - // Get the tracestate - traceState = spanContext.getTraceState().asMap() - .entrySet() - .stream() - .map(entry -> entry.getKey() + "=" + entry.getValue()) - .collect(Collectors.joining(",")); - } + // Create a create_orchestration span (matching .NET SDK pattern) + Map spanAttrs = new HashMap<>(); + spanAttrs.put(TracingHelper.ATTR_TYPE, TracingHelper.TYPE_CREATE_ORCHESTRATION); + spanAttrs.put(TracingHelper.ATTR_TASK_NAME, orchestratorName); + spanAttrs.put(TracingHelper.ATTR_INSTANCE_ID, instanceId); + Span createSpan = TracingHelper.startSpan( + TracingHelper.TYPE_CREATE_ORCHESTRATION + ":" + orchestratorName, + null, SpanKind.PRODUCER, spanAttrs); + Scope createScope = createSpan.makeCurrent(); - if (traceParent != null) { - TraceContext traceContext = TraceContext.newBuilder() - .setTraceParent(traceParent) - .setTraceState(traceState != null ? StringValue.of(traceState) : StringValue.getDefaultInstance()) - .build(); - builder.setParentTraceContext(traceContext); // Set the TraceContext in the CreateInstanceRequest - } + try { + // Capture trace context from the create_orchestration span + TraceContext traceContext = TracingHelper.getCurrentTraceContext(); + if (traceContext != null) { + builder.setParentTraceContext(traceContext); + } - CreateInstanceRequest request = builder.build(); - CreateInstanceResponse response = this.sidecarClient.startInstance(request); - return response.getInstanceId(); + CreateInstanceRequest request = builder.build(); + CreateInstanceResponse response = this.sidecarClient.startInstance(request); + return response.getInstanceId(); + } finally { + createScope.close(); + createSpan.end(); + } } @Override @@ -180,6 +169,9 @@ public void raiseEvent(String instanceId, String eventName, Object eventPayload) Helpers.throwIfArgumentNull(instanceId, "instanceId"); Helpers.throwIfArgumentNull(eventName, "eventName"); + // Emit an event span StartActivityForNewEventRaisedFromClient + TracingHelper.emitEventRaisedFromClientSpan(eventName, instanceId); + RaiseEventRequest.Builder builder = RaiseEventRequest.newBuilder() .setInstanceId(instanceId) .setName(eventName); diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 552cf579..cae734ff 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -11,8 +11,13 @@ import com.microsoft.durabletask.util.VersionUtils; import io.grpc.*; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import java.time.Duration; +import java.time.Instant; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -173,9 +178,78 @@ public void startAndBlock() { // TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava // TODO: Error handling if (!versioningFailed) { - TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( - orchestratorRequest.getPastEventsList(), - orchestratorRequest.getNewEventsList()); + // Extract ExecutionStartedEvent and its timestamp for trace context + HistoryEvent startedHistoryEvent = Stream.concat( + orchestratorRequest.getPastEventsList().stream(), + orchestratorRequest.getNewEventsList().stream()) + .filter(event -> event.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONSTARTED) + .findFirst() + .orElse(null); + + ExecutionStartedEvent startedEvent = startedHistoryEvent != null + ? startedHistoryEvent.getExecutionStarted() : null; + + TraceContext orchTraceCtx = (startedEvent != null && startedEvent.hasParentTraceContext()) + ? startedEvent.getParentTraceContext() : null; + String orchName = startedEvent != null ? startedEvent.getName() : ""; + + // Pass parentTraceContext to executor so child spans + // (activities, timers) reference the correct trace. + TaskOrchestratorResult taskOrchestratorResult; + try { + taskOrchestratorResult = taskOrchestrationExecutor.execute( + orchestratorRequest.getPastEventsList(), + orchestratorRequest.getNewEventsList(), + orchTraceCtx); + } catch (Throwable e) { + if (e instanceof Error) { + throw (Error) e; + } + throw new RuntimeException(e); + } + + // Emit a single orchestration span only on the completion dispatch. + // Uses ExecutionStartedEvent timestamp as start time for full lifecycle. + // Java OTel doesn't support SetSpanId() like .NET, so we emit one + // span on completion rather than merging across dispatches. + boolean isCompleting = taskOrchestratorResult.getActions().stream() + .anyMatch(a -> a.getOrchestratorActionTypeCase() == OrchestratorAction.OrchestratorActionTypeCase.COMPLETEORCHESTRATION + || a.getOrchestratorActionTypeCase() == OrchestratorAction.OrchestratorActionTypeCase.TERMINATEORCHESTRATION); + + if (isCompleting && orchTraceCtx != null) { + Map orchSpanAttrs = new HashMap<>(); + orchSpanAttrs.put(TracingHelper.ATTR_TYPE, TracingHelper.TYPE_ORCHESTRATION); + orchSpanAttrs.put(TracingHelper.ATTR_TASK_NAME, orchName); + orchSpanAttrs.put(TracingHelper.ATTR_INSTANCE_ID, orchestratorRequest.getInstanceId()); + + Instant spanStartTime = null; + if (startedHistoryEvent != null && startedHistoryEvent.hasTimestamp()) { + spanStartTime = DataConverter.getInstantFromTimestamp( + startedHistoryEvent.getTimestamp()); + } + + Span orchestrationSpan = TracingHelper.startSpanWithStartTime( + TracingHelper.TYPE_ORCHESTRATION + ":" + orchName, + orchTraceCtx, + SpanKind.SERVER, + orchSpanAttrs, + spanStartTime); + + // Set error status if orchestration failed + for (OrchestratorAction action : taskOrchestratorResult.getActions()) { + if (action.getOrchestratorActionTypeCase() == OrchestratorAction.OrchestratorActionTypeCase.COMPLETEORCHESTRATION) { + CompleteOrchestrationAction complete = action.getCompleteOrchestration(); + if (complete.getOrchestrationStatus() == OrchestrationStatus.ORCHESTRATION_STATUS_FAILED) { + String errorMsg = complete.hasFailureDetails() + ? complete.getFailureDetails().getErrorMessage() + : "Orchestration failed"; + orchestrationSpan.setStatus(StatusCode.ERROR, errorMsg); + } + break; + } + } + orchestrationSpan.end(); + } OrchestratorResponse response = OrchestratorResponse.newBuilder() .setInstanceId(orchestratorRequest.getInstanceId()) @@ -218,25 +292,46 @@ public void startAndBlock() { } } else if (requestType == RequestCase.ACTIVITYREQUEST) { ActivityRequest activityRequest = workItem.getActivityRequest(); + String activityInstanceId = activityRequest.getOrchestrationInstance().getInstanceId(); + + // Start a tracing span for this activity execution + TraceContext activityTraceCtx = activityRequest.hasParentTraceContext() + ? activityRequest.getParentTraceContext() : null; + Map spanAttributes = new HashMap<>(); + spanAttributes.put(TracingHelper.ATTR_TYPE, TracingHelper.TYPE_ACTIVITY); + spanAttributes.put(TracingHelper.ATTR_TASK_NAME, activityRequest.getName()); + spanAttributes.put(TracingHelper.ATTR_INSTANCE_ID, activityInstanceId); + spanAttributes.put(TracingHelper.ATTR_TASK_ID, String.valueOf(activityRequest.getTaskId())); + Span activitySpan = TracingHelper.startSpan( + TracingHelper.TYPE_ACTIVITY + ":" + activityRequest.getName(), + activityTraceCtx, + SpanKind.SERVER, + spanAttributes); + Scope activityScope = activitySpan.makeCurrent(); // TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava String output = null; TaskFailureDetails failureDetails = null; + Throwable activityError = null; try { output = taskActivityExecutor.execute( activityRequest.getName(), activityRequest.getInput().getValue(), activityRequest.getTaskId()); } catch (Throwable e) { + activityError = e; failureDetails = TaskFailureDetails.newBuilder() .setErrorType(e.getClass().getName()) .setErrorMessage(e.getMessage()) .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e))) .build(); + } finally { + activityScope.close(); + TracingHelper.endSpan(activitySpan, activityError); } ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder() - .setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId()) + .setInstanceId(activityInstanceId) .setTaskId(activityRequest.getTaskId()) .setCompletionToken(workItem.getCompletionToken()); diff --git a/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java b/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java index b85f24a7..e5819b23 100644 --- a/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java +++ b/client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java @@ -8,9 +8,14 @@ import com.microsoft.durabletask.DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy; import com.microsoft.durabletask.implementation.protobuf.OrchestratorService; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Scope; + import java.time.Duration; import java.util.Base64; import java.util.HashMap; +import java.util.Map; import java.util.logging.Logger; /** @@ -146,10 +151,45 @@ public TaskOrchestration create() { logger, versioningOptions); + // Extract ExecutionStartedEvent for trace context and orchestration name + OrchestratorService.ExecutionStartedEvent startedEvent = java.util.stream.Stream.concat( + orchestratorRequest.getPastEventsList().stream(), + orchestratorRequest.getNewEventsList().stream()) + .filter(event -> event.getEventTypeCase() == OrchestratorService.HistoryEvent.EventTypeCase.EXECUTIONSTARTED) + .map(OrchestratorService.HistoryEvent::getExecutionStarted) + .findFirst() + .orElse(null); + + OrchestratorService.TraceContext orchTraceCtx = (startedEvent != null && startedEvent.hasParentTraceContext()) + ? startedEvent.getParentTraceContext() : null; + String orchName = startedEvent != null ? startedEvent.getName() : ""; + + Map orchSpanAttrs = new HashMap<>(); + orchSpanAttrs.put(TracingHelper.ATTR_TYPE, TracingHelper.TYPE_ORCHESTRATION); + orchSpanAttrs.put(TracingHelper.ATTR_TASK_NAME, orchName); + orchSpanAttrs.put(TracingHelper.ATTR_INSTANCE_ID, orchestratorRequest.getInstanceId()); + Span orchestrationSpan = TracingHelper.startSpan( + TracingHelper.TYPE_ORCHESTRATION + ":" + orchName, + orchTraceCtx, + SpanKind.SERVER, + orchSpanAttrs); + Scope orchestrationScope = orchestrationSpan.makeCurrent(); + // TODO: Error handling - TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( - orchestratorRequest.getPastEventsList(), - orchestratorRequest.getNewEventsList()); + TaskOrchestratorResult taskOrchestratorResult; + Throwable orchestrationError = null; + try { + taskOrchestratorResult = taskOrchestrationExecutor.execute( + orchestratorRequest.getPastEventsList(), + orchestratorRequest.getNewEventsList(), + TracingHelper.getCurrentTraceContext(orchestrationSpan)); + } catch (Exception e) { + orchestrationError = e; + throw e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException(e); + } finally { + if (orchestrationScope != null) orchestrationScope.close(); + TracingHelper.endSpan(orchestrationSpan, orchestrationError); + } OrchestratorService.OrchestratorResponse response = OrchestratorService.OrchestratorResponse.newBuilder() .setInstanceId(orchestratorRequest.getInstanceId()) diff --git a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java index 9c70db02..6773cf2c 100644 --- a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java @@ -45,8 +45,11 @@ public TaskOrchestrationExecutor( this.versioningOptions = versioningOptions; } - public TaskOrchestratorResult execute(List pastEvents, List newEvents) { - ContextImplTask context = new ContextImplTask(pastEvents, newEvents); + public TaskOrchestratorResult execute( + List pastEvents, + List newEvents, + @Nullable TraceContext orchestrationSpanContext) { + ContextImplTask context = new ContextImplTask(pastEvents, newEvents, orchestrationSpanContext); if (this.versioningOptions != null && this.versioningOptions.getDefaultVersion() != null) { // Set the default version for the orchestrator @@ -107,9 +110,20 @@ private class ContextImplTask implements TaskOrchestrationContext { private Object continuedAsNewInput; private boolean preserveUnprocessedEvents; private Object customStatus; + private TraceContext parentTraceContext; - public ContextImplTask(List pastEvents, List newEvents) { + // Stores scheduling metadata (timestamp, name, parentTraceContext) for retroactive client spans + private final HashMap scheduledTaskInfoMap = new HashMap<>(); + // Stores timer creation timestamps for timer spans with duration + private final HashMap timerCreationTimes = new HashMap<>(); + + // Orchestration span context for proper parent-child relationships in traces + private TraceContext orchestrationSpanContext; + + public ContextImplTask(List pastEvents, List newEvents, + @Nullable TraceContext orchestrationSpanContext) { this.historyEventPlayer = new OrchestrationHistoryIterator(pastEvents, newEvents); + this.orchestrationSpanContext = orchestrationSpanContext; } @Override @@ -297,9 +311,18 @@ public Task callActivity( if (serializedInput != null) { scheduleTaskBuilder.setInput(StringValue.of(serializedInput)); } - TaskFactory taskFactory = () -> { int id = this.sequenceNumber++; + + // Propagate the orchestration span's context so the Server span on the worker + // is a child of the orchestration span. Falls back to parentTraceContext if no + // orchestration span context is available. + TraceContext propagatedCtx = this.orchestrationSpanContext != null + ? this.orchestrationSpanContext : this.parentTraceContext; + if (propagatedCtx != null) { + scheduleTaskBuilder.setParentTraceContext(propagatedCtx); + } + this.pendingActions.put(id, OrchestratorAction.newBuilder() .setId(id) .setScheduleTask(scheduleTaskBuilder) @@ -378,6 +401,9 @@ public void sendEvent(String instanceId, String eventName, Object eventData) { eventName, id, serializedEventData != null ? serializedEventData : "(null)")); + + // Emit an event span StartTraceActivityForEventRaisedFromWorker + TracingHelper.emitEventRaisedFromWorkerSpan(eventName, this.instanceId, instanceId); } } @@ -417,6 +443,15 @@ public Task callSubOrchestrator( TaskFactory taskFactory = () -> { int id = this.sequenceNumber++; + + // Propagate the orchestration span's context so the Server span on the worker + // is a child of the orchestration span. + TraceContext propagatedCtx = this.orchestrationSpanContext != null + ? this.orchestrationSpanContext : this.parentTraceContext; + if (propagatedCtx != null) { + createSubOrchestrationActionBuilder.setParentTraceContext(propagatedCtx); + } + this.pendingActions.put(id, OrchestratorAction.newBuilder() .setId(id) .setCreateSubOrchestration(createSubOrchestrationActionBuilder) @@ -510,6 +545,8 @@ private void handleTaskScheduled(HistoryEvent e) { TaskScheduledEvent taskScheduled = e.getTaskScheduled(); + storeSchedulingMetadata(taskId, taskScheduled.getName(), TracingHelper.TYPE_ACTIVITY, e); + // The history shows that this orchestrator created a durable task in a previous execution. // We can therefore remove it from the map of pending actions. If we can't find the pending // action, then we assume a non-deterministic code violation in the orchestrator. @@ -545,6 +582,9 @@ private void handleTaskCompleted(HistoryEvent e) { taskId, rawResult != null ? rawResult : "(null)")); + // Emit a retroactive Client span covering scheduling-to-completion duration. + // Matches .NET SDK's EmitTraceActivityForTaskCompleted pattern. + emitClientSpanIfTracked(taskId); } CompletableTask task = record.getTask(); try { @@ -568,6 +608,9 @@ private void handleTaskFailed(HistoryEvent e) { if (!this.isReplaying) { // TODO: Log task failure, including the number of bytes in the result + + // Emit a retroactive Client span covering scheduling-to-failure duration. + emitClientSpanIfTracked(taskId); } CompletableTask task = record.getTask(); @@ -672,6 +715,11 @@ private void handleTimerCreated(HistoryEvent e) { TimerCreatedEvent timerCreatedEvent = e.getTimerCreated(); + // Store the timer creation timestamp for the timer span duration + if (e.hasTimestamp()) { + this.timerCreationTimes.put(timerEventId, DataConverter.getInstantFromTimestamp(e.getTimestamp())); + } + // The history shows that this orchestrator created a durable timer in a previous execution. // We can therefore remove it from the map of pending actions. If we can't find the pending // action, then we assume a non-deterministic code violation in the orchestrator. @@ -695,7 +743,20 @@ public void handleTimerFired(HistoryEvent e) { } if (!this.isReplaying) { - // TODO: Log timer fired, including the scheduled fire-time + // Emit a timer span with duration from creation to firing time. + // Matches .NET SDK's EmitTraceActivityForTimer which spans from startTime to now. + String fireAt = timerFiredEvent.hasFireAt() + ? DataConverter.getInstantFromTimestamp(timerFiredEvent.getFireAt()).toString() + : null; + Instant creationTime = this.timerCreationTimes.remove(timerEventId); + TracingHelper.emitTimerSpan( + this.getName(), + this.instanceId, + timerEventId, + fireAt, + this.orchestrationSpanContext != null + ? this.orchestrationSpanContext : this.parentTraceContext, + creationTime); } CompletableTask task = record.getTask(); @@ -705,6 +766,10 @@ public void handleTimerFired(HistoryEvent e) { private void handleSubOrchestrationCreated(HistoryEvent e) { int taskId = e.getEventId(); SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreated = e.getSubOrchestrationInstanceCreated(); + + storeSchedulingMetadata(taskId, subOrchestrationInstanceCreated.getName(), + TracingHelper.TYPE_ORCHESTRATION, e); + OrchestratorAction taskAction = this.pendingActions.remove(taskId); if (taskAction == null) { String message = String.format( @@ -735,6 +800,8 @@ private void handleSubOrchestrationCompleted(HistoryEvent e) { taskId, rawResult != null ? rawResult : "(null)")); + // Emit a retroactive Client span covering scheduling-to-completion duration. + emitClientSpanIfTracked(taskId); } CompletableTask task = record.getTask(); try { @@ -758,6 +825,9 @@ private void handleSubOrchestrationFailed(HistoryEvent e){ if (!this.isReplaying) { // TODO: Log task failure, including the number of bytes in the result + + // Emit a retroactive Client span covering scheduling-to-failure duration. + emitClientSpanIfTracked(taskId); } CompletableTask task = record.getTask(); @@ -877,6 +947,11 @@ private void processEvent(HistoryEvent e) { this.setInput(input); String version = startedEvent.getVersion().getValue(); this.setVersion(version); + if (startedEvent.hasParentTraceContext()) { + this.parentTraceContext = startedEvent.getParentTraceContext(); + } else { + this.parentTraceContext = null; + } TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name); if (factory == null) { // Try getting the default orchestrator @@ -969,6 +1044,53 @@ public Class getDataType() { } } + /** + * Emits a retroactive Client span for a completed/failed task, if scheduling metadata was tracked. + */ + private void emitClientSpanIfTracked(int taskId) { + ScheduledTaskInfo info = this.scheduledTaskInfoMap.remove(taskId); + if (info != null) { + TracingHelper.emitRetroactiveClientSpan( + info.spanType + ":" + info.taskName, + info.parentTraceContext, + info.spanType, + info.taskName, + this.instanceId, + taskId, + info.scheduledTime); + } + } + + /** + * Stores scheduling metadata for a task so a retroactive client span can be emitted at completion time. + */ + private void storeSchedulingMetadata(int taskId, String taskName, String spanType, HistoryEvent e) { + Instant scheduledTime = e.hasTimestamp() + ? DataConverter.getInstantFromTimestamp(e.getTimestamp()) : null; + TraceContext spanParent = this.orchestrationSpanContext != null + ? this.orchestrationSpanContext : this.parentTraceContext; + this.scheduledTaskInfoMap.put(taskId, new ScheduledTaskInfo( + taskName, scheduledTime, spanParent, spanType)); + } + + /** + * Stores scheduling metadata for retroactive client span creation at completion time. + * Matches .NET SDK pattern where client spans are emitted with the full scheduling-to-completion duration. + */ + private class ScheduledTaskInfo { + final String taskName; + final Instant scheduledTime; + final TraceContext parentTraceContext; + final String spanType; // "activity" or "orchestration" + + ScheduledTaskInfo(String taskName, Instant scheduledTime, TraceContext parentTraceContext, String spanType) { + this.taskName = taskName; + this.scheduledTime = scheduledTime; + this.parentTraceContext = parentTraceContext; + this.spanType = spanType; + } + } + private class OrchestrationHistoryIterator { private final List pastEvents; private final List newEvents; diff --git a/client/src/main/java/com/microsoft/durabletask/TracingHelper.java b/client/src/main/java/com/microsoft/durabletask/TracingHelper.java new file mode 100644 index 00000000..319ce5ee --- /dev/null +++ b/client/src/main/java/com/microsoft/durabletask/TracingHelper.java @@ -0,0 +1,384 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +import com.google.protobuf.StringValue; +import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.TraceContext; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.api.trace.TraceStateBuilder; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.context.Context; + +import javax.annotation.Nullable; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Utility class for OpenTelemetry distributed tracing integration. + *

+ * Provides helpers for propagating W3C Trace Context between orchestrations, + * activities, and sub-orchestrations via the {@code TraceContext} protobuf message. + *

+ * When no OpenTelemetry SDK is configured, the API returns no-op implementations, + * so all methods in this class are safe to call without any tracing overhead. + */ +final class TracingHelper { + + private static final String TRACER_NAME = "Microsoft.DurableTask"; + + // Span type constants matching .NET SDK schema + static final String TYPE_ORCHESTRATION = "orchestration"; + static final String TYPE_ACTIVITY = "activity"; + static final String TYPE_CREATE_ORCHESTRATION = "create_orchestration"; + static final String TYPE_TIMER = "timer"; + static final String TYPE_EVENT = "event"; + static final String TYPE_ORCHESTRATION_EVENT = "orchestration_event"; + + // Attribute keys matching .NET SDK schema + static final String ATTR_TYPE = "durabletask.type"; + static final String ATTR_TASK_NAME = "durabletask.task.name"; + static final String ATTR_INSTANCE_ID = "durabletask.task.instance_id"; + static final String ATTR_TASK_ID = "durabletask.task.task_id"; + static final String ATTR_FIRE_AT = "durabletask.fire_at"; + static final String ATTR_EVENT_TARGET_INSTANCE_ID = "durabletask.event.target_instance_id"; + + private TracingHelper() { + // Static utility class + } + + /** + * Captures the current OpenTelemetry span context as a protobuf {@code TraceContext}. + * + * @return A {@code TraceContext} proto, or {@code null} if there is no valid active span. + */ + @Nullable + static TraceContext getCurrentTraceContext() { + return getCurrentTraceContext(Span.current()); + } + + /** + * Captures a specific OpenTelemetry span's context as a protobuf {@code TraceContext}. + * + * @param span The span to capture context from, may be {@code null}. + * @return A {@code TraceContext} proto, or {@code null} if the span has no valid context. + */ + @Nullable + static TraceContext getCurrentTraceContext(@Nullable Span span) { + if (span == null || !span.getSpanContext().isValid()) { + return null; + } + + SpanContext spanContext = span.getSpanContext(); + + // Construct the traceparent according to the W3C Trace Context specification + // https://www.w3.org/TR/trace-context/#traceparent-header + String traceParent = String.format("00-%s-%s-%02x", + spanContext.getTraceId(), + spanContext.getSpanId(), + spanContext.getTraceFlags().asByte()); + + String traceState = spanContext.getTraceState().asMap() + .entrySet() + .stream() + .map(entry -> entry.getKey() + "=" + entry.getValue()) + .collect(Collectors.joining(",")); + + TraceContext.Builder builder = TraceContext.newBuilder() + .setTraceParent(traceParent); + + if (traceState != null && !traceState.isEmpty()) { + builder.setTraceState(StringValue.of(traceState)); + } + + return builder.build(); + } + + /** + * Converts a protobuf {@code TraceContext} into an OpenTelemetry {@code Context} + * that can be used as a parent for new spans. + * + * @param protoCtx The protobuf trace context, may be {@code null}. + * @return An OpenTelemetry {@code Context}, or {@code null} if the input is empty/null. + */ + @Nullable + static Context extractTraceContext(@Nullable TraceContext protoCtx) { + if (protoCtx == null) { + return null; + } + + String traceParent = protoCtx.getTraceParent(); + if (traceParent == null || traceParent.isEmpty()) { + return null; + } + + // Parse W3C traceparent: 00--- + String[] parts = traceParent.split("-"); + if (parts.length < 4) { + return null; + } + + String traceId = parts[1]; + String spanId = parts[2]; + byte flags = 0; + try { + flags = (byte) Integer.parseInt(parts[3], 16); + } catch (NumberFormatException e) { + // Use default flags + } + + // Parse tracestate if present + TraceState traceState = TraceState.getDefault(); + if (protoCtx.hasTraceState() && protoCtx.getTraceState().getValue() != null + && !protoCtx.getTraceState().getValue().isEmpty()) { + TraceStateBuilder tsBuilder = TraceState.builder(); + String[] entries = protoCtx.getTraceState().getValue().split(","); + for (String entry : entries) { + String[] kv = entry.split("=", 2); + if (kv.length == 2) { + tsBuilder.put(kv[0].trim(), kv[1].trim()); + } + } + traceState = tsBuilder.build(); + } + + SpanContext remoteContext = SpanContext.createFromRemoteParent( + traceId, spanId, TraceFlags.fromByte(flags), traceState); + + if (!remoteContext.isValid()) { + return null; + } + + return Context.current().with(Span.wrap(remoteContext)); + } + + /** + * Starts a new span as a child of the given trace context. + * + * @param name The span name (e.g. "activity:say_hello"). + * @param traceContext The parent trace context from the protobuf message, may be {@code null}. + * @param kind The span kind, may be {@code null} (defaults to INTERNAL). + * @param attributes Optional span attributes, may be {@code null}. + * @return The started {@code Span}. Caller must call {@link Span#end()} when done. + */ + static Span startSpan( + String name, + @Nullable TraceContext traceContext, + @Nullable SpanKind kind, + @Nullable Map attributes) { + return startSpanWithStartTime(name, traceContext, kind, attributes, null); + } + + /** + * Starts a new span as a child of the given trace context, optionally with a custom start time. + * Used for orchestration spans where the engine provides the span start time. + * + * @param name The span name. + * @param traceContext The parent trace context from the protobuf message, may be {@code null}. + * @param kind The span kind, may be {@code null} (defaults to INTERNAL). + * @param attributes Optional span attributes, may be {@code null}. + * @param startTime Optional start time for the span, may be {@code null}. + * @return The started {@code Span}. Caller must call {@link Span#end()} when done. + */ + static Span startSpanWithStartTime( + String name, + @Nullable TraceContext traceContext, + @Nullable SpanKind kind, + @Nullable Map attributes, + @Nullable java.time.Instant startTime) { + Tracer tracer = GlobalOpenTelemetry.getTracer(TRACER_NAME); + SpanBuilder spanBuilder = tracer.spanBuilder(name); + + if (kind != null) { + spanBuilder.setSpanKind(kind); + } + + Context parentCtx = extractTraceContext(traceContext); + if (parentCtx != null) { + spanBuilder.setParent(parentCtx); + } + + if (attributes != null) { + for (Map.Entry entry : attributes.entrySet()) { + spanBuilder.setAttribute(entry.getKey(), entry.getValue()); + } + } + + if (startTime != null) { + spanBuilder.setStartTimestamp(startTime); + } + + Span span = spanBuilder.startSpan(); + return span; + } + + /** + * Ends the given span, optionally recording an error. + * + * @param span The span to end, may be {@code null}. + * @param error If non-null, the exception is recorded on the span. + */ + static void endSpan(@Nullable Span span, @Nullable Throwable error) { + if (span == null) { + return; + } + if (error != null) { + span.setStatus(StatusCode.ERROR, error.getMessage()); + span.recordException(error); + } + span.end(); + } + + /** + * Emits a retroactive Client-kind span that covers the time from task scheduling to completion. + * Matches .NET SDK pattern where client spans are emitted at completion time with + * {@code startTime} from the original TaskScheduled/SubOrchestrationCreated event timestamp. + * + * @param spanName The span name (e.g. "activity:GetWeather"). + * @param parentContext The parent trace context (orchestration context), may be {@code null}. + * @param type The durabletask.type value (e.g. "activity" or "orchestration"). + * @param taskName The task name attribute value. + * @param instanceId The orchestration instance ID. + * @param taskId The task sequence ID. + * @param startTime The scheduling timestamp (span start time), may be {@code null}. + */ + static void emitRetroactiveClientSpan( + String spanName, + @Nullable TraceContext parentContext, + String type, + String taskName, + @Nullable String instanceId, + int taskId, + @Nullable java.time.Instant startTime) { + Tracer tracer = GlobalOpenTelemetry.getTracer(TRACER_NAME); + SpanBuilder spanBuilder = tracer.spanBuilder(spanName) + .setSpanKind(SpanKind.CLIENT) + .setAttribute(ATTR_TYPE, type) + .setAttribute(ATTR_TASK_NAME, taskName) + .setAttribute(ATTR_TASK_ID, String.valueOf(taskId)); + + if (instanceId != null) { + spanBuilder.setAttribute(ATTR_INSTANCE_ID, instanceId); + } + + Context parentCtx = extractTraceContext(parentContext); + if (parentCtx != null) { + spanBuilder.setParent(parentCtx); + } + + if (startTime != null) { + spanBuilder.setStartTimestamp(startTime); + } + + Span span = spanBuilder.startSpan(); + span.end(); + } + + /** + * Emits a timer span with duration from creation time to now. + * Matches .NET SDK's {@code EmitTraceActivityForTimer} which spans from startTime to disposal time. + * + * @param orchestrationName The name of the orchestration that created the timer. + * @param instanceId The orchestration instance ID. + * @param timerId The timer event ID. + * @param fireAt The ISO-8601 formatted fire time. + * @param parentContext The parent trace context, may be {@code null}. + * @param startTime The timer creation time (span start), may be {@code null}. + */ + static void emitTimerSpan( + String orchestrationName, + @Nullable String instanceId, + int timerId, + @Nullable String fireAt, + @Nullable TraceContext parentContext, + @Nullable java.time.Instant startTime) { + Tracer tracer = GlobalOpenTelemetry.getTracer(TRACER_NAME); + SpanBuilder spanBuilder = tracer.spanBuilder( + TYPE_ORCHESTRATION + ":" + orchestrationName + ":" + TYPE_TIMER) + .setSpanKind(SpanKind.INTERNAL) + .setAttribute(ATTR_TYPE, TYPE_TIMER) + .setAttribute(ATTR_TASK_NAME, orchestrationName) + .setAttribute(ATTR_TASK_ID, String.valueOf(timerId)); + + Context parentCtx = extractTraceContext(parentContext); + if (parentCtx != null) { + spanBuilder.setParent(parentCtx); + } + + if (startTime != null) { + spanBuilder.setStartTimestamp(startTime); + } + + if (instanceId != null) { + spanBuilder.setAttribute(ATTR_INSTANCE_ID, instanceId); + } + if (fireAt != null) { + spanBuilder.setAttribute(ATTR_FIRE_AT, fireAt); + } + + Span span = spanBuilder.startSpan(); + span.end(); + } + + /** + * Emits a short-lived Producer span for an event raised from the orchestrator (worker side). + * Matches .NET SDK's {@code StartTraceActivityForEventRaisedFromWorker}. + * + * @param eventName The name of the event being raised. + * @param instanceId The orchestration instance ID sending the event. + * @param targetInstanceId The target orchestration instance ID, may be {@code null}. + */ + static void emitEventRaisedFromWorkerSpan( + String eventName, + @Nullable String instanceId, + @Nullable String targetInstanceId) { + Tracer tracer = GlobalOpenTelemetry.getTracer(TRACER_NAME); + SpanBuilder spanBuilder = tracer.spanBuilder( + TYPE_ORCHESTRATION_EVENT + ":" + eventName) + .setSpanKind(SpanKind.PRODUCER) + .setAttribute(ATTR_TYPE, TYPE_EVENT) + .setAttribute(ATTR_TASK_NAME, eventName); + + if (instanceId != null) { + spanBuilder.setAttribute(ATTR_INSTANCE_ID, instanceId); + } + if (targetInstanceId != null) { + spanBuilder.setAttribute(ATTR_EVENT_TARGET_INSTANCE_ID, targetInstanceId); + } + + Span span = spanBuilder.startSpan(); + span.end(); + } + + /** + * Emits a short-lived Producer span for an event raised from the client. + * Matches .NET SDK's {@code StartActivityForNewEventRaisedFromClient}. + * + * @param eventName The name of the event being raised. + * @param targetInstanceId The target orchestration instance ID. + */ + static void emitEventRaisedFromClientSpan( + String eventName, + @Nullable String targetInstanceId) { + Tracer tracer = GlobalOpenTelemetry.getTracer(TRACER_NAME); + SpanBuilder spanBuilder = tracer.spanBuilder( + TYPE_ORCHESTRATION_EVENT + ":" + eventName) + .setSpanKind(SpanKind.PRODUCER) + .setAttribute(ATTR_TYPE, TYPE_EVENT) + .setAttribute(ATTR_TASK_NAME, eventName); + + if (targetInstanceId != null) { + spanBuilder.setAttribute(ATTR_EVENT_TARGET_INSTANCE_ID, targetInstanceId); + } + + Span span = spanBuilder.startSpan(); + span.end(); + } +} diff --git a/client/src/test/java/com/microsoft/durabletask/TaskOrchestrationExecutorTest.java b/client/src/test/java/com/microsoft/durabletask/TaskOrchestrationExecutorTest.java index 7d62df2b..51679263 100644 --- a/client/src/test/java/com/microsoft/durabletask/TaskOrchestrationExecutorTest.java +++ b/client/src/test/java/com/microsoft/durabletask/TaskOrchestrationExecutorTest.java @@ -11,6 +11,7 @@ import java.time.Duration; import java.util.*; import java.util.logging.Logger; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.*; @@ -64,7 +65,7 @@ void execute_unregisteredOrchestrationType_failsWithDescriptiveMessage() { List newEvents = Collections.emptyList(); // Act - TaskOrchestratorResult result = executor.execute(pastEvents, newEvents); + TaskOrchestratorResult result = executor.execute(pastEvents, newEvents, null); // Assert: the result should contain a CompleteOrchestrationAction with FAILED status // and a failure message mentioning the unknown orchestration name @@ -82,4 +83,198 @@ void execute_unregisteredOrchestrationType_failsWithDescriptiveMessage() { assertTrue(failureDetails.getErrorMessage().contains("worker"), "Error message should mention workers: " + failureDetails.getErrorMessage()); } + + @Test + void execute_propagatesTraceContextToActivities() { + // Arrange: create an orchestration that calls an activity + String orchName = "TestOrchestration"; + String activityName = "TestActivity"; + TraceContext parentTrace = TraceContext.newBuilder() + .setTraceParent("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01") + .setTraceState(StringValue.of("vendorname=opaqueValue")) + .build(); + + HashMap factories = new HashMap<>(); + factories.put(orchName, new TaskOrchestrationFactory() { + @Override + public String getName() { return orchName; } + @Override + public TaskOrchestration create() { + return ctx -> { + ctx.callActivity(activityName, null, String.class); + }; + } + }); + + TaskOrchestrationExecutor executor = new TaskOrchestrationExecutor( + factories, new JacksonDataConverter(), Duration.ofDays(3), logger, null); + + // Build history events with trace context + List newEvents = Arrays.asList( + HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.getDefaultInstance()) + .setOrchestratorStarted(OrchestratorStartedEvent.getDefaultInstance()) + .build(), + HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.getDefaultInstance()) + .setExecutionStarted(ExecutionStartedEvent.newBuilder() + .setName(orchName) + .setVersion(StringValue.of("")) + .setInput(StringValue.of("\"test\"")) + .setOrchestrationInstance(OrchestrationInstance.newBuilder() + .setInstanceId("test-instance") + .build()) + .setParentTraceContext(parentTrace) + .build()) + .build(), + HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.getDefaultInstance()) + .setOrchestratorCompleted(OrchestratorCompletedEvent.getDefaultInstance()) + .build() + ); + + // Act + TaskOrchestratorResult result = executor.execute(Collections.emptyList(), newEvents, null); + + // Assert: find the ScheduleTaskAction and verify it has parentTraceContext set + List actions = new ArrayList<>(result.getActions()); + OrchestratorAction scheduleAction = actions.stream() + .filter(OrchestratorAction::hasScheduleTask) + .findFirst() + .orElseThrow(() -> new AssertionError("Expected a ScheduleTaskAction")); + + ScheduleTaskAction taskAction = scheduleAction.getScheduleTask(); + assertEquals(activityName, taskAction.getName()); + assertTrue(taskAction.hasParentTraceContext(), "ScheduleTaskAction should have parentTraceContext"); + assertEquals(parentTrace.getTraceParent(), taskAction.getParentTraceContext().getTraceParent()); + assertEquals(parentTrace.getTraceState().getValue(), taskAction.getParentTraceContext().getTraceState().getValue()); + } + + @Test + void execute_propagatesTraceContextToSubOrchestrations() { + // Arrange: create an orchestration that calls a sub-orchestration + String orchName = "ParentOrch"; + String subOrchName = "ChildOrch"; + TraceContext parentTrace = TraceContext.newBuilder() + .setTraceParent("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01") + .build(); + + HashMap factories = new HashMap<>(); + factories.put(orchName, new TaskOrchestrationFactory() { + @Override + public String getName() { return orchName; } + @Override + public TaskOrchestration create() { + return ctx -> { + ctx.callSubOrchestrator(subOrchName, null, String.class); + }; + } + }); + + TaskOrchestrationExecutor executor = new TaskOrchestrationExecutor( + factories, new JacksonDataConverter(), Duration.ofDays(3), logger, null); + + List newEvents = Arrays.asList( + HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.getDefaultInstance()) + .setOrchestratorStarted(OrchestratorStartedEvent.getDefaultInstance()) + .build(), + HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.getDefaultInstance()) + .setExecutionStarted(ExecutionStartedEvent.newBuilder() + .setName(orchName) + .setVersion(StringValue.of("")) + .setInput(StringValue.of("\"test\"")) + .setOrchestrationInstance(OrchestrationInstance.newBuilder() + .setInstanceId("parent-instance") + .build()) + .setParentTraceContext(parentTrace) + .build()) + .build(), + HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.getDefaultInstance()) + .setOrchestratorCompleted(OrchestratorCompletedEvent.getDefaultInstance()) + .build() + ); + + // Act + TaskOrchestratorResult result = executor.execute(Collections.emptyList(), newEvents, null); + + // Assert: find the CreateSubOrchestrationAction and verify it has parentTraceContext + List actions = new ArrayList<>(result.getActions()); + OrchestratorAction subOrchAction = actions.stream() + .filter(OrchestratorAction::hasCreateSubOrchestration) + .findFirst() + .orElseThrow(() -> new AssertionError("Expected a CreateSubOrchestrationAction")); + + CreateSubOrchestrationAction createSubOrch = subOrchAction.getCreateSubOrchestration(); + assertEquals(subOrchName, createSubOrch.getName()); + assertTrue(createSubOrch.hasParentTraceContext(), "CreateSubOrchestrationAction should have parentTraceContext"); + assertEquals(parentTrace.getTraceParent(), createSubOrch.getParentTraceContext().getTraceParent()); + } + + @Test + void execute_noTraceContext_actionsDoNotHaveTraceContext() { + // Arrange: orchestration without trace context + String orchName = "NoTraceOrch"; + + HashMap factories = new HashMap<>(); + factories.put(orchName, new TaskOrchestrationFactory() { + @Override + public String getName() { return orchName; } + @Override + public TaskOrchestration create() { + return ctx -> { + ctx.callActivity("SomeActivity", null, String.class); + }; + } + }); + + TaskOrchestrationExecutor executor = new TaskOrchestrationExecutor( + factories, new JacksonDataConverter(), Duration.ofDays(3), logger, null); + + List newEvents = Arrays.asList( + HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.getDefaultInstance()) + .setOrchestratorStarted(OrchestratorStartedEvent.getDefaultInstance()) + .build(), + HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.getDefaultInstance()) + .setExecutionStarted(ExecutionStartedEvent.newBuilder() + .setName(orchName) + .setVersion(StringValue.of("")) + .setInput(StringValue.of("\"test\"")) + .setOrchestrationInstance(OrchestrationInstance.newBuilder() + .setInstanceId("no-trace-instance") + .build()) + .build()) + .build(), + HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.getDefaultInstance()) + .setOrchestratorCompleted(OrchestratorCompletedEvent.getDefaultInstance()) + .build() + ); + + // Act + TaskOrchestratorResult result = executor.execute(Collections.emptyList(), newEvents, null); + + // Assert: actions should not have trace context when none was provided + List actions = new ArrayList<>(result.getActions()); + OrchestratorAction scheduleAction = actions.stream() + .filter(OrchestratorAction::hasScheduleTask) + .findFirst() + .orElseThrow(() -> new AssertionError("Expected a ScheduleTaskAction")); + + assertFalse(scheduleAction.getScheduleTask().hasParentTraceContext(), + "ScheduleTaskAction should not have parentTraceContext when none was provided"); + } } diff --git a/client/src/test/java/com/microsoft/durabletask/TracingHelperTest.java b/client/src/test/java/com/microsoft/durabletask/TracingHelperTest.java new file mode 100644 index 00000000..ca82b0d9 --- /dev/null +++ b/client/src/test/java/com/microsoft/durabletask/TracingHelperTest.java @@ -0,0 +1,319 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.durabletask; + +import com.google.protobuf.StringValue; +import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.TraceContext; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for TracingHelper. + */ +public class TracingHelperTest { + + private InMemorySpanExporter spanExporter; + private SdkTracerProvider tracerProvider; + private OpenTelemetrySdk openTelemetry; + + @BeforeEach + void setUp() { + // Reset first in case another test class triggered GlobalOpenTelemetry.get() + io.opentelemetry.api.GlobalOpenTelemetry.resetForTest(); + spanExporter = InMemorySpanExporter.create(); + tracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + openTelemetry = OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .buildAndRegisterGlobal(); + } + + @AfterEach + void tearDown() { + openTelemetry.close(); + // Reset the global OpenTelemetry to avoid affecting other tests + io.opentelemetry.api.GlobalOpenTelemetry.resetForTest(); + } + + @Test + void getCurrentTraceContext_noActiveSpan_returnsNull() { + TraceContext result = TracingHelper.getCurrentTraceContext(); + assertNull(result); + } + + @Test + void getCurrentTraceContext_withActiveSpan_returnsTraceContext() { + Tracer tracer = openTelemetry.getTracer("test"); + Span span = tracer.spanBuilder("test-span").startSpan(); + try (Scope ignored = span.makeCurrent()) { + TraceContext result = TracingHelper.getCurrentTraceContext(); + assertNotNull(result); + assertNotNull(result.getTraceParent()); + assertTrue(result.getTraceParent().startsWith("00-")); + + // traceparent format: 00--- + String[] parts = result.getTraceParent().split("-"); + assertEquals(4, parts.length); + assertEquals(32, parts[1].length()); // trace ID + assertEquals(16, parts[2].length()); // span ID + } finally { + span.end(); + } + } + + @Test + void extractTraceContext_null_returnsNull() { + Context result = TracingHelper.extractTraceContext(null); + assertNull(result); + } + + @Test + void extractTraceContext_emptyTraceParent_returnsNull() { + TraceContext emptyCtx = TraceContext.newBuilder().build(); + Context result = TracingHelper.extractTraceContext(emptyCtx); + assertNull(result); + } + + @Test + void extractTraceContext_validTraceParent_returnsContext() { + TraceContext protoCtx = TraceContext.newBuilder() + .setTraceParent("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01") + .build(); + + Context result = TracingHelper.extractTraceContext(protoCtx); + assertNotNull(result); + + // Verify we can extract the span context from the OTel context + Span span = Span.fromContext(result); + assertTrue(span.getSpanContext().isValid()); + assertEquals("0af7651916cd43dd8448eb211c80319c", span.getSpanContext().getTraceId()); + assertEquals("b7ad6b7169203331", span.getSpanContext().getSpanId()); + } + + @Test + void extractTraceContext_withTraceState_preservesState() { + TraceContext protoCtx = TraceContext.newBuilder() + .setTraceParent("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01") + .setTraceState(StringValue.of("vendorname=opaqueValue")) + .build(); + + Context result = TracingHelper.extractTraceContext(protoCtx); + assertNotNull(result); + + Span span = Span.fromContext(result); + assertEquals("opaqueValue", span.getSpanContext().getTraceState().get("vendorname")); + } + + @Test + void startSpan_createsSpanWithAttributes() { + Map attrs = new HashMap<>(); + attrs.put(TracingHelper.ATTR_TYPE, TracingHelper.TYPE_ACTIVITY); + attrs.put(TracingHelper.ATTR_TASK_NAME, "test-activity"); + attrs.put(TracingHelper.ATTR_INSTANCE_ID, "abc123"); + + Span span = TracingHelper.startSpan("activity:test-activity", null, SpanKind.SERVER, attrs); + assertNotNull(span); + span.end(); + + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(1, spans.size()); + SpanData spanData = spans.get(0); + assertEquals("activity:test-activity", spanData.getName()); + assertEquals(io.opentelemetry.api.trace.SpanKind.SERVER, spanData.getKind()); + assertEquals("test-activity", spanData.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.task.name"))); + assertEquals("abc123", spanData.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.task.instance_id"))); + assertEquals("activity", spanData.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.type"))); + } + + @Test + void startSpan_withParentTraceContext_createsChildSpan() { + // Create a parent span first + Tracer tracer = openTelemetry.getTracer("test"); + Span parentSpan = tracer.spanBuilder("parent").startSpan(); + TraceContext parentCtx; + try (Scope ignored = parentSpan.makeCurrent()) { + parentCtx = TracingHelper.getCurrentTraceContext(); + } finally { + parentSpan.end(); + } + + assertNotNull(parentCtx); + + // Create a child span using the trace context + Span childSpan = TracingHelper.startSpan("child", parentCtx, SpanKind.INTERNAL, null); + childSpan.end(); + + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(2, spans.size()); + + // Find parent and child spans + SpanData parentData = spans.stream() + .filter(s -> s.getName().equals("parent")) + .findFirst().orElseThrow(); + SpanData childData = spans.stream() + .filter(s -> s.getName().equals("child")) + .findFirst().orElseThrow(); + + // Verify child has same trace ID as parent + assertEquals(parentData.getTraceId(), childData.getTraceId()); + // Verify child's parent span ID matches the parent span + assertEquals(parentData.getSpanId(), childData.getParentSpanId()); + } + + @Test + void endSpan_withError_recordsException() { + Span span = TracingHelper.startSpan("error-span", null, SpanKind.INTERNAL, null); + RuntimeException error = new RuntimeException("test error"); + TracingHelper.endSpan(span, error); + + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(1, spans.size()); + SpanData spanData = spans.get(0); + assertEquals(io.opentelemetry.api.trace.StatusCode.ERROR, spanData.getStatus().getStatusCode()); + assertFalse(spanData.getEvents().isEmpty(), "Should have recorded exception event"); + } + + @Test + void endSpan_withNullSpan_doesNotThrow() { + assertDoesNotThrow(() -> TracingHelper.endSpan(null, null)); + assertDoesNotThrow(() -> TracingHelper.endSpan(null, new RuntimeException("test"))); + } + + @Test + void endSpan_withoutError_endsCleanly() { + Span span = TracingHelper.startSpan("clean-span", null, SpanKind.INTERNAL, null); + TracingHelper.endSpan(span, null); + + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(1, spans.size()); + assertEquals(io.opentelemetry.api.trace.StatusCode.UNSET, spans.get(0).getStatus().getStatusCode()); + } + + @Test + void getCurrentTraceContext_roundTrip() { + // Create a span, capture trace context, extract it, create child - verify full round trip + Tracer tracer = openTelemetry.getTracer("test"); + Span originalSpan = tracer.spanBuilder("original").startSpan(); + TraceContext captured; + try (Scope ignored = originalSpan.makeCurrent()) { + captured = TracingHelper.getCurrentTraceContext(); + } finally { + originalSpan.end(); + } + + assertNotNull(captured); + + // Extract back to OTel context + Context extractedCtx = TracingHelper.extractTraceContext(captured); + assertNotNull(extractedCtx); + + // Verify the extracted context matches the original span + Span extractedSpan = Span.fromContext(extractedCtx); + assertEquals(originalSpan.getSpanContext().getTraceId(), + extractedSpan.getSpanContext().getTraceId()); + assertEquals(originalSpan.getSpanContext().getSpanId(), + extractedSpan.getSpanContext().getSpanId()); + } + + @Test + void emitTimerSpan_createsInternalSpanWithFireAt() { + TracingHelper.emitTimerSpan("MyOrchestration", "instance-1", 5, "2026-01-01T00:00:00Z", null, null); + + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(1, spans.size()); + SpanData sd = spans.get(0); + assertEquals("orchestration:MyOrchestration:timer", sd.getName()); + assertEquals(SpanKind.INTERNAL, sd.getKind()); + assertEquals("timer", sd.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.type"))); + assertEquals("MyOrchestration", sd.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.task.name"))); + assertEquals("instance-1", sd.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.task.instance_id"))); + assertEquals("5", sd.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.task.task_id"))); + assertEquals("2026-01-01T00:00:00Z", sd.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.fire_at"))); + } + + @Test + void emitTimerSpan_withStartTime_setsStartTimestamp() { + java.time.Instant startTime = java.time.Instant.parse("2026-01-01T00:00:00Z"); + TracingHelper.emitTimerSpan("MyOrchestration", "instance-1", 5, "2026-01-01T00:01:00Z", null, startTime); + + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(1, spans.size()); + SpanData sd = spans.get(0); + assertEquals("orchestration:MyOrchestration:timer", sd.getName()); + // The start time should be set to the provided startTime + long startEpochNanos = startTime.getEpochSecond() * 1_000_000_000L + startTime.getNano(); + assertEquals(startEpochNanos, sd.getStartEpochNanos()); + } + + @Test + void emitRetroactiveClientSpan_createsClientSpanWithStartTime() { + java.time.Instant startTime = java.time.Instant.parse("2026-01-01T00:00:00Z"); + TracingHelper.emitRetroactiveClientSpan( + "activity:GetWeather", null, TracingHelper.TYPE_ACTIVITY, + "GetWeather", "instance-1", 3, startTime); + + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(1, spans.size()); + SpanData sd = spans.get(0); + assertEquals("activity:GetWeather", sd.getName()); + assertEquals(SpanKind.CLIENT, sd.getKind()); + assertEquals("activity", sd.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.type"))); + assertEquals("GetWeather", sd.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.task.name"))); + assertEquals("instance-1", sd.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.task.instance_id"))); + assertEquals("3", sd.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.task.task_id"))); + long startEpochNanos = startTime.getEpochSecond() * 1_000_000_000L + startTime.getNano(); + assertEquals(startEpochNanos, sd.getStartEpochNanos()); + } + + @Test + void emitEventRaisedFromWorkerSpan_createsProducerSpan() { + TracingHelper.emitEventRaisedFromWorkerSpan("ApprovalEvent", "orch-1", "target-orch-2"); + + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(1, spans.size()); + SpanData sd = spans.get(0); + assertEquals("orchestration_event:ApprovalEvent", sd.getName()); + assertEquals(SpanKind.PRODUCER, sd.getKind()); + assertEquals("event", sd.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.type"))); + assertEquals("ApprovalEvent", sd.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.task.name"))); + assertEquals("orch-1", sd.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.task.instance_id"))); + assertEquals("target-orch-2", sd.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.event.target_instance_id"))); + } + + @Test + void emitEventRaisedFromClientSpan_createsProducerSpan() { + TracingHelper.emitEventRaisedFromClientSpan("ApprovalEvent", "target-orch-1"); + + List spans = spanExporter.getFinishedSpanItems(); + assertEquals(1, spans.size()); + SpanData sd = spans.get(0); + assertEquals("orchestration_event:ApprovalEvent", sd.getName()); + assertEquals(SpanKind.PRODUCER, sd.getKind()); + assertEquals("event", sd.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.type"))); + assertEquals("target-orch-1", sd.getAttributes().get(io.opentelemetry.api.common.AttributeKey.stringKey("durabletask.event.target_instance_id"))); + } +} diff --git a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH index fdb90d6a..0ef1ed22 100644 --- a/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH +++ b/internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH @@ -1 +1 @@ -026329c53fe6363985655857b9ca848ec7238bd2 \ No newline at end of file +1caadbd7ecfdf5f2309acbeac28a3e36d16aa156 \ No newline at end of file diff --git a/internal/durabletask-protobuf/protos/orchestrator_service.proto b/internal/durabletask-protobuf/protos/orchestrator_service.proto index 8ef46a4a..0c34d986 100644 --- a/internal/durabletask-protobuf/protos/orchestrator_service.proto +++ b/internal/durabletask-protobuf/protos/orchestrator_service.proto @@ -822,6 +822,7 @@ message GetWorkItemsRequest { int32 maxConcurrentEntityWorkItems = 3; repeated WorkerCapability capabilities = 10; + WorkItemFilters workItemFilters = 11; } enum WorkerCapability { @@ -844,6 +845,26 @@ enum WorkerCapability { WORKER_CAPABILITY_LARGE_PAYLOADS = 3; } +message WorkItemFilters { + repeated OrchestrationFilter orchestrations = 1; + repeated ActivityFilter activities = 2; + repeated EntityFilter entities = 3; +} + +message OrchestrationFilter { + string name = 1; + repeated string versions = 2; +} + +message ActivityFilter { + string name = 1; + repeated string versions = 2; +} + +message EntityFilter { + string name = 1; +} + message WorkItem { oneof request { OrchestratorRequest orchestratorRequest = 1; diff --git a/samples-azure-functions/src/main/java/com/functions/TracingChain.java b/samples-azure-functions/src/main/java/com/functions/TracingChain.java new file mode 100644 index 00000000..be026331 --- /dev/null +++ b/samples-azure-functions/src/main/java/com/functions/TracingChain.java @@ -0,0 +1,87 @@ +package com.functions; + +import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.HttpMethod; +import com.microsoft.azure.functions.HttpRequestMessage; +import com.microsoft.azure.functions.HttpResponseMessage; +import com.microsoft.azure.functions.annotation.AuthorizationLevel; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.HttpTrigger; +import com.microsoft.durabletask.DurableTaskClient; +import com.microsoft.durabletask.Task; +import com.microsoft.durabletask.TaskOrchestrationContext; +import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger; +import com.microsoft.durabletask.azurefunctions.DurableClientContext; +import com.microsoft.durabletask.azurefunctions.DurableClientInput; +import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Sample demonstrating distributed tracing with Durable Functions. + *

+ * Uses a Fan-Out/Fan-In pattern with a timer, matching the DTS sample (TracingPattern.java). + * Trace context is automatically propagated from the HTTP trigger through the + * orchestration to each activity. When Application Insights is configured, + * you will see correlated traces across the entire workflow. + */ +public class TracingChain { + + @FunctionName("StartFanOutFanIn") + public HttpResponseMessage startFanOutFanIn( + @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, + authLevel = AuthorizationLevel.ANONYMOUS) + HttpRequestMessage> request, + @DurableClientInput(name = "durableContext") DurableClientContext durableContext, + final ExecutionContext context) { + context.getLogger().info("Starting FanOutFanIn orchestration"); + + DurableTaskClient client = durableContext.getClient(); + String instanceId = client.scheduleNewOrchestrationInstance("FanOutFanIn"); + context.getLogger().info("Created orchestration with instance ID = " + instanceId); + return durableContext.createCheckStatusResponse(request, instanceId); + } + + /** + * Fan-Out/Fan-In orchestration: waits 1s, then runs 5 GetWeather activities + * in parallel, and finally calls CreateSummary to aggregate the results. + */ + @FunctionName("FanOutFanIn") + public String fanOutFanIn( + @DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) { + + // Wait 1 second (creates a timer span) + ctx.createTimer(Duration.ofSeconds(1)).await(); + + // Fan-out: schedule 5 GetWeather activities in parallel + List cities = Arrays.asList("Seattle", "Tokyo", "London", "Paris", "Sydney"); + List> tasks = cities.stream() + .map(city -> ctx.callActivity("GetWeather", city, String.class)) + .collect(Collectors.toList()); + List results = ctx.allOf(tasks).await(); + + // Fan-in: aggregate results + String combined = String.join(", ", results); + return ctx.callActivity("CreateSummary", combined, String.class).await(); + } + + @FunctionName("GetWeather") + public String getWeather( + @DurableActivityTrigger(name = "city") String city, + final ExecutionContext context) { + context.getLogger().info("[GetWeather] Getting weather for: " + city); + return city + "=72F"; + } + + @FunctionName("CreateSummary") + public String createSummary( + @DurableActivityTrigger(name = "input") String input, + final ExecutionContext context) { + context.getLogger().info("[CreateSummary] Creating summary for: " + input); + return "Weather Report: " + input; + } +} diff --git a/samples/README.md b/samples/README.md new file mode 100644 index 00000000..ce504a50 --- /dev/null +++ b/samples/README.md @@ -0,0 +1,112 @@ +# OpenTelemetry Distributed Tracing Sample + +This sample demonstrates distributed tracing with the Durable Task Java SDK using OpenTelemetry. Traces are exported to Jaeger via OTLP/gRPC for visualization. + +The Java SDK automatically propagates W3C trace context (traceparent/tracestate) when scheduling orchestrations. The worker creates spans around each activity and orchestration execution, forming a correlated trace tree visible in Jaeger. + +## Prerequisites + +- Java 17+ +- Docker (for DTS emulator and Jaeger) + +## Running the Sample + +### 1. Start the infrastructure + +```bash +# Start the DTS emulator (port 8080 for gRPC, 8082 for dashboard) +docker run -d --name dts-emulator \ + -p 8080:8080 -p 8082:8082 \ + mcr.microsoft.com/dts/dts-emulator:latest + +# Start Jaeger (port 16686 for UI, 4317 for OTLP gRPC) +docker run -d --name jaeger \ + -p 16686:16686 -p 4317:4317 -p 4318:4318 \ + -e COLLECTOR_OTLP_ENABLED=true \ + jaegertracing/all-in-one:latest +``` + +### 2. Run the tracing sample + +```bash +./gradlew :samples:runTracingPattern -PskipSigning -x downloadProtoFiles +``` + +### 3. View traces + +- **Jaeger UI**: http://localhost:16686 — Search for service `durabletask-java-tracing-sample` +- **DTS Dashboard**: http://localhost:8082 + +## What the Sample Does + +The `TracingPattern` sample demonstrates the **Fan-Out/Fan-In** pattern with distributed tracing: + +1. Configures OpenTelemetry with an OTLP exporter pointing to Jaeger +2. Connects a worker and client to the DTS emulator using a connection string +3. Creates a parent span (`create_orchestration:FanOutFanIn`) and schedules an orchestration +4. The orchestration waits on a 1-second **durable timer**, then fans out 5 parallel `GetWeather` activities (Seattle, Tokyo, London, Paris, Sydney), fans in the results, then calls `CreateSummary` to aggregate +5. The SDK automatically propagates trace context through the full execution chain + +## Screenshots + +### Jaeger — Trace Search Results + +Shows the trace from `durabletask-java-tracing-sample` service with spans covering the full fan-out/fan-in orchestration lifecycle. + +![Jaeger trace search results](images/jaeger-trace-list-full.png) + +### Jaeger — Trace Detail + +Full span hierarchy showing the fan-out/fan-in pattern with proper span durations: +- `create_orchestration:FanOutFanIn` (root, internal) + - `orchestration:FanOutFanIn` (server — full lifecycle, ~1.2s) + - `orchestration:FanOutFanIn:timer` (internal — creation-to-fired, ~965ms) + - `activity:GetWeather` ×5 (client — scheduling-to-completion, ~184ms) + ×5 (server — execution, ~25ms) + - `activity:CreateSummary` (client, ~8ms) + (server, ~0.7ms) + +15 spans total, Depth 2. + +> **Note:** Java OTel doesn't support `SetSpanId()` like .NET, so child spans appear as siblings under `create_orchestration` rather than nested under the orchestration span. All spans have meaningful durations. + +![Jaeger trace detail](images/jaeger-full-trace-detail.png) + +### Jaeger — Span Attributes + +Activity span showing attributes aligned with the .NET SDK schema: +- `durabletask.type=activity` +- `durabletask.task.name=GetWeather` +- `durabletask.task.instance_id=` +- `durabletask.task.task_id=1` +- `otel.scope.name=Microsoft.DurableTask` +- `span.kind=server` + +![Jaeger span detail](images/jaeger-span-detail.png) + +### DTS Dashboard — Completed Orchestrations + +The `FanOutFanIn` orchestration completed successfully with all activities. + +![DTS Dashboard](images/dts-dashboard-completed.png) + +## Cleanup + +```bash +docker stop jaeger dts-emulator && docker rm jaeger dts-emulator +``` + +## Azure Functions Sample + +The `samples-azure-functions` module contains a matching **Fan-Out/Fan-In** sample (`TracingChain.java`) for use with Azure Durable Functions. It uses the same pattern (1s timer → 5× `GetWeather` → `CreateSummary`) but runs as an HTTP-triggered Azure Function. + +### Running + +```bash +cd samples-azure-functions +../gradlew azureFunctionsPackage -PskipSigning -x downloadProtoFiles +cd build/azure-functions/ +func start +``` + +Then trigger with: `curl http://localhost:7071/api/StartFanOutFanIn` + +Distributed tracing in Durable Functions exports to **Application Insights** (not Jaeger/OTLP). Configure your `APPLICATIONINSIGHTS_CONNECTION_STRING` in `local.settings.json` to see traces. diff --git a/samples/build.gradle b/samples/build.gradle index 74f13f97..fe03055a 100644 --- a/samples/build.gradle +++ b/samples/build.gradle @@ -17,6 +17,17 @@ task runWebAppToDurableTaskSchedulerSample(type: JavaExec) { mainClass = 'io.durabletask.samples.WebAppToDurableTaskSchedulerSample' } +task runTracingPattern(type: JavaExec) { + classpath = sourceSets.main.runtimeClasspath + mainClass = 'io.durabletask.samples.TracingPattern' +} + +task printClasspath { + doLast { + println sourceSets.main.runtimeClasspath.asPath + } +} + dependencies { implementation project(':client') implementation project(':azuremanaged') @@ -25,6 +36,11 @@ dependencies { implementation platform("org.springframework.boot:spring-boot-dependencies:2.7.18") implementation 'org.springframework.boot:spring-boot-starter' + // OpenTelemetry for tracing sample + implementation "io.opentelemetry:opentelemetry-sdk:1.58.0" + implementation "io.opentelemetry:opentelemetry-sdk-trace:1.58.0" + implementation "io.opentelemetry:opentelemetry-exporter-otlp:1.58.0" + // https://github.com/grpc/grpc-java#download implementation "io.grpc:grpc-protobuf:${grpcVersion}" implementation "io.grpc:grpc-stub:${grpcVersion}" diff --git a/samples/images/dts-dashboard-completed.png b/samples/images/dts-dashboard-completed.png new file mode 100644 index 00000000..a95f5db8 Binary files /dev/null and b/samples/images/dts-dashboard-completed.png differ diff --git a/samples/images/jaeger-full-trace-detail.png b/samples/images/jaeger-full-trace-detail.png new file mode 100644 index 00000000..37392036 Binary files /dev/null and b/samples/images/jaeger-full-trace-detail.png differ diff --git a/samples/images/jaeger-span-detail.png b/samples/images/jaeger-span-detail.png new file mode 100644 index 00000000..3abb713e Binary files /dev/null and b/samples/images/jaeger-span-detail.png differ diff --git a/samples/images/jaeger-trace-list-full.png b/samples/images/jaeger-trace-list-full.png new file mode 100644 index 00000000..53e3aae9 Binary files /dev/null and b/samples/images/jaeger-trace-list-full.png differ diff --git a/samples/src/main/java/io/durabletask/samples/TracingPattern.java b/samples/src/main/java/io/durabletask/samples/TracingPattern.java new file mode 100644 index 00000000..4428c021 --- /dev/null +++ b/samples/src/main/java/io/durabletask/samples/TracingPattern.java @@ -0,0 +1,168 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package io.durabletask.samples; + +import com.microsoft.durabletask.*; +import com.microsoft.durabletask.azuremanaged.DurableTaskSchedulerClientExtensions; +import com.microsoft.durabletask.azuremanaged.DurableTaskSchedulerWorkerExtensions; + +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; + +/** + * Demonstrates OpenTelemetry distributed tracing with the Durable Task SDK. + * Traces are exported to Jaeger via OTLP/gRPC for visualization. + * + *

The Java SDK's client automatically propagates W3C trace context + * (traceparent/tracestate) when scheduling orchestrations. Activity spans + * are created automatically by the worker around each activity execution. + * + *

Prerequisites: + *

    + *
  • DTS emulator: {@code docker run -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest}
  • + *
  • Jaeger: {@code docker run -d -p 16686:16686 -p 4317:4317 jaegertracing/all-in-one:latest}
  • + *
+ */ +final class TracingPattern { + private static final Logger logger = Logger.getLogger(TracingPattern.class.getName()); + + public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { + // Configure OpenTelemetry with OTLP exporter to Jaeger + String otlpEndpoint = System.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); + if (otlpEndpoint == null) { + otlpEndpoint = "http://localhost:4317"; + } + + OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder() + .setEndpoint(otlpEndpoint) + .build(); + + Resource resource = Resource.builder() + .put("service.name", "durabletask-java-tracing-sample") + .build(); + + SdkTracerProvider tracerProvider = SdkTracerProvider.builder() + .setResource(resource) + .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build()) + .build(); + + OpenTelemetrySdk.builder() + .setTracerProvider(tracerProvider) + .buildAndRegisterGlobal(); + + logger.info("OpenTelemetry configured with OTLP exporter at " + otlpEndpoint); + + // Build connection string for DTS emulator + String connectionString = System.getenv("DURABLE_TASK_CONNECTION_STRING"); + if (connectionString == null) { + String endpoint = System.getenv("ENDPOINT"); + String taskHub = System.getenv("TASKHUB"); + if (endpoint == null) endpoint = "http://localhost:8080"; + if (taskHub == null) taskHub = "default"; + + String authType = endpoint.startsWith("http://localhost") ? "None" : "DefaultAzure"; + connectionString = String.format("Endpoint=%s;TaskHub=%s;Authentication=%s", + endpoint, taskHub, authType); + } + logger.info("Using connection string: " + connectionString); + + // Create worker with orchestration and activities + DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions + .createWorkerBuilder(connectionString) + .addOrchestration(new TaskOrchestrationFactory() { + @Override + public String getName() { return "FanOutFanIn"; } + + @Override + public TaskOrchestration create() { + return ctx -> { + // Timer: wait briefly (demonstrates timer span) + ctx.createTimer(Duration.ofSeconds(1)).await(); + + // Fan-out: schedule multiple parallel activities + List> parallelTasks = new java.util.ArrayList<>(); + String[] cities = {"Seattle", "Tokyo", "London", "Paris", "Sydney"}; + for (String city : cities) { + parallelTasks.add( + ctx.callActivity("GetWeather", city, String.class)); + } + + // Fan-in: wait for all activities to complete + List results = ctx.allOf(parallelTasks).await(); + + // Aggregate results + String summary = ctx.callActivity( + "CreateSummary", String.join(", ", results), String.class).await(); + + ctx.complete(summary); + }; + } + }) + .addActivity(new TaskActivityFactory() { + @Override public String getName() { return "GetWeather"; } + @Override public TaskActivity create() { + return ctx -> { + String city = ctx.getInput(String.class); + logger.info("[GetWeather] Getting weather for: " + city); + try { Thread.sleep(20); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } + return city + "=72F"; + }; + } + }) + .addActivity(new TaskActivityFactory() { + @Override public String getName() { return "CreateSummary"; } + @Override public TaskActivity create() { + return ctx -> { + String input = ctx.getInput(String.class); + logger.info("[CreateSummary] Creating summary for: " + input); + return "Weather Report: " + input; + }; + } + }) + .build(); + + // Start worker and wait for it to connect + worker.start(); + Thread.sleep(5000); + logger.info("Worker started with OpenTelemetry tracing."); + + // Create client + DurableTaskClient client = DurableTaskSchedulerClientExtensions + .createClientBuilder(connectionString).build(); + + // The SDK automatically creates a create_orchestration span and propagates W3C trace context + logger.info("Scheduling FanOutFanIn orchestration..."); + String instanceId = client.scheduleNewOrchestrationInstance( + "FanOutFanIn", + new NewOrchestrationInstanceOptions().setInput("weather-request")); + logger.info("Started orchestration: " + instanceId); + + // Wait for completion + logger.info("Waiting for completion..."); + OrchestrationMetadata result = client.waitForInstanceCompletion( + instanceId, Duration.ofSeconds(60), true); + + logger.info("Status: " + result.getRuntimeStatus()); + logger.info("Result: " + result.readOutputAs(String.class)); + logger.info(""); + logger.info("View traces in Jaeger UI: http://localhost:16686"); + logger.info(" Search for service: durabletask-java-tracing-sample"); + logger.info("View orchestration in DTS Dashboard: http://localhost:8082"); + + // Flush traces and shut down + tracerProvider.forceFlush(); + Thread.sleep(2000); + tracerProvider.shutdown(); + worker.stop(); + System.exit(0); + } +}