diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx index a87cc4530d2..c4b98eb0245 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx @@ -59,6 +59,7 @@ import { cn } from "~/utils/cn"; import { docsPath, v3RunSpanPath, v3TaskParamsSchema, v3TestPath } from "~/utils/pathBuilder"; import { TestTaskService } from "~/v3/services/testTask.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; +import { ServiceValidationError } from "~/v3/services/baseService.server"; import { TestTaskData } from "~/v3/testTask"; import { RunTagInput } from "~/components/runs/v3/RunTagInput"; import { type loader as queuesLoader } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues"; @@ -237,6 +238,13 @@ export const action: ActionFunction = async ({ request, params }) => { ); } + if (e instanceof ServiceValidationError) { + return redirectBackWithErrorMessage( + request, + `Unable to start a test run: ${e.message}` + ); + } + logger.error("Failed to start a test run", { error: e instanceof Error ? e.message : e }); return redirectBackWithErrorMessage( diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts index 0980dc2a75d..1b5a1d2d8ce 100644 --- a/apps/webapp/app/runEngine/concerns/queues.server.ts +++ b/apps/webapp/app/runEngine/concerns/queues.server.ts @@ -57,6 +57,7 @@ export class DefaultQueueManager implements QueueManager { ): Promise { let queueName: string; let lockedQueueId: string | undefined; + let resolvedRegion: string | undefined; // Determine queue name based on lockToVersion and provided options if (lockedBackgroundWorker) { @@ -83,6 +84,17 @@ export class DefaultQueueManager implements QueueManager { // Use the validated queue name directly queueName = specifiedQueue.name; lockedQueueId = specifiedQueue.id; + + // Get task-level region config for the locked worker + const lockedTask = await this.prisma.backgroundWorkerTask.findFirst({ + where: { + workerId: lockedBackgroundWorker.id, + runtimeEnvironmentId: request.environment.id, + slug: request.taskId, + }, + select: { region: true }, + }); + resolvedRegion = lockedTask?.region ?? undefined; } else { // No specific queue name provided, use the default queue for the task on the locked worker const lockedTask = await this.prisma.backgroundWorkerTask.findFirst({ @@ -121,6 +133,7 @@ export class DefaultQueueManager implements QueueManager { // Use the task's default queue name queueName = lockedTask.queue.name; lockedQueueId = lockedTask.queue.id; + resolvedRegion = lockedTask.region ?? undefined; } } else { // Task is not locked to a specific version, use regular logic @@ -131,8 +144,10 @@ export class DefaultQueueManager implements QueueManager { ); } - // Get queue name using the helper for non-locked case (handles provided name or finds default) - queueName = await this.getQueueName(request); + // Get queue name and task-level region using the helper for non-locked case + const queueInfo = await this._getQueueInfo(request); + queueName = queueInfo.queueName; + resolvedRegion = queueInfo.region ?? undefined; } // Sanitize the final determined queue name once @@ -148,19 +163,21 @@ export class DefaultQueueManager implements QueueManager { return { queueName, lockedQueueId, + resolvedRegion, }; } async getQueueName(request: TriggerTaskRequest): Promise { + const { queueName } = await this._getQueueInfo(request); + return queueName; + } + + private async _getQueueInfo( + request: TriggerTaskRequest + ): Promise<{ queueName: string; region?: string | null }> { const { taskId, environment, body } = request; const { queue } = body.options ?? {}; - // Use extractQueueName to handle double-wrapped queue objects - const queueName = extractQueueName(queue); - if (queueName) { - return queueName; - } - const defaultQueueName = `task/${taskId}`; // Find the current worker for the environment @@ -172,7 +189,9 @@ export class DefaultQueueManager implements QueueManager { environmentId: environment.id, }); - return defaultQueueName; + // Use extractQueueName to handle double-wrapped queue objects + const queueName = extractQueueName(queue); + return { queueName: queueName ?? defaultQueueName }; } const task = await this.prisma.backgroundWorkerTask.findFirst({ @@ -192,7 +211,14 @@ export class DefaultQueueManager implements QueueManager { environmentId: environment.id, }); - return defaultQueueName; + const queueName = extractQueueName(queue); + return { queueName: queueName ?? defaultQueueName }; + } + + // Use specified queue name if provided, otherwise fall back to task's queue + const specifiedQueueName = extractQueueName(queue); + if (specifiedQueueName) { + return { queueName: specifiedQueueName, region: task.region }; } if (!task.queue) { @@ -202,10 +228,13 @@ export class DefaultQueueManager implements QueueManager { queueConfig: task.queueConfig, }); - return defaultQueueName; + return { queueName: defaultQueueName, region: task.region }; } - return task.queue.name ?? defaultQueueName; + return { + queueName: task.queue.name ?? defaultQueueName, + region: task.region, + }; } async validateQueueLimits( diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 73b4febcc92..3243e06bc07 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -268,10 +268,11 @@ export class RunEngineTriggerTaskService { }) : undefined; - const { queueName, lockedQueueId } = await this.queueConcern.resolveQueueProperties( - triggerRequest, - lockedToBackgroundWorker ?? undefined - ); + const { queueName, lockedQueueId, resolvedRegion } = + await this.queueConcern.resolveQueueProperties( + triggerRequest, + lockedToBackgroundWorker ?? undefined + ); //upsert tags const tags = await createTags( @@ -284,7 +285,10 @@ export class RunEngineTriggerTaskService { const depth = parentRun ? parentRun.depth + 1 : 0; - const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region); + // Trigger-time region overrides task-level region, which overrides project default + const effectiveRegion = body.options?.region ?? resolvedRegion; + + const workerQueue = await this.queueConcern.getWorkerQueue(environment, effectiveRegion); try { return await this.traceEventConcern.traceRun( diff --git a/apps/webapp/app/runEngine/types.ts b/apps/webapp/app/runEngine/types.ts index 7186d81ff9b..c0ffae309d8 100644 --- a/apps/webapp/app/runEngine/types.ts +++ b/apps/webapp/app/runEngine/types.ts @@ -48,6 +48,7 @@ export type QueueValidationResult = export type QueueProperties = { queueName: string; lockedQueueId?: string; + resolvedRegion?: string; }; export type LockedBackgroundWorker = Pick< diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index 2938164b74b..bbe19ac84ce 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -277,6 +277,7 @@ async function createWorkerTask( retryConfig: task.retry, queueConfig: task.queue, machineConfig: task.machine, + region: task.region, triggerSource: task.triggerSource === "schedule" ? "SCHEDULED" : "STANDARD", fileId: tasksToBackgroundFiles?.get(task.id) ?? null, maxDurationInSeconds: task.maxDuration ? clampMaxDuration(task.maxDuration) : null, diff --git a/internal-packages/database/prisma/migrations/20260217120000_add_region_config_to_background_worker_task/migration.sql b/internal-packages/database/prisma/migrations/20260217120000_add_region_config_to_background_worker_task/migration.sql new file mode 100644 index 00000000000..462e8299af3 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20260217120000_add_region_config_to_background_worker_task/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "BackgroundWorkerTask" ADD COLUMN "region" TEXT; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index e28b951f05d..259f904790c 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -559,6 +559,7 @@ model BackgroundWorkerTask { queueConfig Json? retryConfig Json? machineConfig Json? + regionConfig String? queueId String? queue TaskQueue? @relation(fields: [queueId], references: [id], onDelete: SetNull, onUpdate: Cascade) diff --git a/packages/cli-v3/src/entryPoints/dev-index-worker.ts b/packages/cli-v3/src/entryPoints/dev-index-worker.ts index da5c6ee7508..e5c32a86af6 100644 --- a/packages/cli-v3/src/entryPoints/dev-index-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-index-worker.ts @@ -147,6 +147,20 @@ if (typeof config.machine === "string") { }); } +// If the config has a region, we need to apply it to all tasks that don't have a region +if (typeof config.region === "string") { + tasks = tasks.map((task) => { + if (typeof task.region !== "string") { + return { + ...task, + region: config.region, + } satisfies TaskManifest; + } + + return task; + }); +} + await sendMessageInCatalog( indexerToWorkerMessages, "INDEX_COMPLETE", diff --git a/packages/cli-v3/src/entryPoints/managed-index-worker.ts b/packages/cli-v3/src/entryPoints/managed-index-worker.ts index 5ff9f1b62ed..5575b08f3e7 100644 --- a/packages/cli-v3/src/entryPoints/managed-index-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-index-worker.ts @@ -147,6 +147,20 @@ if (typeof config.machine === "string") { }); } +// If the config has a region, we need to apply it to all tasks that don't have a region +if (typeof config.region === "string") { + tasks = tasks.map((task) => { + if (typeof task.region !== "string") { + return { + ...task, + region: config.region, + } satisfies TaskManifest; + } + + return task; + }); +} + const processKeepAlive = config.processKeepAlive ?? config.experimental_processKeepAlive; await sendMessageInCatalog( diff --git a/packages/core/src/v3/config.ts b/packages/core/src/v3/config.ts index 9c6871a2646..2ab60605ef3 100644 --- a/packages/core/src/v3/config.ts +++ b/packages/core/src/v3/config.ts @@ -141,6 +141,13 @@ export type TriggerConfig = { */ machine?: MachinePresetName; + /** + * The default region to run your tasks in. You can override this on a per-task basis or at trigger time. + * + * Check the Regions page in the dashboard for regions that are available to you. + */ + region?: string; + /** * Set the log level for the logger. Defaults to "info", so you will see "log", "info", "warn", and "error" messages, but not "debug" messages. * diff --git a/packages/core/src/v3/schemas/resources.ts b/packages/core/src/v3/schemas/resources.ts index 08764906ede..fff7d0bc701 100644 --- a/packages/core/src/v3/schemas/resources.ts +++ b/packages/core/src/v3/schemas/resources.ts @@ -10,6 +10,7 @@ export const TaskResource = z.object({ queue: QueueManifest.extend({ name: z.string().optional() }).optional(), retry: RetryOptions.optional(), machine: MachineConfig.optional(), + region: z.string().optional(), triggerSource: z.string().optional(), schedule: ScheduleMetadata.optional(), maxDuration: z.number().optional(), diff --git a/packages/core/src/v3/schemas/schemas.ts b/packages/core/src/v3/schemas/schemas.ts index 233068c0b7b..d6e7b2448ae 100644 --- a/packages/core/src/v3/schemas/schemas.ts +++ b/packages/core/src/v3/schemas/schemas.ts @@ -186,6 +186,7 @@ const taskMetadata = { queue: QueueManifest.extend({ name: z.string().optional() }).optional(), retry: RetryOptions.optional(), machine: MachineConfig.optional(), + region: z.string().optional(), triggerSource: z.string().optional(), schedule: ScheduleMetadata.optional(), maxDuration: z.number().optional(), diff --git a/packages/core/src/v3/types/tasks.ts b/packages/core/src/v3/types/tasks.ts index 3b8b2e9ecdd..67d013efd29 100644 --- a/packages/core/src/v3/types/tasks.ts +++ b/packages/core/src/v3/types/tasks.ts @@ -277,6 +277,25 @@ type CommonTaskOptions< */ maxDuration?: number; + /** + * The default region to run this task in. This can be overridden at trigger time via `options.region`. + * + * Check the Regions page in the dashboard for regions that are available to you. + * + * @example + * + * ```ts + * export const heavyTask = task({ + * id: "heavy-task", + * region: "us-east-1", + * run: async ({ payload, ctx }) => { + * // ... + * }, + * }); + * ``` + */ + region?: string; + /** This gets called when a task is triggered. It's where you put the code you want to execute. * * @param payload - The payload that is passed to your task when it's triggered. This must be JSON serializable. diff --git a/packages/trigger-sdk/src/v3/shared.ts b/packages/trigger-sdk/src/v3/shared.ts index 7b7fa1b9797..304651508b0 100644 --- a/packages/trigger-sdk/src/v3/shared.ts +++ b/packages/trigger-sdk/src/v3/shared.ts @@ -235,6 +235,7 @@ export function createTask< queue: params.queue, retry: params.retry ? { ...defaultRetryOptions, ...params.retry } : undefined, machine: typeof params.machine === "string" ? { preset: params.machine } : params.machine, + region: params.region, maxDuration: params.maxDuration, payloadSchema: params.jsonSchema, fns: { @@ -366,6 +367,7 @@ export function createSchemaTask< queue: params.queue, retry: params.retry ? { ...defaultRetryOptions, ...params.retry } : undefined, machine: typeof params.machine === "string" ? { preset: params.machine } : params.machine, + region: params.region, maxDuration: params.maxDuration, fns: { run: params.run,