From 706ceb93e35d5cd8fd6940d2d433802a49f58bca Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 2 Jul 2025 11:24:52 +0100 Subject: [PATCH] Fixes the jitter option for the scanConcurrencySets cron job --- internal-packages/run-engine/src/run-queue/index.ts | 2 +- .../src/run-queue/tests/concurrencySweeper.test.ts | 7 ++++++- packages/redis-worker/src/worker.ts | 7 ++++--- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 0b6d5c1d1dd..3fc4d0883cc 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -212,7 +212,7 @@ export class RunQueue { scanConcurrencySets: { ...workerCatalog.scanConcurrencySets, cron: options.concurrencySweeper?.scanSchedule ?? workerCatalog.scanConcurrencySets.cron, - jitter: + jitterInMs: options.concurrencySweeper?.scanJitterInMs ?? workerCatalog.scanConcurrencySets.jitterInMs, }, diff --git a/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts b/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts index 8a1399911f6..342cba674a3 100644 --- a/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts @@ -13,7 +13,7 @@ const testOptions = { tracer: trace.getTracer("rq"), workers: 1, defaultEnvConcurrency: 25, - logger: new Logger("RunQueue", "warn"), + logger: new Logger("RunQueue", "debug"), retryOptions: { maxAttempts: 5, factor: 1.1, @@ -59,6 +59,7 @@ describe("RunQueue Concurrency Sweeper", () => { const queue = new RunQueue({ ...testOptions, + logLevel: "debug", queueSelectionStrategy: new FairQueueSelectionStrategy({ redis: { keyPrefix: "runqueue:test:", @@ -67,6 +68,10 @@ describe("RunQueue Concurrency Sweeper", () => { }, keys: testOptions.keys, }), + workerOptions: { + pollIntervalMs: 100, + immediatePollIntervalMs: 100, + }, redis: { keyPrefix: "runqueue:test:", host: redisContainer.getHost(), diff --git a/packages/redis-worker/src/worker.ts b/packages/redis-worker/src/worker.ts index 3542f305fe6..96c41ed9006 100644 --- a/packages/redis-worker/src/worker.ts +++ b/packages/redis-worker/src/worker.ts @@ -693,9 +693,8 @@ class Worker { const scheduledAt = this.calculateNextScheduledAt(cron, lastTimestamp); const identifier = [job, this.timestampIdentifier(scheduledAt)].join(":"); // Calculate the availableAt date by calculating a random number between -jitter/2 and jitter/2 and adding it to the scheduledAt - const availableAt = jitter - ? new Date(scheduledAt.getTime() + Math.random() * jitter - jitter / 2) - : scheduledAt; + const appliedJitter = typeof jitter === "number" ? Math.random() * jitter - jitter / 2 : 0; + const availableAt = new Date(scheduledAt.getTime() + appliedJitter); const enqueued = await this.enqueueOnce({ id: identifier, @@ -715,6 +714,8 @@ class Worker { scheduledAt, enqueued, availableAt, + appliedJitter, + jitter, }); return {