From 65cc779e10b39a8d78db7ca8b5cba12aae3cb0da Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 24 Apr 2025 10:52:36 +0100 Subject: [PATCH 1/3] =?UTF-8?q?Remove=20setting=20the=20invisibility=20tim?= =?UTF-8?q?eout=20because=20it=E2=80=99s=20already=20done=20in=20the=20deq?= =?UTF-8?q?ueue=20Lua=20script?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/redis-worker/src/queue.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/redis-worker/src/queue.ts b/packages/redis-worker/src/queue.ts index 975c484dc2f..b48257d8a19 100644 --- a/packages/redis-worker/src/queue.ts +++ b/packages/redis-worker/src/queue.ts @@ -130,6 +130,7 @@ export class SimpleQueue { throw e; } } + async dequeue(count: number = 1): Promise>> { const now = Date.now(); @@ -179,9 +180,6 @@ export class SimpleQueue { } const visibilityTimeoutMs = parsedItem.visibilityTimeoutMs as number; - const invisibleUntil = now + visibilityTimeoutMs; - - await this.redis.zadd(`queue`, invisibleUntil, id); dequeuedItems.push({ id, From b43043a8f8659ee26a23ccaef2862aaed890d5ad Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 24 Apr 2025 11:09:04 +0100 Subject: [PATCH 2/3] Remove orphaned queue items when dequeuing --- packages/redis-worker/src/queue.test.ts | 62 +++++++++++++++++++++++++ packages/redis-worker/src/queue.ts | 3 ++ 2 files changed, 65 insertions(+) diff --git a/packages/redis-worker/src/queue.test.ts b/packages/redis-worker/src/queue.test.ts index 902dcb9d586..0608222f61f 100644 --- a/packages/redis-worker/src/queue.test.ts +++ b/packages/redis-worker/src/queue.test.ts @@ -4,6 +4,7 @@ import { expect } from "vitest"; import { z } from "zod"; import { SimpleQueue } from "./queue.js"; import { Logger } from "@trigger.dev/core/logger"; +import { createRedisClient } from "@internal/redis"; describe("SimpleQueue", () => { redisTest("enqueue/dequeue", { timeout: 20_000 }, async ({ redisContainer }) => { @@ -328,6 +329,7 @@ describe("SimpleQueue", () => { // Redrive item from DLQ await queue.redriveFromDeadLetterQueue("1"); + await new Promise((resolve) => setTimeout(resolve, 200)); expect(await queue.size()).toBe(1); expect(await queue.sizeOfDeadLetterQueue()).toBe(0); @@ -357,4 +359,64 @@ describe("SimpleQueue", () => { await queue.close(); } }); + + redisTest("cleanup orphaned queue entries", { timeout: 20_000 }, async ({ redisContainer }) => { + const queue = new SimpleQueue({ + name: "test-orphaned", + schema: { + test: z.object({ + value: z.number(), + }), + }, + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + // First, add a normal item + await queue.enqueue({ id: "1", job: "test", item: { value: 1 }, visibilityTimeoutMs: 2000 }); + + const redisClient = createRedisClient({ + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }); + + // Manually add an orphaned item to the queue (without corresponding hash entry) + await redisClient.zadd(`{queue:test-orphaned:}queue`, Date.now(), "orphaned-id"); + + // Verify both items are in the queue + expect(await queue.size()).toBe(2); + + // Dequeue should process both items, but only return the valid one + // and clean up the orphaned entry + const dequeued = await queue.dequeue(2); + + // Should only get the valid item + expect(dequeued).toHaveLength(1); + expect(dequeued[0]).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); + + // The orphaned item should have been removed + expect(await queue.size({ includeFuture: true })).toBe(1); + + // Verify the orphaned ID is no longer in the queue + const orphanedScore = await redisClient.zscore(`{queue:test-orphaned:}queue`, "orphaned-id"); + expect(orphanedScore).toBeNull(); + } finally { + await queue.close(); + } + }); }); diff --git a/packages/redis-worker/src/queue.ts b/packages/redis-worker/src/queue.ts index b48257d8a19..4b73171acc5 100644 --- a/packages/redis-worker/src/queue.ts +++ b/packages/redis-worker/src/queue.ts @@ -372,6 +372,9 @@ export class SimpleQueue { redis.call('ZADD', queue, invisibleUntil, id) table.insert(dequeued, {id, serializedItem, score}) + else + -- Remove the orphaned queue entry if no corresponding item exists + redis.call('ZREM', queue, id) end end From 599329a8e3d6fe821e6c4f53e9546baa4c8a0755 Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Thu, 24 Apr 2025 11:12:40 +0100 Subject: [PATCH 3/3] Added an ack to the visibility timeout test --- packages/redis-worker/src/queue.test.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/redis-worker/src/queue.test.ts b/packages/redis-worker/src/queue.test.ts index 0608222f61f..032a1008521 100644 --- a/packages/redis-worker/src/queue.test.ts +++ b/packages/redis-worker/src/queue.test.ts @@ -210,6 +210,10 @@ describe("SimpleQueue", () => { timestamp: expect.any(Date), }) ); + + // Acknowledge the item and verify it's removed + await queue.ack(second!.id); + expect(await queue.size({ includeFuture: true })).toBe(0); } finally { await queue.close(); }