Skip to content

Commit 8d6acce

Browse files
committed
feat: add region configuration support for background worker tasks and queues
1 parent 1ec7722 commit 8d6acce

File tree

14 files changed

+121
-17
lines changed

14 files changed

+121
-17
lines changed

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.test.tasks.$taskParam/route.tsx

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ import { cn } from "~/utils/cn";
5959
import { docsPath, v3RunSpanPath, v3TaskParamsSchema, v3TestPath } from "~/utils/pathBuilder";
6060
import { TestTaskService } from "~/v3/services/testTask.server";
6161
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
62+
import { ServiceValidationError } from "~/v3/services/baseService.server";
6263
import { TestTaskData } from "~/v3/testTask";
6364
import { RunTagInput } from "~/components/runs/v3/RunTagInput";
6465
import { type loader as queuesLoader } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues";
@@ -237,6 +238,13 @@ export const action: ActionFunction = async ({ request, params }) => {
237238
);
238239
}
239240

241+
if (e instanceof ServiceValidationError) {
242+
return redirectBackWithErrorMessage(
243+
request,
244+
`Unable to start a test run: ${e.message}`
245+
);
246+
}
247+
240248
logger.error("Failed to start a test run", { error: e instanceof Error ? e.message : e });
241249

242250
return redirectBackWithErrorMessage(

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ export class DefaultQueueManager implements QueueManager {
5757
): Promise<QueueProperties> {
5858
let queueName: string;
5959
let lockedQueueId: string | undefined;
60+
let resolvedRegion: string | undefined;
6061

6162
// Determine queue name based on lockToVersion and provided options
6263
if (lockedBackgroundWorker) {
@@ -83,6 +84,17 @@ export class DefaultQueueManager implements QueueManager {
8384
// Use the validated queue name directly
8485
queueName = specifiedQueue.name;
8586
lockedQueueId = specifiedQueue.id;
87+
88+
// Get task-level region config for the locked worker
89+
const lockedTask = await this.prisma.backgroundWorkerTask.findFirst({
90+
where: {
91+
workerId: lockedBackgroundWorker.id,
92+
runtimeEnvironmentId: request.environment.id,
93+
slug: request.taskId,
94+
},
95+
select: { region: true },
96+
});
97+
resolvedRegion = lockedTask?.region ?? undefined;
8698
} else {
8799
// No specific queue name provided, use the default queue for the task on the locked worker
88100
const lockedTask = await this.prisma.backgroundWorkerTask.findFirst({
@@ -121,6 +133,7 @@ export class DefaultQueueManager implements QueueManager {
121133
// Use the task's default queue name
122134
queueName = lockedTask.queue.name;
123135
lockedQueueId = lockedTask.queue.id;
136+
resolvedRegion = lockedTask.region ?? undefined;
124137
}
125138
} else {
126139
// Task is not locked to a specific version, use regular logic
@@ -131,8 +144,10 @@ export class DefaultQueueManager implements QueueManager {
131144
);
132145
}
133146

134-
// Get queue name using the helper for non-locked case (handles provided name or finds default)
135-
queueName = await this.getQueueName(request);
147+
// Get queue name and task-level region using the helper for non-locked case
148+
const queueInfo = await this._getQueueInfo(request);
149+
queueName = queueInfo.queueName;
150+
resolvedRegion = queueInfo.region ?? undefined;
136151
}
137152

138153
// Sanitize the final determined queue name once
@@ -148,19 +163,21 @@ export class DefaultQueueManager implements QueueManager {
148163
return {
149164
queueName,
150165
lockedQueueId,
166+
resolvedRegion,
151167
};
152168
}
153169

154170
async getQueueName(request: TriggerTaskRequest): Promise<string> {
171+
const { queueName } = await this._getQueueInfo(request);
172+
return queueName;
173+
}
174+
175+
private async _getQueueInfo(
176+
request: TriggerTaskRequest
177+
): Promise<{ queueName: string; region?: string | null }> {
155178
const { taskId, environment, body } = request;
156179
const { queue } = body.options ?? {};
157180

158-
// Use extractQueueName to handle double-wrapped queue objects
159-
const queueName = extractQueueName(queue);
160-
if (queueName) {
161-
return queueName;
162-
}
163-
164181
const defaultQueueName = `task/${taskId}`;
165182

166183
// Find the current worker for the environment
@@ -172,7 +189,9 @@ export class DefaultQueueManager implements QueueManager {
172189
environmentId: environment.id,
173190
});
174191

175-
return defaultQueueName;
192+
// Use extractQueueName to handle double-wrapped queue objects
193+
const queueName = extractQueueName(queue);
194+
return { queueName: queueName ?? defaultQueueName };
176195
}
177196

178197
const task = await this.prisma.backgroundWorkerTask.findFirst({
@@ -192,7 +211,14 @@ export class DefaultQueueManager implements QueueManager {
192211
environmentId: environment.id,
193212
});
194213

195-
return defaultQueueName;
214+
const queueName = extractQueueName(queue);
215+
return { queueName: queueName ?? defaultQueueName };
216+
}
217+
218+
// Use specified queue name if provided, otherwise fall back to task's queue
219+
const specifiedQueueName = extractQueueName(queue);
220+
if (specifiedQueueName) {
221+
return { queueName: specifiedQueueName, region: task.region };
196222
}
197223

198224
if (!task.queue) {
@@ -202,10 +228,13 @@ export class DefaultQueueManager implements QueueManager {
202228
queueConfig: task.queueConfig,
203229
});
204230

205-
return defaultQueueName;
231+
return { queueName: defaultQueueName, region: task.region };
206232
}
207233

208-
return task.queue.name ?? defaultQueueName;
234+
return {
235+
queueName: task.queue.name ?? defaultQueueName,
236+
region: task.region,
237+
};
209238
}
210239

211240
async validateQueueLimits(

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -268,10 +268,11 @@ export class RunEngineTriggerTaskService {
268268
})
269269
: undefined;
270270

271-
const { queueName, lockedQueueId } = await this.queueConcern.resolveQueueProperties(
272-
triggerRequest,
273-
lockedToBackgroundWorker ?? undefined
274-
);
271+
const { queueName, lockedQueueId, resolvedRegion } =
272+
await this.queueConcern.resolveQueueProperties(
273+
triggerRequest,
274+
lockedToBackgroundWorker ?? undefined
275+
);
275276

276277
//upsert tags
277278
const tags = await createTags(
@@ -284,7 +285,10 @@ export class RunEngineTriggerTaskService {
284285

285286
const depth = parentRun ? parentRun.depth + 1 : 0;
286287

287-
const workerQueue = await this.queueConcern.getWorkerQueue(environment, body.options?.region);
288+
// Trigger-time region overrides task-level region, which overrides project default
289+
const effectiveRegion = body.options?.region ?? resolvedRegion;
290+
291+
const workerQueue = await this.queueConcern.getWorkerQueue(environment, effectiveRegion);
288292

289293
try {
290294
return await this.traceEventConcern.traceRun(

apps/webapp/app/runEngine/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ export type QueueValidationResult =
4848
export type QueueProperties = {
4949
queueName: string;
5050
lockedQueueId?: string;
51+
resolvedRegion?: string;
5152
};
5253

5354
export type LockedBackgroundWorker = Pick<

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,7 @@ async function createWorkerTask(
277277
retryConfig: task.retry,
278278
queueConfig: task.queue,
279279
machineConfig: task.machine,
280+
region: task.region,
280281
triggerSource: task.triggerSource === "schedule" ? "SCHEDULED" : "STANDARD",
281282
fileId: tasksToBackgroundFiles?.get(task.id) ?? null,
282283
maxDurationInSeconds: task.maxDuration ? clampMaxDuration(task.maxDuration) : null,
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "BackgroundWorkerTask" ADD COLUMN "region" TEXT;

internal-packages/database/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,7 @@ model BackgroundWorkerTask {
559559
queueConfig Json?
560560
retryConfig Json?
561561
machineConfig Json?
562+
regionConfig String?
562563
563564
queueId String?
564565
queue TaskQueue? @relation(fields: [queueId], references: [id], onDelete: SetNull, onUpdate: Cascade)

packages/cli-v3/src/entryPoints/dev-index-worker.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,20 @@ if (typeof config.machine === "string") {
147147
});
148148
}
149149

150+
// If the config has a region, we need to apply it to all tasks that don't have a region
151+
if (typeof config.region === "string") {
152+
tasks = tasks.map((task) => {
153+
if (typeof task.region !== "string") {
154+
return {
155+
...task,
156+
region: config.region,
157+
} satisfies TaskManifest;
158+
}
159+
160+
return task;
161+
});
162+
}
163+
150164
await sendMessageInCatalog(
151165
indexerToWorkerMessages,
152166
"INDEX_COMPLETE",

packages/cli-v3/src/entryPoints/managed-index-worker.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,20 @@ if (typeof config.machine === "string") {
147147
});
148148
}
149149

150+
// If the config has a region, we need to apply it to all tasks that don't have a region
151+
if (typeof config.region === "string") {
152+
tasks = tasks.map((task) => {
153+
if (typeof task.region !== "string") {
154+
return {
155+
...task,
156+
region: config.region,
157+
} satisfies TaskManifest;
158+
}
159+
160+
return task;
161+
});
162+
}
163+
150164
const processKeepAlive = config.processKeepAlive ?? config.experimental_processKeepAlive;
151165

152166
await sendMessageInCatalog(

packages/core/src/v3/config.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,13 @@ export type TriggerConfig = {
141141
*/
142142
machine?: MachinePresetName;
143143

144+
/**
145+
* The default region to run your tasks in. You can override this on a per-task basis or at trigger time.
146+
*
147+
* Check the Regions page in the dashboard for regions that are available to you.
148+
*/
149+
region?: string;
150+
144151
/**
145152
* Set the log level for the logger. Defaults to "info", so you will see "log", "info", "warn", and "error" messages, but not "debug" messages.
146153
*

0 commit comments

Comments
 (0)