-
Notifications
You must be signed in to change notification settings - Fork 8
Open
Description
Having a timeout while waiting for an event prevents workflows from locking up if some external event never happens, and also keeps the logic for tracking whether that happened within the workflow itself (instead of some external timeout tracking).
The current workaround is using this wrapper, which will schedule a mutation for the timeout to send a null body to the event (note this cannot Promise.all with anything else):
export async function awaitEventTimeout<T = unknown, Name extends string = string>(
ctx: WorkflowCtx,
event: ({ name: Name; id?: EventId<Name> } | { name?: Name; id: EventId<Name> }) & {
validator?: Validator<T, any, any>
},
timeoutMs: number
): Promise<T | null> {
// Schedule a mutation to send a blank event after timeout (will be ignored if real event arrives first)
// When id is not provided, we must have name (per the event union type)
const blankEventArgs = "id" in event && event.id !== undefined ? { eventID: event.id } : { workflowID: ctx.workflowId, name: event.name as string }
// Explicitly don't await so it runs concurrent to awaitEvent
ctx.runMutation(internal.workflow_utils.sendBlankEvent, blankEventArgs, {
runAfter: timeoutMs,
})
const result = await ctx.awaitEvent<T | null>({
...event,
validator: event.validator ? v.union(event.validator, v.null()) : undefined,
})
return result
}
export const sendBlankEvent = internalMutation({
args: {
// Either provide eventID alone, or workflowID + name together
eventID: v.optional(vEventId<string>()),
workflowID: v.optional(vWorkflowId),
name: v.optional(v.string()),
},
handler: async (ctx, args) => {
if (args.eventID !== undefined) {
await workflow.sendEvent(ctx, {
id: args.eventID,
value: null as unknown,
})
} else if (args.workflowID !== undefined && args.name !== undefined) {
await workflow.sendEvent(ctx, {
workflowId: args.workflowID,
name: args.name,
value: null as unknown,
})
} else {
throw new Error("sendBlankEvent requires either eventID, or both workflowID and name")
}
},
})Metadata
Metadata
Assignees
Labels
No labels