diff --git a/api/persistence/v1/executions.pb.go b/api/persistence/v1/executions.pb.go index 2e6d4d5654..ffef9dd74c 100644 --- a/api/persistence/v1/executions.pb.go +++ b/api/persistence/v1/executions.pb.go @@ -3675,19 +3675,7 @@ type NexusOperationInfo struct { NextAttemptScheduleTime *timestamppb.Timestamp `protobuf:"bytes,14,opt,name=next_attempt_schedule_time,json=nextAttemptScheduleTime,proto3" json:"next_attempt_schedule_time,omitempty"` // Endpoint ID, the name is also stored here (field 1) but we use the ID internally to avoid failing operation // requests when an endpoint is renamed. - EndpointId string `protobuf:"bytes,15,opt,name=endpoint_id,json=endpointId,proto3" json:"endpoint_id,omitempty"` - // Schedule-to-start timeout for this operation. - // (-- api-linter: core::0140::prepositions=disabled - // - // aip.dev/not-precedent: "to" is used to indicate interval. --) - ScheduleToStartTimeout *durationpb.Duration `protobuf:"bytes,16,opt,name=schedule_to_start_timeout,json=scheduleToStartTimeout,proto3" json:"schedule_to_start_timeout,omitempty"` - // Start-to-close timeout for this operation. - // (-- api-linter: core::0140::prepositions=disabled - // - // aip.dev/not-precedent: "to" is used to indicate interval. --) - StartToCloseTimeout *durationpb.Duration `protobuf:"bytes,17,opt,name=start_to_close_timeout,json=startToCloseTimeout,proto3" json:"start_to_close_timeout,omitempty"` - // Time the operation was started (only available for async operations). - StartedTime *timestamppb.Timestamp `protobuf:"bytes,18,opt,name=started_time,json=startedTime,proto3" json:"started_time,omitempty"` + EndpointId string `protobuf:"bytes,15,opt,name=endpoint_id,json=endpointId,proto3" json:"endpoint_id,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -3820,27 +3808,6 @@ func (x *NexusOperationInfo) GetEndpointId() string { return "" } -func (x *NexusOperationInfo) GetScheduleToStartTimeout() *durationpb.Duration { - if x != nil { - return x.ScheduleToStartTimeout - } - return nil -} - -func (x *NexusOperationInfo) GetStartToCloseTimeout() *durationpb.Duration { - if x != nil { - return x.StartToCloseTimeout - } - return nil -} - -func (x *NexusOperationInfo) GetStartedTime() *timestamppb.Timestamp { - if x != nil { - return x.StartedTime - } - return nil -} - // NexusOperationCancellationInfo contains the state of a nexus operation cancelation. type NexusOperationCancellationInfo struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -4980,7 +4947,7 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "\x0eWorkflowClosed\x1a\x80\x01\n" + "\aTrigger\x12j\n" + "\x0fworkflow_closed\x18\x01 \x01(\v2?.temporal.server.api.persistence.v1.CallbackInfo.WorkflowClosedH\x00R\x0eworkflowClosedB\t\n" + - "\avariant\"\xf2\a\n" + + "\avariant\"\x8d\x06\n" + "\x12NexusOperationInfo\x12\x1a\n" + "\bendpoint\x18\x01 \x01(\tR\bendpoint\x12\x18\n" + "\aservice\x18\x02 \x01(\tR\aservice\x12\x1c\n" + @@ -4998,10 +4965,7 @@ const file_temporal_server_api_persistence_v1_executions_proto_rawDesc = "" + "\x14last_attempt_failure\x18\r \x01(\v2 .temporal.api.failure.v1.FailureR\x12lastAttemptFailure\x12W\n" + "\x1anext_attempt_schedule_time\x18\x0e \x01(\v2\x1a.google.protobuf.TimestampR\x17nextAttemptScheduleTime\x12\x1f\n" + "\vendpoint_id\x18\x0f \x01(\tR\n" + - "endpointId\x12T\n" + - "\x19schedule_to_start_timeout\x18\x10 \x01(\v2\x19.google.protobuf.DurationR\x16scheduleToStartTimeout\x12N\n" + - "\x16start_to_close_timeout\x18\x11 \x01(\v2\x19.google.protobuf.DurationR\x13startToCloseTimeout\x12=\n" + - "\fstarted_time\x18\x12 \x01(\v2\x1a.google.protobuf.TimestampR\vstartedTimeJ\x04\b\x04\x10\x05\"\xff\x03\n" + + "endpointIdJ\x04\b\x04\x10\x05\"\xff\x03\n" + "\x1eNexusOperationCancellationInfo\x12A\n" + "\x0erequested_time\x18\x01 \x01(\v2\x1a.google.protobuf.TimestampR\rrequestedTime\x12L\n" + "\x05state\x18\x02 \x01(\x0e26.temporal.api.enums.v1.NexusOperationCancellationStateR\x05state\x12\x18\n" + @@ -5245,32 +5209,29 @@ var file_temporal_server_api_persistence_v1_executions_proto_depIdxs = []int32{ 43, // 121: temporal.server.api.persistence.v1.NexusOperationInfo.last_attempt_complete_time:type_name -> google.protobuf.Timestamp 68, // 122: temporal.server.api.persistence.v1.NexusOperationInfo.last_attempt_failure:type_name -> temporal.api.failure.v1.Failure 43, // 123: temporal.server.api.persistence.v1.NexusOperationInfo.next_attempt_schedule_time:type_name -> google.protobuf.Timestamp - 44, // 124: temporal.server.api.persistence.v1.NexusOperationInfo.schedule_to_start_timeout:type_name -> google.protobuf.Duration - 44, // 125: temporal.server.api.persistence.v1.NexusOperationInfo.start_to_close_timeout:type_name -> google.protobuf.Duration - 43, // 126: temporal.server.api.persistence.v1.NexusOperationInfo.started_time:type_name -> google.protobuf.Timestamp - 43, // 127: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.requested_time:type_name -> google.protobuf.Timestamp - 79, // 128: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.state:type_name -> temporal.api.enums.v1.NexusOperationCancellationState - 43, // 129: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.last_attempt_complete_time:type_name -> google.protobuf.Timestamp - 68, // 130: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.last_attempt_failure:type_name -> temporal.api.failure.v1.Failure - 43, // 131: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.next_attempt_schedule_time:type_name -> google.protobuf.Timestamp - 43, // 132: temporal.server.api.persistence.v1.WorkflowPauseInfo.pause_time:type_name -> google.protobuf.Timestamp - 80, // 133: temporal.server.api.persistence.v1.ShardInfo.QueueStatesEntry.value:type_name -> temporal.server.api.persistence.v1.QueueState - 81, // 134: temporal.server.api.persistence.v1.WorkflowExecutionInfo.SearchAttributesEntry.value:type_name -> temporal.api.common.v1.Payload - 81, // 135: temporal.server.api.persistence.v1.WorkflowExecutionInfo.MemoEntry.value:type_name -> temporal.api.common.v1.Payload - 82, // 136: temporal.server.api.persistence.v1.WorkflowExecutionInfo.UpdateInfosEntry.value:type_name -> temporal.server.api.persistence.v1.UpdateInfo - 83, // 137: temporal.server.api.persistence.v1.WorkflowExecutionInfo.SubStateMachinesByTypeEntry.value:type_name -> temporal.server.api.persistence.v1.StateMachineMap - 24, // 138: temporal.server.api.persistence.v1.WorkflowExecutionInfo.ChildrenInitializedPostResetPointEntry.value:type_name -> temporal.server.api.persistence.v1.ResetChildInfo - 4, // 139: temporal.server.api.persistence.v1.WorkflowExecutionState.RequestIdsEntry.value:type_name -> temporal.server.api.persistence.v1.RequestIDInfo - 43, // 140: temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.pause_time:type_name -> google.protobuf.Timestamp - 37, // 141: temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.manual:type_name -> temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.Manual - 40, // 142: temporal.server.api.persistence.v1.Callback.Nexus.header:type_name -> temporal.server.api.persistence.v1.Callback.Nexus.HeaderEntry - 84, // 143: temporal.server.api.persistence.v1.Callback.HSM.ref:type_name -> temporal.server.api.persistence.v1.StateMachineRef - 41, // 144: temporal.server.api.persistence.v1.CallbackInfo.Trigger.workflow_closed:type_name -> temporal.server.api.persistence.v1.CallbackInfo.WorkflowClosed - 145, // [145:145] is the sub-list for method output_type - 145, // [145:145] is the sub-list for method input_type - 145, // [145:145] is the sub-list for extension type_name - 145, // [145:145] is the sub-list for extension extendee - 0, // [0:145] is the sub-list for field type_name + 43, // 124: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.requested_time:type_name -> google.protobuf.Timestamp + 79, // 125: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.state:type_name -> temporal.api.enums.v1.NexusOperationCancellationState + 43, // 126: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.last_attempt_complete_time:type_name -> google.protobuf.Timestamp + 68, // 127: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.last_attempt_failure:type_name -> temporal.api.failure.v1.Failure + 43, // 128: temporal.server.api.persistence.v1.NexusOperationCancellationInfo.next_attempt_schedule_time:type_name -> google.protobuf.Timestamp + 43, // 129: temporal.server.api.persistence.v1.WorkflowPauseInfo.pause_time:type_name -> google.protobuf.Timestamp + 80, // 130: temporal.server.api.persistence.v1.ShardInfo.QueueStatesEntry.value:type_name -> temporal.server.api.persistence.v1.QueueState + 81, // 131: temporal.server.api.persistence.v1.WorkflowExecutionInfo.SearchAttributesEntry.value:type_name -> temporal.api.common.v1.Payload + 81, // 132: temporal.server.api.persistence.v1.WorkflowExecutionInfo.MemoEntry.value:type_name -> temporal.api.common.v1.Payload + 82, // 133: temporal.server.api.persistence.v1.WorkflowExecutionInfo.UpdateInfosEntry.value:type_name -> temporal.server.api.persistence.v1.UpdateInfo + 83, // 134: temporal.server.api.persistence.v1.WorkflowExecutionInfo.SubStateMachinesByTypeEntry.value:type_name -> temporal.server.api.persistence.v1.StateMachineMap + 24, // 135: temporal.server.api.persistence.v1.WorkflowExecutionInfo.ChildrenInitializedPostResetPointEntry.value:type_name -> temporal.server.api.persistence.v1.ResetChildInfo + 4, // 136: temporal.server.api.persistence.v1.WorkflowExecutionState.RequestIdsEntry.value:type_name -> temporal.server.api.persistence.v1.RequestIDInfo + 43, // 137: temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.pause_time:type_name -> google.protobuf.Timestamp + 37, // 138: temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.manual:type_name -> temporal.server.api.persistence.v1.ActivityInfo.PauseInfo.Manual + 40, // 139: temporal.server.api.persistence.v1.Callback.Nexus.header:type_name -> temporal.server.api.persistence.v1.Callback.Nexus.HeaderEntry + 84, // 140: temporal.server.api.persistence.v1.Callback.HSM.ref:type_name -> temporal.server.api.persistence.v1.StateMachineRef + 41, // 141: temporal.server.api.persistence.v1.CallbackInfo.Trigger.workflow_closed:type_name -> temporal.server.api.persistence.v1.CallbackInfo.WorkflowClosed + 142, // [142:142] is the sub-list for method output_type + 142, // [142:142] is the sub-list for method input_type + 142, // [142:142] is the sub-list for extension type_name + 142, // [142:142] is the sub-list for extension extendee + 0, // [0:142] is the sub-list for field type_name } func init() { file_temporal_server_api_persistence_v1_executions_proto_init() } diff --git a/components/nexusoperations/events_test.go b/components/nexusoperations/events_test.go index 1da26dff45..d5df00d0ec 100644 --- a/components/nexusoperations/events_test.go +++ b/components/nexusoperations/events_test.go @@ -11,15 +11,12 @@ import ( "go.temporal.io/server/components/nexusoperations" "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/hsm/hsmtest" - "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) func TestCherryPick(t *testing.T) { setup := func(t *testing.T) (*hsm.Node, nexusoperations.Operation, int64) { - node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(time.Hour), - })) + node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Hour)) op, err := hsm.MachineData[nexusoperations.Operation](node) require.NoError(t, err) eventID, err := hsm.EventIDFromToken(op.ScheduledEventToken) @@ -162,9 +159,7 @@ func TestTerminalStatesDeletion(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(time.Hour), - })) + node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Hour)) op, err := hsm.MachineData[nexusoperations.Operation](node) require.NoError(t, err) eventID, err := hsm.EventIDFromToken(op.ScheduledEventToken) diff --git a/components/nexusoperations/executors.go b/components/nexusoperations/executors.go index 717a1c905b..222a5a07e9 100644 --- a/components/nexusoperations/executors.go +++ b/components/nexusoperations/executors.go @@ -32,19 +32,10 @@ import ( "go.uber.org/fx" ) -type operationTimeoutBelowMinError struct { - timeoutType enumspb.TimeoutType -} - -func (o *operationTimeoutBelowMinError) Error() string { - return fmt.Sprintf("not enough time to execute another request before %s timeout", o.timeoutType.String()) -} - +var ErrOperationTimeoutBelowMin = errors.New("remaining operation timeout is less than required minimum") var ErrInvalidOperationToken = errors.New("invalid operation token") var errRequestTimedOut = errors.New("request timed out") -const maxDuration = time.Duration(1<<63 - 1) - // ClientProvider provides a nexus client for a given endpoint. type ClientProvider func(ctx context.Context, namespaceID string, entry *persistencespb.NexusEndpointEntry, service string) (*nexusrpc.HTTPClient, error) @@ -80,19 +71,7 @@ func RegisterExecutor( } if err := hsm.RegisterTimerExecutor( registry, - exec.executeScheduleToCloseTimeoutTask, - ); err != nil { - return err - } - if err := hsm.RegisterTimerExecutor( - registry, - exec.executeScheduleToStartTimeoutTask, - ); err != nil { - return err - } - if err := hsm.RegisterTimerExecutor( - registry, - exec.executeStartToCloseTimeoutTask, + exec.executeTimeoutTask, ); err != nil { return err } @@ -205,33 +184,17 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ return fmt.Errorf("%w: %w", queueserrors.NewUnprocessableTaskError("failed to generate a callback token"), err) } + header := nexus.Header(args.header) callTimeout := e.Config.RequestTimeout(ns.Name().String(), task.EndpointName) - var timeoutType enumspb.TimeoutType - // Adjust timeout based on remaining operation timeouts. - // ScheduleToStart takes precedence over ScheduleToClose since it is already capped by it. - if args.scheduleToStartTimeout > 0 { - callTimeout = min(callTimeout, args.scheduleToStartTimeout-time.Since(args.scheduledTime)) - timeoutType = enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START - } else if args.scheduleToCloseTimeout > 0 { - callTimeout = min(callTimeout, args.scheduleToCloseTimeout-time.Since(args.scheduledTime)) - timeoutType = enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE - } - // Inform the handler of the operation timeout via header. - // StartToClose takes precedence over ScheduleToClose since it is already capped by it. - opTimeout := maxDuration - if args.startToCloseTimeout > 0 { - opTimeout = args.startToCloseTimeout - } if args.scheduleToCloseTimeout > 0 { - opTimeout = min(args.scheduleToCloseTimeout-time.Since(args.scheduledTime), opTimeout) - } - header := nexus.Header(args.header) - // Set the operation timeout header if not already set. - if opTimeoutHeader := header.Get(nexus.HeaderOperationTimeout); opTimeout != maxDuration && opTimeoutHeader == "" { - if header == nil { - header = make(nexus.Header, 1) + opTimeout := args.scheduleToCloseTimeout - time.Since(args.scheduledTime) + callTimeout = min(callTimeout, opTimeout) + if opTimeoutHeader := header.Get(nexus.HeaderOperationTimeout); opTimeoutHeader == "" { + if header == nil { + header = make(nexus.Header, 1) + } + header[nexus.HeaderOperationTimeout] = opTimeout.String() } - header[nexus.HeaderOperationTimeout] = commonnexus.FormatDuration(opTimeout) } callCtx, cancel := context.WithTimeout(ctx, callTimeout) @@ -271,7 +234,7 @@ func (e taskExecutor) executeInvocationTask(ctx context.Context, env hsm.Environ var rawResult *nexusrpc.ClientStartOperationResponse[*nexus.LazyValue] var callErr error if callTimeout < e.Config.MinRequestTimeout(ns.Name().String()) { - callErr = &operationTimeoutBelowMinError{timeoutType: timeoutType} + callErr = ErrOperationTimeoutBelowMin } else { rawResult, callErr = client.StartOperation(callCtx, args.operation, args.payload, nexus.StartOperationOptions{ Header: header, @@ -348,9 +311,7 @@ type startArgs struct { endpointName string endpointID string scheduledTime time.Time - scheduleToStartTimeout time.Duration scheduleToCloseTimeout time.Duration - startToCloseTimeout time.Duration header map[string]string payload *commonpb.Payload nexusLink nexus.Link @@ -375,18 +336,15 @@ func (e taskExecutor) loadOperationArgs( args.service = operation.Service args.operation = operation.Operation args.requestID = operation.RequestId - args.scheduleToCloseTimeout = operation.ScheduleToCloseTimeout.AsDuration() - args.scheduleToStartTimeout = operation.ScheduleToStartTimeout.AsDuration() - args.startToCloseTimeout = operation.StartToCloseTimeout.AsDuration() eventToken = operation.ScheduledEventToken event, err := node.LoadHistoryEvent(ctx, eventToken) if err != nil { return nil } args.scheduledTime = event.EventTime.AsTime() - attrs := event.GetNexusOperationScheduledEventAttributes() - args.payload = attrs.GetInput() - args.header = attrs.GetNexusHeader() + args.scheduleToCloseTimeout = event.GetNexusOperationScheduledEventAttributes().GetScheduleToCloseTimeout().AsDuration() + args.payload = event.GetNexusOperationScheduledEventAttributes().GetInput() + args.header = event.GetNexusOperationScheduledEventAttributes().GetNexusHeader() args.nexusLink = ConvertLinkWorkflowEventToNexusLink(&commonpb.Link_WorkflowEvent{ Namespace: ns.Name().String(), WorkflowId: ref.WorkflowKey.WorkflowID, @@ -479,7 +437,6 @@ func (e taskExecutor) saveResult(ctx context.Context, env hsm.Environment, ref h func (e taskExecutor) handleStartOperationError(env hsm.Environment, node *hsm.Node, operation Operation, callErr error) error { var handlerErr *nexus.HandlerError var opFailedErr *nexus.OperationError - var opTimeoutBelowMinErr *operationTimeoutBelowMinError switch { case errors.As(callErr, &opFailedErr): @@ -496,9 +453,9 @@ func (e taskExecutor) handleStartOperationError(env hsm.Environment, node *hsm.N // Following practices from workflow task completion payload size limit enforcement, we do not retry this // operation if the response's operation token is too large. return handleNonRetryableStartOperationError(node, operation, callErr) - case errors.As(callErr, &opTimeoutBelowMinErr): + case errors.Is(callErr, ErrOperationTimeoutBelowMin): // Not enough time to execute another request, resolve the operation with a timeout. - return e.recordOperationTimeout(node, opTimeoutBelowMinErr.timeoutType) + return e.recordOperationTimeout(node) case errors.Is(callErr, context.DeadlineExceeded) || errors.Is(callErr, context.Canceled): // If timed out, we don't leak internal info to the user callErr = errRequestTimedOut @@ -556,19 +513,11 @@ func (e taskExecutor) executeBackoffTask(env hsm.Environment, node *hsm.Node, ta }) } -func (e taskExecutor) executeScheduleToCloseTimeoutTask(env hsm.Environment, node *hsm.Node, task ScheduleToCloseTimeoutTask) error { - return e.recordOperationTimeout(node, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE) +func (e taskExecutor) executeTimeoutTask(env hsm.Environment, node *hsm.Node, task TimeoutTask) error { + return e.recordOperationTimeout(node) } -func (e taskExecutor) executeScheduleToStartTimeoutTask(env hsm.Environment, node *hsm.Node, task ScheduleToStartTimeoutTask) error { - return e.recordOperationTimeout(node, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START) -} - -func (e taskExecutor) executeStartToCloseTimeoutTask(env hsm.Environment, node *hsm.Node, task StartToCloseTimeoutTask) error { - return e.recordOperationTimeout(node, enumspb.TIMEOUT_TYPE_START_TO_CLOSE) -} - -func (e taskExecutor) recordOperationTimeout(node *hsm.Node, timeoutType enumspb.TimeoutType) error { +func (e taskExecutor) recordOperationTimeout(node *hsm.Node) error { return hsm.MachineTransition(node, func(op Operation) (hsm.TransitionOutput, error) { eventID, err := hsm.EventIDFromToken(op.ScheduledEventToken) if err != nil { @@ -585,7 +534,7 @@ func (e taskExecutor) recordOperationTimeout(node *hsm.Node, timeoutType enumspb Message: "operation timed out", FailureInfo: &failurepb.Failure_TimeoutFailureInfo{ TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ - TimeoutType: timeoutType, + TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, }, }, }, @@ -625,15 +574,9 @@ func (e taskExecutor) executeCancelationTask(ctx context.Context, env hsm.Enviro } callTimeout := e.Config.RequestTimeout(ns.Name().String(), task.EndpointName) - var timeoutType enumspb.TimeoutType - // Adjust timeout based on remaining operation timeouts. - if args.startToCloseTimeout > 0 { - callTimeout = min(callTimeout, args.startToCloseTimeout-time.Since(args.startedTime)) - timeoutType = enumspb.TIMEOUT_TYPE_START_TO_CLOSE - } if args.scheduleToCloseTimeout > 0 { - callTimeout = min(callTimeout, args.scheduleToCloseTimeout-time.Since(args.scheduledTime)) - timeoutType = enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE + opTimeout := args.scheduleToCloseTimeout - time.Since(args.scheduledTime) + callTimeout = min(callTimeout, opTimeout) } callCtx, cancel := context.WithTimeout(ctx, callTimeout) defer cancel() @@ -675,7 +618,7 @@ func (e taskExecutor) executeCancelationTask(ctx context.Context, env hsm.Enviro var callErr error startTime := time.Now() if callTimeout < e.Config.MinRequestTimeout(ns.Name().String()) { - callErr = &operationTimeoutBelowMinError{timeoutType: timeoutType} + callErr = ErrOperationTimeoutBelowMin } else { callErr = handle.Cancel(callCtx, nexus.CancelOperationOptions{Header: nexus.Header(args.headers)}) } @@ -709,8 +652,6 @@ func (e taskExecutor) executeCancelationTask(ctx context.Context, env hsm.Enviro type cancelArgs struct { service, operation, token, endpointID, endpointName, requestID string scheduledTime time.Time - startedTime time.Time - startToCloseTimeout time.Duration scheduleToCloseTimeout time.Duration scheduledEventID int64 headers map[string]string @@ -736,9 +677,7 @@ func (e taskExecutor) loadArgsForCancelation(ctx context.Context, env hsm.Enviro args.endpointName = op.Endpoint args.requestID = op.RequestId args.scheduledTime = op.ScheduledTime.AsTime() - args.startedTime = op.StartedTime.AsTime() args.scheduleToCloseTimeout = op.ScheduleToCloseTimeout.AsDuration() - args.startToCloseTimeout = op.StartToCloseTimeout.AsDuration() args.scheduledEventID, err = hsm.EventIDFromToken(op.ScheduledEventToken) if err != nil { return err @@ -761,8 +700,7 @@ func (e taskExecutor) saveCancelationResult(ctx context.Context, env hsm.Environ return hsm.MachineTransition(n, func(c Cancelation) (hsm.TransitionOutput, error) { if callErr != nil { var handlerErr *nexus.HandlerError - var opTimeoutBelowMinErr *operationTimeoutBelowMinError - isRetryable := !errors.As(callErr, &opTimeoutBelowMinErr) && (!errors.As(callErr, &handlerErr) || handlerErr.Retryable()) + isRetryable := !errors.Is(callErr, ErrOperationTimeoutBelowMin) && (!errors.As(callErr, &handlerErr) || handlerErr.Retryable()) failure, err := callErrToFailure(callErr, isRetryable) if err != nil { return hsm.TransitionOutput{}, err @@ -865,8 +803,7 @@ func startCallOutcomeTag(callCtx context.Context, result *nexusrpc.ClientStartOp var opFailedError *nexus.OperationError if callErr != nil { - var opTimeoutBelowMinErr *operationTimeoutBelowMinError - if errors.As(callErr, &opTimeoutBelowMinErr) { + if errors.Is(callErr, ErrOperationTimeoutBelowMin) { return "operation-timeout" } if callCtx.Err() != nil { @@ -888,8 +825,7 @@ func startCallOutcomeTag(callCtx context.Context, result *nexusrpc.ClientStartOp func cancelCallOutcomeTag(callCtx context.Context, callErr error) string { var handlerErr *nexus.HandlerError if callErr != nil { - var opTimeoutBelowMinErr *operationTimeoutBelowMinError - if errors.As(callErr, &opTimeoutBelowMinErr) { + if errors.Is(callErr, ErrOperationTimeoutBelowMin) { return "operation-timeout" } if callCtx.Err() != nil { @@ -918,8 +854,10 @@ func isDestinationDown(err error) bool { if errors.Is(err, ErrInvalidOperationToken) { return false } - var opTimeoutBelowMinErr *operationTimeoutBelowMinError - return !errors.As(err, &opTimeoutBelowMinErr) + if errors.Is(err, ErrOperationTimeoutBelowMin) { + return false + } + return true } func callErrToFailure(callErr error, retryable bool) (*failurepb.Failure, error) { diff --git a/components/nexusoperations/executors_test.go b/components/nexusoperations/executors_test.go index afa3347a5a..95dcb3f585 100644 --- a/components/nexusoperations/executors_test.go +++ b/components/nexusoperations/executors_test.go @@ -35,7 +35,6 @@ import ( "go.temporal.io/server/service/history/hsm/hsmtest" queueserrors "go.temporal.io/server/service/history/queues/errors" "go.uber.org/mock/gomock" - "google.golang.org/protobuf/types/known/durationpb" ) var endpointEntry = &persistencespb.NexusEndpointEntry{ @@ -86,8 +85,6 @@ func TestProcessInvocationTask(t *testing.T) { checkOutcome func(t *testing.T, op nexusoperations.Operation, events []*historypb.HistoryEvent) requestTimeout time.Duration schedToCloseTimeout time.Duration - startToCloseTimeout time.Duration - schedToStartTimeout time.Duration destinationDown bool }{ { @@ -149,9 +146,8 @@ func TestProcessInvocationTask(t *testing.T) { name: "sync start", requestTimeout: time.Hour, schedToCloseTimeout: time.Hour, - // To test this value is ignored when ScheduleToCloseTimeout is set (but still set on the header). - header: nexus.Header{nexus.HeaderOperationTimeout: commonnexus.FormatDuration(time.Millisecond)}, - destinationDown: false, + header: nexus.Header{nexus.HeaderOperationTimeout: time.Microsecond.String()}, // to test this value is ignored when ScheduleToCloseTimeout is set + destinationDown: false, onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { // Also use this test case to check the input and options provided. if service != "service" { @@ -166,9 +162,6 @@ func TestProcessInvocationTask(t *testing.T) { if options.CallbackURL != "http://localhost/callback" { return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "invalid callback URL") } - if options.Header.Get(nexus.HeaderOperationTimeout) != "1ms" { - return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "invalid operation timeout header: %s", options.Header.Get(nexus.HeaderOperationTimeout)) - } var v string if err := input.Consume(&v); err != nil || v != "input" { return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "invalid input") @@ -323,30 +316,6 @@ func TestProcessInvocationTask(t *testing.T) { destinationDown: true, expectedMetricOutcome: "request-timeout", onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { - opTimeout, err := time.ParseDuration(options.Header.Get(nexus.HeaderOperationTimeout)) - if err != nil || opTimeout > 10*time.Millisecond { - return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "invalid operation timeout header: %s", options.Header.Get(nexus.HeaderOperationTimeout)) - } - time.Sleep(time.Millisecond * 100) //nolint:forbidigo // Allow time.Sleep for timeout tests - return &nexus.HandlerStartOperationResultAsync{OperationToken: "op-token"}, nil - }, - checkOutcome: func(t *testing.T, op nexusoperations.Operation, events []*historypb.HistoryEvent) { - require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, op.State()) - require.NotNil(t, op.LastAttemptFailure.GetApplicationFailureInfo()) - require.Regexp(t, "request timed out", op.LastAttemptFailure.Message) - require.Empty(t, events) - }, - }, - { - name: "invocation timeout by ScheduleToStartTimeout", - requestTimeout: time.Hour, - schedToStartTimeout: 10 * time.Millisecond, - destinationDown: true, - expectedMetricOutcome: "request-timeout", - onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { - if options.Header.Get(nexus.HeaderOperationTimeout) != "" { - return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "operation timeout header should not be set, got: %s", options.Header.Get(nexus.HeaderOperationTimeout)) - } time.Sleep(time.Millisecond * 100) //nolint:forbidigo // Allow time.Sleep for timeout tests return &nexus.HandlerStartOperationResultAsync{OperationToken: "op-token"}, nil }, @@ -357,23 +326,6 @@ func TestProcessInvocationTask(t *testing.T) { require.Equal(t, 0, len(events)) }, }, - { - name: "operation timeout header set by StartToCloseTimeout", - requestTimeout: time.Hour, - startToCloseTimeout: 1 * time.Minute, - destinationDown: false, - expectedMetricOutcome: "pending", - onStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { - if options.Header.Get(nexus.HeaderOperationTimeout) != "60000ms" { - return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeBadRequest, "invalid operation timeout header: %s", options.Header.Get(nexus.HeaderOperationTimeout)) - } - return &nexus.HandlerStartOperationResultAsync{OperationToken: "op-token"}, nil - }, - checkOutcome: func(t *testing.T, op nexusoperations.Operation, events []*historypb.HistoryEvent) { - require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_STARTED, op.State()) - require.Len(t, events, 1) - }, - }, { name: "ScheduleToCloseTimeout less than MinRequestTimeout", requestTimeout: time.Hour, @@ -485,11 +437,7 @@ func TestProcessInvocationTask(t *testing.T) { nexustest.NewNexusServer(t, listenAddr, h) reg := newRegistry(t) - event := mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(tc.schedToCloseTimeout), - ScheduleToStartTimeout: durationpb.New(tc.schedToStartTimeout), - StartToCloseTimeout: durationpb.New(tc.startToCloseTimeout), - }) + event := mustNewScheduledEvent(time.Now(), tc.schedToCloseTimeout) if tc.eventHasNoEndpointID { event.GetNexusOperationScheduledEventAttributes().EndpointId = "" } @@ -608,9 +556,7 @@ func TestProcessInvocationTask(t *testing.T) { func TestProcessBackoffTask(t *testing.T) { reg := newRegistry(t) backend := &hsmtest.NodeBackend{} - node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(time.Hour), - })) + node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), time.Hour)) env := fakeEnv{node} require.NoError(t, nexusoperations.RegisterExecutor(reg, nexusoperations.TaskExecutorOptions{})) @@ -639,9 +585,7 @@ func TestProcessBackoffTask(t *testing.T) { func TestProcessTimeoutTask(t *testing.T) { reg := newRegistry(t) backend := &hsmtest.NodeBackend{} - node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(time.Hour), - })) + node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), time.Hour)) env := fakeEnv{node} require.NoError(t, nexusoperations.RegisterExecutor(reg, nexusoperations.TaskExecutorOptions{})) @@ -649,7 +593,7 @@ func TestProcessTimeoutTask(t *testing.T) { err := reg.ExecuteTimerTask( env, node, - nexusoperations.ScheduleToCloseTimeoutTask{}, + nexusoperations.TimeoutTask{}, ) require.NoError(t, err) op, err := hsm.MachineData[nexusoperations.Operation](node) @@ -682,102 +626,6 @@ func TestProcessTimeoutTask(t *testing.T) { }, backend.Events[0].GetNexusOperationTimedOutEventAttributes()) } -func TestProcessScheduleToStartTimeoutTask(t *testing.T) { - reg := newRegistry(t) - backend := &hsmtest.NodeBackend{} - node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(time.Hour), - })) - env := fakeEnv{node} - - require.NoError(t, nexusoperations.RegisterExecutor(reg, nexusoperations.TaskExecutorOptions{})) - - err := reg.ExecuteTimerTask( - env, - node, - nexusoperations.ScheduleToStartTimeoutTask{}, - ) - require.NoError(t, err) - op, err := hsm.MachineData[nexusoperations.Operation](node) - require.NoError(t, err) - require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_TIMED_OUT, op.State()) - require.Len(t, backend.Events, 1) - require.Equal(t, enumspb.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT, backend.Events[0].EventType) - protorequire.ProtoEqual(t, &historypb.NexusOperationTimedOutEventAttributes{ - ScheduledEventId: 1, - RequestId: op.RequestId, - Failure: &failurepb.Failure{ - Message: "nexus operation completed unsuccessfully", - FailureInfo: &failurepb.Failure_NexusOperationExecutionFailureInfo{ - NexusOperationExecutionFailureInfo: &failurepb.NexusOperationFailureInfo{ - ScheduledEventId: 1, - Endpoint: "endpoint", - Service: "service", - Operation: "operation", - }, - }, - Cause: &failurepb.Failure{ - Message: "operation timed out", - FailureInfo: &failurepb.Failure_TimeoutFailureInfo{ - TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ - TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, - }, - }, - }, - }, - }, backend.Events[0].GetNexusOperationTimedOutEventAttributes()) -} - -func TestProcessStartToCloseTimeoutTask(t *testing.T) { - reg := newRegistry(t) - backend := &hsmtest.NodeBackend{} - node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(time.Hour), - })) - env := fakeEnv{node} - - require.NoError(t, nexusoperations.RegisterExecutor(reg, nexusoperations.TaskExecutorOptions{})) - - // Transition to STARTED state first - err := hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { - return nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ - Node: node, - Time: time.Now(), - Attributes: &historypb.NexusOperationStartedEventAttributes{ - OperationToken: "test-token", - }, - }) - }) - require.NoError(t, err) - - // Now execute the start-to-close timeout - err = reg.ExecuteTimerTask( - env, - node, - nexusoperations.StartToCloseTimeoutTask{}, - ) - require.NoError(t, err) - op, err := hsm.MachineData[nexusoperations.Operation](node) - require.NoError(t, err) - require.Equal(t, enumsspb.NEXUS_OPERATION_STATE_TIMED_OUT, op.State()) - // Should have TIMED_OUT event (STARTED event is not added by the transition) - require.Len(t, backend.Events, 1) - require.Equal(t, enumspb.EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT, backend.Events[0].EventType) - // Verify timeout type and message - timedOutAttrs := backend.Events[0].GetNexusOperationTimedOutEventAttributes() - require.Equal(t, int64(1), timedOutAttrs.ScheduledEventId) - require.Equal(t, op.RequestId, timedOutAttrs.RequestId) - require.NotNil(t, timedOutAttrs.Failure) - require.Equal(t, "nexus operation completed unsuccessfully", timedOutAttrs.Failure.Message) - require.NotNil(t, timedOutAttrs.Failure.Cause) - require.Equal(t, "operation timed out", timedOutAttrs.Failure.Cause.Message) - require.Equal(t, enumspb.TIMEOUT_TYPE_START_TO_CLOSE, timedOutAttrs.Failure.Cause.GetTimeoutFailureInfo().TimeoutType) - // Verify operation token is present in failure info - nexusFailureInfo := timedOutAttrs.Failure.GetNexusOperationExecutionFailureInfo() - require.NotNil(t, nexusFailureInfo) - require.Equal(t, "test-token", nexusFailureInfo.OperationToken) -} - func TestProcessCancelationTask(t *testing.T) { cases := []struct { name string @@ -787,7 +635,6 @@ func TestProcessCancelationTask(t *testing.T) { checkOutcome func(t *testing.T, op nexusoperations.Cancelation) requestTimeout time.Duration schedToCloseTimeout time.Duration - startToCloseTimeout time.Duration destinationDown bool header map[string]string }{ @@ -870,7 +717,7 @@ func TestProcessCancelationTask(t *testing.T) { }, }, { - name: "operation timeout by ScheduleToCloseTimeout", + name: "operation timeout", requestTimeout: time.Hour, schedToCloseTimeout: time.Microsecond, destinationDown: false, @@ -879,20 +726,7 @@ func TestProcessCancelationTask(t *testing.T) { checkOutcome: func(t *testing.T, c nexusoperations.Cancelation) { require.Equal(t, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED, c.State()) require.NotNil(t, c.LastAttemptFailure.GetApplicationFailureInfo()) - require.Contains(t, "not enough time to execute another request before ScheduleToClose timeout", c.LastAttemptFailure.Message) - }, - }, - { - name: "operation timeout by StartToCloseTimeout", - requestTimeout: time.Hour, - startToCloseTimeout: time.Microsecond, - destinationDown: false, - onCancelOperation: nil, // This should not be called if the operation has timed out. - expectedMetricOutcome: "operation-timeout", - checkOutcome: func(t *testing.T, c nexusoperations.Cancelation) { - require.Equal(t, enumspb.NEXUS_OPERATION_CANCELLATION_STATE_FAILED, c.State()) - require.NotNil(t, c.LastAttemptFailure.GetApplicationFailureInfo()) - require.Contains(t, "not enough time to execute another request before StartToClose timeout", c.LastAttemptFailure.Message) + require.Regexp(t, nexusoperations.ErrOperationTimeoutBelowMin.Error(), c.LastAttemptFailure.Message) }, }, { @@ -919,10 +753,7 @@ func TestProcessCancelationTask(t *testing.T) { nexustest.NewNexusServer(t, listenAddr, h) reg := newRegistry(t) - event := mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(tc.schedToCloseTimeout), - StartToCloseTimeout: durationpb.New(tc.startToCloseTimeout), - }) + event := mustNewScheduledEvent(time.Now(), tc.schedToCloseTimeout) if tc.header != nil { event.GetNexusOperationScheduledEventAttributes().NexusHeader = tc.header } @@ -1036,9 +867,7 @@ func TestProcessCancelationTask_OperationCompleted(t *testing.T) { reg := newRegistry(t) backend := &hsmtest.NodeBackend{} - node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(time.Hour), - })) + node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), time.Hour)) op, err := hsm.MachineData[nexusoperations.Operation](node) require.NoError(t, err) _, err = nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ @@ -1097,9 +926,7 @@ func TestProcessCancelationTask_OperationCompleted(t *testing.T) { func TestProcessCancelationBackoffTask(t *testing.T) { reg := newRegistry(t) backend := &hsmtest.NodeBackend{} - node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(time.Hour), - })) + node := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), time.Hour)) op, err := hsm.MachineData[nexusoperations.Operation](node) require.NoError(t, err) _, err = nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ diff --git a/components/nexusoperations/helpers_test.go b/components/nexusoperations/helpers_test.go index e111015cda..4ec23bbe37 100644 --- a/components/nexusoperations/helpers_test.go +++ b/components/nexusoperations/helpers_test.go @@ -13,11 +13,11 @@ import ( historypb "go.temporal.io/api/history/v1" "go.temporal.io/sdk/converter" persistencespb "go.temporal.io/server/api/persistence/v1" - "go.temporal.io/server/common" "go.temporal.io/server/components/nexusoperations" "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/hsm/hsmtest" "go.temporal.io/server/service/history/workflow" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -72,33 +72,23 @@ func (root) IsTransitionHistoryEnabled() bool { return false } -func mustNewScheduledEvent(schedTime time.Time, defaults *historypb.NexusOperationScheduledEventAttributes) *historypb.HistoryEvent { - if defaults == nil { - defaults = &historypb.NexusOperationScheduledEventAttributes{} +func mustNewScheduledEvent(schedTime time.Time, timeout time.Duration) *historypb.HistoryEvent { + conv := converter.GetDefaultDataConverter() + payload, err := conv.ToPayload("input") + if err != nil { + panic(err) } - attrs := common.CloneProto(defaults) - if attrs.EndpointId == "" { - attrs.EndpointId = "endpoint-id" - } - if attrs.Endpoint == "" { - attrs.Endpoint = "endpoint" - } - if attrs.Service == "" { - attrs.Service = "service" - } - if attrs.Operation == "" { - attrs.Operation = "operation" - } - if attrs.Input == nil { - conv := converter.GetDefaultDataConverter() - payload, err := conv.ToPayload("input") - if err != nil { - panic(err) - } - attrs.Input = payload + + attr := &historypb.NexusOperationScheduledEventAttributes{ + EndpointId: "endpoint-id", + Endpoint: "endpoint", + Service: "service", + Operation: "operation", + Input: payload, + RequestId: uuid.NewString(), } - if attrs.RequestId == "" { - attrs.RequestId = uuid.NewString() + if timeout > 0 { + attr.ScheduleToCloseTimeout = durationpb.New(timeout) } return &historypb.HistoryEvent{ @@ -106,7 +96,7 @@ func mustNewScheduledEvent(schedTime time.Time, defaults *historypb.NexusOperati EventId: 1, EventTime: timestamppb.New(schedTime), Attributes: &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{ - NexusOperationScheduledEventAttributes: attrs, + NexusOperationScheduledEventAttributes: attr, }, } } diff --git a/components/nexusoperations/statemachine.go b/components/nexusoperations/statemachine.go index a8fe1ef5fe..0ad3d888df 100644 --- a/components/nexusoperations/statemachine.go +++ b/components/nexusoperations/statemachine.go @@ -54,8 +54,6 @@ func AddChild(node *hsm.Node, id string, event *historypb.HistoryEvent, eventTok Operation: attrs.Operation, ScheduledTime: event.EventTime, ScheduleToCloseTimeout: attrs.ScheduleToCloseTimeout, - ScheduleToStartTimeout: attrs.ScheduleToStartTimeout, - StartToCloseTimeout: attrs.StartToCloseTimeout, RequestId: attrs.RequestId, State: enumsspb.NEXUS_OPERATION_STATE_UNSPECIFIED, ScheduledEventToken: eventToken, @@ -139,34 +137,10 @@ func (o Operation) transitionTasks() ([]hsm.Task, error) { // creationTasks returns tasks that are emitted when the machine is created. func (o Operation) creationTasks() ([]hsm.Task, error) { - var tasks []hsm.Task - if o.ScheduleToCloseTimeout.AsDuration() != 0 { - tasks = append(tasks, ScheduleToCloseTimeoutTask{ - deadline: o.ScheduledTime.AsTime().Add(o.ScheduleToCloseTimeout.AsDuration()), - }) - } - - if o.ScheduleToStartTimeout.AsDuration() != 0 { - tasks = append(tasks, ScheduleToStartTimeoutTask{ - deadline: o.ScheduledTime.AsTime().Add(o.ScheduleToStartTimeout.AsDuration()), - }) - } - - return tasks, nil -} - -// startToCloseTimeoutTask returns the StartToCloseTimeout task if the timeout is set. -// This task is created when an operation transitions to the STARTED state. -func (o Operation) startToCloseTimeoutTask() []hsm.Task { - if o.StartedTime.AsTime().IsZero() || o.StartToCloseTimeout.AsDuration() == 0 { - return nil - } - return []hsm.Task{ - StartToCloseTimeoutTask{ - deadline: o.StartedTime.AsTime().Add(o.StartToCloseTimeout.AsDuration()), - }, + return []hsm.Task{TimeoutTask{deadline: o.ScheduledTime.AsTime().Add(o.ScheduleToCloseTimeout.AsDuration())}}, nil } + return nil, nil } func (o Operation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error) { @@ -178,7 +152,7 @@ func (o Operation) RegenerateTasks(node *hsm.Node) ([]hsm.Task, error) { if err != nil { return nil, err } - return append(append(transitionTasks, creationTasks...), o.startToCloseTimeoutTask()...), nil + return append(transitionTasks, creationTasks...), nil } func (o Operation) output() (hsm.TransitionOutput, error) { @@ -373,8 +347,6 @@ var TransitionStarted = hsm.NewTransition( op.OperationToken = event.Attributes.OperationId //nolint:staticcheck // SA1019 this field might be set in older histories. } - op.StartedTime = timestamppb.New(event.Time) - // If cancelation is requested already, schedule sending the cancelation request. child, err := op.CancelationNode(event.Node) if err != nil { @@ -388,16 +360,7 @@ var TransitionStarted = hsm.NewTransition( }) }) } - - output, err := op.output() - if err != nil { - return output, err - } - - // Schedule start-to-close timeout task if configured - output.Tasks = append(output.Tasks, op.startToCloseTimeoutTask()...) - - return output, nil + return op.output() }, ) diff --git a/components/nexusoperations/statemachine_test.go b/components/nexusoperations/statemachine_test.go index b63731093d..67fc425c55 100644 --- a/components/nexusoperations/statemachine_test.go +++ b/components/nexusoperations/statemachine_test.go @@ -31,7 +31,7 @@ func TestAddChild(t *testing.T) { assertTasks: func(t *testing.T, tasks []hsm.Task) { require.Equal(t, 2, len(tasks)) require.Equal(t, nexusoperations.TaskTypeInvocation, tasks[0].Type()) - require.Equal(t, nexusoperations.TaskTypeScheduleToCloseTimeout, tasks[1].Type()) + require.Equal(t, nexusoperations.TaskTypeTimeout, tasks[1].Type()) }, }, { @@ -86,216 +86,57 @@ func TestAddChild(t *testing.T) { } } -func TestAddChildWithNewTimeouts(t *testing.T) { - cases := []struct { - name string - scheduleToCloseTimeout time.Duration - scheduleToStartTimeout time.Duration - startToCloseTimeout time.Duration - assertTasks func(t *testing.T, tasks []hsm.Task) - }{ - { - name: "with all timeouts", - scheduleToCloseTimeout: time.Hour, - scheduleToStartTimeout: 30 * time.Minute, - startToCloseTimeout: 45 * time.Minute, - assertTasks: func(t *testing.T, tasks []hsm.Task) { - // Should have Invocation, ScheduleToClose, and ScheduleToStart tasks - require.Len(t, tasks, 3) - require.Equal(t, nexusoperations.TaskTypeInvocation, tasks[0].Type()) - require.Equal(t, nexusoperations.TaskTypeScheduleToCloseTimeout, tasks[1].Type()) - require.Equal(t, nexusoperations.TaskTypeScheduleToStartTimeout, tasks[2].Type()) - }, - }, - { - name: "with schedule-to-start only", - scheduleToCloseTimeout: 0, - scheduleToStartTimeout: 30 * time.Minute, - startToCloseTimeout: 0, - assertTasks: func(t *testing.T, tasks []hsm.Task) { - require.Len(t, tasks, 2) - require.Equal(t, nexusoperations.TaskTypeInvocation, tasks[0].Type()) - require.Equal(t, nexusoperations.TaskTypeScheduleToStartTimeout, tasks[1].Type()) - }, - }, - { - name: "without new timeouts", - scheduleToCloseTimeout: time.Hour, - scheduleToStartTimeout: 0, - startToCloseTimeout: 0, - assertTasks: func(t *testing.T, tasks []hsm.Task) { - require.Len(t, tasks, 2) - require.Equal(t, nexusoperations.TaskTypeInvocation, tasks[0].Type()) - require.Equal(t, nexusoperations.TaskTypeScheduleToCloseTimeout, tasks[1].Type()) - }, - }, - } - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - root := newRoot(t, &hsmtest.NodeBackend{}) - schedTime := timestamppb.Now() - event := &historypb.HistoryEvent{ - EventTime: schedTime, - Attributes: &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{ - NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{ - EndpointId: "endpoint-id", - Endpoint: "endpoint", - Service: "service", - Operation: "operation", - RequestId: "request-id", - ScheduleToCloseTimeout: durationpb.New(tc.scheduleToCloseTimeout), - ScheduleToStartTimeout: durationpb.New(tc.scheduleToStartTimeout), - StartToCloseTimeout: durationpb.New(tc.startToCloseTimeout), - }, - }, - } - child, err := nexusoperations.AddChild(root, "test-id", event, []byte("token")) - require.NoError(t, err) - opLog, err := root.OpLog() - require.NoError(t, err) - require.Len(t, opLog, 1) - transitionOp, ok := opLog[0].(hsm.TransitionOperation) - require.True(t, ok) - tc.assertTasks(t, transitionOp.Output.Tasks) - - op, err := hsm.MachineData[nexusoperations.Operation](child) - require.NoError(t, err) - require.Equal(t, tc.scheduleToCloseTimeout, op.ScheduleToCloseTimeout.AsDuration()) - require.Equal(t, tc.scheduleToStartTimeout, op.ScheduleToStartTimeout.AsDuration()) - require.Equal(t, tc.startToCloseTimeout, op.StartToCloseTimeout.AsDuration()) - }) - } -} - -func TestTransitionStartedEmitsStartToCloseTimeout(t *testing.T) { - root := newRoot(t, &hsmtest.NodeBackend{}) - schedTime := timestamppb.Now() - event := &historypb.HistoryEvent{ - EventTime: schedTime, - Attributes: &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{ - NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{ - EndpointId: "endpoint-id", - Endpoint: "endpoint", - Service: "service", - Operation: "operation", - RequestId: "request-id", - ScheduleToCloseTimeout: durationpb.New(time.Hour), - StartToCloseTimeout: durationpb.New(30 * time.Minute), - }, - }, - } - child, err := nexusoperations.AddChild(root, "test-id", event, []byte("token")) - require.NoError(t, err) - - // Transition to STARTED state - err = hsm.MachineTransition(child, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { - return nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ - Node: child, - Time: time.Now(), - Attributes: &historypb.NexusOperationStartedEventAttributes{ - OperationToken: "test-token", - }, - }) - }) - require.NoError(t, err) - - opLog, err := root.OpLog() - require.NoError(t, err) - // Should have 2 operations: initial AddChild transition and TransitionStarted - require.Len(t, opLog, 2) - - // Check the TransitionStarted output - transitionOp, ok := opLog[1].(hsm.TransitionOperation) - require.True(t, ok) - // Should have StartToCloseTimeout task - require.Len(t, transitionOp.Output.Tasks, 1) - require.Equal(t, nexusoperations.TaskTypeStartToCloseTimeout, transitionOp.Output.Tasks[0].Type()) -} - func TestRegenerateTasks(t *testing.T) { cases := []struct { - name string - scheduleToCloseTimeout time.Duration - startToCloseTimeout time.Duration - state enumsspb.NexusOperationState - assertTasks func(t *testing.T, tasks []hsm.Task) + name string + timeout time.Duration + state enumsspb.NexusOperationState + assertTasks func(t *testing.T, tasks []hsm.Task) }{ { - name: "scheduled | with timeout", - scheduleToCloseTimeout: time.Hour, - state: enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, + name: "scheduled | with timeout", + timeout: time.Hour, + state: enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, assertTasks: func(t *testing.T, tasks []hsm.Task) { require.Equal(t, 2, len(tasks)) require.Equal(t, nexusoperations.TaskTypeInvocation, tasks[0].Type()) require.Equal(t, tasks[0].(nexusoperations.InvocationTask).EndpointName, "endpoint") - require.Equal(t, nexusoperations.TaskTypeScheduleToCloseTimeout, tasks[1].Type()) + require.Equal(t, nexusoperations.TaskTypeTimeout, tasks[1].Type()) }, }, { - name: "scheduled | without timeout", - state: enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, + name: "scheduled | without timeout", + timeout: 0, + state: enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, assertTasks: func(t *testing.T, tasks []hsm.Task) { require.Equal(t, 1, len(tasks)) require.Equal(t, nexusoperations.TaskTypeInvocation, tasks[0].Type()) }, }, { - name: "backing off | with timeout", - scheduleToCloseTimeout: time.Hour, - state: enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, + name: "backing off | with timeout", + timeout: time.Hour, + state: enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, assertTasks: func(t *testing.T, tasks []hsm.Task) { require.Equal(t, 2, len(tasks)) require.Equal(t, nexusoperations.TaskTypeBackoff, tasks[0].Type()) - require.Equal(t, nexusoperations.TaskTypeScheduleToCloseTimeout, tasks[1].Type()) + require.Equal(t, nexusoperations.TaskTypeTimeout, tasks[1].Type()) }, }, { - name: "backing off | without timeout", - state: enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, + name: "backing off | without timeout", + timeout: 0, + state: enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF, assertTasks: func(t *testing.T, tasks []hsm.Task) { require.Equal(t, 1, len(tasks)) require.Equal(t, nexusoperations.TaskTypeBackoff, tasks[0].Type()) }, }, - { - name: "started | with start to close timeout", - startToCloseTimeout: 15 * time.Minute, - state: enumsspb.NEXUS_OPERATION_STATE_STARTED, - assertTasks: func(t *testing.T, tasks []hsm.Task) { - require.Len(t, tasks, 1) - require.Equal(t, nexusoperations.TaskTypeStartToCloseTimeout, tasks[0].Type()) - }, - }, - { - name: "started | without start to close timeout", - state: enumsspb.NEXUS_OPERATION_STATE_STARTED, - assertTasks: func(t *testing.T, tasks []hsm.Task) { - require.Empty(t, tasks) - }, - }, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - schedTime := time.Now() - event := &historypb.HistoryEvent{ - EventType: enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, - EventId: 1, - EventTime: timestamppb.New(schedTime), - Attributes: &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{ - NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{ - EndpointId: "endpoint-id", - Endpoint: "endpoint", - Service: "service", - Operation: "operation", - RequestId: "request-id", - ScheduleToCloseTimeout: durationpb.New(tc.scheduleToCloseTimeout), - StartToCloseTimeout: durationpb.New(tc.startToCloseTimeout), - }, - }, - } - node := newOperationNode(t, &hsmtest.NodeBackend{}, event) + node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), tc.timeout)) if tc.state == enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF { require.NoError(t, hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { @@ -306,16 +147,6 @@ func TestRegenerateTasks(t *testing.T) { RetryPolicy: backoff.NewExponentialRetryPolicy(time.Second), }) })) - } else if tc.state == enumsspb.NEXUS_OPERATION_STATE_STARTED { - require.NoError(t, hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { - return nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ - Time: time.Now(), - Node: node, - Attributes: &historypb.NexusOperationStartedEventAttributes{ - OperationToken: "operation-token", - }, - }) - })) } op, err := hsm.MachineData[nexusoperations.Operation](node) @@ -328,9 +159,7 @@ func TestRegenerateTasks(t *testing.T) { } func TestRetry(t *testing.T) { - node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(time.Minute), - })) + node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Minute)) // Reset any outputs generated from nexusoperations.AddChild, we tested those already. node.ClearTransactionState() require.NoError(t, hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { @@ -454,9 +283,7 @@ func TestCompleteFromAttempt(t *testing.T) { } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(time.Minute), - })) + node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Minute)) // Reset any outputs generated from nexusoperations.AddChild, we tested those already. node.ClearTransactionState() require.NoError(t, hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { @@ -483,17 +310,13 @@ func TestCompleteExternally(t *testing.T) { { name: "scheduled", fn: func(t *testing.T) *hsm.Node { - return newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(time.Minute), - })) + return newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Minute)) }, }, { name: "backing off", fn: func(t *testing.T) *hsm.Node { - node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(time.Minute), - })) + node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Minute)) require.NoError(t, hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { return nexusoperations.TransitionAttemptFailed.Apply(op, nexusoperations.EventAttemptFailed{ Node: node, @@ -508,9 +331,7 @@ func TestCompleteExternally(t *testing.T) { { name: "started", fn: func(t *testing.T) *hsm.Node { - node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(time.Minute), - })) + node := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Minute)) require.NoError(t, hsm.MachineTransition(node, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { return nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ Node: node, @@ -600,9 +421,7 @@ func TestCompleteExternally(t *testing.T) { func TestCancel(t *testing.T) { backend := &hsmtest.NodeBackend{} - root := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(time.Hour), - })) + root := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), time.Hour)) op, err := hsm.MachineData[nexusoperations.Operation](root) require.NoError(t, err) _, err = nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ @@ -625,9 +444,7 @@ func TestCancel(t *testing.T) { func TestCancelationValidTransitions(t *testing.T) { // Setup - root := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), &historypb.NexusOperationScheduledEventAttributes{ - ScheduleToCloseTimeout: durationpb.New(time.Hour), - })) + root := newOperationNode(t, &hsmtest.NodeBackend{}, mustNewScheduledEvent(time.Now(), time.Hour)) // We don't support cancel before started. Mark the operation as started. require.NoError(t, hsm.MachineTransition(root, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { return nexusoperations.TransitionStarted.Apply(op, nexusoperations.EventStarted{ @@ -736,7 +553,7 @@ func TestCancelationValidTransitions(t *testing.T) { func TestCancelationBeforeStarted(t *testing.T) { // Setup backend := &hsmtest.NodeBackend{} - root := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), nil)) + root := newOperationNode(t, backend, mustNewScheduledEvent(time.Now(), 0)) require.NoError(t, hsm.MachineTransition(root, func(op nexusoperations.Operation) (hsm.TransitionOutput, error) { return op.Cancel(root, time.Now(), 0) })) diff --git a/components/nexusoperations/tasks.go b/components/nexusoperations/tasks.go index 111d426524..c513733344 100644 --- a/components/nexusoperations/tasks.go +++ b/components/nexusoperations/tasks.go @@ -16,38 +16,35 @@ import ( ) const ( + TaskTypeTimeout = "nexusoperations.Timeout" TaskTypeInvocation = "nexusoperations.Invocation" TaskTypeBackoff = "nexusoperations.Backoff" TaskTypeCancelation = "nexusoperations.Cancelation" TaskTypeCancelationBackoff = "nexusoperations.CancelationBackoff" - // NOTE: the name `Timeout` is used for backward compatibility with existing persisted tasks and predates the addition of more flexible timeout types. - TaskTypeScheduleToCloseTimeout = "nexusoperations.Timeout" - TaskTypeScheduleToStartTimeout = "nexusoperations.ScheduleToStartTimeout" - TaskTypeStartToCloseTimeout = "nexusoperations.StartToCloseTimeout" ) var errSerializationCast = errors.New("cannot serialize HSM task. unable to cast to expected type") -type ScheduleToCloseTimeoutTask struct { +type TimeoutTask struct { deadline time.Time } -var _ hsm.Task = ScheduleToCloseTimeoutTask{} +var _ hsm.Task = TimeoutTask{} -func (ScheduleToCloseTimeoutTask) Type() string { - return TaskTypeScheduleToCloseTimeout +func (TimeoutTask) Type() string { + return TaskTypeTimeout } -func (t ScheduleToCloseTimeoutTask) Deadline() time.Time { +func (t TimeoutTask) Deadline() time.Time { return t.deadline } -func (ScheduleToCloseTimeoutTask) Destination() string { +func (TimeoutTask) Destination() string { return "" } // Validate checks if the timeout task is still valid to execute for the given node state. -func (t ScheduleToCloseTimeoutTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error { +func (t TimeoutTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error { if err := node.CheckRunning(); err != nil { return err } @@ -69,7 +66,7 @@ func (t ScheduleToCloseTimeoutTask) Validate(ref *persistencespb.StateMachineRef type TimeoutTaskSerializer struct{} func (TimeoutTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error) { - return ScheduleToCloseTimeoutTask{deadline: attrs.Deadline}, nil + return TimeoutTask{deadline: attrs.Deadline}, nil } func (TimeoutTaskSerializer) Serialize(hsm.Task) ([]byte, error) { @@ -238,112 +235,8 @@ func (CancelationBackoffTaskSerializer) Serialize(hsm.Task) ([]byte, error) { return nil, nil } -type ScheduleToStartTimeoutTask struct { - deadline time.Time -} - -var _ hsm.Task = ScheduleToStartTimeoutTask{} - -func (ScheduleToStartTimeoutTask) Type() string { - return TaskTypeScheduleToStartTimeout -} - -func (t ScheduleToStartTimeoutTask) Deadline() time.Time { - return t.deadline -} - -func (ScheduleToStartTimeoutTask) Destination() string { - return "" -} - -// Validate checks if the schedule-to-start timeout task is still valid. -// Only valid if operation is still in SCHEDULED or BACKING_OFF state. -func (t ScheduleToStartTimeoutTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error { - if err := node.CheckRunning(); err != nil { - return err - } - op, err := hsm.MachineData[Operation](node) - if err != nil { - return err - } - // Only timeout if we haven't started yet - switch op.State() { - case enumsspb.NEXUS_OPERATION_STATE_SCHEDULED, - enumsspb.NEXUS_OPERATION_STATE_BACKING_OFF: - return nil - default: - // Already started or completed, timeout not applicable - return fmt.Errorf( - "%w: %w: cannot apply schedule-to-start timeout to machine in state %v", - consts.ErrStaleReference, - hsm.ErrInvalidTransition, - op.State(), - ) - } -} - -type ScheduleToStartTimeoutTaskSerializer struct{} - -func (ScheduleToStartTimeoutTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error) { - return ScheduleToStartTimeoutTask{deadline: attrs.Deadline}, nil -} - -func (ScheduleToStartTimeoutTaskSerializer) Serialize(hsm.Task) ([]byte, error) { - return nil, nil -} - -type StartToCloseTimeoutTask struct { - deadline time.Time -} - -var _ hsm.Task = StartToCloseTimeoutTask{} - -func (StartToCloseTimeoutTask) Type() string { - return TaskTypeStartToCloseTimeout -} - -func (t StartToCloseTimeoutTask) Deadline() time.Time { - return t.deadline -} - -func (StartToCloseTimeoutTask) Destination() string { - return "" -} - -// Validate checks if the start-to-close timeout task is still valid. -// Only valid if operation is in STARTED state. -func (t StartToCloseTimeoutTask) Validate(ref *persistencespb.StateMachineRef, node *hsm.Node) error { - if err := node.CheckRunning(); err != nil { - return err - } - op, err := hsm.MachineData[Operation](node) - if err != nil { - return err - } - // Only timeout if we're in started state - if op.State() != enumsspb.NEXUS_OPERATION_STATE_STARTED { - return fmt.Errorf( - "%w: %w: cannot apply start-to-close timeout to machine in state %v", - consts.ErrStaleReference, - hsm.ErrInvalidTransition, - op.State(), - ) - } - return nil -} - -type StartToCloseTimeoutTaskSerializer struct{} - -func (StartToCloseTimeoutTaskSerializer) Deserialize(data []byte, attrs hsm.TaskAttributes) (hsm.Task, error) { - return StartToCloseTimeoutTask{deadline: attrs.Deadline}, nil -} - -func (StartToCloseTimeoutTaskSerializer) Serialize(hsm.Task) ([]byte, error) { - return nil, nil -} - func RegisterTaskSerializers(reg *hsm.Registry) error { - if err := reg.RegisterTaskSerializer(TaskTypeScheduleToCloseTimeout, TimeoutTaskSerializer{}); err != nil { + if err := reg.RegisterTaskSerializer(TaskTypeTimeout, TimeoutTaskSerializer{}); err != nil { return err } if err := reg.RegisterTaskSerializer(TaskTypeInvocation, InvocationTaskSerializer{}); err != nil { @@ -358,8 +251,5 @@ func RegisterTaskSerializers(reg *hsm.Registry) error { if err := reg.RegisterTaskSerializer(TaskTypeCancelationBackoff, CancelationBackoffTaskSerializer{}); err != nil { // nolint:revive return err } - if err := reg.RegisterTaskSerializer(TaskTypeScheduleToStartTimeout, ScheduleToStartTimeoutTaskSerializer{}); err != nil { - return err - } - return reg.RegisterTaskSerializer(TaskTypeStartToCloseTimeout, StartToCloseTimeoutTaskSerializer{}) + return nil } diff --git a/components/nexusoperations/workflow/commands.go b/components/nexusoperations/workflow/commands.go index 441c82e4b7..8bb59fea12 100644 --- a/components/nexusoperations/workflow/commands.go +++ b/components/nexusoperations/workflow/commands.go @@ -100,22 +100,6 @@ func (ch *commandHandler) HandleScheduleCommand( } } - if err := timestamp.ValidateAndCapProtoDuration(attrs.ScheduleToStartTimeout); err != nil { - return workflow.FailWorkflowTaskError{ - Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES, - Message: fmt.Sprintf( - "ScheduleNexusOperationCommandAttributes.ScheduleToStartTimeout is invalid: %v", err), - } - } - - if err := timestamp.ValidateAndCapProtoDuration(attrs.StartToCloseTimeout); err != nil { - return workflow.FailWorkflowTaskError{ - Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES, - Message: fmt.Sprintf( - "ScheduleNexusOperationCommandAttributes.StartToCloseTimeout is invalid: %v", err), - } - } - if !validator.IsValidPayloadSize(attrs.Input.Size()) { return workflow.FailWorkflowTaskError{ Cause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES, @@ -168,20 +152,6 @@ func (ch *commandHandler) HandleScheduleCommand( attrs.ScheduleToCloseTimeout = durationpb.New(maxTimeout) } - // Trim secondary timeouts to the primary timeout. - scheduleToCloseTimeout := attrs.ScheduleToCloseTimeout.AsDuration() - scheduleToStartTimeout := attrs.ScheduleToStartTimeout.AsDuration() - startToCloseTimeout := attrs.StartToCloseTimeout.AsDuration() - - if scheduleToCloseTimeout > 0 { - if scheduleToStartTimeout > scheduleToCloseTimeout { - attrs.ScheduleToStartTimeout = attrs.ScheduleToCloseTimeout - } - if startToCloseTimeout > scheduleToCloseTimeout { - attrs.StartToCloseTimeout = attrs.ScheduleToCloseTimeout - } - } - event := ms.AddHistoryEvent(enumspb.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED, func(he *historypb.HistoryEvent) { he.Attributes = &historypb.HistoryEvent_NexusOperationScheduledEventAttributes{ NexusOperationScheduledEventAttributes: &historypb.NexusOperationScheduledEventAttributes{ @@ -191,8 +161,6 @@ func (ch *commandHandler) HandleScheduleCommand( Operation: attrs.Operation, Input: attrs.Input, ScheduleToCloseTimeout: attrs.ScheduleToCloseTimeout, - ScheduleToStartTimeout: attrs.ScheduleToStartTimeout, - StartToCloseTimeout: attrs.StartToCloseTimeout, NexusHeader: lowerCaseHeader, RequestId: uuid.NewString(), WorkflowTaskCompletedEventId: workflowTaskCompletedEventID, diff --git a/components/nexusoperations/workflow/commands_test.go b/components/nexusoperations/workflow/commands_test.go index bec7340421..e5f7702c09 100644 --- a/components/nexusoperations/workflow/commands_test.go +++ b/components/nexusoperations/workflow/commands_test.go @@ -388,128 +388,6 @@ func TestHandleScheduleCommand(t *testing.T) { }) } - t.Run("invalid schedule-to-start timeout", func(t *testing.T) { - tcx := newTestContext(t, defaultConfig) - err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ - Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ - ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ - Endpoint: "endpoint", - Service: "service", - Operation: "op", - ScheduleToStartTimeout: durationpb.New(-1 * time.Second), - }, - }, - }) - var failWFTErr workflow.FailWorkflowTaskError - require.ErrorAs(t, err, &failWFTErr) - require.False(t, failWFTErr.TerminateWorkflow) - require.Equal(t, enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES, failWFTErr.Cause) - require.Empty(t, tcx.history.Events) - }) - - t.Run("invalid start-to-close timeout", func(t *testing.T) { - tcx := newTestContext(t, defaultConfig) - err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ - Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ - ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ - Endpoint: "endpoint", - Service: "service", - Operation: "op", - StartToCloseTimeout: durationpb.New(-1 * time.Second), - }, - }, - }) - var failWFTErr workflow.FailWorkflowTaskError - require.ErrorAs(t, err, &failWFTErr) - require.False(t, failWFTErr.TerminateWorkflow) - require.Equal(t, enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_SCHEDULE_NEXUS_OPERATION_ATTRIBUTES, failWFTErr.Cause) - require.Empty(t, tcx.history.Events) - }) - - t.Run("schedule-to-start timeout trimmed to schedule-to-close timeout", func(t *testing.T) { - tcx := newTestContext(t, defaultConfig) - err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ - Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ - ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ - Endpoint: "endpoint", - Service: "service", - Operation: "op", - ScheduleToCloseTimeout: durationpb.New(30 * time.Minute), - ScheduleToStartTimeout: durationpb.New(time.Hour), - }, - }, - }) - require.NoError(t, err) - require.Len(t, tcx.history.Events, 1) - eAttrs := tcx.history.Events[0].GetNexusOperationScheduledEventAttributes() - require.Equal(t, 30*time.Minute, eAttrs.ScheduleToStartTimeout.AsDuration()) - require.Equal(t, 30*time.Minute, eAttrs.ScheduleToCloseTimeout.AsDuration()) - }) - - t.Run("start-to-close timeout trimmed to schedule-to-close timeout", func(t *testing.T) { - tcx := newTestContext(t, defaultConfig) - err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ - Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ - ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ - Endpoint: "endpoint", - Service: "service", - Operation: "op", - ScheduleToCloseTimeout: durationpb.New(30 * time.Minute), - StartToCloseTimeout: durationpb.New(time.Hour), - }, - }, - }) - require.NoError(t, err) - require.Len(t, tcx.history.Events, 1) - eAttrs := tcx.history.Events[0].GetNexusOperationScheduledEventAttributes() - require.Equal(t, 30*time.Minute, eAttrs.StartToCloseTimeout.AsDuration()) - require.Equal(t, 30*time.Minute, eAttrs.ScheduleToCloseTimeout.AsDuration()) - }) - - t.Run("both secondary timeouts trimmed to schedule-to-close timeout", func(t *testing.T) { - tcx := newTestContext(t, defaultConfig) - err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ - Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ - ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ - Endpoint: "endpoint", - Service: "service", - Operation: "op", - ScheduleToCloseTimeout: durationpb.New(30 * time.Minute), - ScheduleToStartTimeout: durationpb.New(time.Hour), - StartToCloseTimeout: durationpb.New(2 * time.Hour), - }, - }, - }) - require.NoError(t, err) - require.Len(t, tcx.history.Events, 1) - eAttrs := tcx.history.Events[0].GetNexusOperationScheduledEventAttributes() - require.Equal(t, 30*time.Minute, eAttrs.ScheduleToStartTimeout.AsDuration()) - require.Equal(t, 30*time.Minute, eAttrs.StartToCloseTimeout.AsDuration()) - require.Equal(t, 30*time.Minute, eAttrs.ScheduleToCloseTimeout.AsDuration()) - }) - - t.Run("secondary timeouts not trimmed when less than schedule-to-close timeout", func(t *testing.T) { - tcx := newTestContext(t, defaultConfig) - err := tcx.scheduleHandler(context.Background(), tcx.ms, commandValidator{maxPayloadSize: 1}, 1, &commandpb.Command{ - Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ - ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ - Endpoint: "endpoint", - Service: "service", - Operation: "op", - ScheduleToCloseTimeout: durationpb.New(time.Hour), - ScheduleToStartTimeout: durationpb.New(20 * time.Minute), - StartToCloseTimeout: durationpb.New(30 * time.Minute), - }, - }, - }) - require.NoError(t, err) - require.Len(t, tcx.history.Events, 1) - eAttrs := tcx.history.Events[0].GetNexusOperationScheduledEventAttributes() - require.Equal(t, 20*time.Minute, eAttrs.ScheduleToStartTimeout.AsDuration()) - require.Equal(t, 30*time.Minute, eAttrs.StartToCloseTimeout.AsDuration()) - require.Equal(t, time.Hour, eAttrs.ScheduleToCloseTimeout.AsDuration()) - }) - t.Run("sets event attributes with UserMetadata and spawns a child operation machine", func(t *testing.T) { tcx := newTestContext(t, defaultConfig) cAttrs := &commandpb.ScheduleNexusOperationCommandAttributes{ diff --git a/go.mod b/go.mod index 411dd4c2c1..c19671cead 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( go.opentelemetry.io/otel/sdk v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 - go.temporal.io/api v1.60.1-0.20260113171616-51aa2c291ff8 + go.temporal.io/api v1.60.1-0.20260108175916-49710b8392af go.temporal.io/sdk v1.38.0 go.uber.org/fx v1.24.0 go.uber.org/mock v0.6.0 diff --git a/go.sum b/go.sum index 62c18cc63e..7b7ab8a5f3 100644 --- a/go.sum +++ b/go.sum @@ -371,8 +371,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.60.1-0.20260113171616-51aa2c291ff8 h1:Btl9joj1fKCqa3ljjAL2TZt87fzx/EHLtVaStOHtISU= -go.temporal.io/api v1.60.1-0.20260113171616-51aa2c291ff8/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.60.1-0.20260108175916-49710b8392af h1:qa9w0zjQ7ZLgqaB8msld09oos/ORj6yQ4qQrFpMwuPc= +go.temporal.io/api v1.60.1-0.20260108175916-49710b8392af/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4= go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/proto/internal/temporal/server/api/persistence/v1/executions.proto b/proto/internal/temporal/server/api/persistence/v1/executions.proto index 116770bb82..c14e9baf56 100644 --- a/proto/internal/temporal/server/api/persistence/v1/executions.proto +++ b/proto/internal/temporal/server/api/persistence/v1/executions.proto @@ -818,19 +818,6 @@ message NexusOperationInfo { // Endpoint ID, the name is also stored here (field 1) but we use the ID internally to avoid failing operation // requests when an endpoint is renamed. string endpoint_id = 15; - - // Schedule-to-start timeout for this operation. - // (-- api-linter: core::0140::prepositions=disabled - // aip.dev/not-precedent: "to" is used to indicate interval. --) - google.protobuf.Duration schedule_to_start_timeout = 16; - - // Start-to-close timeout for this operation. - // (-- api-linter: core::0140::prepositions=disabled - // aip.dev/not-precedent: "to" is used to indicate interval. --) - google.protobuf.Duration start_to_close_timeout = 17; - - // Time the operation was started (only available for async operations). - google.protobuf.Timestamp started_time = 18; } // NexusOperationCancellationInfo contains the state of a nexus operation cancelation. diff --git a/service/history/api/describeworkflow/api.go b/service/history/api/describeworkflow/api.go index 08569e6c5a..6632965c23 100644 --- a/service/history/api/describeworkflow/api.go +++ b/service/history/api/describeworkflow/api.go @@ -619,8 +619,6 @@ func buildPendingNexusOperationInfo( OperationToken: op.OperationToken, ScheduledEventId: scheduledEventID, ScheduleToCloseTimeout: op.ScheduleToCloseTimeout, - ScheduleToStartTimeout: op.ScheduleToStartTimeout, - StartToCloseTimeout: op.StartToCloseTimeout, ScheduledTime: op.ScheduledTime, State: state, Attempt: op.Attempt, diff --git a/service/history/workflow/task_generator_test.go b/service/history/workflow/task_generator_test.go index 385762cdec..c0d8bfeff8 100644 --- a/service/history/workflow/task_generator_test.go +++ b/service/history/workflow/task_generator_test.go @@ -520,7 +520,7 @@ func TestTaskGenerator_GenerateDirtySubStateMachineTasks(t *testing.T) { }, MachineTransitionCount: 1, }, - Type: nexusoperations.TaskTypeScheduleToCloseTimeout, + Type: nexusoperations.TaskTypeTimeout, Data: nil, }, timers[1].Infos[0]) } diff --git a/tests/nexus_workflow_test.go b/tests/nexus_workflow_test.go index 881d52372e..7f1d8980ec 100644 --- a/tests/nexus_workflow_test.go +++ b/tests/nexus_workflow_test.go @@ -2706,331 +2706,6 @@ func (s *NexusWorkflowTestSuite) TestNexusAsyncOperationWithMultipleCallers() { } } -func (s *NexusWorkflowTestSuite) TestNexusOperationScheduleToCloseTimeout() { - ctx := testcore.NewContext() - taskQueue := testcore.RandomizeStr(s.T().Name()) - endpointName := testcore.RandomizedNexusEndpoint(s.T().Name()) - - _, err := s.OperatorClient().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ - Spec: &nexuspb.EndpointSpec{ - Name: endpointName, - Target: &nexuspb.EndpointTarget{ - Variant: &nexuspb.EndpointTarget_Worker_{ - Worker: &nexuspb.EndpointTarget_Worker{ - Namespace: s.Namespace().String(), - TaskQueue: "unreachable-for-test", - }, - }, - }, - }, - }) - s.NoError(err) - - run, err := s.SdkClient().ExecuteWorkflow(ctx, client.StartWorkflowOptions{ - TaskQueue: taskQueue, - }, "workflow") - s.NoError(err) - - // Schedule the operation with a short schedule-to-close timeout - pollResp, err := s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ - Namespace: s.Namespace().String(), - TaskQueue: &taskqueuepb.TaskQueue{ - Name: taskQueue, - Kind: enumspb.TASK_QUEUE_KIND_NORMAL, - }, - Identity: "test", - }) - s.NoError(err) - _, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ - Identity: "test", - TaskToken: pollResp.TaskToken, - Commands: []*commandpb.Command{ - { - CommandType: enumspb.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION, - Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ - ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ - Endpoint: endpointName, - Service: "service", - Operation: "operation", - Input: s.mustToPayload("input"), - ScheduleToCloseTimeout: durationpb.New(2 * time.Second), - }, - }, - }, - }, - }) - s.NoError(err) - - descResp, err := s.SdkClient().DescribeWorkflowExecution(ctx, run.GetID(), run.GetRunID()) - s.NoError(err) - s.Len(descResp.PendingNexusOperations, 1) - s.Equal(2*time.Second, descResp.PendingNexusOperations[0].ScheduleToCloseTimeout.AsDuration()) - - // Now wait for the timeout event - pollResp, err = s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ - Namespace: s.Namespace().String(), - TaskQueue: &taskqueuepb.TaskQueue{ - Name: taskQueue, - Kind: enumspb.TASK_QUEUE_KIND_NORMAL, - }, - Identity: "test", - }) - s.NoError(err) - - // Verify we got a timeout event with the correct timeout type - timedOutEventIdx := slices.IndexFunc(pollResp.History.Events, func(e *historypb.HistoryEvent) bool { - return e.GetNexusOperationTimedOutEventAttributes() != nil - }) - s.Positive(timedOutEventIdx) - timedOutEvent := pollResp.History.Events[timedOutEventIdx] - s.Equal(enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, - timedOutEvent.GetNexusOperationTimedOutEventAttributes().GetFailure().GetCause().GetTimeoutFailureInfo().GetTimeoutType()) - - // Complete the workflow - _, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ - Identity: "test", - TaskToken: pollResp.TaskToken, - Commands: []*commandpb.Command{ - { - CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, - Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ - CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}, - }, - }, - }, - }) - s.NoError(err) - s.NoError(run.Get(ctx, nil)) -} - -func (s *NexusWorkflowTestSuite) TestNexusOperationScheduleToStartTimeout() { - ctx := testcore.NewContext() - taskQueue := testcore.RandomizeStr(s.T().Name()) - endpointName := testcore.RandomizedNexusEndpoint(s.T().Name()) - - _, err := s.OperatorClient().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ - Spec: &nexuspb.EndpointSpec{ - Name: endpointName, - Target: &nexuspb.EndpointTarget{ - Variant: &nexuspb.EndpointTarget_Worker_{ - Worker: &nexuspb.EndpointTarget_Worker{ - Namespace: s.Namespace().String(), - TaskQueue: "unreachable-for-test", - }, - }, - }, - }, - }) - s.NoError(err) - - run, err := s.SdkClient().ExecuteWorkflow(ctx, client.StartWorkflowOptions{ - TaskQueue: taskQueue, - }, "workflow") - s.NoError(err) - - // Schedule the operation with a short schedule-to-close timeout - pollResp, err := s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ - Namespace: s.Namespace().String(), - TaskQueue: &taskqueuepb.TaskQueue{ - Name: taskQueue, - Kind: enumspb.TASK_QUEUE_KIND_NORMAL, - }, - Identity: "test", - }) - s.NoError(err) - _, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ - Identity: "test", - TaskToken: pollResp.TaskToken, - Commands: []*commandpb.Command{ - { - CommandType: enumspb.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION, - Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ - ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ - Endpoint: endpointName, - Service: "service", - Operation: "operation", - Input: s.mustToPayload("input"), - ScheduleToStartTimeout: durationpb.New(2 * time.Second), - }, - }, - }, - }, - }) - s.NoError(err) - - descResp, err := s.SdkClient().DescribeWorkflowExecution(ctx, run.GetID(), run.GetRunID()) - s.NoError(err) - s.Len(descResp.PendingNexusOperations, 1) - s.Equal(2*time.Second, descResp.PendingNexusOperations[0].ScheduleToStartTimeout.AsDuration()) - - // Now wait for the timeout event - pollResp, err = s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ - Namespace: s.Namespace().String(), - TaskQueue: &taskqueuepb.TaskQueue{ - Name: taskQueue, - Kind: enumspb.TASK_QUEUE_KIND_NORMAL, - }, - Identity: "test", - }) - s.NoError(err) - - // Verify we got a timeout event with the correct timeout type - timedOutEventIdx := slices.IndexFunc(pollResp.History.Events, func(e *historypb.HistoryEvent) bool { - return e.GetNexusOperationTimedOutEventAttributes() != nil - }) - s.Positive(timedOutEventIdx) - timedOutEvent := pollResp.History.Events[timedOutEventIdx] - s.Equal(enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, - timedOutEvent.GetNexusOperationTimedOutEventAttributes().GetFailure().GetCause().GetTimeoutFailureInfo().GetTimeoutType()) - - // Complete the workflow - _, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ - Identity: "test", - TaskToken: pollResp.TaskToken, - Commands: []*commandpb.Command{ - { - CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, - Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ - CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}, - }, - }, - }, - }) - s.NoError(err) - s.NoError(run.Get(ctx, nil)) -} - -func (s *NexusWorkflowTestSuite) TestNexusOperationStartToCloseTimeout() { - ctx := testcore.NewContext() - taskQueue := testcore.RandomizeStr(s.T().Name()) - endpointName := testcore.RandomizedNexusEndpoint(s.T().Name()) - - // Handler that starts quickly (returns async) but never completes - h := nexustest.Handler{ - OnStartOperation: func(ctx context.Context, service, operation string, input *nexus.LazyValue, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[any], error) { - // Return async start immediately - return &nexus.HandlerStartOperationResultAsync{OperationToken: "test-op-token"}, nil - }, - } - listenAddr := nexustest.AllocListenAddress() - nexustest.NewNexusServer(s.T(), listenAddr, h) - - _, err := s.OperatorClient().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ - Spec: &nexuspb.EndpointSpec{ - Name: endpointName, - Target: &nexuspb.EndpointTarget{ - Variant: &nexuspb.EndpointTarget_External_{ - External: &nexuspb.EndpointTarget_External{ - Url: "http://" + listenAddr, - }, - }, - }, - }, - }) - s.NoError(err) - - run, err := s.SdkClient().ExecuteWorkflow(ctx, client.StartWorkflowOptions{ - TaskQueue: taskQueue, - }, "workflow") - s.NoError(err) - - // Schedule the operation with a short start-to-close timeout - pollResp, err := s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ - Namespace: s.Namespace().String(), - TaskQueue: &taskqueuepb.TaskQueue{ - Name: taskQueue, - Kind: enumspb.TASK_QUEUE_KIND_NORMAL, - }, - Identity: "test", - }) - s.NoError(err) - _, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ - Identity: "test", - TaskToken: pollResp.TaskToken, - Commands: []*commandpb.Command{ - { - CommandType: enumspb.COMMAND_TYPE_SCHEDULE_NEXUS_OPERATION, - Attributes: &commandpb.Command_ScheduleNexusOperationCommandAttributes{ - ScheduleNexusOperationCommandAttributes: &commandpb.ScheduleNexusOperationCommandAttributes{ - Endpoint: endpointName, - Service: "service", - Operation: "operation", - Input: s.mustToPayload("input"), - StartToCloseTimeout: durationpb.New(2 * time.Second), - }, - }, - }, - }, - }) - s.NoError(err) - - descResp, err := s.SdkClient().DescribeWorkflowExecution(ctx, run.GetID(), run.GetRunID()) - s.NoError(err) - s.Len(descResp.PendingNexusOperations, 1) - s.Equal(2*time.Second, descResp.PendingNexusOperations[0].StartToCloseTimeout.AsDuration()) - - // Wait for the started event first - pollResp, err = s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ - Namespace: s.Namespace().String(), - TaskQueue: &taskqueuepb.TaskQueue{ - Name: taskQueue, - Kind: enumspb.TASK_QUEUE_KIND_NORMAL, - }, - Identity: "test", - }) - s.NoError(err) - - // Verify we got a started event - startedEventIdx := slices.IndexFunc(pollResp.History.Events, func(e *historypb.HistoryEvent) bool { - return e.GetNexusOperationStartedEventAttributes() != nil - }) - s.Positive(startedEventIdx) - - // Respond to acknowledge the started event - _, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ - Identity: "test", - TaskToken: pollResp.TaskToken, - }) - s.NoError(err) - - // Now wait for the timeout event - pollResp, err = s.FrontendClient().PollWorkflowTaskQueue(ctx, &workflowservice.PollWorkflowTaskQueueRequest{ - Namespace: s.Namespace().String(), - TaskQueue: &taskqueuepb.TaskQueue{ - Name: taskQueue, - Kind: enumspb.TASK_QUEUE_KIND_NORMAL, - }, - Identity: "test", - }) - s.NoError(err) - - // Verify we got a timeout event with the correct timeout type - timedOutEventIdx := slices.IndexFunc(pollResp.History.Events, func(e *historypb.HistoryEvent) bool { - return e.GetNexusOperationTimedOutEventAttributes() != nil - }) - s.Positive(timedOutEventIdx) - timedOutEvent := pollResp.History.Events[timedOutEventIdx] - s.Equal(enumspb.TIMEOUT_TYPE_START_TO_CLOSE, - timedOutEvent.GetNexusOperationTimedOutEventAttributes().GetFailure().GetCause().GetTimeoutFailureInfo().GetTimeoutType()) - s.Contains(timedOutEvent.GetNexusOperationTimedOutEventAttributes().GetFailure().GetCause().GetMessage(), "operation timed out") - - // Complete the workflow - _, err = s.FrontendClient().RespondWorkflowTaskCompleted(ctx, &workflowservice.RespondWorkflowTaskCompletedRequest{ - Identity: "test", - TaskToken: pollResp.TaskToken, - Commands: []*commandpb.Command{ - { - CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, - Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ - CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}, - }, - }, - }, - }) - s.NoError(err) - s.NoError(run.Get(ctx, nil)) -} - // generateValidCallbackToken creates a valid callback token for testing with the given namespace, workflow, and run IDs func (s *NexusWorkflowTestSuite) generateValidCallbackToken(namespaceID, workflowID, runID string) (string, error) { gen := &commonnexus.CallbackTokenGenerator{} diff --git a/tests/xdc/nexus_state_replication_test.go b/tests/xdc/nexus_state_replication_test.go index 4d84b5cc7c..bf836e9b4c 100644 --- a/tests/xdc/nexus_state_replication_test.go +++ b/tests/xdc/nexus_state_replication_test.go @@ -121,7 +121,7 @@ func (s *NexusStateReplicationSuite) TestNexusOperationEventsReplicated() { ns := s.createGlobalNamespace() endpointName := testcore.RandomizedNexusEndpoint(s.T().Name()) - // Set URL template after httpAPIAddress is set, see commonnexus.RouteCompletionCallback. + // Set URL template after httpAPAddress is set, see commonnexus.RouteCompletionCallback. for _, cluster := range s.clusters { cluster.OverrideDynamicConfig( s.T(), @@ -274,7 +274,7 @@ func (s *NexusStateReplicationSuite) TestNexusOperationCancelationReplicated() { ns := s.createGlobalNamespace() endpointName := testcore.RandomizedNexusEndpoint(s.T().Name()) - // Set URL template after httpAPIAddress is set, see commonnexus.RouteCompletionCallback. + // Set URL template after httpAPAddress is set, see commonnexus.RouteCompletionCallback. // We don't actually want to deliver callbacks in this test, the config just has to be set for nexus task execution. for _, cluster := range s.clusters { cluster.OverrideDynamicConfig(