Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions task/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,14 @@ type ActivityContext interface {
GetTaskID() int32
GetTaskExecutionID() string
Context() context.Context
GetTraceContext() *protos.TraceContext
}

type activityContext struct {
TaskID int32
TaskExecutionID string
Name string
TraceContext *protos.TraceContext

rawInput []byte
ctx context.Context
Expand All @@ -122,10 +124,12 @@ type activityContext struct {
type Activity func(ctx ActivityContext) (any, error)

func newTaskActivityContext(ctx context.Context, taskID int32, ts *protos.TaskScheduledEvent) *activityContext {

return &activityContext{
TaskID: taskID,
TaskExecutionID: ts.TaskExecutionId,
Name: ts.Name,
TraceContext: ts.ParentTraceContext,
rawInput: []byte(ts.Input.GetValue()),
ctx: ctx,
}
Expand All @@ -147,3 +151,7 @@ func (actx *activityContext) GetTaskID() int32 {
func (actx *activityContext) GetTaskExecutionID() string {
return actx.TaskExecutionID
}

func (actx *activityContext) GetTraceContext() *protos.TraceContext {
return actx.TraceContext
}
70 changes: 69 additions & 1 deletion tests/orchestrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"

"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/api/protos"
"github.com/dapr/durabletask-go/backend"
"github.com/dapr/durabletask-go/backend/sqlite"
"github.com/dapr/durabletask-go/task"
"github.com/dapr/durabletask-go/tests/utils"
"go.opentelemetry.io/otel"
)

var tracer = otel.Tracer("orchestration-test")
Expand Down Expand Up @@ -231,6 +232,7 @@ func Test_SingleActivity_TaskSpan(t *testing.T) {
if err := ctx.GetInput(&name); err != nil {
return nil, err
}
ctx.GetTraceContext()
_, childSpan := tracer.Start(ctx.Context(), "activityChild")
childSpan.End()
return fmt.Sprintf("Hello, %s!", name), nil
Expand Down Expand Up @@ -1544,6 +1546,72 @@ func Test_TaskExecutionId(t *testing.T) {
})
}

func Test_ActivityTraceContext(t *testing.T) {
t.Run("TraceContext is propagated in the activity context", func(t *testing.T) {
// Registration
r := task.NewTaskRegistry()
require.NoError(t, r.AddOrchestratorN("TraceContextOrchestration", func(ctx *task.OrchestrationContext) (any, error) {
if err := ctx.CallActivity("ActivityWithContext", task.WithActivityRetryPolicy(&task.RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 10 * time.Millisecond,
})).Await(nil); err != nil {
return nil, err
}
return nil, nil
}))

traceParentMap := make(map[string]string)
var executionId string
require.NoError(t, r.AddActivityN("ActivityWithContext", func(ctx task.ActivityContext) (any, error) {
executionId = ctx.GetTaskExecutionID()
tp := ctx.GetTraceContext().GetTraceParent()
traceParentMap[executionId] = tp

// Create a new context
newCtx := context.Background()

// Create a TextMapCarrier with the traceparent
carrier := propagation.MapCarrier{}
carrier.Set("traceparent", tp)

// Use the TraceContext propagator to extract the trace context
propagator := propagation.TraceContext{}
newCtx = propagator.Extract(newCtx, carrier)

_, childSpan := tracer.Start(context.Background(), "ActivityWith1Context")
childSpan.End()
return nil, nil
}))

// Initialization
ctx := context.Background()
exporter := utils.InitTracing()

client, worker := initTaskHubWorker(ctx, r)
defer worker.Shutdown(ctx)

// Run the orchestration
id, err := client.ScheduleNewOrchestration(ctx, "TraceContextOrchestration")
require.NoError(t, err)

metadata, err := client.WaitForOrchestrationCompletion(ctx, id)
require.NoError(t, err)

assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED, metadata.RuntimeStatus)
assert.NotEmpty(t, executionId)
assert.NotEmpty(t, traceParentMap[executionId])

// Validate the exported OTel traces include patch spans
spans := exporter.GetSpans().Snapshots()
utils.AssertSpanSequence(t, spans,
utils.AssertOrchestratorCreated("TraceContextOrchestration", id),
utils.AssertSpan("ActivityWith1Context"),
utils.AssertActivity("ActivityWithContext", id, 0),
utils.AssertOrchestratorExecuted("TraceContextOrchestration", id, "COMPLETED"),
)
})
}

func Test_OrchestrationPatching_DefaultToPatched(t *testing.T) {
// Registration
r := task.NewTaskRegistry()
Expand Down