Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

### New

- Align tracing attributes with .NET SDK conventions: add `execution_id` on creation spans, `version` on activity execution spans, `name`/`instance_id` on timer spans, and `durabletask.task.status` on orchestration completion ([#126](https://github.com/microsoft/durabletask-js/pull/126))
- Add retroactive span emission model: emit Client spans at activity/sub-orchestration completion with historical scheduling timestamps, and timer spans with creation-to-fired duration (matching .NET/Java SDK patterns) ([#126](https://github.com/microsoft/durabletask-js/pull/126))

### Fixes


Expand Down
Binary file added doc/images/tracing/jaeger-full-trace-detail.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/images/tracing/jaeger-span-detail.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added doc/images/tracing/jaeger-trace-list.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
172 changes: 49 additions & 123 deletions examples/azure-managed/distributed-tracing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
// --------------------------------------------------------------------------
import { NodeSDK } from "@opentelemetry/sdk-node";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
import { SimpleSpanProcessor } from "@opentelemetry/sdk-trace-base";
import { resourceFromAttributes } from "@opentelemetry/resources";
import { Resource } from "@opentelemetry/resources";
import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions";

// Load environment variables from .env file
Expand All @@ -25,15 +24,13 @@ dotenv.config({ path: path.join(__dirname, "..", ".env") });
// Read the OTLP endpoint from the environment (defaults to Jaeger's OTLP HTTP port)
const otlpEndpoint = process.env.OTEL_EXPORTER_OTLP_ENDPOINT || "http://localhost:4318";

const traceExporter = new OTLPTraceExporter({
url: `${otlpEndpoint}/v1/traces`,
});

const sdk = new NodeSDK({
resource: resourceFromAttributes({
[ATTR_SERVICE_NAME]: "durabletask-js-tracing-example",
resource: new Resource({
[ATTR_SERVICE_NAME]: "durabletask-js-tracing-sample",
}),
traceExporter: new OTLPTraceExporter({
url: `${otlpEndpoint}/v1/traces`,
}),
spanProcessors: [new SimpleSpanProcessor(traceExporter)],
});

sdk.start();
Expand All @@ -53,13 +50,9 @@ import { TOrchestrator } from "@microsoft/durabletask-js/dist/types/orchestrator
import { whenAll } from "@microsoft/durabletask-js/dist/task";

// --------------------------------------------------------------------------
// 3. Application code
// 3. Application code — FanOutFanIn pattern (matches Java tracing sample)
// --------------------------------------------------------------------------
(async () => {
// Use ConsoleLogger so structured log events (with event IDs and categories) are
// printed to the console by default, similar to .NET's default ILogger<T> output.
// For production, consider using createAzureLogger() which integrates with @azure/logger
// and respects the AZURE_LOG_LEVEL environment variable.
const sdkLogger = new ConsoleLogger();

// --- Configuration ---
Expand All @@ -80,96 +73,44 @@ import { whenAll } from "@microsoft/durabletask-js/dist/task";

// --- Activity definitions ---

/** Simulates fetching data from an external service. */
const fetchData = async (_ctx: ActivityContext, source: string): Promise<string> => {
console.log(` [fetchData] Fetching data from "${source}"...`);
// Simulate network latency
await new Promise((r) => setTimeout(r, 300 + Math.random() * 200));
return `data-from-${source}`;
};

/** Simulates transforming a piece of data. */
const transformData = async (_ctx: ActivityContext, input: string): Promise<string> => {
console.log(` [transformData] Transforming "${input}"...`);
await new Promise((r) => setTimeout(r, 200));
return `transformed(${input})`;
/** Simulates getting weather for a city. */
const getWeather = async (_ctx: ActivityContext, city: string): Promise<string> => {
console.log(` [GetWeather] Getting weather for: ${city}`);
await new Promise((r) => setTimeout(r, 20));
return `${city}=72F`;
};

/** Simulates persisting results to a database. */
const saveResults = async (_ctx: ActivityContext, results: string[]): Promise<number> => {
console.log(` [saveResults] Saving ${results.length} results...`);
await new Promise((r) => setTimeout(r, 150));
return results.length;
/** Aggregates weather results into a summary. */
const createSummary = async (_ctx: ActivityContext, input: string): Promise<string> => {
console.log(` [CreateSummary] Creating summary for: ${input}`);
return `Weather Report: ${input}`;
};

// --- Orchestrator: data pipeline (chaining + fan-out/fan-in) ---
// --- Orchestrator: Fan-Out/Fan-In with timer ---

/**
* Demonstrates a realistic data-processing pipeline that produces a rich
* distributed trace:
* Demonstrates a Fan-Out/Fan-In pattern that produces a rich trace:
*
* 1. Fan-out – fetch data from multiple sources in parallel.
* 2. Fan-out – transform each result in parallel.
* 3. Chain – save all transformed results.
*
* The resulting trace will show:
* create_orchestration → orchestration (server)
* ├─ activity:fetchData (×N, parallel)
* ├─ activity:transformData (×N, parallel)
* └─ activity:saveResults
* create_orchestration:FanOutFanIn (Producer)
* └─ orchestration:FanOutFanIn (Server)
* ├─ orchestration:FanOutFanIn:timer (Internal, ~1s)
* ├─ activity:getWeather (Client+Server, ×5 parallel)
* └─ activity:createSummary (Client+Server)
*/
const dataPipelineOrchestrator: TOrchestrator = async function* (
ctx: OrchestrationContext,
): any {
const sources: string[] = yield ctx.callActivity(getDataSources);

// Step 1 – fan-out: fetch from all sources in parallel
const fetchTasks = [];
for (const source of sources) {
fetchTasks.push(ctx.callActivity(fetchData, source));
}
const rawData: string[] = yield whenAll(fetchTasks);

// Step 2 – fan-out: transform all fetched data in parallel
const transformTasks = [];
for (const raw of rawData) {
transformTasks.push(ctx.callActivity(transformData, raw));
}
const transformed: string[] = yield whenAll(transformTasks);

// Step 3 – chain: save all results
const savedCount: number = yield ctx.callActivity(saveResults, transformed);
const fanOutFanIn: TOrchestrator = async function* (ctx: OrchestrationContext): any {
// Timer: wait 1 second (demonstrates timer span with creation-to-fired duration)
const timerDelay = new Date(ctx.currentUtcDateTime.getTime() + 1000);
yield ctx.createTimer(timerDelay);

return {
sourcesProcessed: sources.length,
resultsSaved: savedCount,
data: transformed,
};
};

/** Returns the list of data sources to process. */
const getDataSources = async (_ctx: ActivityContext): Promise<string[]> => {
return ["users-api", "orders-api", "inventory-api", "analytics-api"];
};
// Fan-out: schedule parallel weather lookups
const cities = ["Seattle", "Tokyo", "London", "Paris", "Sydney"];
const tasks = cities.map((city) => ctx.callActivity(getWeather, city));
const results: string[] = yield whenAll(tasks);

// --- Orchestrator: simple sequence (for a cleaner trace comparison) ---

const sequenceOrchestrator: TOrchestrator = async function* (
ctx: OrchestrationContext,
): any {
const cities = ["Tokyo", "Seattle", "London"];
const greetings: string[] = [];
for (const city of cities) {
const greeting: string = yield ctx.callActivity(greetCity, city);
greetings.push(greeting);
}
return greetings;
};
// Aggregate results
const summary: string = yield ctx.callActivity(createSummary, results.join(", "));

const greetCity = async (_ctx: ActivityContext, city: string): Promise<string> => {
console.log(` [greetCity] Greeting ${city}`);
await new Promise((r) => setTimeout(r, 100));
return `Hello, ${city}!`;
return summary;
};

// --- Create client & worker ---
Expand All @@ -193,52 +134,37 @@ import { whenAll } from "@microsoft/durabletask-js/dist/task";

client = clientBuilder.build();
worker = workerBuilder
.addOrchestrator(dataPipelineOrchestrator)
.addOrchestrator(sequenceOrchestrator)
.addActivity(fetchData)
.addActivity(transformData)
.addActivity(saveResults)
.addActivity(getDataSources)
.addActivity(greetCity)
.addOrchestrator(fanOutFanIn)
.addActivity(getWeather)
.addActivity(createSummary)
.build();

// --- Start worker ---
console.log("Starting worker...");
await worker.start();
// Allow the worker time to establish the gRPC stream with the scheduler.
// worker.start() returns before the connection is fully established.
await new Promise((r) => setTimeout(r, 2000));
console.log("Worker started.");

// --- Run orchestrations ---
console.log("Worker started with OpenTelemetry tracing.");

// 1) Sequence orchestration
console.log("\n=== Sequence Orchestration ===");
const seqId = await client.scheduleNewOrchestration(sequenceOrchestrator);
console.log(`Scheduled: ${seqId}`);
const seqState = await client.waitForOrchestrationCompletion(seqId, true, 60);
console.log(`Completed – result: ${seqState?.serializedOutput}`);
// --- Run orchestration ---
console.log("\nScheduling FanOutFanIn orchestration...");
const instanceId = await client.scheduleNewOrchestration(fanOutFanIn);
console.log(`Started orchestration: ${instanceId}`);

// 2) Data pipeline orchestration (fan-out/fan-in)
console.log("\n=== Data Pipeline Orchestration ===");
const pipelineId = await client.scheduleNewOrchestration(dataPipelineOrchestrator);
console.log(`Scheduled: ${pipelineId}`);
const pipelineState = await client.waitForOrchestrationCompletion(pipelineId, true, 60);
console.log(`Completed – result: ${pipelineState?.serializedOutput}`);
console.log("Waiting for completion...");
const state = await client.waitForOrchestrationCompletion(instanceId, true, 60);
console.log(`Status: ${state?.runtimeStatus}`);
console.log(`Result: ${state?.serializedOutput}`);

console.log("\n=== All orchestrations completed! ===");
console.log(
`Open Jaeger UI at http://localhost:16686 and search for service "durabletask-js-tracing-example" to view traces.`,
);
console.log("\nView traces in Jaeger UI: http://localhost:16686");
console.log(' Search for service: durabletask-js-tracing-sample');
console.log("View orchestration in DTS Dashboard: http://localhost:8082");
} catch (error) {
console.error("Error:", error);
process.exit(1);
} finally {
console.log("\nShutting down...");
if (worker) await worker.stop();
if (client) await client.stop();

// Flush remaining spans before exit
await sdk.shutdown();
console.log("Done.");
process.exit(0);
Expand Down
8 changes: 6 additions & 2 deletions packages/durabletask-js/src/tracing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ export {
OrchestrationSpanInfo,
startSpanForNewOrchestration,
startSpanForOrchestrationExecution,
startSpanForSchedulingTask,
injectTraceContextForSchedulingTask,
startSpanForTaskExecution,
startSpanForSchedulingSubOrchestration,
injectTraceContextForSchedulingSubOrchestration,
emitSpanForTimer,
emitSpanForEventSent,
startSpanForEventRaisedFromClient,
Expand All @@ -35,6 +35,10 @@ export {
setSpanError,
setSpanOk,
endSpan,
setOrchestrationStatusFromActions,
createOrchestrationTraceContextPb,
processActionsForTracing,
emitRetroactiveActivityClientSpan,
emitRetroactiveSubOrchClientSpan,
processNewEventsForTracing,
} from "./trace-helper";
Loading
Loading