diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 91a7370b791..30410053c82 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -460,7 +460,6 @@ const EnvironmentSchema = z.object({ RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0), RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(), RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), - RUN_ENGINE_MAX_DEQUEUE_LOOP_ATTEMPTS: z.coerce.number().int().default(10), RUN_ENGINE_WORKER_REDIS_HOST: z .string() diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index 4788da84630..ad9e1c9aeb9 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -58,7 +58,6 @@ function createRunEngine() { maximumEnvCount: env.RUN_ENGINE_MAXIMUM_ENV_COUNT, tracer, }, - maxDequeueLoopAttempts: env.RUN_ENGINE_MAX_DEQUEUE_LOOP_ATTEMPTS, }, runLock: { redis: { diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 2bb9ece95ea..d21b1d6795e 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -113,7 +113,6 @@ export class RunEngine { logger: new Logger("RunQueue", "debug"), redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` }, retryOptions: options.queue?.retryOptions, - maxDequeueLoopAttempts: options.queue?.maxDequeueLoopAttempts ?? 10, }); this.worker = new Worker({ diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 54d6b668c93..a89a7d1fe33 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -35,7 +35,6 @@ export type RunEngineOptions = { FairQueueSelectionStrategyOptions, "parentQueueLimit" | "tracer" | "biases" | "reuseSnapshotCount" | "maximumEnvCount" >; - maxDequeueLoopAttempts?: number; }; runLock: { redis: RedisOptions; diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index f2677c64902..c34247f11cb 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -30,6 +30,7 @@ import { type Result, } from "@internal/redis"; import { MessageNotFoundError } from "./errors.js"; +import { tryCatch } from "@trigger.dev/core"; const SemanticAttributes = { QUEUE: "runqueue.queue", @@ -51,7 +52,6 @@ export type RunQueueOptions = { verbose?: boolean; logger?: Logger; retryOptions?: RetryOptions; - maxDequeueLoopAttempts?: number; }; type DequeuedMessage = { @@ -78,7 +78,6 @@ export class RunQueue { private redis: Redis; public keys: RunQueueKeyProducer; private queueSelectionStrategy: RunQueueSelectionStrategy; - private maxDequeueLoopAttempts: number; constructor(private readonly options: RunQueueOptions) { this.retryOptions = options.retryOptions ?? defaultRetrySettings; @@ -94,7 +93,6 @@ export class RunQueue { this.keys = options.keys; this.queueSelectionStrategy = options.queueSelectionStrategy; - this.maxDequeueLoopAttempts = options.maxDequeueLoopAttempts ?? 10; this.subscriber = createRedisClient(options.redis, { onError: (error) => { @@ -396,55 +394,45 @@ export class RunQueue { let attemptedEnvs = 0; let attemptedQueues = 0; - let dequeueLoopAttempts = 0; const messages: DequeuedMessage[] = []; - // Each env starts with its list of candidate queues - const tenantQueues: Record = {}; - - // Initialize tenantQueues with the queues for each env for (const env of envQueues) { - tenantQueues[env.envId] = [...env.queues]; // Create a copy of the queues array - } - - // Continue until we've hit max count or all tenants have empty queue lists - while ( - messages.length < maxCount && - Object.values(tenantQueues).some((queues) => queues.length > 0) && - dequeueLoopAttempts < this.maxDequeueLoopAttempts - ) { - dequeueLoopAttempts++; + attemptedEnvs++; - for (const env of envQueues) { - attemptedEnvs++; - - // Skip if this tenant has no more queues - if (tenantQueues[env.envId].length === 0) { - continue; - } - - // Pop the next queue (using round-robin order) - const queue = tenantQueues[env.envId].shift()!; + for (const queue of env.queues) { attemptedQueues++; // Attempt to dequeue from this queue - const message = await this.#callDequeueMessage({ - messageQueue: queue, - }); + const [error, message] = await tryCatch( + this.#callDequeueMessage({ + messageQueue: queue, + }) + ); + + if (error) { + this.logger.error( + `[dequeueMessageInSharedQueue][${this.name}] Failed to dequeue from queue ${queue}`, + { + error, + } + ); + } if (message) { messages.push(message); - // Re-add this queue at the end, since it might have more messages - tenantQueues[env.envId].push(queue); } - // If message is null, do not re-add the queue in this cycle - // If we've reached maxCount, break out of the loop + // If we've reached maxCount, we don't want to look at this env anymore if (messages.length >= maxCount) { break; } } + + // If we've reached maxCount, we're completely done + if (messages.length >= maxCount) { + break; + } } span.setAttributes({ diff --git a/references/hello-world/src/trigger/deadlocks.ts b/references/hello-world/src/trigger/deadlocks.ts index beb838e5b4b..8bf35de738c 100644 --- a/references/hello-world/src/trigger/deadlocks.ts +++ b/references/hello-world/src/trigger/deadlocks.ts @@ -14,22 +14,9 @@ export const deadlockReleasingQueue = queue({ export const deadlockTester = task({ id: "deadlock-tester", run: async (payload: any, { ctx }) => { - // await deadlockNestedTask.triggerAndWait({ - // message: "Hello, world!", - // }); - - await deadlockNestedTask.batchTriggerAndWait([ - { - payload: { - message: "Hello, world!", - }, - }, - { - payload: { - message: "Hello, world!", - }, - }, - ]); + await deadlockNestedTask.triggerAndWait({ + message: "Hello, world!", + }); }, }); @@ -37,22 +24,9 @@ export const deadlockNestedTask = task({ id: "deadlock-nested-task", queue: deadlockQueue, run: async (payload: any, { ctx }) => { - // await deadlockTester.triggerAndWait({ - // message: "Hello, world!", - // }); - - await deadlockTester.batchTriggerAndWait([ - { - payload: { - message: "Hello, world!", - }, - }, - { - payload: { - message: "Hello, world!", - }, - }, - ]); + await deadlockTester.triggerAndWait({ + message: "Hello, world!", + }); return { message: "Hello, world!",