From 3c753ec5af3fc1bd914970f9c180bb09dd1f2e23 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Tue, 22 Apr 2025 13:36:21 +0100 Subject: [PATCH 1/5] Added some Redis worker debounce tests (one failing that reproduces a bug) --- packages/redis-worker/src/worker.test.ts | 188 +++++++++++++++++++++++ 1 file changed, 188 insertions(+) diff --git a/packages/redis-worker/src/worker.test.ts b/packages/redis-worker/src/worker.test.ts index 1768f391076..1a24a5f881e 100644 --- a/packages/redis-worker/src/worker.test.ts +++ b/packages/redis-worker/src/worker.test.ts @@ -250,4 +250,192 @@ describe("Worker", () => { await redisClient.quit(); } ); + + redisTest( + "Should process a job with the same ID only once when rescheduled", + { timeout: 30_000 }, + async ({ redisContainer }) => { + const processedPayloads: string[] = []; + + const worker = new Worker({ + name: "test-worker", + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + catalog: { + testJob: { + schema: z.object({ value: z.string() }), + visibilityTimeoutMs: 5000, + retry: { maxAttempts: 3 }, + }, + }, + jobs: { + testJob: async ({ payload }) => { + await new Promise((resolve) => setTimeout(resolve, 30)); // Simulate work + processedPayloads.push(payload.value); + }, + }, + concurrency: { + workers: 1, + tasksPerWorker: 1, + }, + pollIntervalMs: 10, // Ensure quick polling to detect the scheduled item + logger: new Logger("test", "log"), + }).start(); + + // Unique ID to use for both enqueues + const testJobId = "duplicate-job-id"; + + // Enqueue the first item immediately + await worker.enqueue({ + id: testJobId, + job: "testJob", + payload: { value: "first-attempt" }, + availableAt: new Date(Date.now() + 50), + }); + + // Enqueue another item with the same ID but scheduled 50ms in the future + await worker.enqueue({ + id: testJobId, + job: "testJob", + payload: { value: "second-attempt" }, + availableAt: new Date(Date.now() + 50), + }); + + // Wait enough time for both jobs to be processed if they were going to be + await new Promise((resolve) => setTimeout(resolve, 300)); + + // Verify that only one job was processed (the second one should have replaced the first) + expect(processedPayloads.length).toBe(1); + + // Verify that the second job's payload was the one processed + expect(processedPayloads[0]).toBe("second-attempt"); + + await worker.stop(); + } + ); + + redisTest( + "Should process second job with same ID when enqueued during first job execution with future availableAt", + { timeout: 30_000 }, + async ({ redisContainer }) => { + const processedPayloads: string[] = []; + const jobStarted: string[] = []; + let firstJobCompleted = false; + const events: string[] = []; + + const worker = new Worker({ + name: "test-worker", + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + catalog: { + testJob: { + schema: z.object({ value: z.string() }), + visibilityTimeoutMs: 5000, + retry: { maxAttempts: 3 }, + }, + }, + jobs: { + testJob: async ({ payload }) => { + // Record when the job starts processing + jobStarted.push(payload.value); + events.push(`Job started: ${payload.value}`); + + if (payload.value === "first-attempt") { + // First job takes a long time to process + await new Promise((resolve) => setTimeout(resolve, 1_000)); + firstJobCompleted = true; + } + + // Record when the job completes + processedPayloads.push(payload.value); + events.push(`Job completed: ${payload.value}`); + }, + }, + concurrency: { + workers: 1, + tasksPerWorker: 1, + }, + pollIntervalMs: 10, + logger: new Logger("test", "log"), + }).start(); + + const testJobId = "long-running-job-id"; + + // Queue the first job + await worker.enqueue({ + id: testJobId, + job: "testJob", + payload: { value: "first-attempt" }, + }); + events.push("First job enqueued"); + + // Verify initial queue size + const size1 = await worker.queue.size({ includeFuture: true }); + events.push(`Queue size after first enqueue: ${size1}`); + expect(size1).toBe(1); + + // Wait until we know the first job has started processing + while (jobStarted.length === 0) { + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + // Now that first job is running, queue second job with same ID + // Set availableAt to be 1.5 seconds in the future (after first job completes) + await worker.enqueue({ + id: testJobId, + job: "testJob", + payload: { value: "second-attempt" }, + availableAt: new Date(Date.now() + 1500), + }); + events.push("Second job enqueued with future availableAt"); + + // Verify queue size after second enqueue + const size2 = await worker.queue.size({ includeFuture: true }); + const size2Present = await worker.queue.size({ includeFuture: false }); + events.push(`Queue size after second enqueue (including future): ${size2}`); + events.push(`Queue size after second enqueue (present only): ${size2Present}`); + expect(size2).toBe(1); // Should still be 1 as it's the same ID + + // Wait for the first job to complete + while (!firstJobCompleted) { + await new Promise((resolve) => setTimeout(resolve, 10)); + } + events.push("First job completed"); + + // Check queue size right after first job completes + const size3 = await worker.queue.size({ includeFuture: true }); + const size3Present = await worker.queue.size({ includeFuture: false }); + events.push(`Queue size after first job completes (including future): ${size3}`); + events.push(`Queue size after first job completes (present only): ${size3Present}`); + + // Wait long enough for the second job to become available and potentially run + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Final queue size + const size4 = await worker.queue.size({ includeFuture: true }); + const size4Present = await worker.queue.size({ includeFuture: false }); + events.push(`Final queue size (including future): ${size4}`); + events.push(`Final queue size (present only): ${size4Present}`); + + console.log("Event sequence:", events); + + // First job should have run + expect(processedPayloads).toContain("first-attempt"); + + // These assertions should fail - demonstrating the bug + // The second job should run after its availableAt time, but doesn't because + // the ack from the first job removed it from Redis entirely + expect(jobStarted).toContain("second-attempt"); + expect(processedPayloads).toContain("second-attempt"); + expect(processedPayloads.length).toBe(2); + + await worker.stop(); + } + ); }); From 48f19f0393b9d305da45761cabec70ef8ec136fb Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Tue, 22 Apr 2025 14:32:21 +0100 Subject: [PATCH 2/5] Added some tests for acking --- packages/redis-worker/src/worker.test.ts | 125 +++++++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/packages/redis-worker/src/worker.test.ts b/packages/redis-worker/src/worker.test.ts index 1a24a5f881e..fe69adc4eae 100644 --- a/packages/redis-worker/src/worker.test.ts +++ b/packages/redis-worker/src/worker.test.ts @@ -438,4 +438,129 @@ describe("Worker", () => { await worker.stop(); } ); + + redisTest( + "Should properly remove future-scheduled job after completion", + { timeout: 30_000 }, + async ({ redisContainer }) => { + const processedPayloads: string[] = []; + + const worker = new Worker({ + name: "test-worker", + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + catalog: { + testJob: { + schema: z.object({ value: z.string() }), + visibilityTimeoutMs: 5000, + retry: { maxAttempts: 3 }, + }, + }, + jobs: { + testJob: async ({ payload }) => { + processedPayloads.push(payload.value); + }, + }, + concurrency: { + workers: 1, + tasksPerWorker: 1, + }, + pollIntervalMs: 10, + logger: new Logger("test", "debug"), // Use debug to see all logs + }).start(); + + // Schedule a job 500ms in the future + await worker.enqueue({ + id: "future-job", + job: "testJob", + payload: { value: "test" }, + availableAt: new Date(Date.now() + 500), + }); + + // Verify it's in the future queue + const initialSize = await worker.queue.size(); + const initialSizeWithFuture = await worker.queue.size({ includeFuture: true }); + expect(initialSize).toBe(0); + expect(initialSizeWithFuture).toBe(1); + + // Wait for job to be processed + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify job was processed + expect(processedPayloads).toContain("test"); + + // Verify queue is completely empty + const finalSize = await worker.queue.size(); + const finalSizeWithFuture = await worker.queue.size({ includeFuture: true }); + expect(finalSize).toBe(0); + expect(finalSizeWithFuture).toBe(0); + + await worker.stop(); + } + ); + + redisTest( + "Should properly remove immediate job after completion", + { timeout: 30_000 }, + async ({ redisContainer }) => { + const processedPayloads: string[] = []; + + const worker = new Worker({ + name: "test-worker", + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + catalog: { + testJob: { + schema: z.object({ value: z.string() }), + visibilityTimeoutMs: 5000, + retry: { maxAttempts: 3 }, + }, + }, + jobs: { + testJob: async ({ payload }) => { + processedPayloads.push(payload.value); + }, + }, + concurrency: { + workers: 1, + tasksPerWorker: 1, + }, + pollIntervalMs: 10, + logger: new Logger("test", "debug"), // Use debug to see all logs + }).start(); + + // Enqueue a job to run immediately + await worker.enqueue({ + id: "immediate-job", + job: "testJob", + payload: { value: "test" }, + }); + + // Verify it's in the present queue + const initialSize = await worker.queue.size(); + const initialSizeWithFuture = await worker.queue.size({ includeFuture: true }); + expect(initialSize).toBe(1); + expect(initialSizeWithFuture).toBe(1); + + // Wait for job to be processed + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify job was processed + expect(processedPayloads).toContain("test"); + + // Verify queue is completely empty + const finalSize = await worker.queue.size(); + const finalSizeWithFuture = await worker.queue.size({ includeFuture: true }); + expect(finalSize).toBe(0); + expect(finalSizeWithFuture).toBe(0); + + await worker.stop(); + } + ); }); From 470cb2fcf01dc37ee4ffbc41a33a1786f3714507 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Tue, 22 Apr 2025 15:10:34 +0100 Subject: [PATCH 3/5] Added a deduplicationKey to prevent acking when items are queued --- packages/redis-worker/src/queue.ts | 47 +++++++++++++++++++++++++----- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/packages/redis-worker/src/queue.ts b/packages/redis-worker/src/queue.ts index 697b82ff5c8..734acc627be 100644 --- a/packages/redis-worker/src/queue.ts +++ b/packages/redis-worker/src/queue.ts @@ -27,6 +27,7 @@ export type QueueItem = { visibilityTimeoutMs: number; attempt: number; timestamp: Date; + deduplicationKey?: string; }; export type AnyQueueItem = { @@ -36,6 +37,7 @@ export type AnyQueueItem = { visibilityTimeoutMs: number; attempt: number; timestamp: Date; + deduplicationKey?: string; }; export class SimpleQueue { @@ -98,11 +100,13 @@ export class SimpleQueue { }): Promise { try { const score = availableAt ? availableAt.getTime() : Date.now(); + const deduplicationKey = nanoid(); const serializedItem = JSON.stringify({ job, item, visibilityTimeoutMs, attempt, + deduplicationKey, }); const result = await this.redis.enqueueItem( @@ -136,7 +140,7 @@ export class SimpleQueue { return []; } - const dequeuedItems = []; + const dequeuedItems: Array> = []; for (const [id, serializedItem, score] of results) { const parsedItem = JSON.parse(serializedItem) as any; @@ -186,6 +190,7 @@ export class SimpleQueue { visibilityTimeoutMs, attempt: parsedItem.attempt ?? 0, timestamp, + deduplicationKey: parsedItem.deduplicationKey, }); } @@ -200,14 +205,22 @@ export class SimpleQueue { } } - async ack(id: string): Promise { + async ack(id: string, deduplicationKey?: string): Promise { try { - await this.redis.ackItem(`queue`, `items`, id); + const result = await this.redis.ackItem(`queue`, `items`, id, deduplicationKey ?? ""); + if (result === 0) { + this.logger.error(`SimpleQueue ${this.name}.ack(): ack operation returned 0`, { + queue: this.name, + id, + deduplicationKey, + }); + } } catch (e) { this.logger.error(`SimpleQueue ${this.name}.ack(): error acknowledging item`, { queue: this.name, error: e, id, + deduplicationKey, }); throw e; } @@ -367,15 +380,32 @@ export class SimpleQueue { this.redis.defineCommand("ackItem", { numberOfKeys: 2, lua: ` - local queue = KEYS[1] - local items = KEYS[2] + local queueKey = KEYS[1] + local itemsKey = KEYS[2] local id = ARGV[1] + local deduplicationKey = ARGV[2] - redis.call('ZREM', queue, id) - redis.call('HDEL', items, id) + -- Get the item from the hash + local item = redis.call('HGET', itemsKey, id) + if not item then + return -1 + end + -- Only check deduplicationKey if a non-empty one was passed in + if deduplicationKey and deduplicationKey ~= "" then + local success, parsed = pcall(cjson.decode, item) + if success then + if parsed.deduplicationKey and parsed.deduplicationKey ~= deduplicationKey then + return 0 + end + end + end + + -- Remove from sorted set and hash + redis.call('ZREM', queueKey, id) + redis.call('HDEL', itemsKey, id) return 1 - `, + `, }); this.redis.defineCommand("moveToDeadLetterQueue", { @@ -468,6 +498,7 @@ declare module "@internal/redis" { queue: string, items: string, id: string, + deduplicationKey: string, callback?: Callback ): Result; From 72cbabe3487de7367e42d3f4a53409b1042db0ae Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Tue, 22 Apr 2025 15:11:08 +0100 Subject: [PATCH 4/5] The worker passes the deduplicationKey back in for acking --- packages/redis-worker/src/worker.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/redis-worker/src/worker.ts b/packages/redis-worker/src/worker.ts index f78d493faef..13e4fd85f14 100644 --- a/packages/redis-worker/src/worker.ts +++ b/packages/redis-worker/src/worker.ts @@ -28,6 +28,7 @@ type JobHandler = (param payload: z.infer; visibilityTimeoutMs: number; attempt: number; + deduplicationKey?: string; }) => Promise; export type WorkerConcurrencyOptions = { @@ -345,7 +346,7 @@ class Worker { * Processes a single item. */ private async processItem( - { id, job, item, visibilityTimeoutMs, attempt, timestamp }: AnyQueueItem, + { id, job, item, visibilityTimeoutMs, attempt, timestamp, deduplicationKey }: AnyQueueItem, batchSize: number, workerId: string ): Promise { @@ -362,7 +363,7 @@ class Worker { async () => { await this.withHistogram( this.metrics.jobDuration, - handler({ id, payload: item, visibilityTimeoutMs, attempt }), + handler({ id, payload: item, visibilityTimeoutMs, attempt, deduplicationKey }), { worker_id: workerId, batch_size: batchSize, @@ -372,7 +373,7 @@ class Worker { ); // On success, acknowledge the item. - await this.queue.ack(id); + await this.queue.ack(id, deduplicationKey); }, { kind: SpanKind.CONSUMER, From be02fe64f19b80d2df91dc11c07f655ae0cbbbfe Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Tue, 22 Apr 2025 15:23:49 +0100 Subject: [PATCH 5/5] Improved logs and removed events from test --- packages/redis-worker/src/queue.ts | 16 ++++++++++------ packages/redis-worker/src/worker.test.ts | 15 --------------- 2 files changed, 10 insertions(+), 21 deletions(-) diff --git a/packages/redis-worker/src/queue.ts b/packages/redis-worker/src/queue.ts index 734acc627be..975c484dc2f 100644 --- a/packages/redis-worker/src/queue.ts +++ b/packages/redis-worker/src/queue.ts @@ -208,12 +208,16 @@ export class SimpleQueue { async ack(id: string, deduplicationKey?: string): Promise { try { const result = await this.redis.ackItem(`queue`, `items`, id, deduplicationKey ?? ""); - if (result === 0) { - this.logger.error(`SimpleQueue ${this.name}.ack(): ack operation returned 0`, { - queue: this.name, - id, - deduplicationKey, - }); + if (result !== 1) { + this.logger.debug( + `SimpleQueue ${this.name}.ack(): ack operation returned ${result}. This means it was not removed from the queue.`, + { + queue: this.name, + id, + deduplicationKey, + result, + } + ); } } catch (e) { this.logger.error(`SimpleQueue ${this.name}.ack(): error acknowledging item`, { diff --git a/packages/redis-worker/src/worker.test.ts b/packages/redis-worker/src/worker.test.ts index fe69adc4eae..8b604be8ae0 100644 --- a/packages/redis-worker/src/worker.test.ts +++ b/packages/redis-worker/src/worker.test.ts @@ -324,7 +324,6 @@ describe("Worker", () => { const processedPayloads: string[] = []; const jobStarted: string[] = []; let firstJobCompleted = false; - const events: string[] = []; const worker = new Worker({ name: "test-worker", @@ -344,7 +343,6 @@ describe("Worker", () => { testJob: async ({ payload }) => { // Record when the job starts processing jobStarted.push(payload.value); - events.push(`Job started: ${payload.value}`); if (payload.value === "first-attempt") { // First job takes a long time to process @@ -354,7 +352,6 @@ describe("Worker", () => { // Record when the job completes processedPayloads.push(payload.value); - events.push(`Job completed: ${payload.value}`); }, }, concurrency: { @@ -373,11 +370,9 @@ describe("Worker", () => { job: "testJob", payload: { value: "first-attempt" }, }); - events.push("First job enqueued"); // Verify initial queue size const size1 = await worker.queue.size({ includeFuture: true }); - events.push(`Queue size after first enqueue: ${size1}`); expect(size1).toBe(1); // Wait until we know the first job has started processing @@ -393,26 +388,20 @@ describe("Worker", () => { payload: { value: "second-attempt" }, availableAt: new Date(Date.now() + 1500), }); - events.push("Second job enqueued with future availableAt"); // Verify queue size after second enqueue const size2 = await worker.queue.size({ includeFuture: true }); const size2Present = await worker.queue.size({ includeFuture: false }); - events.push(`Queue size after second enqueue (including future): ${size2}`); - events.push(`Queue size after second enqueue (present only): ${size2Present}`); expect(size2).toBe(1); // Should still be 1 as it's the same ID // Wait for the first job to complete while (!firstJobCompleted) { await new Promise((resolve) => setTimeout(resolve, 10)); } - events.push("First job completed"); // Check queue size right after first job completes const size3 = await worker.queue.size({ includeFuture: true }); const size3Present = await worker.queue.size({ includeFuture: false }); - events.push(`Queue size after first job completes (including future): ${size3}`); - events.push(`Queue size after first job completes (present only): ${size3Present}`); // Wait long enough for the second job to become available and potentially run await new Promise((resolve) => setTimeout(resolve, 2000)); @@ -420,10 +409,6 @@ describe("Worker", () => { // Final queue size const size4 = await worker.queue.size({ includeFuture: true }); const size4Present = await worker.queue.size({ includeFuture: false }); - events.push(`Final queue size (including future): ${size4}`); - events.push(`Final queue size (present only): ${size4Present}`); - - console.log("Event sequence:", events); // First job should have run expect(processedPayloads).toContain("first-attempt");