Skip to content

Commit 471bf19

Browse files
committed
Adopt the pre-failed run approach in the legacy run engine batch trigger service
1 parent 7c143dd commit 471bf19

File tree

4 files changed

+198
-404
lines changed

4 files changed

+198
-404
lines changed

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 0 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -221,111 +221,6 @@ export class DefaultQueueManager implements QueueManager {
221221
return task.queue.name ?? defaultQueueName;
222222
}
223223

224-
/**
225-
* Resolves queue names for batch items and groups them by queue.
226-
* Returns a map of queue name -> count of items going to that queue.
227-
*/
228-
async resolveQueueNamesForBatchItems(
229-
environment: AuthenticatedEnvironment,
230-
items: Array<{ task: string; options?: { queue?: { name?: string } } }>
231-
): Promise<Map<string, number>> {
232-
const queueCounts = new Map<string, number>();
233-
234-
// Separate items with explicit queues from those needing lookup
235-
const itemsNeedingLookup: Array<{ task: string; count: number }> = [];
236-
const taskCounts = new Map<string, number>();
237-
238-
for (const item of items) {
239-
const explicitQueueName = extractQueueName(item.options?.queue);
240-
241-
if (explicitQueueName) {
242-
// Item has explicit queue - count it directly
243-
const sanitized = sanitizeQueueName(explicitQueueName) || `task/${item.task}`;
244-
queueCounts.set(sanitized, (queueCounts.get(sanitized) ?? 0) + 1);
245-
} else {
246-
// Need to look up default queue for this task - group by task
247-
taskCounts.set(item.task, (taskCounts.get(item.task) ?? 0) + 1);
248-
}
249-
}
250-
251-
// Batch lookup default queues for all unique tasks
252-
if (taskCounts.size > 0) {
253-
const worker = await findCurrentWorkerFromEnvironment(environment, this.prisma);
254-
const taskSlugs = Array.from(taskCounts.keys());
255-
256-
// Map task slug -> queue name
257-
const taskQueueMap = new Map<string, string>();
258-
259-
if (worker) {
260-
// Single query to get all tasks with their queues
261-
const tasks = await this.prisma.backgroundWorkerTask.findMany({
262-
where: {
263-
workerId: worker.id,
264-
runtimeEnvironmentId: environment.id,
265-
slug: { in: taskSlugs },
266-
},
267-
include: {
268-
queue: true,
269-
},
270-
});
271-
272-
for (const task of tasks) {
273-
const queueName = task.queue?.name ?? `task/${task.slug}`;
274-
taskQueueMap.set(task.slug, sanitizeQueueName(queueName) || `task/${task.slug}`);
275-
}
276-
}
277-
278-
// Count items per queue
279-
for (const [taskSlug, count] of taskCounts) {
280-
const queueName = taskQueueMap.get(taskSlug) ?? `task/${taskSlug}`;
281-
queueCounts.set(queueName, (queueCounts.get(queueName) ?? 0) + count);
282-
}
283-
}
284-
285-
return queueCounts;
286-
}
287-
288-
/**
289-
* Validates queue limits for multiple queues at once.
290-
* Returns the first queue that exceeds limits, or null if all are within limits.
291-
*/
292-
async validateMultipleQueueLimits(
293-
environment: AuthenticatedEnvironment,
294-
queueCounts: Map<string, number>
295-
): Promise<{ ok: true } | { ok: false; queueName: string; maximumSize: number; queueSize: number }> {
296-
const maximumSize = getMaximumSizeForEnvironment(environment);
297-
298-
logger.debug("validateMultipleQueueLimits", {
299-
environmentId: environment.id,
300-
environmentType: environment.type,
301-
organizationId: environment.organization.id,
302-
maximumDevQueueSize: environment.organization.maximumDevQueueSize,
303-
maximumDeployedQueueSize: environment.organization.maximumDeployedQueueSize,
304-
resolvedMaximumSize: maximumSize,
305-
queueCounts: Object.fromEntries(queueCounts),
306-
});
307-
308-
if (typeof maximumSize === "undefined") {
309-
return { ok: true };
310-
}
311-
312-
for (const [queueName, itemCount] of queueCounts) {
313-
const queueSize = await getCachedQueueSize(this.engine, environment, queueName);
314-
const projectedSize = queueSize + itemCount;
315-
316-
if (projectedSize > maximumSize) {
317-
return {
318-
ok: false,
319-
queueName,
320-
maximumSize,
321-
queueSize,
322-
};
323-
}
324-
}
325-
326-
return { ok: true };
327-
}
328-
329224
async validateQueueLimits(
330225
environment: AuthenticatedEnvironment,
331226
queueName: string,

0 commit comments

Comments
 (0)