diff --git a/CHANGELOG.md b/CHANGELOG.md index 3376787..68aa426 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ### New +- Align tracing attributes with .NET SDK conventions: add `execution_id` on creation spans, `version` on activity execution spans, `name`/`instance_id` on timer spans, and `durabletask.task.status` on orchestration completion ([#126](https://github.com/microsoft/durabletask-js/pull/126)) +- Add retroactive span emission model: emit Client spans at activity/sub-orchestration completion with historical scheduling timestamps, and timer spans with creation-to-fired duration (matching .NET/Java SDK patterns) ([#126](https://github.com/microsoft/durabletask-js/pull/126)) + ### Fixes diff --git a/doc/images/tracing/jaeger-full-trace-detail.png b/doc/images/tracing/jaeger-full-trace-detail.png new file mode 100644 index 0000000..7b311d9 Binary files /dev/null and b/doc/images/tracing/jaeger-full-trace-detail.png differ diff --git a/doc/images/tracing/jaeger-span-detail.png b/doc/images/tracing/jaeger-span-detail.png new file mode 100644 index 0000000..bd5629b Binary files /dev/null and b/doc/images/tracing/jaeger-span-detail.png differ diff --git a/doc/images/tracing/jaeger-trace-list.png b/doc/images/tracing/jaeger-trace-list.png new file mode 100644 index 0000000..8c627be Binary files /dev/null and b/doc/images/tracing/jaeger-trace-list.png differ diff --git a/examples/azure-managed/distributed-tracing/index.ts b/examples/azure-managed/distributed-tracing/index.ts index 0ca3f6e..e37ac13 100644 --- a/examples/azure-managed/distributed-tracing/index.ts +++ b/examples/azure-managed/distributed-tracing/index.ts @@ -13,8 +13,7 @@ // -------------------------------------------------------------------------- import { NodeSDK } from "@opentelemetry/sdk-node"; import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http"; -import { SimpleSpanProcessor } from "@opentelemetry/sdk-trace-base"; -import { resourceFromAttributes } from "@opentelemetry/resources"; +import { Resource } from "@opentelemetry/resources"; import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions"; // Load environment variables from .env file @@ -25,15 +24,13 @@ dotenv.config({ path: path.join(__dirname, "..", ".env") }); // Read the OTLP endpoint from the environment (defaults to Jaeger's OTLP HTTP port) const otlpEndpoint = process.env.OTEL_EXPORTER_OTLP_ENDPOINT || "http://localhost:4318"; -const traceExporter = new OTLPTraceExporter({ - url: `${otlpEndpoint}/v1/traces`, -}); - const sdk = new NodeSDK({ - resource: resourceFromAttributes({ - [ATTR_SERVICE_NAME]: "durabletask-js-tracing-example", + resource: new Resource({ + [ATTR_SERVICE_NAME]: "durabletask-js-tracing-sample", + }), + traceExporter: new OTLPTraceExporter({ + url: `${otlpEndpoint}/v1/traces`, }), - spanProcessors: [new SimpleSpanProcessor(traceExporter)], }); sdk.start(); @@ -53,13 +50,9 @@ import { TOrchestrator } from "@microsoft/durabletask-js/dist/types/orchestrator import { whenAll } from "@microsoft/durabletask-js/dist/task"; // -------------------------------------------------------------------------- -// 3. Application code +// 3. Application code — FanOutFanIn pattern (matches Java tracing sample) // -------------------------------------------------------------------------- (async () => { - // Use ConsoleLogger so structured log events (with event IDs and categories) are - // printed to the console by default, similar to .NET's default ILogger output. - // For production, consider using createAzureLogger() which integrates with @azure/logger - // and respects the AZURE_LOG_LEVEL environment variable. const sdkLogger = new ConsoleLogger(); // --- Configuration --- @@ -80,96 +73,44 @@ import { whenAll } from "@microsoft/durabletask-js/dist/task"; // --- Activity definitions --- - /** Simulates fetching data from an external service. */ - const fetchData = async (_ctx: ActivityContext, source: string): Promise => { - console.log(` [fetchData] Fetching data from "${source}"...`); - // Simulate network latency - await new Promise((r) => setTimeout(r, 300 + Math.random() * 200)); - return `data-from-${source}`; - }; - - /** Simulates transforming a piece of data. */ - const transformData = async (_ctx: ActivityContext, input: string): Promise => { - console.log(` [transformData] Transforming "${input}"...`); - await new Promise((r) => setTimeout(r, 200)); - return `transformed(${input})`; + /** Simulates getting weather for a city. */ + const getWeather = async (_ctx: ActivityContext, city: string): Promise => { + console.log(` [GetWeather] Getting weather for: ${city}`); + await new Promise((r) => setTimeout(r, 20)); + return `${city}=72F`; }; - /** Simulates persisting results to a database. */ - const saveResults = async (_ctx: ActivityContext, results: string[]): Promise => { - console.log(` [saveResults] Saving ${results.length} results...`); - await new Promise((r) => setTimeout(r, 150)); - return results.length; + /** Aggregates weather results into a summary. */ + const createSummary = async (_ctx: ActivityContext, input: string): Promise => { + console.log(` [CreateSummary] Creating summary for: ${input}`); + return `Weather Report: ${input}`; }; - // --- Orchestrator: data pipeline (chaining + fan-out/fan-in) --- + // --- Orchestrator: Fan-Out/Fan-In with timer --- /** - * Demonstrates a realistic data-processing pipeline that produces a rich - * distributed trace: + * Demonstrates a Fan-Out/Fan-In pattern that produces a rich trace: * - * 1. Fan-out – fetch data from multiple sources in parallel. - * 2. Fan-out – transform each result in parallel. - * 3. Chain – save all transformed results. - * - * The resulting trace will show: - * create_orchestration → orchestration (server) - * ├─ activity:fetchData (×N, parallel) - * ├─ activity:transformData (×N, parallel) - * └─ activity:saveResults + * create_orchestration:FanOutFanIn (Producer) + * └─ orchestration:FanOutFanIn (Server) + * ├─ orchestration:FanOutFanIn:timer (Internal, ~1s) + * ├─ activity:getWeather (Client+Server, ×5 parallel) + * └─ activity:createSummary (Client+Server) */ - const dataPipelineOrchestrator: TOrchestrator = async function* ( - ctx: OrchestrationContext, - ): any { - const sources: string[] = yield ctx.callActivity(getDataSources); - - // Step 1 – fan-out: fetch from all sources in parallel - const fetchTasks = []; - for (const source of sources) { - fetchTasks.push(ctx.callActivity(fetchData, source)); - } - const rawData: string[] = yield whenAll(fetchTasks); - - // Step 2 – fan-out: transform all fetched data in parallel - const transformTasks = []; - for (const raw of rawData) { - transformTasks.push(ctx.callActivity(transformData, raw)); - } - const transformed: string[] = yield whenAll(transformTasks); - - // Step 3 – chain: save all results - const savedCount: number = yield ctx.callActivity(saveResults, transformed); + const fanOutFanIn: TOrchestrator = async function* (ctx: OrchestrationContext): any { + // Timer: wait 1 second (demonstrates timer span with creation-to-fired duration) + const timerDelay = new Date(ctx.currentUtcDateTime.getTime() + 1000); + yield ctx.createTimer(timerDelay); - return { - sourcesProcessed: sources.length, - resultsSaved: savedCount, - data: transformed, - }; - }; - - /** Returns the list of data sources to process. */ - const getDataSources = async (_ctx: ActivityContext): Promise => { - return ["users-api", "orders-api", "inventory-api", "analytics-api"]; - }; + // Fan-out: schedule parallel weather lookups + const cities = ["Seattle", "Tokyo", "London", "Paris", "Sydney"]; + const tasks = cities.map((city) => ctx.callActivity(getWeather, city)); + const results: string[] = yield whenAll(tasks); - // --- Orchestrator: simple sequence (for a cleaner trace comparison) --- - - const sequenceOrchestrator: TOrchestrator = async function* ( - ctx: OrchestrationContext, - ): any { - const cities = ["Tokyo", "Seattle", "London"]; - const greetings: string[] = []; - for (const city of cities) { - const greeting: string = yield ctx.callActivity(greetCity, city); - greetings.push(greeting); - } - return greetings; - }; + // Aggregate results + const summary: string = yield ctx.callActivity(createSummary, results.join(", ")); - const greetCity = async (_ctx: ActivityContext, city: string): Promise => { - console.log(` [greetCity] Greeting ${city}`); - await new Promise((r) => setTimeout(r, 100)); - return `Hello, ${city}!`; + return summary; }; // --- Create client & worker --- @@ -193,43 +134,30 @@ import { whenAll } from "@microsoft/durabletask-js/dist/task"; client = clientBuilder.build(); worker = workerBuilder - .addOrchestrator(dataPipelineOrchestrator) - .addOrchestrator(sequenceOrchestrator) - .addActivity(fetchData) - .addActivity(transformData) - .addActivity(saveResults) - .addActivity(getDataSources) - .addActivity(greetCity) + .addOrchestrator(fanOutFanIn) + .addActivity(getWeather) + .addActivity(createSummary) .build(); // --- Start worker --- console.log("Starting worker..."); await worker.start(); - // Allow the worker time to establish the gRPC stream with the scheduler. - // worker.start() returns before the connection is fully established. await new Promise((r) => setTimeout(r, 2000)); - console.log("Worker started."); - - // --- Run orchestrations --- + console.log("Worker started with OpenTelemetry tracing."); - // 1) Sequence orchestration - console.log("\n=== Sequence Orchestration ==="); - const seqId = await client.scheduleNewOrchestration(sequenceOrchestrator); - console.log(`Scheduled: ${seqId}`); - const seqState = await client.waitForOrchestrationCompletion(seqId, true, 60); - console.log(`Completed – result: ${seqState?.serializedOutput}`); + // --- Run orchestration --- + console.log("\nScheduling FanOutFanIn orchestration..."); + const instanceId = await client.scheduleNewOrchestration(fanOutFanIn); + console.log(`Started orchestration: ${instanceId}`); - // 2) Data pipeline orchestration (fan-out/fan-in) - console.log("\n=== Data Pipeline Orchestration ==="); - const pipelineId = await client.scheduleNewOrchestration(dataPipelineOrchestrator); - console.log(`Scheduled: ${pipelineId}`); - const pipelineState = await client.waitForOrchestrationCompletion(pipelineId, true, 60); - console.log(`Completed – result: ${pipelineState?.serializedOutput}`); + console.log("Waiting for completion..."); + const state = await client.waitForOrchestrationCompletion(instanceId, true, 60); + console.log(`Status: ${state?.runtimeStatus}`); + console.log(`Result: ${state?.serializedOutput}`); - console.log("\n=== All orchestrations completed! ==="); - console.log( - `Open Jaeger UI at http://localhost:16686 and search for service "durabletask-js-tracing-example" to view traces.`, - ); + console.log("\nView traces in Jaeger UI: http://localhost:16686"); + console.log(' Search for service: durabletask-js-tracing-sample'); + console.log("View orchestration in DTS Dashboard: http://localhost:8082"); } catch (error) { console.error("Error:", error); process.exit(1); @@ -237,8 +165,6 @@ import { whenAll } from "@microsoft/durabletask-js/dist/task"; console.log("\nShutting down..."); if (worker) await worker.stop(); if (client) await client.stop(); - - // Flush remaining spans before exit await sdk.shutdown(); console.log("Done."); process.exit(0); diff --git a/packages/durabletask-js/src/tracing/index.ts b/packages/durabletask-js/src/tracing/index.ts index d2f8988..8335c72 100644 --- a/packages/durabletask-js/src/tracing/index.ts +++ b/packages/durabletask-js/src/tracing/index.ts @@ -22,9 +22,9 @@ export { OrchestrationSpanInfo, startSpanForNewOrchestration, startSpanForOrchestrationExecution, - startSpanForSchedulingTask, + injectTraceContextForSchedulingTask, startSpanForTaskExecution, - startSpanForSchedulingSubOrchestration, + injectTraceContextForSchedulingSubOrchestration, emitSpanForTimer, emitSpanForEventSent, startSpanForEventRaisedFromClient, @@ -35,6 +35,10 @@ export { setSpanError, setSpanOk, endSpan, + setOrchestrationStatusFromActions, createOrchestrationTraceContextPb, processActionsForTracing, + emitRetroactiveActivityClientSpan, + emitRetroactiveSubOrchClientSpan, + processNewEventsForTracing, } from "./trace-helper"; diff --git a/packages/durabletask-js/src/tracing/trace-helper.ts b/packages/durabletask-js/src/tracing/trace-helper.ts index 1a9f62a..aecbb46 100644 --- a/packages/durabletask-js/src/tracing/trace-helper.ts +++ b/packages/durabletask-js/src/tracing/trace-helper.ts @@ -9,6 +9,8 @@ import { getOtelApi, createPbTraceContextFromSpan, createParentContextFromPb, + generateSpanId, + createTraceparent, } from "./trace-context-utils"; import type { Span, Tracer } from "@opentelemetry/api"; @@ -75,6 +77,7 @@ export function startSpanForNewOrchestration(req: pb.CreateInstanceRequest): Spa const name = req.getName(); const version = req.getVersion()?.getValue(); const instanceId = req.getInstanceid(); + const executionId = req.getExecutionid()?.getValue(); const spanName = createSpanName(TaskType.CREATE_ORCHESTRATION, name, version); const span = ctx.tracer.startSpan(spanName, { @@ -84,6 +87,7 @@ export function startSpanForNewOrchestration(req: pb.CreateInstanceRequest): Spa [DurableTaskAttributes.TASK_NAME]: name, [DurableTaskAttributes.TASK_INSTANCE_ID]: instanceId, ...(version ? { [DurableTaskAttributes.TASK_VERSION]: version } : {}), + ...(executionId ? { [DurableTaskAttributes.TASK_EXECUTION_ID]: executionId } : {}), }, }); @@ -179,43 +183,41 @@ export function startSpanForOrchestrationExecution( * @param action - The ScheduleTaskAction to inject trace context into. * @param taskId - The sequential task ID. */ -export function startSpanForSchedulingTask( +/** + * Injects trace context into a ScheduleTaskAction for activity scheduling. + * Generates a random client span ID (matching .NET's CreateTraceContext pattern) + * without creating an actual Client span — the retroactive Client span is emitted + * later from processNewEventsForTracing when the activity completes. + * + * @param orchestrationSpan - The parent orchestration span. + * @param action - The ScheduleTaskAction to inject trace context into. + */ +export function injectTraceContextForSchedulingTask( orchestrationSpan: Span, action: pb.ScheduleTaskAction, - taskId: number, ): void { const ctx = getTracingContext(); if (!ctx) return; - const name = action.getName(); - const version = action.getVersion()?.getValue(); - const spanName = createSpanName(TaskType.ACTIVITY, name, version); + const orchSpanContext = orchestrationSpan.spanContext(); + if (!ctx.otel.isSpanContextValid(orchSpanContext)) return; - // Create a context with the orchestration span as parent - const parentContext = ctx.otel.trace.setSpan(ctx.otel.context.active(), orchestrationSpan); + // Generate a random client span ID (like .NET's ActivitySpanId.CreateRandom()) + const clientSpanId = generateSpanId(); + const traceparent = createTraceparent(orchSpanContext.traceId, clientSpanId, orchSpanContext.traceFlags); - const span = ctx.tracer.startSpan( - spanName, - { - kind: ctx.otel.SpanKind.CLIENT, - attributes: { - [DurableTaskAttributes.TYPE]: TaskType.ACTIVITY, - [DurableTaskAttributes.TASK_NAME]: name, - [DurableTaskAttributes.TASK_TASK_ID]: taskId, - ...(version ? { [DurableTaskAttributes.TASK_VERSION]: version } : {}), - }, - }, - parentContext, - ); + const pbCtx = new pb.TraceContext(); + pbCtx.setTraceparent(traceparent); + pbCtx.setSpanid(clientSpanId); - // Inject trace context into the action - const pbCtx = createPbTraceContextFromSpan(span); - if (pbCtx) { - action.setParenttracecontext(pbCtx); + const tracestate = orchSpanContext.traceState?.serialize(); + if (tracestate) { + const sv = new StringValue(); + sv.setValue(tracestate); + pbCtx.setTracestate(sv); } - // End the span immediately - it represents the act of scheduling, not execution - span.end(); + action.setParenttracecontext(pbCtx); } /** @@ -229,7 +231,8 @@ export function startSpanForTaskExecution(req: pb.ActivityRequest): Span | undef if (!ctx) return undefined; const name = req.getName(); - const spanName = createSpanName(TaskType.ACTIVITY, name); + const version = req.getVersion()?.getValue(); + const spanName = createSpanName(TaskType.ACTIVITY, name, version); const parentPbCtx = req.getParenttracecontext(); const parentContext = createParentContextFromPb(parentPbCtx); @@ -245,6 +248,7 @@ export function startSpanForTaskExecution(req: pb.ActivityRequest): Span | undef [DurableTaskAttributes.TASK_NAME]: name, [DurableTaskAttributes.TASK_INSTANCE_ID]: instanceId, [DurableTaskAttributes.TASK_TASK_ID]: req.getTaskid(), + ...(version ? { [DurableTaskAttributes.TASK_VERSION]: version } : {}), }, }, parentContext, @@ -261,44 +265,40 @@ export function startSpanForTaskExecution(req: pb.ActivityRequest): Span | undef * @param action - The CreateSubOrchestrationAction to inject trace context into. * @param taskId - The sequential task ID. */ -export function startSpanForSchedulingSubOrchestration( +/** + * Injects trace context into a CreateSubOrchestrationAction for sub-orchestration scheduling. + * Generates a random client span ID (matching .NET's CreateTraceContext pattern) + * without creating an actual Client span — the retroactive Client span is emitted + * later from processNewEventsForTracing when the sub-orchestration completes. + * + * @param orchestrationSpan - The parent orchestration span. + * @param action - The CreateSubOrchestrationAction to inject trace context into. + */ +export function injectTraceContextForSchedulingSubOrchestration( orchestrationSpan: Span, action: pb.CreateSubOrchestrationAction, - taskId: number, ): void { const ctx = getTracingContext(); if (!ctx) return; - const name = action.getName(); - const version = action.getVersion()?.getValue(); - const instanceId = action.getInstanceid(); - const spanName = createSpanName(TaskType.ORCHESTRATION, name, version); + const orchSpanContext = orchestrationSpan.spanContext(); + if (!ctx.otel.isSpanContextValid(orchSpanContext)) return; - const parentContext = ctx.otel.trace.setSpan(ctx.otel.context.active(), orchestrationSpan); + const clientSpanId = generateSpanId(); + const traceparent = createTraceparent(orchSpanContext.traceId, clientSpanId, orchSpanContext.traceFlags); - const span = ctx.tracer.startSpan( - spanName, - { - kind: ctx.otel.SpanKind.CLIENT, - attributes: { - [DurableTaskAttributes.TYPE]: TaskType.ORCHESTRATION, - [DurableTaskAttributes.TASK_NAME]: name, - [DurableTaskAttributes.TASK_INSTANCE_ID]: instanceId, - [DurableTaskAttributes.TASK_TASK_ID]: taskId, - ...(version ? { [DurableTaskAttributes.TASK_VERSION]: version } : {}), - }, - }, - parentContext, - ); + const pbCtx = new pb.TraceContext(); + pbCtx.setTraceparent(traceparent); + pbCtx.setSpanid(clientSpanId); - // Inject trace context into the action - const pbCtx = createPbTraceContextFromSpan(span); - if (pbCtx) { - action.setParenttracecontext(pbCtx); + const tracestate = orchSpanContext.traceState?.serialize(); + if (tracestate) { + const sv = new StringValue(); + sv.setValue(tracestate); + pbCtx.setTracestate(sv); } - // End the span immediately - it represents the act of scheduling - span.end(); + action.setParenttracecontext(pbCtx); } /** @@ -308,12 +308,15 @@ export function startSpanForSchedulingSubOrchestration( * @param orchestrationName - The name of the parent orchestration. * @param fireAt - When the timer fires. * @param timerId - The timer's sequential ID. + * @param instanceId - The orchestration instance ID. */ export function emitSpanForTimer( orchestrationSpan: Span, orchestrationName: string, fireAt: Date, timerId: number, + instanceId?: string, + startTime?: Date, ): void { const ctx = getTracingContext(); if (!ctx) return; @@ -325,10 +328,13 @@ export function emitSpanForTimer( spanName, { kind: ctx.otel.SpanKind.INTERNAL, + startTime: startTime, attributes: { [DurableTaskAttributes.TYPE]: TaskType.TIMER, + [DurableTaskAttributes.TASK_NAME]: orchestrationName, [DurableTaskAttributes.TASK_TASK_ID]: timerId, [DurableTaskAttributes.FIRE_AT]: fireAt.toISOString(), + ...(instanceId ? { [DurableTaskAttributes.TASK_INSTANCE_ID]: instanceId } : {}), }, }, parentContext, @@ -337,17 +343,234 @@ export function emitSpanForTimer( span.end(); } +/** + * Emits a retroactive Client-kind span for a completed/failed task or sub-orchestration. + * Common helper for activity and sub-orchestration retroactive spans. + */ +function emitRetroactiveClientSpan( + orchestrationSpan: Span, + taskType: string, + taskName: string, + version: string | undefined, + instanceId: string, + startTime?: Date, + failureMessage?: string, + taskId?: number, +): void { + const ctx = getTracingContext(); + if (!ctx) return; + + const spanName = createSpanName(taskType, taskName, version); + const parentContext = ctx.otel.trace.setSpan(ctx.otel.context.active(), orchestrationSpan); + + const span = ctx.tracer.startSpan( + spanName, + { + kind: ctx.otel.SpanKind.CLIENT, + startTime: startTime, + attributes: { + [DurableTaskAttributes.TYPE]: taskType, + [DurableTaskAttributes.TASK_NAME]: taskName, + [DurableTaskAttributes.TASK_INSTANCE_ID]: instanceId, + ...(version ? { [DurableTaskAttributes.TASK_VERSION]: version } : {}), + ...(taskId !== undefined ? { [DurableTaskAttributes.TASK_TASK_ID]: taskId } : {}), + }, + }, + parentContext, + ); + + if (failureMessage) { + span.setStatus({ code: ctx.otel.SpanStatusCode.ERROR, message: failureMessage }); + } + + span.end(); +} + +/** + * Emits a retroactive Client-kind span for a completed/failed activity task. + * This matches the .NET SDK pattern (EmitTraceActivityForTaskCompleted/Failed) where + * client spans are emitted at completion time with startTime from the original + * TaskScheduled event timestamp, providing accurate scheduling-to-completion duration. + * + * @param orchestrationSpan - The parent orchestration span. + * @param taskName - The activity name. + * @param version - The activity version (optional). + * @param instanceId - The orchestration instance ID. + * @param taskId - The task's sequential ID. + * @param startTime - The scheduling timestamp from the TaskScheduled history event. + * @param failureMessage - If the task failed, the error message. + */ +export function emitRetroactiveActivityClientSpan( + orchestrationSpan: Span, + taskName: string, + version: string | undefined, + instanceId: string, + taskId: number, + startTime?: Date, + failureMessage?: string, +): void { + emitRetroactiveClientSpan( + orchestrationSpan, TaskType.ACTIVITY, taskName, version, + instanceId, startTime, failureMessage, taskId, + ); +} + +/** + * Emits a retroactive Client-kind span for a completed/failed sub-orchestration. + * Matches .NET SDK's EmitTraceActivityForSubOrchestrationCompleted/Failed pattern. + * + * @param orchestrationSpan - The parent orchestration span. + * @param subOrchName - The sub-orchestration name. + * @param version - The sub-orchestration version (optional). + * @param instanceId - The parent orchestration instance ID. + * @param startTime - The scheduling timestamp from the SubOrchestrationInstanceCreated event. + * @param failureMessage - If the sub-orchestration failed, the error message. + */ +export function emitRetroactiveSubOrchClientSpan( + orchestrationSpan: Span, + subOrchName: string, + version: string | undefined, + instanceId: string, + startTime?: Date, + failureMessage?: string, +): void { + emitRetroactiveClientSpan( + orchestrationSpan, TaskType.ORCHESTRATION, subOrchName, version, + instanceId, startTime, failureMessage, + ); +} + +/** + * Processes new history events to emit retroactive spans for completed/failed tasks, + * sub-orchestrations, and fired timers. This follows the .NET SDK pattern where the + * worker emits these spans before the orchestrator executor runs. + * + * @param orchestrationSpan - The orchestration span (parent for retroactive spans). + * @param pastEvents - The past (replay) history events to look up scheduling events. + * @param newEvents - The new history events to process for completions/failures. + * @param instanceId - The orchestration instance ID. + * @param orchestrationName - The orchestration name (for timer spans). + */ +export function processNewEventsForTracing( + orchestrationSpan: Span | undefined | null, + pastEvents: pb.HistoryEvent[], + newEvents: pb.HistoryEvent[], + instanceId: string, + orchestrationName: string, +): void { + if (!orchestrationSpan) return; + if (!getTracingContext()) return; + + // Build lookup maps from past events + const taskScheduledEvents = new Map(); + const subOrchCreatedEvents = new Map(); + const timerCreatedEvents = new Map(); + + for (const event of pastEvents) { + const eventId = event.getEventid(); + if (event.hasTaskscheduled()) { + taskScheduledEvents.set(eventId, event); + } else if (event.hasSuborchestrationinstancecreated()) { + subOrchCreatedEvents.set(eventId, event); + } else if (event.hasTimercreated()) { + timerCreatedEvents.set(eventId, event); + } + } + + // Process new events for completions, failures, and timer firings + for (const newEvent of newEvents) { + if (newEvent.hasTaskcompleted()) { + const taskCompleted = newEvent.getTaskcompleted()!; + const scheduledEvent = taskScheduledEvents.get(taskCompleted.getTaskscheduledid()); + if (scheduledEvent) { + const taskScheduled = scheduledEvent.getTaskscheduled()!; + emitRetroactiveActivityClientSpan( + orchestrationSpan, + taskScheduled.getName(), + taskScheduled.getVersion()?.getValue(), + instanceId, + scheduledEvent.getEventid(), + scheduledEvent.getTimestamp()?.toDate(), + ); + } + } else if (newEvent.hasTaskfailed()) { + const taskFailed = newEvent.getTaskfailed()!; + const scheduledEvent = taskScheduledEvents.get(taskFailed.getTaskscheduledid()); + if (scheduledEvent) { + const taskScheduled = scheduledEvent.getTaskscheduled()!; + const failureMessage = + taskFailed.getFailuredetails()?.getErrormessage() ?? "Unspecified task activity failure"; + emitRetroactiveActivityClientSpan( + orchestrationSpan, + taskScheduled.getName(), + taskScheduled.getVersion()?.getValue(), + instanceId, + scheduledEvent.getEventid(), + scheduledEvent.getTimestamp()?.toDate(), + failureMessage, + ); + } + } else if (newEvent.hasSuborchestrationinstancecompleted()) { + const subOrchCompleted = newEvent.getSuborchestrationinstancecompleted()!; + const createdEvent = subOrchCreatedEvents.get(subOrchCompleted.getTaskscheduledid()); + if (createdEvent) { + const subOrchCreated = createdEvent.getSuborchestrationinstancecreated()!; + emitRetroactiveSubOrchClientSpan( + orchestrationSpan, + subOrchCreated.getName(), + subOrchCreated.getVersion()?.getValue(), + instanceId, + createdEvent.getTimestamp()?.toDate(), + ); + } + } else if (newEvent.hasSuborchestrationinstancefailed()) { + const subOrchFailed = newEvent.getSuborchestrationinstancefailed()!; + const createdEvent = subOrchCreatedEvents.get(subOrchFailed.getTaskscheduledid()); + if (createdEvent) { + const subOrchCreated = createdEvent.getSuborchestrationinstancecreated()!; + const failureMessage = + subOrchFailed.getFailuredetails()?.getErrormessage() ?? "Unspecified sub-orchestration failure"; + emitRetroactiveSubOrchClientSpan( + orchestrationSpan, + subOrchCreated.getName(), + subOrchCreated.getVersion()?.getValue(), + instanceId, + createdEvent.getTimestamp()?.toDate(), + failureMessage, + ); + } + } else if (newEvent.hasTimerfired()) { + const timerFired = newEvent.getTimerfired()!; + const timerId = timerFired.getTimerid(); + const createdEvent = timerCreatedEvents.get(timerId); + const fireAt = timerFired.getFireat()?.toDate() ?? new Date(); + emitSpanForTimer( + orchestrationSpan, + orchestrationName, + fireAt, + timerId, + instanceId, + createdEvent?.getTimestamp()?.toDate(), + ); + } + } +} + /** * Emits a span for sending an event to another orchestration. * * @param orchestrationSpan - The parent orchestration span. * @param eventName - The name of the event. * @param targetInstanceId - The target orchestration instance ID. + * @param instanceId - The source orchestration instance ID. + * @param executionId - The source orchestration execution ID. */ export function emitSpanForEventSent( orchestrationSpan: Span, eventName: string, targetInstanceId?: string, + instanceId?: string, + executionId?: string, ): void { const ctx = getTracingContext(); if (!ctx) return; @@ -362,6 +585,8 @@ export function emitSpanForEventSent( attributes: { [DurableTaskAttributes.TYPE]: TaskType.EVENT, [DurableTaskAttributes.TASK_NAME]: eventName, + ...(instanceId ? { [DurableTaskAttributes.TASK_INSTANCE_ID]: instanceId } : {}), + ...(executionId ? { [DurableTaskAttributes.TASK_EXECUTION_ID]: executionId } : {}), ...(targetInstanceId ? { [DurableTaskAttributes.EVENT_TARGET_INSTANCE_ID]: targetInstanceId } : {}), }, }, @@ -440,6 +665,60 @@ export function endSpan(span: Span | undefined | null): void { } } +/** + * Sets the orchestration completion status attribute and span status based on the + * completion action. Matches .NET behavior: sets ERROR status with result message + * when orchestration fails, OK otherwise. + * + * @param span - The orchestration span. + * @param actions - The orchestrator actions to inspect for completion status. + */ +export function setOrchestrationStatusFromActions( + span: Span | undefined | null, + actions: pb.OrchestratorAction[], +): void { + if (!span) return; + const otel = getOtelApi(); + if (!otel) return; + + for (const action of actions) { + if (action.hasCompleteorchestration()) { + const completeAction = action.getCompleteorchestration()!; + const status = completeAction.getOrchestrationstatus(); + const statusName = orchestrationStatusToString(status); + if (statusName) { + span.setAttribute(DurableTaskAttributes.TASK_STATUS, statusName); + } + + // Match .NET: set span error status when orchestration completes with Failed + if (status === pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED) { + const errorMessage = completeAction.getResult()?.getValue() ?? "Orchestration failed"; + span.setStatus({ code: otel.SpanStatusCode.ERROR, message: errorMessage }); + } else { + span.setStatus({ code: otel.SpanStatusCode.OK }); + } + + break; + } + } +} + +/** Maps protobuf OrchestrationStatus enum to a human-readable string. */ +const orchestrationStatusNames: Record = { + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING]: "Running", + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED]: "Completed", + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_CONTINUED_AS_NEW]: "ContinuedAsNew", + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED]: "Failed", + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_CANCELED]: "Canceled", + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED]: "Terminated", + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_PENDING]: "Pending", + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_SUSPENDED]: "Suspended", +}; + +function orchestrationStatusToString(status: pb.OrchestrationStatus): string | undefined { + return orchestrationStatusNames[status]; +} + /** * Creates an OrchestrationTraceContext protobuf message for the orchestrator response. * @@ -467,28 +746,37 @@ export function createOrchestrationTraceContextPb(spanInfo: OrchestrationSpanInf * @param orchestrationSpan - The orchestration span (parent for action spans). * @param actions - The OrchestratorAction list to process. * @param orchestrationName - The name of the orchestration (for timer spans). + * @param instanceId - The orchestration instance ID (for enriching span attributes). + * @param executionId - The orchestration execution ID (for event spans). */ export function processActionsForTracing( orchestrationSpan: Span | undefined | null, actions: pb.OrchestratorAction[], orchestrationName: string, + instanceId?: string, + executionId?: string, ): void { if (!orchestrationSpan) return; for (const action of actions) { if (action.hasScheduletask()) { const scheduleTask = action.getScheduletask()!; - startSpanForSchedulingTask(orchestrationSpan, scheduleTask, action.getId()); + injectTraceContextForSchedulingTask(orchestrationSpan, scheduleTask); } else if (action.hasCreatesuborchestration()) { const createSubOrch = action.getCreatesuborchestration()!; - startSpanForSchedulingSubOrchestration(orchestrationSpan, createSubOrch, action.getId()); + injectTraceContextForSchedulingSubOrchestration(orchestrationSpan, createSubOrch); } else if (action.hasCreatetimer()) { - const createTimer = action.getCreatetimer()!; - const fireAt = createTimer.getFireat()?.toDate() ?? new Date(); - emitSpanForTimer(orchestrationSpan, orchestrationName, fireAt, action.getId()); + // Timer spans are emitted retroactively from TimerFired events in + // processNewEventsForTracing (matching .NET/Java), not here. } else if (action.hasSendevent()) { const sendEvent = action.getSendevent()!; - emitSpanForEventSent(orchestrationSpan, sendEvent.getName(), sendEvent.getInstance()?.getInstanceid()); + emitSpanForEventSent( + orchestrationSpan, + sendEvent.getName(), + sendEvent.getInstance()?.getInstanceid(), + instanceId, + executionId, + ); } else if (action.hasSendentitymessage()) { const sendEntityMsg = action.getSendentitymessage()!; if (sendEntityMsg.hasEntityoperationcalled()) { diff --git a/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts b/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts index e12464d..03a2e12 100644 --- a/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts +++ b/packages/durabletask-js/src/worker/task-hub-grpc-worker.ts @@ -25,10 +25,13 @@ import { VersioningOptions, VersionMatchStrategy, VersionFailureStrategy } from import { compareVersions } from "../utils/versioning.util"; import * as WorkerLogs from "./logs"; import { + DurableTaskAttributes, startSpanForOrchestrationExecution, startSpanForTaskExecution, processActionsForTracing, createOrchestrationTraceContextPb, + setOrchestrationStatusFromActions, + processNewEventsForTracing, setSpanError, setSpanOk, endSpan, @@ -653,6 +656,20 @@ export class TaskHubGrpcWorker { ? startSpanForOrchestrationExecution(executionStartedProtoEvent, orchTraceContext, instanceId) : undefined; + // Emit retroactive spans for tasks/sub-orchestrations that completed/failed and timers + // that fired. This follows the .NET SDK pattern where these spans are emitted from + // history events BEFORE the orchestrator executor runs. + const orchName = executionStartedProtoEvent?.getName() ?? ""; + if (tracingResult) { + processNewEventsForTracing( + tracingResult.span, + req.getPasteventsList(), + req.getNeweventsList(), + instanceId, + orchName, + ); + } + let res; try { @@ -661,8 +678,8 @@ export class TaskHubGrpcWorker { // Process actions to inject trace context into scheduled tasks, sub-orchestrations, etc. if (tracingResult) { - const orchName = executionStartedProtoEvent?.getName() ?? ""; - processActionsForTracing(tracingResult.span, result.actions, orchName); + const executionId = req.getExecutionid()?.getValue(); + processActionsForTracing(tracingResult.span, result.actions, orchName, instanceId, executionId); } res = new pb.OrchestratorResponse(); @@ -678,7 +695,9 @@ export class TaskHubGrpcWorker { const orchTraceCtxPb = createOrchestrationTraceContextPb(tracingResult.spanInfo); res.setOrchestrationtracecontext(orchTraceCtxPb); - setSpanOk(tracingResult.span); + // Set orchestration completion status attribute and span status + // (OK for success, ERROR for failed orchestrations — matching .NET) + setOrchestrationStatusFromActions(tracingResult.span, result.actions); } } catch (e: unknown) { const error = e instanceof Error ? e : new Error(String(e)); @@ -687,6 +706,9 @@ export class TaskHubGrpcWorker { // Record the failure on the tracing span if (tracingResult) { setSpanError(tracingResult.span, error); + // Set just the status attribute — don't call setOrchestrationStatusFromActions + // which would overwrite the specific error message with a generic one + tracingResult.span.setAttribute(DurableTaskAttributes.TASK_STATUS, "Failed"); } const failureDetails = pbh.newFailureDetails(error); diff --git a/packages/durabletask-js/test/tracing.spec.ts b/packages/durabletask-js/test/tracing.spec.ts index 58f2d55..4e2dfa0 100644 --- a/packages/durabletask-js/test/tracing.spec.ts +++ b/packages/durabletask-js/test/tracing.spec.ts @@ -29,6 +29,10 @@ import { emitSpanForEventSent, processActionsForTracing, createOrchestrationTraceContextPb, + setOrchestrationStatusFromActions, + emitRetroactiveActivityClientSpan, + emitRetroactiveSubOrchClientSpan, + processNewEventsForTracing, setSpanError, setSpanOk, endSpan, @@ -545,7 +549,7 @@ describe("Trace Helper - setSpanError and setSpanOk", () => { }); describe("Trace Helper - processActionsForTracing", () => { - it("should create spans for ScheduleTaskAction", () => { + it("should inject trace context for ScheduleTaskAction without creating a span", () => { const tracer = otel.trace.getTracer(TRACER_NAME); const parentSpan = tracer.startSpan("parent-orch"); @@ -560,12 +564,8 @@ describe("Trace Helper - processActionsForTracing", () => { parentSpan.end(); const spans = exporter.getFinishedSpans(); - // Should have parent + child (schedule task) - expect(spans.length).toBe(2); - - const childSpan = spans.find((s: any) => s.name === "activity:MyActivity"); - expect(childSpan).toBeDefined(); - expect(childSpan!.kind).toBe(otel.SpanKind.CLIENT); + // Should have only the parent span — no Client span created (matching .NET) + expect(spans.length).toBe(1); // Trace context should have been injected into the action const traceCtx = scheduleTask.getParenttracecontext(); @@ -573,7 +573,7 @@ describe("Trace Helper - processActionsForTracing", () => { expect(traceCtx!.getTraceparent()).toMatch(/^00-[0-9a-f]{32}-[0-9a-f]{16}-0[01]$/); }); - it("should create spans for CreateSubOrchestrationAction", () => { + it("should inject trace context for CreateSubOrchestrationAction without creating a span", () => { const tracer = otel.trace.getTracer(TRACER_NAME); const parentSpan = tracer.startSpan("parent-orch"); @@ -589,12 +589,16 @@ describe("Trace Helper - processActionsForTracing", () => { parentSpan.end(); const spans = exporter.getFinishedSpans(); - const childSpan = spans.find((s: any) => s.name === "orchestration:SubOrch"); - expect(childSpan).toBeDefined(); - expect(childSpan!.kind).toBe(otel.SpanKind.CLIENT); + // Should have only the parent span — no Client span created (matching .NET) + expect(spans.length).toBe(1); + + // Trace context should have been injected + const traceCtx = createSubOrch.getParenttracecontext(); + expect(traceCtx).toBeDefined(); + expect(traceCtx!.getTraceparent()).toMatch(/^00-[0-9a-f]{32}-[0-9a-f]{16}-0[01]$/); }); - it("should create spans for CreateTimerAction", () => { + it("should skip timer actions (timers are emitted retroactively from TimerFired events)", () => { const tracer = otel.trace.getTracer(TRACER_NAME); const parentSpan = tracer.startSpan("parent-orch"); @@ -613,8 +617,7 @@ describe("Trace Helper - processActionsForTracing", () => { const spans = exporter.getFinishedSpans(); const timerSpan = spans.find((s: any) => s.name === "orchestration:MyOrchestration:timer"); - expect(timerSpan).toBeDefined(); - expect(timerSpan!.kind).toBe(otel.SpanKind.INTERNAL); + expect(timerSpan).toBeUndefined(); }); it("should create spans for SendEventAction", () => { @@ -1001,3 +1004,590 @@ describe("Trace Helper - setSpanError with unknown types", () => { expect(spans[0].status.message).toBe("null"); }); }); + +describe("Trace Helper - execution_id on creation spans", () => { + it("should include execution_id when present on CreateInstanceRequest", () => { + const req = new pb.CreateInstanceRequest(); + req.setName("MyOrchestration"); + req.setInstanceid("test-instance"); + const execId = new StringValue(); + execId.setValue("exec-abc-123"); + req.setExecutionid(execId); + + const span = startSpanForNewOrchestration(req); + span!.end(); + + const spans = exporter.getFinishedSpans(); + expect(spans[0].attributes[DurableTaskAttributes.TASK_EXECUTION_ID]).toBe("exec-abc-123"); + }); + + it("should not include execution_id when not present", () => { + const req = new pb.CreateInstanceRequest(); + req.setName("MyOrchestration"); + req.setInstanceid("test-instance"); + + const span = startSpanForNewOrchestration(req); + span!.end(); + + const spans = exporter.getFinishedSpans(); + expect(spans[0].attributes[DurableTaskAttributes.TASK_EXECUTION_ID]).toBeUndefined(); + }); +}); + +describe("Trace Helper - version on activity execution spans", () => { + it("should include version in span name and attributes when provided", () => { + const req = new pb.ActivityRequest(); + req.setName("MyActivity"); + req.setTaskid(1); + const versionValue = new StringValue(); + versionValue.setValue("1.5.0"); + req.setVersion(versionValue); + + const orchInstance = new pb.OrchestrationInstance(); + orchInstance.setInstanceid("parent-orch-id"); + req.setOrchestrationinstance(orchInstance); + + const span = startSpanForTaskExecution(req); + span!.end(); + + const spans = exporter.getFinishedSpans(); + expect(spans[0].name).toBe("activity:MyActivity@(1.5.0)"); + expect(spans[0].attributes[DurableTaskAttributes.TASK_VERSION]).toBe("1.5.0"); + }); + + it("should not include version when not present", () => { + const req = new pb.ActivityRequest(); + req.setName("MyActivity"); + req.setTaskid(1); + + const orchInstance = new pb.OrchestrationInstance(); + orchInstance.setInstanceid("parent-orch-id"); + req.setOrchestrationinstance(orchInstance); + + const span = startSpanForTaskExecution(req); + span!.end(); + + const spans = exporter.getFinishedSpans(); + expect(spans[0].name).toBe("activity:MyActivity"); + expect(spans[0].attributes[DurableTaskAttributes.TASK_VERSION]).toBeUndefined(); + }); +}); + +describe("Trace Helper - timer span enrichment", () => { + it("should include name and instance_id on timer spans", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const parentSpan = tracer.startSpan("parent-orch-timer-enriched"); + const fireAt = new Date("2025-07-01T12:00:00Z"); + + emitSpanForTimer(parentSpan, "EnrichedOrch", fireAt, 7, "instance-enriched-123"); + parentSpan.end(); + + const spans = exporter.getFinishedSpans(); + const timerSpan = spans.find((s: any) => s.name === "orchestration:EnrichedOrch:timer"); + expect(timerSpan).toBeDefined(); + expect(timerSpan!.attributes[DurableTaskAttributes.TASK_NAME]).toBe("EnrichedOrch"); + expect(timerSpan!.attributes[DurableTaskAttributes.TASK_INSTANCE_ID]).toBe("instance-enriched-123"); + }); + + it("should omit instance_id when not provided", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const parentSpan = tracer.startSpan("parent-orch-timer-no-id"); + const fireAt = new Date("2025-07-01T12:00:00Z"); + + emitSpanForTimer(parentSpan, "NoIdOrch", fireAt, 8); + parentSpan.end(); + + const spans = exporter.getFinishedSpans(); + const timerSpan = spans.find((s: any) => s.name === "orchestration:NoIdOrch:timer"); + expect(timerSpan).toBeDefined(); + expect(timerSpan!.attributes[DurableTaskAttributes.TASK_NAME]).toBe("NoIdOrch"); + expect(timerSpan!.attributes[DurableTaskAttributes.TASK_INSTANCE_ID]).toBeUndefined(); + }); +}); + +describe("Trace Helper - setOrchestrationStatusFromActions", () => { + it("should set status attribute for completed orchestration and OK span status", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const span = tracer.startSpan("orch-status-test"); + + const completeAction = new pb.CompleteOrchestrationAction(); + completeAction.setOrchestrationstatus(pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + + const action = new pb.OrchestratorAction(); + action.setCompleteorchestration(completeAction); + + setOrchestrationStatusFromActions(span, [action]); + span.end(); + + const spans = exporter.getFinishedSpans(); + expect(spans[0].attributes[DurableTaskAttributes.TASK_STATUS]).toBe("Completed"); + expect(spans[0].status.code).toBe(otel.SpanStatusCode.OK); + }); + + it("should set status attribute for failed orchestration and ERROR span status", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const span = tracer.startSpan("orch-status-failed"); + + const completeAction = new pb.CompleteOrchestrationAction(); + completeAction.setOrchestrationstatus(pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED); + const resultValue = new StringValue(); + resultValue.setValue("User code threw an error"); + completeAction.setResult(resultValue); + + const action = new pb.OrchestratorAction(); + action.setCompleteorchestration(completeAction); + + setOrchestrationStatusFromActions(span, [action]); + span.end(); + + const spans = exporter.getFinishedSpans(); + expect(spans[0].attributes[DurableTaskAttributes.TASK_STATUS]).toBe("Failed"); + expect(spans[0].status.code).toBe(otel.SpanStatusCode.ERROR); + expect(spans[0].status.message).toBe("User code threw an error"); + }); + + it("should not set status when no completion action present", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const span = tracer.startSpan("orch-status-none"); + + const action = new pb.OrchestratorAction(); + action.setScheduletask(new pb.ScheduleTaskAction()); + + setOrchestrationStatusFromActions(span, [action]); + span.end(); + + const spans = exporter.getFinishedSpans(); + expect(spans[0].attributes[DurableTaskAttributes.TASK_STATUS]).toBeUndefined(); + }); + + it("should safely handle null/undefined span", () => { + const action = new pb.OrchestratorAction(); + action.setCompleteorchestration(new pb.CompleteOrchestrationAction()); + + // Should not throw + setOrchestrationStatusFromActions(null, [action]); + setOrchestrationStatusFromActions(undefined, [action]); + }); + + it("should handle terminated status", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const span = tracer.startSpan("orch-status-terminated"); + + const completeAction = new pb.CompleteOrchestrationAction(); + completeAction.setOrchestrationstatus(pb.OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED); + + const action = new pb.OrchestratorAction(); + action.setCompleteorchestration(completeAction); + + setOrchestrationStatusFromActions(span, [action]); + span.end(); + + const spans = exporter.getFinishedSpans(); + expect(spans[0].attributes[DurableTaskAttributes.TASK_STATUS]).toBe("Terminated"); + }); + + it("should handle continued_as_new status", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const span = tracer.startSpan("orch-status-can"); + + const completeAction = new pb.CompleteOrchestrationAction(); + completeAction.setOrchestrationstatus(pb.OrchestrationStatus.ORCHESTRATION_STATUS_CONTINUED_AS_NEW); + + const action = new pb.OrchestratorAction(); + action.setCompleteorchestration(completeAction); + + setOrchestrationStatusFromActions(span, [action]); + span.end(); + + const spans = exporter.getFinishedSpans(); + expect(spans[0].attributes[DurableTaskAttributes.TASK_STATUS]).toBe("ContinuedAsNew"); + }); +}); + +describe("Trace Helper - processActionsForTracing with instanceId", () => { + it("should pass instanceId to event spans", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const orchSpan = tracer.startSpan("orch-with-instance"); + + const sendEvent = new pb.SendEventAction(); + sendEvent.setName("TestEvent"); + const instance = new pb.OrchestrationInstance(); + instance.setInstanceid("target-instance-99"); + sendEvent.setInstance(instance); + + const action = new pb.OrchestratorAction(); + action.setId(10); + action.setSendevent(sendEvent); + + processActionsForTracing(orchSpan, [action], "TestOrch", "source-orch-instance-42"); + orchSpan.end(); + + const spans = exporter.getFinishedSpans(); + const eventSpan = spans.find((s: any) => s.name === "orchestration_event:TestEvent"); + expect(eventSpan).toBeDefined(); + expect(eventSpan!.attributes[DurableTaskAttributes.TASK_INSTANCE_ID]).toBe("source-orch-instance-42"); + expect(eventSpan!.attributes[DurableTaskAttributes.EVENT_TARGET_INSTANCE_ID]).toBe("target-instance-99"); + }); +}); + +describe("Retroactive span emission", () => { + beforeEach(() => { + exporter.reset(); + }); + + describe("emitRetroactiveActivityClientSpan", () => { + it("should emit a Client span with historical startTime for completed activities", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const orchSpan = tracer.startSpan("parent-orch"); + const schedulingTime = new Date("2025-01-15T10:00:00Z"); + + emitRetroactiveActivityClientSpan( + orchSpan, + "GetWeather", + "1.0", + "instance-abc", + 5, + schedulingTime, + ); + orchSpan.end(); + + const spans = exporter.getFinishedSpans(); + const clientSpan = spans.find((s: any) => s.name === "activity:GetWeather@(1.0)"); + expect(clientSpan).toBeDefined(); + expect(clientSpan!.kind).toBe(otel.SpanKind.CLIENT); + expect(clientSpan!.attributes[DurableTaskAttributes.TYPE]).toBe(TaskType.ACTIVITY); + expect(clientSpan!.attributes[DurableTaskAttributes.TASK_NAME]).toBe("GetWeather"); + expect(clientSpan!.attributes[DurableTaskAttributes.TASK_VERSION]).toBe("1.0"); + expect(clientSpan!.attributes[DurableTaskAttributes.TASK_INSTANCE_ID]).toBe("instance-abc"); + expect(clientSpan!.attributes[DurableTaskAttributes.TASK_TASK_ID]).toBe(5); + // Verify historical startTime is used + expect(clientSpan!.startTime[0]).toBe(Math.floor(schedulingTime.getTime() / 1000)); + }); + + it("should set error status for failed activities", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const orchSpan = tracer.startSpan("parent-orch"); + + emitRetroactiveActivityClientSpan( + orchSpan, + "FailingTask", + undefined, + "instance-def", + 7, + new Date(), + "Task timed out", + ); + orchSpan.end(); + + const spans = exporter.getFinishedSpans(); + const clientSpan = spans.find((s: any) => s.name === "activity:FailingTask"); + expect(clientSpan).toBeDefined(); + expect(clientSpan!.status.code).toBe(otel.SpanStatusCode.ERROR); + expect(clientSpan!.status.message).toBe("Task timed out"); + }); + + it("should work without version", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const orchSpan = tracer.startSpan("parent-orch"); + + emitRetroactiveActivityClientSpan( + orchSpan, + "SimpleTask", + undefined, + "instance-ghi", + 3, + ); + orchSpan.end(); + + const spans = exporter.getFinishedSpans(); + const clientSpan = spans.find((s: any) => s.name === "activity:SimpleTask"); + expect(clientSpan).toBeDefined(); + expect(clientSpan!.attributes[DurableTaskAttributes.TASK_VERSION]).toBeUndefined(); + }); + }); + + describe("emitRetroactiveSubOrchClientSpan", () => { + it("should emit a Client span for completed sub-orchestrations", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const orchSpan = tracer.startSpan("parent-orch"); + const schedulingTime = new Date("2025-02-20T14:30:00Z"); + + emitRetroactiveSubOrchClientSpan( + orchSpan, + "ChildWorkflow", + "2.0", + "parent-instance", + schedulingTime, + ); + orchSpan.end(); + + const spans = exporter.getFinishedSpans(); + const clientSpan = spans.find((s: any) => s.name === "orchestration:ChildWorkflow@(2.0)"); + expect(clientSpan).toBeDefined(); + expect(clientSpan!.kind).toBe(otel.SpanKind.CLIENT); + expect(clientSpan!.attributes[DurableTaskAttributes.TYPE]).toBe(TaskType.ORCHESTRATION); + expect(clientSpan!.attributes[DurableTaskAttributes.TASK_NAME]).toBe("ChildWorkflow"); + expect(clientSpan!.attributes[DurableTaskAttributes.TASK_INSTANCE_ID]).toBe("parent-instance"); + expect(clientSpan!.startTime[0]).toBe(Math.floor(schedulingTime.getTime() / 1000)); + }); + + it("should set error status for failed sub-orchestrations", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const orchSpan = tracer.startSpan("parent-orch"); + + emitRetroactiveSubOrchClientSpan( + orchSpan, + "FailingChild", + undefined, + "parent-instance", + new Date(), + "Sub-orchestration crashed", + ); + orchSpan.end(); + + const spans = exporter.getFinishedSpans(); + const clientSpan = spans.find((s: any) => s.name === "orchestration:FailingChild"); + expect(clientSpan).toBeDefined(); + expect(clientSpan!.status.code).toBe(otel.SpanStatusCode.ERROR); + }); + }); + + describe("emitSpanForTimer with startTime", () => { + it("should use historical startTime for timer spans", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const orchSpan = tracer.startSpan("parent-orch"); + const creationTime = new Date("2025-03-01T08:00:00Z"); + const fireTime = new Date("2025-03-01T08:05:00Z"); + + emitSpanForTimer(orchSpan, "TimerOrch", fireTime, 42, "timer-instance", creationTime); + orchSpan.end(); + + const spans = exporter.getFinishedSpans(); + const timerSpan = spans.find((s: any) => s.name === "orchestration:TimerOrch:timer"); + expect(timerSpan).toBeDefined(); + expect(timerSpan!.kind).toBe(otel.SpanKind.INTERNAL); + expect(timerSpan!.startTime[0]).toBe(Math.floor(creationTime.getTime() / 1000)); + }); + }); + + describe("processNewEventsForTracing", () => { + function makeHistoryEvent(eventId: number, timestampDate?: Date): pb.HistoryEvent { + const event = new pb.HistoryEvent(); + event.setEventid(eventId); + if (timestampDate) { + const ts = new Timestamp(); + ts.fromDate(timestampDate); + event.setTimestamp(ts); + } + return event; + } + + it("should emit retroactive Client span for TaskCompleted", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const orchSpan = tracer.startSpan("test-orch"); + const schedulingTime = new Date("2025-01-10T09:00:00Z"); + + // Past event: TaskScheduled + const pastEvent = makeHistoryEvent(1, schedulingTime); + const taskScheduled = new pb.TaskScheduledEvent(); + taskScheduled.setName("ProcessData"); + pastEvent.setTaskscheduled(taskScheduled); + + // New event: TaskCompleted + const newEvent = makeHistoryEvent(2); + const taskCompleted = new pb.TaskCompletedEvent(); + taskCompleted.setTaskscheduledid(1); + newEvent.setTaskcompleted(taskCompleted); + + processNewEventsForTracing(orchSpan, [pastEvent], [newEvent], "instance-123", "MainOrch"); + orchSpan.end(); + + const spans = exporter.getFinishedSpans(); + const clientSpan = spans.find((s: any) => s.name === "activity:ProcessData"); + expect(clientSpan).toBeDefined(); + expect(clientSpan!.kind).toBe(otel.SpanKind.CLIENT); + expect(clientSpan!.attributes[DurableTaskAttributes.TASK_INSTANCE_ID]).toBe("instance-123"); + expect(clientSpan!.startTime[0]).toBe(Math.floor(schedulingTime.getTime() / 1000)); + }); + + it("should emit retroactive Client span with error for TaskFailed", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const orchSpan = tracer.startSpan("test-orch"); + + const pastEvent = makeHistoryEvent(3, new Date()); + const taskScheduled = new pb.TaskScheduledEvent(); + taskScheduled.setName("RiskyOperation"); + pastEvent.setTaskscheduled(taskScheduled); + + const newEvent = makeHistoryEvent(4); + const taskFailed = new pb.TaskFailedEvent(); + taskFailed.setTaskscheduledid(3); + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrormessage("Connection refused"); + taskFailed.setFailuredetails(failureDetails); + newEvent.setTaskfailed(taskFailed); + + processNewEventsForTracing(orchSpan, [pastEvent], [newEvent], "instance-456", "MainOrch"); + orchSpan.end(); + + const spans = exporter.getFinishedSpans(); + const clientSpan = spans.find((s: any) => s.name === "activity:RiskyOperation"); + expect(clientSpan).toBeDefined(); + expect(clientSpan!.status.code).toBe(otel.SpanStatusCode.ERROR); + expect(clientSpan!.status.message).toBe("Connection refused"); + }); + + it("should emit retroactive Client span for SubOrchestrationCompleted", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const orchSpan = tracer.startSpan("test-orch"); + const schedulingTime = new Date("2025-02-01T10:00:00Z"); + + const pastEvent = makeHistoryEvent(5, schedulingTime); + const subOrchCreated = new pb.SubOrchestrationInstanceCreatedEvent(); + subOrchCreated.setName("ChildWorkflow"); + pastEvent.setSuborchestrationinstancecreated(subOrchCreated); + + const newEvent = makeHistoryEvent(6); + const subOrchCompleted = new pb.SubOrchestrationInstanceCompletedEvent(); + subOrchCompleted.setTaskscheduledid(5); + newEvent.setSuborchestrationinstancecompleted(subOrchCompleted); + + processNewEventsForTracing(orchSpan, [pastEvent], [newEvent], "parent-789", "MainOrch"); + orchSpan.end(); + + const spans = exporter.getFinishedSpans(); + const clientSpan = spans.find((s: any) => s.name === "orchestration:ChildWorkflow"); + expect(clientSpan).toBeDefined(); + expect(clientSpan!.kind).toBe(otel.SpanKind.CLIENT); + expect(clientSpan!.attributes[DurableTaskAttributes.TYPE]).toBe(TaskType.ORCHESTRATION); + expect(clientSpan!.startTime[0]).toBe(Math.floor(schedulingTime.getTime() / 1000)); + }); + + it("should emit retroactive Client span with error for SubOrchestrationFailed", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const orchSpan = tracer.startSpan("test-orch"); + + const pastEvent = makeHistoryEvent(7, new Date()); + const subOrchCreated = new pb.SubOrchestrationInstanceCreatedEvent(); + subOrchCreated.setName("FailingChild"); + pastEvent.setSuborchestrationinstancecreated(subOrchCreated); + + const newEvent = makeHistoryEvent(8); + const subOrchFailed = new pb.SubOrchestrationInstanceFailedEvent(); + subOrchFailed.setTaskscheduledid(7); + const failureDetails = new pb.TaskFailureDetails(); + failureDetails.setErrormessage("Child orchestration crashed"); + subOrchFailed.setFailuredetails(failureDetails); + newEvent.setSuborchestrationinstancefailed(subOrchFailed); + + processNewEventsForTracing(orchSpan, [pastEvent], [newEvent], "parent-xyz", "MainOrch"); + orchSpan.end(); + + const spans = exporter.getFinishedSpans(); + const clientSpan = spans.find((s: any) => s.name === "orchestration:FailingChild"); + expect(clientSpan).toBeDefined(); + expect(clientSpan!.status.code).toBe(otel.SpanStatusCode.ERROR); + }); + + it("should emit timer span with historical startTime for TimerFired", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const orchSpan = tracer.startSpan("test-orch"); + const timerCreationTime = new Date("2025-03-01T12:00:00Z"); + const timerFireTime = new Date("2025-03-01T12:05:00Z"); + + // Past event: TimerCreated + const pastEvent = makeHistoryEvent(10, timerCreationTime); + pastEvent.setTimercreated(new pb.TimerCreatedEvent()); + + // New event: TimerFired + const newEvent = makeHistoryEvent(11); + const timerFired = new pb.TimerFiredEvent(); + timerFired.setTimerid(10); + const fireAtTs = new Timestamp(); + fireAtTs.fromDate(timerFireTime); + timerFired.setFireat(fireAtTs); + newEvent.setTimerfired(timerFired); + + processNewEventsForTracing(orchSpan, [pastEvent], [newEvent], "timer-instance", "TimerOrch"); + orchSpan.end(); + + const spans = exporter.getFinishedSpans(); + const timerSpan = spans.find((s: any) => s.name === "orchestration:TimerOrch:timer"); + expect(timerSpan).toBeDefined(); + expect(timerSpan!.kind).toBe(otel.SpanKind.INTERNAL); + expect(timerSpan!.attributes[DurableTaskAttributes.TASK_INSTANCE_ID]).toBe("timer-instance"); + expect(timerSpan!.startTime[0]).toBe(Math.floor(timerCreationTime.getTime() / 1000)); + }); + + it("should handle multiple completions in a single batch", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const orchSpan = tracer.startSpan("test-orch"); + + // Two past TaskScheduled events + const past1 = makeHistoryEvent(1, new Date("2025-01-01T10:00:00Z")); + const ts1 = new pb.TaskScheduledEvent(); + ts1.setName("TaskA"); + past1.setTaskscheduled(ts1); + + const past2 = makeHistoryEvent(2, new Date("2025-01-01T10:01:00Z")); + const ts2 = new pb.TaskScheduledEvent(); + ts2.setName("TaskB"); + past2.setTaskscheduled(ts2); + + // Two new TaskCompleted events + const new1 = makeHistoryEvent(3); + const tc1 = new pb.TaskCompletedEvent(); + tc1.setTaskscheduledid(1); + new1.setTaskcompleted(tc1); + + const new2 = makeHistoryEvent(4); + const tc2 = new pb.TaskCompletedEvent(); + tc2.setTaskscheduledid(2); + new2.setTaskcompleted(tc2); + + processNewEventsForTracing(orchSpan, [past1, past2], [new1, new2], "batch-instance", "BatchOrch"); + orchSpan.end(); + + const spans = exporter.getFinishedSpans(); + const taskASpan = spans.find((s: any) => s.name === "activity:TaskA"); + const taskBSpan = spans.find((s: any) => s.name === "activity:TaskB"); + expect(taskASpan).toBeDefined(); + expect(taskBSpan).toBeDefined(); + }); + + it("should not emit spans when orchestration span is null", () => { + const pastEvent = makeHistoryEvent(1, new Date()); + const taskScheduled = new pb.TaskScheduledEvent(); + taskScheduled.setName("Orphan"); + pastEvent.setTaskscheduled(taskScheduled); + + const newEvent = makeHistoryEvent(2); + const taskCompleted = new pb.TaskCompletedEvent(); + taskCompleted.setTaskscheduledid(1); + newEvent.setTaskcompleted(taskCompleted); + + processNewEventsForTracing(null, [pastEvent], [newEvent], "instance", "Orch"); + + const spans = exporter.getFinishedSpans(); + expect(spans.find((s: any) => s.name === "activity:Orphan")).toBeUndefined(); + }); + + it("should skip events with no matching past event", () => { + const tracer = otel.trace.getTracer(TRACER_NAME); + const orchSpan = tracer.startSpan("test-orch"); + + // New event referencing non-existent past event + const newEvent = makeHistoryEvent(2); + const taskCompleted = new pb.TaskCompletedEvent(); + taskCompleted.setTaskscheduledid(999); + newEvent.setTaskcompleted(taskCompleted); + + processNewEventsForTracing(orchSpan, [], [newEvent], "instance", "Orch"); + orchSpan.end(); + + const spans = exporter.getFinishedSpans(); + // Only the parent orch span should exist + expect(spans.length).toBe(1); + }); + }); +}); diff --git a/packages/durabletask-js/test/worker-tracing.spec.ts b/packages/durabletask-js/test/worker-tracing.spec.ts index ba42e04..ea4f659 100644 --- a/packages/durabletask-js/test/worker-tracing.spec.ts +++ b/packages/durabletask-js/test/worker-tracing.spec.ts @@ -291,18 +291,17 @@ describe("Worker Tracing - Scheduled Actions Trace Context Injection", () => { // Should have a valid traceparent expect(traceCtx!.getTraceparent()).toMatch(/^00-[0-9a-f]{32}-[0-9a-f]{16}-0[01]$/); - // Verify OTEL spans were created: orchestration span + scheduling span + // Verify OTEL spans were created: orchestration span only (no scheduling CLIENT span — matching .NET) const spans = exporter.getFinishedSpans(); const orchSpan = spans.find( (s) => s.attributes[DurableTaskAttributes.TYPE] === TaskType.ORCHESTRATION, ); + expect(orchSpan).toBeDefined(); + // No activity CLIENT span should exist — trace context is injected via random span ID const activityScheduleSpan = spans.find( (s) => s.attributes[DurableTaskAttributes.TYPE] === TaskType.ACTIVITY, ); - expect(orchSpan).toBeDefined(); - expect(activityScheduleSpan).toBeDefined(); - // The schedule span should be a CLIENT span (act of scheduling) - expect(activityScheduleSpan!.kind).toBe(otel.SpanKind.CLIENT); + expect(activityScheduleSpan).toBeUndefined(); }); it("should inject parenttracecontext into CreateSubOrchestrationAction", async () => { @@ -334,13 +333,12 @@ describe("Worker Tracing - Scheduled Actions Trace Context Injection", () => { // Should have a valid traceparent expect(traceCtx!.getTraceparent()).toMatch(/^00-[0-9a-f]{32}-[0-9a-f]{16}-0[01]$/); - // Verify the scheduling span was created + // No CLIENT span should exist — trace context is injected via random span ID (matching .NET) const spans = exporter.getFinishedSpans(); const subOrchScheduleSpan = spans.find( (s) => s.attributes[DurableTaskAttributes.TYPE] === TaskType.ORCHESTRATION && s.kind === otel.SpanKind.CLIENT, ); - expect(subOrchScheduleSpan).toBeDefined(); - expect(subOrchScheduleSpan!.kind).toBe(otel.SpanKind.CLIENT); + expect(subOrchScheduleSpan).toBeUndefined(); }); it("should inject parenttracecontext with trace ID from parent span", async () => {