diff --git a/api/persistence/v1/executions.pb.go b/api/persistence/v1/executions.pb.go index ea69814872..aa3cfa99e4 100644 --- a/api/persistence/v1/executions.pb.go +++ b/api/persistence/v1/executions.pb.go @@ -3661,7 +3661,19 @@ type NexusOperationInfo struct { NextAttemptScheduleTime *timestamppb.Timestamp `protobuf:"bytes,14,opt,name=next_attempt_schedule_time,json=nextAttemptScheduleTime,proto3" json:"next_attempt_schedule_time,omitempty"` // Endpoint ID, the name is also stored here (field 1) but we use the ID internally to avoid failing operation // requests when an endpoint is renamed. - EndpointId string `protobuf:"bytes,15,opt,name=endpoint_id,json=endpointId,proto3" json:"endpoint_id,omitempty"` + EndpointId string `protobuf:"bytes,15,opt,name=endpoint_id,json=endpointId,proto3" json:"endpoint_id,omitempty"` + // Schedule-to-start timeout for this operation. + // (-- api-linter: core::0140::prepositions=disabled + // + // aip.dev/not-precedent: "to" is used to indicate interval. --) + ScheduleToStartTimeout *durationpb.Duration `protobuf:"bytes,16,opt,name=schedule_to_start_timeout,json=scheduleToStartTimeout,proto3" json:"schedule_to_start_timeout,omitempty"` + // Start-to-close timeout for this operation. + // (-- api-linter: core::0140::prepositions=disabled + // + // aip.dev/not-precedent: "to" is used to indicate interval. --) + StartToCloseTimeout *durationpb.Duration `protobuf:"bytes,17,opt,name=start_to_close_timeout,json=startToCloseTimeout,proto3" json:"start_to_close_timeout,omitempty"` + // Time the operation was started (only available for async operations). + StartedTime *timestamppb.Timestamp `protobuf:"bytes,18,opt,name=started_time,json=startedTime,proto3" json:"started_time,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -3794,6 +3806,27 @@ func (x *NexusOperationInfo) GetEndpointId() string { return "" } +func (x *NexusOperationInfo) GetScheduleToStartTimeout() *durationpb.Duration { + if x != nil { + return x.ScheduleToStartTimeout + } + return nil +} + +func (x *NexusOperationInfo) GetStartToCloseTimeout() *durationpb.Duration { + if x != nil { + return x.StartToCloseTimeout + } + return nil +} + +func (x *NexusOperationInfo) GetStartedTime() *timestamppb.Timestamp { + if x != nil { + return x.StartedTime + } + return nil +} + // NexusOperationCancellationInfo contains the state of a nexus operation cancelation. type NexusOperationCancellationInfo struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -4932,7 +4965,7 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "\x0eWorkflowClosed\x1a\x80\x01\n" + "\aTrigger\x12j\n" + "\x0fworkflow_closed\x18\x01 \x01(\v2?.temporal.server.api.persistence.v1.CallbackInfo.WorkflowClosedH\x00R\x0eworkflowClosedB\t\n" + - "\avariant\"\x8d\x06\n" + + "\avariant\"\xf2\a\n" + "\x12NexusOperationInfo\x12\x1a\n" + "\bendpoint\x18\x01 \x01(\tR\bendpoint\x12\x18\n" + "\aservice\x18\x02 \x01(\tR\aservice\x12\x1c\n" + @@ -4950,7 +4983,10 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "\x14last_attempt_failure\x18\r \x01(\v2 .temporal.api.failure.v1.FailureR\x12lastAttemptFailure\x12W\n" + "\x1anext_attempt_schedule_time\x18\x0e \x01(\v2\x1a.google.protobuf.TimestampR\x17nextAttemptScheduleTime\x12\x1f\n" + "\vendpoint_id\x18\x0f \x01(\tR\n" + - "endpointIdJ\x04\b\x04\x10\x05\"\xff\x03\n" + + "endpointId\x12T\n" + + "\x19schedule_to_start_timeout\x18\x10 \x01(\v2\x19.google.protobuf.DurationR\x16scheduleToStartTimeout\x12N\n" + + "\x16start_to_close_timeout\x18\x11 \x01(\v2\x19.google.protobuf.DurationR\x13startToCloseTimeout\x12=\n" + + "\fstarted_time\x18\x12 \x01(\v2\x1a.google.protobuf.TimestampR\vstartedTimeJ\x04\b\x04\x10\x05\"\xff\x03\n" + "\x1eNexusOperationCancellationInfo\x12A\n" + "\x0erequested_time\x18\x01 \x01(\v2\x1a.google.protobuf.TimestampR\rrequestedTime\x12L\n" + "\x05state\x18\x02 \x01(\x0e26.temporal.api.enums.v1.NexusOperationCancellationStateR\x05state\x12\x18\n" + @@ -5194,29 +5230,32 @@ var file_temporal_server_api_persistence_v1_executions_proto_depIdxs = []int32{ 43, // 121: temporal.server.api.persistence.v1.NexusOperationInfo.last_attempt_complete_time:type_name -> google.protobuf.Timestamp 68, // 122: temporal.server.api.persistence.v1.NexusOperationInfo.last_attempt_failure:type_name -> temporal.api.failure.v1.Failure 43, // 123: temporal.server.api.persistence.v1.NexusOperationInfo.next_attempt_schedule_time:type_name -> google.protobuf.Timestamp - 43, // 124: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.requested_time:type_name -> google.protobuf.Timestamp - 79, // 125: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.state:type_name -> temporal.api.enums.v1.NexusOperationCancellationState - 43, // 126: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.last_attempt_complete_time:type_name -> google.protobuf.Timestamp - 68, // 127: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.last_attempt_failure:type_name -> temporal.api.failure.v1.Failure - 43, // 128: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.next_attempt_schedule_time:type_name -> google.protobuf.Timestamp - 43, // 129: temporal.server.api.persistence.v1.WorkflowPauseInfo.pause_time:type_name -> google.protobuf.Timestamp - 80, // 130: temporal.server.api.persistence.v1.ShardInfo.QueueStatesEntry.value:type_name -> temporal.server.api.persistence.v1.QueueState - 81, // 131: temporal.server.api.persistence.v1.WorkflowExecutionInfo.SearchAttributesEntry.value:type_name -> temporal.api.common.v1.Payload - 81, // 132: temporal.server.api.persistence.v1.WorkflowExecutionInfo.MemoEntry.value:type_name -> temporal.api.common.v1.Payload - 82, // 133: temporal.server.api.persistence.v1.WorkflowExecutionInfo.UpdateInfosEntry.value:type_name -> temporal.server.api.persistence.v1.UpdateInfo - 83, // 134: temporal.server.api.persistence.v1.WorkflowExecutionInfo.SubStateMachinesByTypeEntry.value:type_name -> temporal.server.api.persistence.v1.StateMachineMap - 24, // 135: temporal.server.api.persistence.v1.WorkflowExecutionInfo.ChildrenInitializedPostResetPointEntry.value:type_name -> temporal.server.api.persistence.v1.ResetChildInfo - 4, // 136: temporal.server.api.persistence.v1.WorkflowExecutionState.RequestIdsEntry.value:type_name -> temporal.server.api.persistence.v1.RequestIDInfo - 43, // 137: temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.pause_time:type_name -> google.protobuf.Timestamp - 37, // 138: temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.manual:type_name -> temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.Manual - 40, // 139: temporal.server.api.persistence.v1.Callback.Nexus.header:type_name -> temporal.server.api.persistence.v1.Callback.Nexus.HeaderEntry - 84, // 140: temporal.server.api.persistence.v1.Callback.HSM.ref:type_name -> temporal.server.api.persistence.v1.StateMachineRef - 41, // 141: temporal.server.api.persistence.v1.CallbackInfo.Trigger.workflow_closed:type_name -> temporal.server.api.persistence.v1.CallbackInfo.WorkflowClosed - 142, // [142:142] is the sub-list for method output_type - 142, // [142:142] is the sub-list for method input_type - 142, // [142:142] is the sub-list for extension type_name - 142, // [142:142] is the sub-list for extension extendee - 0, // [0:142] is the sub-list for field type_name + 44, // 124: temporal.server.api.persistence.v1.NexusOperationInfo.schedule_to_start_timeout:type_name -> google.protobuf.Duration + 44, // 125: temporal.server.api.persistence.v1.NexusOperationInfo.start_to_close_timeout:type_name -> google.protobuf.Duration + 43, // 126: temporal.server.api.persistence.v1.NexusOperationInfo.started_time:type_name -> google.protobuf.Timestamp + 43, // 127: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.requested_time:type_name -> google.protobuf.Timestamp + 79, // 128: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.state:type_name -> temporal.api.enums.v1.NexusOperationCancellationState + 43, // 129: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.last_attempt_complete_time:type_name -> google.protobuf.Timestamp + 68, // 130: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.last_attempt_failure:type_name -> temporal.api.failure.v1.Failure + 43, // 131: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.next_attempt_schedule_time:type_name -> google.protobuf.Timestamp + 43, // 132: temporal.server.api.persistence.v1.WorkflowPauseInfo.pause_time:type_name -> google.protobuf.Timestamp + 80, // 133: temporal.server.api.persistence.v1.ShardInfo.QueueStatesEntry.value:type_name -> temporal.server.api.persistence.v1.QueueState + 81, // 134: temporal.server.api.persistence.v1.WorkflowExecutionInfo.SearchAttributesEntry.value:type_name -> temporal.api.common.v1.Payload + 81, // 135: temporal.server.api.persistence.v1.WorkflowExecutionInfo.MemoEntry.value:type_name -> temporal.api.common.v1.Payload + 82, // 136: temporal.server.api.persistence.v1.WorkflowExecutionInfo.UpdateInfosEntry.value:type_name -> temporal.server.api.persistence.v1.UpdateInfo + 83, // 137: temporal.server.api.persistence.v1.WorkflowExecutionInfo.SubStateMachinesByTypeEntry.value:type_name -> temporal.server.api.persistence.v1.StateMachineMap + 24, // 138: temporal.server.api.persistence.v1.WorkflowExecutionInfo.ChildrenInitializedPostResetPointEntry.value:type_name -> temporal.server.api.persistence.v1.ResetChildInfo + 4, // 139: temporal.server.api.persistence.v1.WorkflowExecutionState.RequestIdsEntry.value:type_name -> temporal.server.api.persistence.v1.RequestIDInfo + 43, // 140: temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.pause_time:type_name -> google.protobuf.Timestamp + 37, // 141: temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.manual:type_name -> temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.Manual + 40, // 142: temporal.server.api.persistence.v1.Callback.Nexus.header:type_name -> temporal.server.api.persistence.v1.Callback.Nexus.HeaderEntry + 84, // 143: temporal.server.api.persistence.v1.Callback.HSM.ref:type_name -> temporal.server.api.persistence.v1.StateMachineRef + 41, // 144: temporal.server.api.persistence.v1.CallbackInfo.Trigger.workflow_closed:type_name -> temporal.server.api.persistence.v1.CallbackInfo.WorkflowClosed + 145, // [145:145] is the sub-list for method output_type + 145, // [145:145] is the sub-list for method input_type + 145, // [145:145] is the sub-list for extension type_name + 145, // [145:145] is the sub-list for extension extendee + 0, // [0:145] is the sub-list for field type_name } func init() { file_temporal_server_api_persistence_v1_executions_proto_init() } diff --git a/components/nexusoperations/events_test.go b/components/nexusoperations/events_test.go index d5df00d0ec..1da26dff45 100644 --- a/components/nexusoperations/events_test.go +++ b/components/nexusoperations/events_test.go @@ -11,12 +11,15 @@ import ( "go.temporal.io/server/components/nexusoperations" "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/hsm/hsmtest" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) func TestCherryPick(t *testing.T) { setup := func(t *testing.T) (*hsm.Node, nexusoperations.Operation, int64) { - node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Hour)) + node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(time.Hour), + })) op, err := hsm.MachineData[nexusoperations.Operation](node) require.NoError(t, err) eventID, err := hsm.EventIDFromToken(op.ScheduledEventToken) @@ -159,7 +162,9 @@ func TestTerminalStatesDeletion(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Hour)) + node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(time.Hour), + })) op, err := hsm.MachineData[nexusoperations.Operation](node) require.NoError(t, err) eventID, err := hsm.EventIDFromToken(op.ScheduledEventToken) diff --git a/components/nexusoperations/executors.go b/components/nexusoperations/executors.go index 236c14c3d6..c2acdb8232 100644 --- a/components/nexusoperations/executors.go +++ b/components/nexusoperations/executors.go @@ -74,6 +74,18 @@ func RegisterExecutor( ); err != nil { return err } + if err := hsm.RegisterTimerExecutor( + registry, + exec.executeScheduleToStartTimeoutTask, + ); err != nil { + return err + } + if err := hsm.RegisterTimerExecutor( + registry, + exec.executeStartToCloseTimeoutTask, + ); err != nil { + return err + } if err := hsm.RegisterImmediateExecutor( registry, exec.executeCancelationTask, @@ -184,16 +196,23 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ } header := nexus.Header(args.header) + var opTimeout time.Duration callTimeout := e.Config.RequestTimeout(ns.Name().String(), task.EndpointName) - if args.scheduleToCloseTimeout > 0 { - opTimeout := args.scheduleToCloseTimeout - time.Since(args.scheduledTime) + // Adjust timeout based on remaining operation timeouts. + // StartToClose takes precedence over ScheduleToClose since it is already capped by it. + if args.startToCloseTimeout > 0 { + opTimeout = args.startToCloseTimeout - time.Since(args.scheduledTime) callTimeout = min(callTimeout, opTimeout) - if opTimeoutHeader := header.Get(nexus.HeaderOperationTimeout); opTimeoutHeader == "" { - if header == nil { - header = make(nexus.Header, 1) - } - header[nexus.HeaderOperationTimeout] = opTimeout.String() + } else if args.scheduleToCloseTimeout > 0 { + opTimeout = args.scheduleToCloseTimeout - time.Since(args.scheduledTime) + callTimeout = min(callTimeout, opTimeout) + } + // Set the operation timeout header if not already set. + if opTimeoutHeader := header.Get(nexus.HeaderOperationTimeout); opTimeout > 0 && opTimeoutHeader == "" { + if header == nil { + header = make(nexus.Header, 1) } + header[nexus.HeaderOperationTimeout] = commonnexus.FormatDuration(opTimeout) } callCtx, cancel := context.WithTimeout(ctx, callTimeout) @@ -311,6 +330,7 @@ type startArgs struct { endpointID string scheduledTime time.Time scheduleToCloseTimeout time.Duration + startToCloseTimeout time.Duration header map[string]string payload *commonpb.Payload nexusLink nexus.Link @@ -335,15 +355,17 @@ func (e taskExecutor) loadOperationArgs( args.service = operation.Service args.operation = operation.Operation args.requestID = operation.RequestId + args.scheduleToCloseTimeout = operation.ScheduleToCloseTimeout.AsDuration() + args.startToCloseTimeout = operation.StartToCloseTimeout.AsDuration() eventToken = operation.ScheduledEventToken event, err := node.LoadHistoryEvent(ctx, eventToken) if err != nil { return nil } args.scheduledTime = event.EventTime.AsTime() - args.scheduleToCloseTimeout = event.GetNexusOperationScheduledEventAttributes().GetScheduleToCloseTimeout().AsDuration() - args.payload = event.GetNexusOperationScheduledEventAttributes().GetInput() - args.header = event.GetNexusOperationScheduledEventAttributes().GetNexusHeader() + attrs := event.GetNexusOperationScheduledEventAttributes() + args.payload = attrs.GetInput() + args.header = attrs.GetNexusHeader() args.nexusLink = ConvertLinkWorkflowEventToNexusLink(&commonpb.Link_WorkflowEvent{ Namespace: ns.Name().String(), WorkflowId: ref.WorkflowKey.WorkflowID, @@ -550,6 +572,74 @@ func (e taskExecutor) recordOperationTimeout(node *hsm.Node) error { }) } +func (e taskExecutor) executeScheduleToStartTimeoutTask(env hsm.Environment, node *hsm.Node, task ScheduleToStartTimeoutTask) error { + return hsm.MachineTransition(node, func(op Operation) (hsm.TransitionOutput, error) { + eventID, err := hsm.EventIDFromToken(op.ScheduledEventToken) + if err != nil { + return hsm.TransitionOutput{}, err + } + node.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT, func(e *historypb.HistoryEvent) { + // nolint:revive // We must mutate here even if the linter doesn't like it. + e.Attributes = &historypb.HistoryEvent_NexusOperationTimedOutEventAttributes{ + NexusOperationTimedOutEventAttributes: &historypb.NexusOperationTimedOutEventAttributes{ + Failure: nexusOperationFailure( + op, + eventID, + &failurepb.Failure{ + Message: "operation timed out before starting", + FailureInfo: &failurepb.Failure_TimeoutFailureInfo{ + TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ + TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, + }, + }, + }, + ), + ScheduledEventId: eventID, + RequestId: op.RequestId, + }, + } + }) + + return TransitionTimedOut.Apply(op, EventTimedOut{ + Node: node, + }) + }) +} + +func (e taskExecutor) executeStartToCloseTimeoutTask(env hsm.Environment, node *hsm.Node, task StartToCloseTimeoutTask) error { + return hsm.MachineTransition(node, func(op Operation) (hsm.TransitionOutput, error) { + eventID, err := hsm.EventIDFromToken(op.ScheduledEventToken) + if err != nil { + return hsm.TransitionOutput{}, err + } + node.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT, func(e *historypb.HistoryEvent) { + // nolint:revive // We must mutate here even if the linter doesn't like it. + e.Attributes = &historypb.HistoryEvent_NexusOperationTimedOutEventAttributes{ + NexusOperationTimedOutEventAttributes: &historypb.NexusOperationTimedOutEventAttributes{ + Failure: nexusOperationFailure( + op, + eventID, + &failurepb.Failure{ + Message: "operation timed out after starting", + FailureInfo: &failurepb.Failure_TimeoutFailureInfo{ + TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ + TimeoutType: enumspb.TIMEOUT_TYPE_START_TO_CLOSE, + }, + }, + }, + ), + ScheduledEventId: eventID, + RequestId: op.RequestId, + }, + } + }) + + return TransitionTimedOut.Apply(op, EventTimedOut{ + Node: node, + }) + }) +} + func (e taskExecutor) executeCancelationTask(ctx context.Context, env hsm.Environment, ref hsm.Ref, task CancelationTask) error { ns, err := e.NamespaceRegistry.GetNamespaceByID(namespace.ID(ref.WorkflowKey.NamespaceID)) if err != nil { @@ -573,7 +663,12 @@ func (e taskExecutor) executeCancelationTask(ctx context.Context, env hsm.Enviro } callTimeout := e.Config.RequestTimeout(ns.Name().String(), task.EndpointName) - if args.scheduleToCloseTimeout > 0 { + // Adjust timeout based on remaining operation timeouts. + // StartToClose takes precedence over ScheduleToClose since it is already capped by it. + if args.startToCloseTimeout > 0 { + opTimeout := args.startToCloseTimeout - time.Since(args.scheduledTime) + callTimeout = min(callTimeout, opTimeout) + } else if args.scheduleToCloseTimeout > 0 { opTimeout := args.scheduleToCloseTimeout - time.Since(args.scheduledTime) callTimeout = min(callTimeout, opTimeout) } @@ -652,6 +747,7 @@ type cancelArgs struct { service, operation, token, endpointID, endpointName, requestID string scheduledTime time.Time scheduleToCloseTimeout time.Duration + startToCloseTimeout time.Duration scheduledEventID int64 headers map[string]string } @@ -677,6 +773,7 @@ func (e taskExecutor) loadArgsForCancelation(ctx context.Context, env hsm.Enviro args.requestID = op.RequestId args.scheduledTime = op.ScheduledTime.AsTime() args.scheduleToCloseTimeout = op.ScheduleToCloseTimeout.AsDuration() + args.startToCloseTimeout = op.StartToCloseTimeout.AsDuration() args.scheduledEventID, err = hsm.EventIDFromToken(op.ScheduledEventToken) if err != nil { return err diff --git a/components/nexusoperations/executors_test.go b/components/nexusoperations/executors_test.go index 12b4310ad9..423f97b3d2 100644 --- a/components/nexusoperations/executors_test.go +++ b/components/nexusoperations/executors_test.go @@ -34,6 +34,7 @@ import ( "go.temporal.io/server/service/history/hsm/hsmtest" queueserrors "go.temporal.io/server/service/history/queues/errors" "go.uber.org/mock/gomock" + "google.golang.org/protobuf/types/known/durationpb" ) var endpointEntry = &persistencespb.NexusEndpointEntry{ @@ -84,6 +85,7 @@ func TestProcessInvocationTask(t *testing.T) { checkOutcome func(t *testing.T, op nexusoperations.Operation, events []*historypb.HistoryEvent) requestTimeout time.Duration schedToCloseTimeout time.Duration + startToCloseTimeout time.Duration destinationDown bool }{ { @@ -145,8 +147,9 @@ func TestProcessInvocationTask(t *testing.T) { name: "sync start", requestTimeout: time.Hour, schedToCloseTimeout: time.Hour, - header: nexus.Header{nexus.HeaderOperationTimeout: time.Microsecond.String()}, // to test this value is ignored when ScheduleToCloseTimeout is set - destinationDown: false, + // To test this value is ignored when ScheduleToCloseTimeout is set (but still set on the header). + header: nexus.Header{nexus.HeaderOperationTimeout: commonnexus.FormatDuration(time.Millisecond)}, + destinationDown: false, onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { // Also use this test case to check the input and options provided. if service != "service" { @@ -161,6 +164,9 @@ func TestProcessInvocationTask(t *testing.T) { if options.CallbackURL != "http://localhost/callback" { return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "invalid callback URL") } + if options.Header.Get(nexus.HeaderOperationTimeout) != "1ms" { + return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "invalid operation timeout header: %s", options.Header.Get(nexus.HeaderOperationTimeout)) + } var v string if err := input.Consume(&v); err != nil || v != "input" { return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "invalid input") @@ -315,6 +321,31 @@ func TestProcessInvocationTask(t *testing.T) { destinationDown: true, expectedMetricOutcome: "request-timeout", onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { + opTimeout, err := time.ParseDuration(options.Header.Get(nexus.HeaderOperationTimeout)) + if err != nil || opTimeout > 10*time.Millisecond { + return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "invalid operation timeout header: %s", options.Header.Get(nexus.HeaderOperationTimeout)) + } + time.Sleep(time.Millisecond * 100) //nolint:forbidigo // Allow time.Sleep for timeout tests + return &nexus.HandlerStartOperationResultAsync{OperationToken: "op-token"}, nil + }, + checkOutcome: func(t *testing.T, op nexusoperations.Operation, events []*historypb.HistoryEvent) { + require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, op.State()) + require.NotNil(t, op.LastAttemptFailure.GetApplicationFailureInfo()) + require.Regexp(t, "request timed out", op.LastAttemptFailure.Message) + require.Empty(t, events) + }, + }, + { + name: "invocation timeout by StartToCloseTimeout", + requestTimeout: time.Hour, + startToCloseTimeout: 10 * time.Millisecond, + destinationDown: true, + expectedMetricOutcome: "request-timeout", + onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { + opTimeout, err := time.ParseDuration(options.Header.Get(nexus.HeaderOperationTimeout)) + if err != nil || opTimeout > 10*time.Millisecond { + return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "invalid operation timeout header: %s", options.Header.Get(nexus.HeaderOperationTimeout)) + } time.Sleep(time.Millisecond * 100) //nolint:forbidigo // Allow time.Sleep for timeout tests return &nexus.HandlerStartOperationResultAsync{OperationToken: "op-token"}, nil }, @@ -436,7 +467,10 @@ func TestProcessInvocationTask(t *testing.T) { nexustest.NewNexusServer(t, listenAddr, h) reg := newRegistry(t) - event := mustNewScheduledEvent(time.Now(), tc.schedToCloseTimeout) + event := mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(tc.schedToCloseTimeout), + StartToCloseTimeout: durationpb.New(tc.startToCloseTimeout), + }) if tc.eventHasNoEndpointID { event.GetNexusOperationScheduledEventAttributes().EndpointId = "" } @@ -555,7 +589,9 @@ func TestProcessInvocationTask(t *testing.T) { func TestProcessBackoffTask(t *testing.T) { reg := newRegistry(t) backend := &hsmtest.NodeBackend{} - node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), time.Hour)) + node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(time.Hour), + })) env := fakeEnv{node} require.NoError(t, nexusoperations.RegisterExecutor(reg, nexusoperations.TaskExecutorOptions{})) @@ -584,7 +620,9 @@ func TestProcessBackoffTask(t *testing.T) { func TestProcessTimeoutTask(t *testing.T) { reg := newRegistry(t) backend := &hsmtest.NodeBackend{} - node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), time.Hour)) + node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(time.Hour), + })) env := fakeEnv{node} require.NoError(t, nexusoperations.RegisterExecutor(reg, nexusoperations.TaskExecutorOptions{})) @@ -625,6 +663,102 @@ func TestProcessTimeoutTask(t *testing.T) { }, backend.Events[0].GetNexusOperationTimedOutEventAttributes()) } +func TestProcessScheduleToStartTimeoutTask(t *testing.T) { + reg := newRegistry(t) + backend := &hsmtest.NodeBackend{} + node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(time.Hour), + })) + env := fakeEnv{node} + + require.NoError(t, nexusoperations.RegisterExecutor(reg, nexusoperations.TaskExecutorOptions{})) + + err := reg.ExecuteTimerTask( + env, + node, + nexusoperations.ScheduleToStartTimeoutTask{}, + ) + require.NoError(t, err) + op, err := hsm.MachineData[nexusoperations.Operation](node) + require.NoError(t, err) + require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_TIMED_OUT, op.State()) + require.Len(t, backend.Events, 1) + require.Equal(t, enumspb.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT, backend.Events[0].EventType) + protorequire.ProtoEqual(t, &historypb.NexusOperationTimedOutEventAttributes{ + ScheduledEventId: 1, + RequestId: op.RequestId, + Failure: &failurepb.Failure{ + Message: "nexus operation completed unsuccessfully", + FailureInfo: &failurepb.Failure_NexusOperationExecutionFailureInfo{ + NexusOperationExecutionFailureInfo: &failurepb.NexusOperationFailureInfo{ + ScheduledEventId: 1, + Endpoint: "endpoint", + Service: "service", + Operation: "operation", + }, + }, + Cause: &failurepb.Failure{ + Message: "operation timed out before starting", + FailureInfo: &failurepb.Failure_TimeoutFailureInfo{ + TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ + TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, + }, + }, + }, + }, + }, backend.Events[0].GetNexusOperationTimedOutEventAttributes()) +} + +func TestProcessStartToCloseTimeoutTask(t *testing.T) { + reg := newRegistry(t) + backend := &hsmtest.NodeBackend{} + node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(time.Hour), + })) + env := fakeEnv{node} + + require.NoError(t, nexusoperations.RegisterExecutor(reg, nexusoperations.TaskExecutorOptions{})) + + // Transition to STARTED state first + err := hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { + return nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ + Node: node, + Time: time.Now(), + Attributes: &historypb.NexusOperationStartedEventAttributes{ + OperationToken: "test-token", + }, + }) + }) + require.NoError(t, err) + + // Now execute the start-to-close timeout + err = reg.ExecuteTimerTask( + env, + node, + nexusoperations.StartToCloseTimeoutTask{}, + ) + require.NoError(t, err) + op, err := hsm.MachineData[nexusoperations.Operation](node) + require.NoError(t, err) + require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_TIMED_OUT, op.State()) + // Should have TIMED_OUT event (STARTED event is not added by the transition) + require.Len(t, backend.Events, 1) + require.Equal(t, enumspb.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT, backend.Events[0].EventType) + // Verify timeout type and message + timedOutAttrs := backend.Events[0].GetNexusOperationTimedOutEventAttributes() + require.Equal(t, int64(1), timedOutAttrs.ScheduledEventId) + require.Equal(t, op.RequestId, timedOutAttrs.RequestId) + require.NotNil(t, timedOutAttrs.Failure) + require.Equal(t, "nexus operation completed unsuccessfully", timedOutAttrs.Failure.Message) + require.NotNil(t, timedOutAttrs.Failure.Cause) + require.Equal(t, "operation timed out after starting", timedOutAttrs.Failure.Cause.Message) + require.Equal(t, enumspb.TIMEOUT_TYPE_START_TO_CLOSE, timedOutAttrs.Failure.Cause.GetTimeoutFailureInfo().TimeoutType) + // Verify operation token is present in failure info + nexusFailureInfo := timedOutAttrs.Failure.GetNexusOperationExecutionFailureInfo() + require.NotNil(t, nexusFailureInfo) + require.Equal(t, "test-token", nexusFailureInfo.OperationToken) +} + func TestProcessCancelationTask(t *testing.T) { cases := []struct { name string @@ -634,6 +768,7 @@ func TestProcessCancelationTask(t *testing.T) { checkOutcome func(t *testing.T, op nexusoperations.Cancelation) requestTimeout time.Duration schedToCloseTimeout time.Duration + startToCloseTimeout time.Duration destinationDown bool header map[string]string }{ @@ -716,7 +851,7 @@ func TestProcessCancelationTask(t *testing.T) { }, }, { - name: "operation timeout", + name: "operation timeout by ScheduleToCloseTimeout", requestTimeout: time.Hour, schedToCloseTimeout: time.Microsecond, destinationDown: false, @@ -728,6 +863,19 @@ func TestProcessCancelationTask(t *testing.T) { require.Regexp(t, nexusoperations.ErrOperationTimeoutBelowMin.Error(), c.LastAttemptFailure.Message) }, }, + { + name: "operation timeout by StartToCloseTimeout", + requestTimeout: time.Hour, + startToCloseTimeout: time.Microsecond, + destinationDown: false, + onCancelOperation: nil, // This should not be called if the operation has timed out. + expectedMetricOutcome: "operation-timeout", + checkOutcome: func(t *testing.T, c nexusoperations.Cancelation) { + require.Equal(t, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED, c.State()) + require.NotNil(t, c.LastAttemptFailure.GetApplicationFailureInfo()) + require.Regexp(t, nexusoperations.ErrOperationTimeoutBelowMin.Error(), c.LastAttemptFailure.Message) + }, + }, { name: "endpoint not found", endpointNotFound: true, @@ -752,7 +900,10 @@ func TestProcessCancelationTask(t *testing.T) { nexustest.NewNexusServer(t, listenAddr, h) reg := newRegistry(t) - event := mustNewScheduledEvent(time.Now(), tc.schedToCloseTimeout) + event := mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(tc.schedToCloseTimeout), + StartToCloseTimeout: durationpb.New(tc.startToCloseTimeout), + }) if tc.header != nil { event.GetNexusOperationScheduledEventAttributes().NexusHeader = tc.header } @@ -866,7 +1017,9 @@ func TestProcessCancelationTask_OperationCompleted(t *testing.T) { reg := newRegistry(t) backend := &hsmtest.NodeBackend{} - node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), time.Hour)) + node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(time.Hour), + })) op, err := hsm.MachineData[nexusoperations.Operation](node) require.NoError(t, err) _, err = nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ @@ -925,7 +1078,9 @@ func TestProcessCancelationTask_OperationCompleted(t *testing.T) { func TestProcessCancelationBackoffTask(t *testing.T) { reg := newRegistry(t) backend := &hsmtest.NodeBackend{} - node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), time.Hour)) + node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(time.Hour), + })) op, err := hsm.MachineData[nexusoperations.Operation](node) require.NoError(t, err) _, err = nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ diff --git a/components/nexusoperations/helpers_test.go b/components/nexusoperations/helpers_test.go index 4ec23bbe37..e111015cda 100644 --- a/components/nexusoperations/helpers_test.go +++ b/components/nexusoperations/helpers_test.go @@ -13,11 +13,11 @@ import ( historypb "go.temporal.io/api/history/v1" "go.temporal.io/sdk/converter" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common" "go.temporal.io/server/components/nexusoperations" "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/hsm/hsmtest" "go.temporal.io/server/service/history/workflow" - "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -72,23 +72,33 @@ func (root) IsTransitionHistoryEnabled() bool { return false } -func mustNewScheduledEvent(schedTime time.Time, timeout time.Duration) *historypb.HistoryEvent { - conv := converter.GetDefaultDataConverter() - payload, err := conv.ToPayload("input") - if err != nil { - panic(err) +func mustNewScheduledEvent(schedTime time.Time, defaults *historypb.NexusOperationScheduledEventAttributes) *historypb.HistoryEvent { + if defaults == nil { + defaults = &historypb.NexusOperationScheduledEventAttributes{} } - - attr := &historypb.NexusOperationScheduledEventAttributes{ - EndpointId: "endpoint-id", - Endpoint: "endpoint", - Service: "service", - Operation: "operation", - Input: payload, - RequestId: uuid.NewString(), + attrs := common.CloneProto(defaults) + if attrs.EndpointId == "" { + attrs.EndpointId = "endpoint-id" + } + if attrs.Endpoint == "" { + attrs.Endpoint = "endpoint" + } + if attrs.Service == "" { + attrs.Service = "service" + } + if attrs.Operation == "" { + attrs.Operation = "operation" + } + if attrs.Input == nil { + conv := converter.GetDefaultDataConverter() + payload, err := conv.ToPayload("input") + if err != nil { + panic(err) + } + attrs.Input = payload } - if timeout > 0 { - attr.ScheduleToCloseTimeout = durationpb.New(timeout) + if attrs.RequestId == "" { + attrs.RequestId = uuid.NewString() } return &historypb.HistoryEvent{ @@ -96,7 +106,7 @@ func mustNewScheduledEvent(schedTime time.Time, timeout time.Duration) *historyp EventId: 1, EventTime: timestamppb.New(schedTime), Attributes: &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{ - NexusOperationScheduledEventAttributes: attr, + NexusOperationScheduledEventAttributes: attrs, }, } } diff --git a/components/nexusoperations/statemachine.go b/components/nexusoperations/statemachine.go index 0ad3d888df..c40d2c870a 100644 --- a/components/nexusoperations/statemachine.go +++ b/components/nexusoperations/statemachine.go @@ -54,6 +54,8 @@ func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventTok Operation: attrs.Operation, ScheduledTime: event.EventTime, ScheduleToCloseTimeout: attrs.ScheduleToCloseTimeout, + ScheduleToStartTimeout: attrs.ScheduleToStartTimeout, + StartToCloseTimeout: attrs.StartToCloseTimeout, RequestId: attrs.RequestId, State: enumsspb.NEXUS_OPERATION_STATE_UNSPECIFIED, ScheduledEventToken: eventToken, @@ -137,10 +139,34 @@ func (o Operation) transitionTasks() ([]hsm.Task, error) { // creationTasks returns tasks that are emitted when the machine is created. func (o Operation) creationTasks() ([]hsm.Task, error) { + var tasks []hsm.Task + if o.ScheduleToCloseTimeout.AsDuration() != 0 { - return []hsm.Task{TimeoutTask{deadline: o.ScheduledTime.AsTime().Add(o.ScheduleToCloseTimeout.AsDuration())}}, nil + tasks = append(tasks, TimeoutTask{ + deadline: o.ScheduledTime.AsTime().Add(o.ScheduleToCloseTimeout.AsDuration()), + }) + } + + if o.ScheduleToStartTimeout.AsDuration() != 0 { + tasks = append(tasks, ScheduleToStartTimeoutTask{ + deadline: o.ScheduledTime.AsTime().Add(o.ScheduleToStartTimeout.AsDuration()), + }) + } + + return tasks, nil +} + +// startToCloseTimeoutTask returns the StartToCloseTimeout task if the timeout is set. +// This task is created when an operation transitions to the STARTED state. +func (o Operation) startToCloseTimeoutTask() []hsm.Task { + if o.StartedTime.AsTime().IsZero() || o.StartToCloseTimeout.AsDuration() == 0 { + return nil + } + return []hsm.Task{ + StartToCloseTimeoutTask{ + deadline: o.StartedTime.AsTime().Add(o.StartToCloseTimeout.AsDuration()), + }, } - return nil, nil } func (o Operation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error) { @@ -152,7 +178,7 @@ func (o Operation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error) { if err != nil { return nil, err } - return append(transitionTasks, creationTasks...), nil + return append(append(transitionTasks, creationTasks...), o.startToCloseTimeoutTask()...), nil } func (o Operation) output() (hsm.TransitionOutput, error) { @@ -347,6 +373,8 @@ var TransitionStarted = hsm.NewTransition( op.OperationToken = event.Attributes.OperationId //nolint:staticcheck // SA1019 this field might be set in older histories. } + op.StartedTime = timestamppb.New(event.Time) + // If cancelation is requested already, schedule sending the cancelation request. child, err := op.CancelationNode(event.Node) if err != nil { @@ -360,7 +388,16 @@ var TransitionStarted = hsm.NewTransition( }) }) } - return op.output() + + output, err := op.output() + if err != nil { + return output, err + } + + // Schedule start-to-close timeout task if configured + output.Tasks = append(output.Tasks, op.startToCloseTimeoutTask()...) + + return output, nil }, ) diff --git a/components/nexusoperations/statemachine_test.go b/components/nexusoperations/statemachine_test.go index 67fc425c55..b63731093d 100644 --- a/components/nexusoperations/statemachine_test.go +++ b/components/nexusoperations/statemachine_test.go @@ -31,7 +31,7 @@ func TestAddChild(t *testing.T) { assertTasks: func(t *testing.T, tasks []hsm.Task) { require.Equal(t, 2, len(tasks)) require.Equal(t, nexusoperations.TaskTypeInvocation, tasks[0].Type()) - require.Equal(t, nexusoperations.TaskTypeTimeout, tasks[1].Type()) + require.Equal(t, nexusoperations.TaskTypeScheduleToCloseTimeout, tasks[1].Type()) }, }, { @@ -86,57 +86,216 @@ func TestAddChild(t *testing.T) { } } +func TestAddChildWithNewTimeouts(t *testing.T) { + cases := []struct { + name string + scheduleToCloseTimeout time.Duration + scheduleToStartTimeout time.Duration + startToCloseTimeout time.Duration + assertTasks func(t *testing.T, tasks []hsm.Task) + }{ + { + name: "with all timeouts", + scheduleToCloseTimeout: time.Hour, + scheduleToStartTimeout: 30 * time.Minute, + startToCloseTimeout: 45 * time.Minute, + assertTasks: func(t *testing.T, tasks []hsm.Task) { + // Should have Invocation, ScheduleToClose, and ScheduleToStart tasks + require.Len(t, tasks, 3) + require.Equal(t, nexusoperations.TaskTypeInvocation, tasks[0].Type()) + require.Equal(t, nexusoperations.TaskTypeScheduleToCloseTimeout, tasks[1].Type()) + require.Equal(t, nexusoperations.TaskTypeScheduleToStartTimeout, tasks[2].Type()) + }, + }, + { + name: "with schedule-to-start only", + scheduleToCloseTimeout: 0, + scheduleToStartTimeout: 30 * time.Minute, + startToCloseTimeout: 0, + assertTasks: func(t *testing.T, tasks []hsm.Task) { + require.Len(t, tasks, 2) + require.Equal(t, nexusoperations.TaskTypeInvocation, tasks[0].Type()) + require.Equal(t, nexusoperations.TaskTypeScheduleToStartTimeout, tasks[1].Type()) + }, + }, + { + name: "without new timeouts", + scheduleToCloseTimeout: time.Hour, + scheduleToStartTimeout: 0, + startToCloseTimeout: 0, + assertTasks: func(t *testing.T, tasks []hsm.Task) { + require.Len(t, tasks, 2) + require.Equal(t, nexusoperations.TaskTypeInvocation, tasks[0].Type()) + require.Equal(t, nexusoperations.TaskTypeScheduleToCloseTimeout, tasks[1].Type()) + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + root := newRoot(t, &hsmtest.NodeBackend{}) + schedTime := timestamppb.Now() + event := &historypb.HistoryEvent{ + EventTime: schedTime, + Attributes: &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{ + NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{ + EndpointId: "endpoint-id", + Endpoint: "endpoint", + Service: "service", + Operation: "operation", + RequestId: "request-id", + ScheduleToCloseTimeout: durationpb.New(tc.scheduleToCloseTimeout), + ScheduleToStartTimeout: durationpb.New(tc.scheduleToStartTimeout), + StartToCloseTimeout: durationpb.New(tc.startToCloseTimeout), + }, + }, + } + child, err := nexusoperations.AddChild(root, "test-id", event, []byte("token")) + require.NoError(t, err) + opLog, err := root.OpLog() + require.NoError(t, err) + require.Len(t, opLog, 1) + transitionOp, ok := opLog[0].(hsm.TransitionOperation) + require.True(t, ok) + tc.assertTasks(t, transitionOp.Output.Tasks) + + op, err := hsm.MachineData[nexusoperations.Operation](child) + require.NoError(t, err) + require.Equal(t, tc.scheduleToCloseTimeout, op.ScheduleToCloseTimeout.AsDuration()) + require.Equal(t, tc.scheduleToStartTimeout, op.ScheduleToStartTimeout.AsDuration()) + require.Equal(t, tc.startToCloseTimeout, op.StartToCloseTimeout.AsDuration()) + }) + } +} + +func TestTransitionStartedEmitsStartToCloseTimeout(t *testing.T) { + root := newRoot(t, &hsmtest.NodeBackend{}) + schedTime := timestamppb.Now() + event := &historypb.HistoryEvent{ + EventTime: schedTime, + Attributes: &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{ + NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{ + EndpointId: "endpoint-id", + Endpoint: "endpoint", + Service: "service", + Operation: "operation", + RequestId: "request-id", + ScheduleToCloseTimeout: durationpb.New(time.Hour), + StartToCloseTimeout: durationpb.New(30 * time.Minute), + }, + }, + } + child, err := nexusoperations.AddChild(root, "test-id", event, []byte("token")) + require.NoError(t, err) + + // Transition to STARTED state + err = hsm.MachineTransition(child, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { + return nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ + Node: child, + Time: time.Now(), + Attributes: &historypb.NexusOperationStartedEventAttributes{ + OperationToken: "test-token", + }, + }) + }) + require.NoError(t, err) + + opLog, err := root.OpLog() + require.NoError(t, err) + // Should have 2 operations: initial AddChild transition and TransitionStarted + require.Len(t, opLog, 2) + + // Check the TransitionStarted output + transitionOp, ok := opLog[1].(hsm.TransitionOperation) + require.True(t, ok) + // Should have StartToCloseTimeout task + require.Len(t, transitionOp.Output.Tasks, 1) + require.Equal(t, nexusoperations.TaskTypeStartToCloseTimeout, transitionOp.Output.Tasks[0].Type()) +} + func TestRegenerateTasks(t *testing.T) { cases := []struct { - name string - timeout time.Duration - state enumsspb.NexusOperationState - assertTasks func(t *testing.T, tasks []hsm.Task) + name string + scheduleToCloseTimeout time.Duration + startToCloseTimeout time.Duration + state enumsspb.NexusOperationState + assertTasks func(t *testing.T, tasks []hsm.Task) }{ { - name: "scheduled | with timeout", - timeout: time.Hour, - state: enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, + name: "scheduled | with timeout", + scheduleToCloseTimeout: time.Hour, + state: enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, assertTasks: func(t *testing.T, tasks []hsm.Task) { require.Equal(t, 2, len(tasks)) require.Equal(t, nexusoperations.TaskTypeInvocation, tasks[0].Type()) require.Equal(t, tasks[0].(nexusoperations.InvocationTask).EndpointName, "endpoint") - require.Equal(t, nexusoperations.TaskTypeTimeout, tasks[1].Type()) + require.Equal(t, nexusoperations.TaskTypeScheduleToCloseTimeout, tasks[1].Type()) }, }, { - name: "scheduled | without timeout", - timeout: 0, - state: enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, + name: "scheduled | without timeout", + state: enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, assertTasks: func(t *testing.T, tasks []hsm.Task) { require.Equal(t, 1, len(tasks)) require.Equal(t, nexusoperations.TaskTypeInvocation, tasks[0].Type()) }, }, { - name: "backing off | with timeout", - timeout: time.Hour, - state: enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, + name: "backing off | with timeout", + scheduleToCloseTimeout: time.Hour, + state: enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, assertTasks: func(t *testing.T, tasks []hsm.Task) { require.Equal(t, 2, len(tasks)) require.Equal(t, nexusoperations.TaskTypeBackoff, tasks[0].Type()) - require.Equal(t, nexusoperations.TaskTypeTimeout, tasks[1].Type()) + require.Equal(t, nexusoperations.TaskTypeScheduleToCloseTimeout, tasks[1].Type()) }, }, { - name: "backing off | without timeout", - timeout: 0, - state: enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, + name: "backing off | without timeout", + state: enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, assertTasks: func(t *testing.T, tasks []hsm.Task) { require.Equal(t, 1, len(tasks)) require.Equal(t, nexusoperations.TaskTypeBackoff, tasks[0].Type()) }, }, + { + name: "started | with start to close timeout", + startToCloseTimeout: 15 * time.Minute, + state: enumsspb.NEXUS_OPERATION_STATE_STARTED, + assertTasks: func(t *testing.T, tasks []hsm.Task) { + require.Len(t, tasks, 1) + require.Equal(t, nexusoperations.TaskTypeStartToCloseTimeout, tasks[0].Type()) + }, + }, + { + name: "started | without start to close timeout", + state: enumsspb.NEXUS_OPERATION_STATE_STARTED, + assertTasks: func(t *testing.T, tasks []hsm.Task) { + require.Empty(t, tasks) + }, + }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), tc.timeout)) + schedTime := time.Now() + event := &historypb.HistoryEvent{ + EventType: enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, + EventId: 1, + EventTime: timestamppb.New(schedTime), + Attributes: &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{ + NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{ + EndpointId: "endpoint-id", + Endpoint: "endpoint", + Service: "service", + Operation: "operation", + RequestId: "request-id", + ScheduleToCloseTimeout: durationpb.New(tc.scheduleToCloseTimeout), + StartToCloseTimeout: durationpb.New(tc.startToCloseTimeout), + }, + }, + } + node := newOperationNode(t, &hsmtest.NodeBackend{}, event) if tc.state == enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF { require.NoError(t, hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { @@ -147,6 +306,16 @@ func TestRegenerateTasks(t *testing.T) { RetryPolicy: backoff.NewExponentialRetryPolicy(time.Second), }) })) + } else if tc.state == enumsspb.NEXUS_OPERATION_STATE_STARTED { + require.NoError(t, hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { + return nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ + Time: time.Now(), + Node: node, + Attributes: &historypb.NexusOperationStartedEventAttributes{ + OperationToken: "operation-token", + }, + }) + })) } op, err := hsm.MachineData[nexusoperations.Operation](node) @@ -159,7 +328,9 @@ func TestRegenerateTasks(t *testing.T) { } func TestRetry(t *testing.T) { - node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Minute)) + node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(time.Minute), + })) // Reset any outputs generated from nexusoperations.AddChild, we tested those already. node.ClearTransactionState() require.NoError(t, hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { @@ -283,7 +454,9 @@ func TestCompleteFromAttempt(t *testing.T) { } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Minute)) + node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(time.Minute), + })) // Reset any outputs generated from nexusoperations.AddChild, we tested those already. node.ClearTransactionState() require.NoError(t, hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { @@ -310,13 +483,17 @@ func TestCompleteExternally(t *testing.T) { { name: "scheduled", fn: func(t *testing.T) *hsm.Node { - return newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Minute)) + return newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(time.Minute), + })) }, }, { name: "backing off", fn: func(t *testing.T) *hsm.Node { - node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Minute)) + node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(time.Minute), + })) require.NoError(t, hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { return nexusoperations.TransitionAttemptFailed.Apply(op, nexusoperations.EventAttemptFailed{ Node: node, @@ -331,7 +508,9 @@ func TestCompleteExternally(t *testing.T) { { name: "started", fn: func(t *testing.T) *hsm.Node { - node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Minute)) + node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(time.Minute), + })) require.NoError(t, hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { return nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ Node: node, @@ -421,7 +600,9 @@ func TestCompleteExternally(t *testing.T) { func TestCancel(t *testing.T) { backend := &hsmtest.NodeBackend{} - root := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), time.Hour)) + root := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(time.Hour), + })) op, err := hsm.MachineData[nexusoperations.Operation](root) require.NoError(t, err) _, err = nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ @@ -444,7 +625,9 @@ func TestCancel(t *testing.T) { func TestCancelationValidTransitions(t *testing.T) { // Setup - root := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Hour)) + root := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ + ScheduleToCloseTimeout: durationpb.New(time.Hour), + })) // We don't support cancel before started. Mark the operation as started. require.NoError(t, hsm.MachineTransition(root, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { return nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ @@ -553,7 +736,7 @@ func TestCancelationValidTransitions(t *testing.T) { func TestCancelationBeforeStarted(t *testing.T) { // Setup backend := &hsmtest.NodeBackend{} - root := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), 0)) + root := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), nil)) require.NoError(t, hsm.MachineTransition(root, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { return op.Cancel(root, time.Now(), 0) })) diff --git a/components/nexusoperations/tasks.go b/components/nexusoperations/tasks.go index c513733344..b7c2a24d62 100644 --- a/components/nexusoperations/tasks.go +++ b/components/nexusoperations/tasks.go @@ -16,11 +16,14 @@ import ( ) const ( - TaskTypeTimeout = "nexusoperations.Timeout" TaskTypeInvocation = "nexusoperations.Invocation" TaskTypeBackoff = "nexusoperations.Backoff" TaskTypeCancelation = "nexusoperations.Cancelation" TaskTypeCancelationBackoff = "nexusoperations.CancelationBackoff" + // NOTE: the name `Timeout` is used for backward compatibility with existing persisted tasks and predates the addition of more flexible timeout types. + TaskTypeScheduleToCloseTimeout = "nexusoperations.Timeout" + TaskTypeScheduleToStartTimeout = "nexusoperations.ScheduleToStartTimeout" + TaskTypeStartToCloseTimeout = "nexusoperations.StartToCloseTimeout" ) var errSerializationCast = errors.New("cannot serialize HSM task. unable to cast to expected type") @@ -32,7 +35,7 @@ type TimeoutTask struct { var _ hsm.Task = TimeoutTask{} func (TimeoutTask) Type() string { - return TaskTypeTimeout + return TaskTypeScheduleToCloseTimeout } func (t TimeoutTask) Deadline() time.Time { @@ -235,8 +238,112 @@ func (CancelationBackoffTaskSerializer) Serialize(hsm.Task) ([]byte, error) { return nil, nil } +type ScheduleToStartTimeoutTask struct { + deadline time.Time +} + +var _ hsm.Task = ScheduleToStartTimeoutTask{} + +func (ScheduleToStartTimeoutTask) Type() string { + return TaskTypeScheduleToStartTimeout +} + +func (t ScheduleToStartTimeoutTask) Deadline() time.Time { + return t.deadline +} + +func (ScheduleToStartTimeoutTask) Destination() string { + return "" +} + +// Validate checks if the schedule-to-start timeout task is still valid. +// Only valid if operation is still in SCHEDULED or BACKING_OFF state. +func (t ScheduleToStartTimeoutTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error { + if err := node.CheckRunning(); err != nil { + return err + } + op, err := hsm.MachineData[Operation](node) + if err != nil { + return err + } + // Only timeout if we haven't started yet + switch op.State() { + case enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, + enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF: + return nil + default: + // Already started or completed, timeout not applicable + return fmt.Errorf( + "%w: %w: cannot apply schedule-to-start timeout to machine in state %v", + consts.ErrStaleReference, + hsm.ErrInvalidTransition, + op.State(), + ) + } +} + +type ScheduleToStartTimeoutTaskSerializer struct{} + +func (ScheduleToStartTimeoutTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error) { + return ScheduleToStartTimeoutTask{deadline: attrs.Deadline}, nil +} + +func (ScheduleToStartTimeoutTaskSerializer) Serialize(hsm.Task) ([]byte, error) { + return nil, nil +} + +type StartToCloseTimeoutTask struct { + deadline time.Time +} + +var _ hsm.Task = StartToCloseTimeoutTask{} + +func (StartToCloseTimeoutTask) Type() string { + return TaskTypeStartToCloseTimeout +} + +func (t StartToCloseTimeoutTask) Deadline() time.Time { + return t.deadline +} + +func (StartToCloseTimeoutTask) Destination() string { + return "" +} + +// Validate checks if the start-to-close timeout task is still valid. +// Only valid if operation is in STARTED state. +func (t StartToCloseTimeoutTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error { + if err := node.CheckRunning(); err != nil { + return err + } + op, err := hsm.MachineData[Operation](node) + if err != nil { + return err + } + // Only timeout if we're in started state + if op.State() != enumsspb.NEXUS_OPERATION_STATE_STARTED { + return fmt.Errorf( + "%w: %w: cannot apply start-to-close timeout to machine in state %v", + consts.ErrStaleReference, + hsm.ErrInvalidTransition, + op.State(), + ) + } + return nil +} + +type StartToCloseTimeoutTaskSerializer struct{} + +func (StartToCloseTimeoutTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error) { + return StartToCloseTimeoutTask{deadline: attrs.Deadline}, nil +} + +func (StartToCloseTimeoutTaskSerializer) Serialize(hsm.Task) ([]byte, error) { + return nil, nil +} + func RegisterTaskSerializers(reg *hsm.Registry) error { - if err := reg.RegisterTaskSerializer(TaskTypeTimeout, TimeoutTaskSerializer{}); err != nil { + if err := reg.RegisterTaskSerializer(TaskTypeScheduleToCloseTimeout, TimeoutTaskSerializer{}); err != nil { return err } if err := reg.RegisterTaskSerializer(TaskTypeInvocation, InvocationTaskSerializer{}); err != nil { @@ -251,5 +358,8 @@ func RegisterTaskSerializers(reg *hsm.Registry) error { if err := reg.RegisterTaskSerializer(TaskTypeCancelationBackoff, CancelationBackoffTaskSerializer{}); err != nil { // nolint:revive return err } - return nil + if err := reg.RegisterTaskSerializer(TaskTypeScheduleToStartTimeout, ScheduleToStartTimeoutTaskSerializer{}); err != nil { + return err + } + return reg.RegisterTaskSerializer(TaskTypeStartToCloseTimeout, StartToCloseTimeoutTaskSerializer{}) } diff --git a/components/nexusoperations/workflow/commands.go b/components/nexusoperations/workflow/commands.go index 8bb59fea12..cdd0239a94 100644 --- a/components/nexusoperations/workflow/commands.go +++ b/components/nexusoperations/workflow/commands.go @@ -100,6 +100,22 @@ func (ch *commandHandler) HandleScheduleCommand( } } + if err := timestamp.ValidateAndCapProtoDuration(attrs.ScheduleToStartTimeout); err != nil { + return workflow.FailWorkflowTaskError{ + Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES, + Message: fmt.Sprintf( + "ScheduleNexusOperationCommandAttributes.ScheduleToStartTimeout is invalid: %v", err), + } + } + + if err := timestamp.ValidateAndCapProtoDuration(attrs.StartToCloseTimeout); err != nil { + return workflow.FailWorkflowTaskError{ + Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES, + Message: fmt.Sprintf( + "ScheduleNexusOperationCommandAttributes.StartToCloseTimeout is invalid: %v", err), + } + } + if !validator.IsValidPayloadSize(attrs.Input.Size()) { return workflow.FailWorkflowTaskError{ Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES, @@ -152,6 +168,19 @@ func (ch *commandHandler) HandleScheduleCommand( attrs.ScheduleToCloseTimeout = durationpb.New(maxTimeout) } + // Trim secondary timeouts to the primary timeout. + scheduleToCloseTimeout := attrs.ScheduleToCloseTimeout.AsDuration() + scheduleToStartTimeout := attrs.ScheduleToStartTimeout.AsDuration() + startToCloseTimeout := attrs.StartToCloseTimeout.AsDuration() + + if scheduleToCloseTimeout > 0 && scheduleToStartTimeout > 0 && scheduleToStartTimeout > scheduleToCloseTimeout { + attrs.ScheduleToStartTimeout = attrs.ScheduleToCloseTimeout + } + + if scheduleToCloseTimeout > 0 && startToCloseTimeout > 0 && startToCloseTimeout > scheduleToCloseTimeout { + attrs.StartToCloseTimeout = attrs.ScheduleToCloseTimeout + } + event := ms.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, func(he *historypb.HistoryEvent) { he.Attributes = &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{ NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{ @@ -161,6 +190,8 @@ func (ch *commandHandler) HandleScheduleCommand( Operation: attrs.Operation, Input: attrs.Input, ScheduleToCloseTimeout: attrs.ScheduleToCloseTimeout, + ScheduleToStartTimeout: attrs.ScheduleToStartTimeout, + StartToCloseTimeout: attrs.StartToCloseTimeout, NexusHeader: lowerCaseHeader, RequestId: uuid.NewString(), WorkflowTaskCompletedEventId: workflowTaskCompletedEventID, diff --git a/components/nexusoperations/workflow/commands_test.go b/components/nexusoperations/workflow/commands_test.go index e5f7702c09..bec7340421 100644 --- a/components/nexusoperations/workflow/commands_test.go +++ b/components/nexusoperations/workflow/commands_test.go @@ -388,6 +388,128 @@ func TestHandleScheduleCommand(t *testing.T) { }) } + t.Run("invalid schedule-to-start timeout", func(t *testing.T) { + tcx := newTestContext(t, defaultConfig) + err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ + Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ + Endpoint: "endpoint", + Service: "service", + Operation: "op", + ScheduleToStartTimeout: durationpb.New(-1 * time.Second), + }, + }, + }) + var failWFTErr workflow.FailWorkflowTaskError + require.ErrorAs(t, err, &failWFTErr) + require.False(t, failWFTErr.TerminateWorkflow) + require.Equal(t, enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES, failWFTErr.Cause) + require.Empty(t, tcx.history.Events) + }) + + t.Run("invalid start-to-close timeout", func(t *testing.T) { + tcx := newTestContext(t, defaultConfig) + err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ + Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ + Endpoint: "endpoint", + Service: "service", + Operation: "op", + StartToCloseTimeout: durationpb.New(-1 * time.Second), + }, + }, + }) + var failWFTErr workflow.FailWorkflowTaskError + require.ErrorAs(t, err, &failWFTErr) + require.False(t, failWFTErr.TerminateWorkflow) + require.Equal(t, enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES, failWFTErr.Cause) + require.Empty(t, tcx.history.Events) + }) + + t.Run("schedule-to-start timeout trimmed to schedule-to-close timeout", func(t *testing.T) { + tcx := newTestContext(t, defaultConfig) + err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ + Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ + Endpoint: "endpoint", + Service: "service", + Operation: "op", + ScheduleToCloseTimeout: durationpb.New(30 * time.Minute), + ScheduleToStartTimeout: durationpb.New(time.Hour), + }, + }, + }) + require.NoError(t, err) + require.Len(t, tcx.history.Events, 1) + eAttrs := tcx.history.Events[0].GetNexusOperationScheduledEventAttributes() + require.Equal(t, 30*time.Minute, eAttrs.ScheduleToStartTimeout.AsDuration()) + require.Equal(t, 30*time.Minute, eAttrs.ScheduleToCloseTimeout.AsDuration()) + }) + + t.Run("start-to-close timeout trimmed to schedule-to-close timeout", func(t *testing.T) { + tcx := newTestContext(t, defaultConfig) + err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ + Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ + Endpoint: "endpoint", + Service: "service", + Operation: "op", + ScheduleToCloseTimeout: durationpb.New(30 * time.Minute), + StartToCloseTimeout: durationpb.New(time.Hour), + }, + }, + }) + require.NoError(t, err) + require.Len(t, tcx.history.Events, 1) + eAttrs := tcx.history.Events[0].GetNexusOperationScheduledEventAttributes() + require.Equal(t, 30*time.Minute, eAttrs.StartToCloseTimeout.AsDuration()) + require.Equal(t, 30*time.Minute, eAttrs.ScheduleToCloseTimeout.AsDuration()) + }) + + t.Run("both secondary timeouts trimmed to schedule-to-close timeout", func(t *testing.T) { + tcx := newTestContext(t, defaultConfig) + err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ + Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ + Endpoint: "endpoint", + Service: "service", + Operation: "op", + ScheduleToCloseTimeout: durationpb.New(30 * time.Minute), + ScheduleToStartTimeout: durationpb.New(time.Hour), + StartToCloseTimeout: durationpb.New(2 * time.Hour), + }, + }, + }) + require.NoError(t, err) + require.Len(t, tcx.history.Events, 1) + eAttrs := tcx.history.Events[0].GetNexusOperationScheduledEventAttributes() + require.Equal(t, 30*time.Minute, eAttrs.ScheduleToStartTimeout.AsDuration()) + require.Equal(t, 30*time.Minute, eAttrs.StartToCloseTimeout.AsDuration()) + require.Equal(t, 30*time.Minute, eAttrs.ScheduleToCloseTimeout.AsDuration()) + }) + + t.Run("secondary timeouts not trimmed when less than schedule-to-close timeout", func(t *testing.T) { + tcx := newTestContext(t, defaultConfig) + err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ + Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ + Endpoint: "endpoint", + Service: "service", + Operation: "op", + ScheduleToCloseTimeout: durationpb.New(time.Hour), + ScheduleToStartTimeout: durationpb.New(20 * time.Minute), + StartToCloseTimeout: durationpb.New(30 * time.Minute), + }, + }, + }) + require.NoError(t, err) + require.Len(t, tcx.history.Events, 1) + eAttrs := tcx.history.Events[0].GetNexusOperationScheduledEventAttributes() + require.Equal(t, 20*time.Minute, eAttrs.ScheduleToStartTimeout.AsDuration()) + require.Equal(t, 30*time.Minute, eAttrs.StartToCloseTimeout.AsDuration()) + require.Equal(t, time.Hour, eAttrs.ScheduleToCloseTimeout.AsDuration()) + }) + t.Run("sets event attributes with UserMetadata and spawns a child operation machine", func(t *testing.T) { tcx := newTestContext(t, defaultConfig) cAttrs := &commandpb.ScheduleNexusOperationCommandAttributes{ diff --git a/go.mod b/go.mod index c19671cead..411dd4c2c1 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( go.opentelemetry.io/otel/sdk v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 - go.temporal.io/api v1.60.1-0.20260108175916-49710b8392af + go.temporal.io/api v1.60.1-0.20260113171616-51aa2c291ff8 go.temporal.io/sdk v1.38.0 go.uber.org/fx v1.24.0 go.uber.org/mock v0.6.0 diff --git a/go.sum b/go.sum index 7b7ab8a5f3..62c18cc63e 100644 --- a/go.sum +++ b/go.sum @@ -371,8 +371,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.60.1-0.20260108175916-49710b8392af h1:qa9w0zjQ7ZLgqaB8msld09oos/ORj6yQ4qQrFpMwuPc= -go.temporal.io/api v1.60.1-0.20260108175916-49710b8392af/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.60.1-0.20260113171616-51aa2c291ff8 h1:Btl9joj1fKCqa3ljjAL2TZt87fzx/EHLtVaStOHtISU= +go.temporal.io/api v1.60.1-0.20260113171616-51aa2c291ff8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4= go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/proto/internal/temporal/server/api/persistence/v1/executions.proto b/proto/internal/temporal/server/api/persistence/v1/executions.proto index d747ac0d7c..8e1c519f6a 100644 --- a/proto/internal/temporal/server/api/persistence/v1/executions.proto +++ b/proto/internal/temporal/server/api/persistence/v1/executions.proto @@ -812,6 +812,19 @@ message NexusOperationInfo { // Endpoint ID, the name is also stored here (field 1) but we use the ID internally to avoid failing operation // requests when an endpoint is renamed. string endpoint_id = 15; + + // Schedule-to-start timeout for this operation. + // (-- api-linter: core::0140::prepositions=disabled + // aip.dev/not-precedent: "to" is used to indicate interval. --) + google.protobuf.Duration schedule_to_start_timeout = 16; + + // Start-to-close timeout for this operation. + // (-- api-linter: core::0140::prepositions=disabled + // aip.dev/not-precedent: "to" is used to indicate interval. --) + google.protobuf.Duration start_to_close_timeout = 17; + + // Time the operation was started (only available for async operations). + google.protobuf.Timestamp started_time = 18; } // NexusOperationCancellationInfo contains the state of a nexus operation cancelation. diff --git a/service/history/api/describeworkflow/api.go b/service/history/api/describeworkflow/api.go index 6632965c23..08569e6c5a 100644 --- a/service/history/api/describeworkflow/api.go +++ b/service/history/api/describeworkflow/api.go @@ -619,6 +619,8 @@ func buildPendingNexusOperationInfo( OperationToken: op.OperationToken, ScheduledEventId: scheduledEventID, ScheduleToCloseTimeout: op.ScheduleToCloseTimeout, + ScheduleToStartTimeout: op.ScheduleToStartTimeout, + StartToCloseTimeout: op.StartToCloseTimeout, ScheduledTime: op.ScheduledTime, State: state, Attempt: op.Attempt, diff --git a/service/history/workflow/task_generator_test.go b/service/history/workflow/task_generator_test.go index 93423893f4..2fe570139d 100644 --- a/service/history/workflow/task_generator_test.go +++ b/service/history/workflow/task_generator_test.go @@ -518,7 +518,7 @@ func TestTaskGenerator_GenerateDirtySubStateMachineTasks(t *testing.T) { }, MachineTransitionCount: 1, }, - Type: nexusoperations.TaskTypeTimeout, + Type: nexusoperations.TaskTypeScheduleToCloseTimeout, Data: nil, }, timers[1].Infos[0]) } diff --git a/tests/nexus_workflow_test.go b/tests/nexus_workflow_test.go index 7f1d8980ec..3acc6a8549 100644 --- a/tests/nexus_workflow_test.go +++ b/tests/nexus_workflow_test.go @@ -2706,6 +2706,331 @@ func (s *NexusWorkflowTestSuite) TestNexusAsyncOperationWithMultipleCallers() { } } +func (s *NexusWorkflowTestSuite) TestNexusOperationScheduleToCloseTimeout() { + ctx := testcore.NewContext() + taskQueue := testcore.RandomizeStr(s.T().Name()) + endpointName := testcore.RandomizedNexusEndpoint(s.T().Name()) + + _, err := s.OperatorClient().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ + Spec: &nexuspb.EndpointSpec{ + Name: endpointName, + Target: &nexuspb.EndpointTarget{ + Variant: &nexuspb.EndpointTarget_Worker_{ + Worker: &nexuspb.EndpointTarget_Worker{ + Namespace: s.Namespace().String(), + TaskQueue: "unreachable-for-test", + }, + }, + }, + }, + }) + s.NoError(err) + + run, err := s.SdkClient().ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + TaskQueue: taskQueue, + }, "workflow") + s.NoError(err) + + // Schedule the operation with a short schedule-to-close timeout + pollResp, err := s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskQueue, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + Identity: "test", + }) + s.NoError(err) + _, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ + Identity: "test", + TaskToken: pollResp.TaskToken, + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION, + Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ + Endpoint: endpointName, + Service: "service", + Operation: "operation", + Input: s.mustToPayload("input"), + ScheduleToCloseTimeout: durationpb.New(2 * time.Second), + }, + }, + }, + }, + }) + s.NoError(err) + + descResp, err := s.SdkClient().DescribeWorkflowExecution(ctx, run.GetID(), run.GetRunID()) + s.NoError(err) + s.Len(descResp.PendingNexusOperations, 1) + s.Equal(2*time.Second, descResp.PendingNexusOperations[0].ScheduleToCloseTimeout.AsDuration()) + + // Now wait for the timeout event + pollResp, err = s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskQueue, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + Identity: "test", + }) + s.NoError(err) + + // Verify we got a timeout event with the correct timeout type + timedOutEventIdx := slices.IndexFunc(pollResp.History.Events, func(e *historypb.HistoryEvent) bool { + return e.GetNexusOperationTimedOutEventAttributes() != nil + }) + s.Positive(timedOutEventIdx) + timedOutEvent := pollResp.History.Events[timedOutEventIdx] + s.Equal(enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, + timedOutEvent.GetNexusOperationTimedOutEventAttributes().GetFailure().GetCause().GetTimeoutFailureInfo().GetTimeoutType()) + + // Complete the workflow + _, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ + Identity: "test", + TaskToken: pollResp.TaskToken, + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ + CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}, + }, + }, + }, + }) + s.NoError(err) + s.NoError(run.Get(ctx, nil)) +} + +func (s *NexusWorkflowTestSuite) TestNexusOperationScheduleToStartTimeout() { + ctx := testcore.NewContext() + taskQueue := testcore.RandomizeStr(s.T().Name()) + endpointName := testcore.RandomizedNexusEndpoint(s.T().Name()) + + _, err := s.OperatorClient().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ + Spec: &nexuspb.EndpointSpec{ + Name: endpointName, + Target: &nexuspb.EndpointTarget{ + Variant: &nexuspb.EndpointTarget_Worker_{ + Worker: &nexuspb.EndpointTarget_Worker{ + Namespace: s.Namespace().String(), + TaskQueue: "unreachable-for-test", + }, + }, + }, + }, + }) + s.NoError(err) + + run, err := s.SdkClient().ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + TaskQueue: taskQueue, + }, "workflow") + s.NoError(err) + + // Schedule the operation with a short schedule-to-close timeout + pollResp, err := s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskQueue, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + Identity: "test", + }) + s.NoError(err) + _, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ + Identity: "test", + TaskToken: pollResp.TaskToken, + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION, + Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ + Endpoint: endpointName, + Service: "service", + Operation: "operation", + Input: s.mustToPayload("input"), + ScheduleToStartTimeout: durationpb.New(2 * time.Second), + }, + }, + }, + }, + }) + s.NoError(err) + + descResp, err := s.SdkClient().DescribeWorkflowExecution(ctx, run.GetID(), run.GetRunID()) + s.NoError(err) + s.Len(descResp.PendingNexusOperations, 1) + s.Equal(2*time.Second, descResp.PendingNexusOperations[0].ScheduleToStartTimeout.AsDuration()) + + // Now wait for the timeout event + pollResp, err = s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskQueue, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + Identity: "test", + }) + s.NoError(err) + + // Verify we got a timeout event with the correct timeout type + timedOutEventIdx := slices.IndexFunc(pollResp.History.Events, func(e *historypb.HistoryEvent) bool { + return e.GetNexusOperationTimedOutEventAttributes() != nil + }) + s.Positive(timedOutEventIdx) + timedOutEvent := pollResp.History.Events[timedOutEventIdx] + s.Equal(enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, + timedOutEvent.GetNexusOperationTimedOutEventAttributes().GetFailure().GetCause().GetTimeoutFailureInfo().GetTimeoutType()) + + // Complete the workflow + _, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ + Identity: "test", + TaskToken: pollResp.TaskToken, + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ + CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}, + }, + }, + }, + }) + s.NoError(err) + s.NoError(run.Get(ctx, nil)) +} + +func (s *NexusWorkflowTestSuite) TestNexusOperationStartToCloseTimeout() { + ctx := testcore.NewContext() + taskQueue := testcore.RandomizeStr(s.T().Name()) + endpointName := testcore.RandomizedNexusEndpoint(s.T().Name()) + + // Handler that starts quickly (returns async) but never completes + h := nexustest.Handler{ + OnStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { + // Return async start immediately + return &nexus.HandlerStartOperationResultAsync{OperationToken: "test-op-token"}, nil + }, + } + listenAddr := nexustest.AllocListenAddress() + nexustest.NewNexusServer(s.T(), listenAddr, h) + + _, err := s.OperatorClient().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ + Spec: &nexuspb.EndpointSpec{ + Name: endpointName, + Target: &nexuspb.EndpointTarget{ + Variant: &nexuspb.EndpointTarget_External_{ + External: &nexuspb.EndpointTarget_External{ + Url: "http://" + listenAddr, + }, + }, + }, + }, + }) + s.NoError(err) + + run, err := s.SdkClient().ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + TaskQueue: taskQueue, + }, "workflow") + s.NoError(err) + + // Schedule the operation with a short start-to-close timeout + pollResp, err := s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskQueue, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + Identity: "test", + }) + s.NoError(err) + _, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ + Identity: "test", + TaskToken: pollResp.TaskToken, + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION, + Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ + ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ + Endpoint: endpointName, + Service: "service", + Operation: "operation", + Input: s.mustToPayload("input"), + StartToCloseTimeout: durationpb.New(2 * time.Second), + }, + }, + }, + }, + }) + s.NoError(err) + + descResp, err := s.SdkClient().DescribeWorkflowExecution(ctx, run.GetID(), run.GetRunID()) + s.NoError(err) + s.Len(descResp.PendingNexusOperations, 1) + s.Equal(2*time.Second, descResp.PendingNexusOperations[0].StartToCloseTimeout.AsDuration()) + + // Wait for the started event first + pollResp, err = s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskQueue, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + Identity: "test", + }) + s.NoError(err) + + // Verify we got a started event + startedEventIdx := slices.IndexFunc(pollResp.History.Events, func(e *historypb.HistoryEvent) bool { + return e.GetNexusOperationStartedEventAttributes() != nil + }) + s.Positive(startedEventIdx) + + // Respond to acknowledge the started event + _, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ + Identity: "test", + TaskToken: pollResp.TaskToken, + }) + s.NoError(err) + + // Now wait for the timeout event + pollResp, err = s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskQueue, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + Identity: "test", + }) + s.NoError(err) + + // Verify we got a timeout event with the correct timeout type + timedOutEventIdx := slices.IndexFunc(pollResp.History.Events, func(e *historypb.HistoryEvent) bool { + return e.GetNexusOperationTimedOutEventAttributes() != nil + }) + s.Positive(timedOutEventIdx) + timedOutEvent := pollResp.History.Events[timedOutEventIdx] + s.Equal(enumspb.TIMEOUT_TYPE_START_TO_CLOSE, + timedOutEvent.GetNexusOperationTimedOutEventAttributes().GetFailure().GetCause().GetTimeoutFailureInfo().GetTimeoutType()) + s.Contains(timedOutEvent.GetNexusOperationTimedOutEventAttributes().GetFailure().GetCause().GetMessage(), "after starting") + + // Complete the workflow + _, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ + Identity: "test", + TaskToken: pollResp.TaskToken, + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ + CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}, + }, + }, + }, + }) + s.NoError(err) + s.NoError(run.Get(ctx, nil)) +} + // generateValidCallbackToken creates a valid callback token for testing with the given namespace, workflow, and run IDs func (s *NexusWorkflowTestSuite) generateValidCallbackToken(namespaceID, workflowID, runID string) (string, error) { gen := &commonnexus.CallbackTokenGenerator{}