diff --git a/packages/redis-worker/src/queue.test.ts b/packages/redis-worker/src/queue.test.ts index 902dcb9d586..032a1008521 100644 --- a/packages/redis-worker/src/queue.test.ts +++ b/packages/redis-worker/src/queue.test.ts @@ -4,6 +4,7 @@ import { expect } from "vitest"; import { z } from "zod"; import { SimpleQueue } from "./queue.js"; import { Logger } from "@trigger.dev/core/logger"; +import { createRedisClient } from "@internal/redis"; describe("SimpleQueue", () => { redisTest("enqueue/dequeue", { timeout: 20_000 }, async ({ redisContainer }) => { @@ -209,6 +210,10 @@ describe("SimpleQueue", () => { timestamp: expect.any(Date), }) ); + + // Acknowledge the item and verify it's removed + await queue.ack(second!.id); + expect(await queue.size({ includeFuture: true })).toBe(0); } finally { await queue.close(); } @@ -328,6 +333,7 @@ describe("SimpleQueue", () => { // Redrive item from DLQ await queue.redriveFromDeadLetterQueue("1"); + await new Promise((resolve) => setTimeout(resolve, 200)); expect(await queue.size()).toBe(1); expect(await queue.sizeOfDeadLetterQueue()).toBe(0); @@ -357,4 +363,64 @@ describe("SimpleQueue", () => { await queue.close(); } }); + + redisTest("cleanup orphaned queue entries", { timeout: 20_000 }, async ({ redisContainer }) => { + const queue = new SimpleQueue({ + name: "test-orphaned", + schema: { + test: z.object({ + value: z.number(), + }), + }, + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + // First, add a normal item + await queue.enqueue({ id: "1", job: "test", item: { value: 1 }, visibilityTimeoutMs: 2000 }); + + const redisClient = createRedisClient({ + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }); + + // Manually add an orphaned item to the queue (without corresponding hash entry) + await redisClient.zadd(`{queue:test-orphaned:}queue`, Date.now(), "orphaned-id"); + + // Verify both items are in the queue + expect(await queue.size()).toBe(2); + + // Dequeue should process both items, but only return the valid one + // and clean up the orphaned entry + const dequeued = await queue.dequeue(2); + + // Should only get the valid item + expect(dequeued).toHaveLength(1); + expect(dequeued[0]).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); + + // The orphaned item should have been removed + expect(await queue.size({ includeFuture: true })).toBe(1); + + // Verify the orphaned ID is no longer in the queue + const orphanedScore = await redisClient.zscore(`{queue:test-orphaned:}queue`, "orphaned-id"); + expect(orphanedScore).toBeNull(); + } finally { + await queue.close(); + } + }); }); diff --git a/packages/redis-worker/src/queue.ts b/packages/redis-worker/src/queue.ts index 975c484dc2f..4b73171acc5 100644 --- a/packages/redis-worker/src/queue.ts +++ b/packages/redis-worker/src/queue.ts @@ -130,6 +130,7 @@ export class SimpleQueue { throw e; } } + async dequeue(count: number = 1): Promise>> { const now = Date.now(); @@ -179,9 +180,6 @@ export class SimpleQueue { } const visibilityTimeoutMs = parsedItem.visibilityTimeoutMs as number; - const invisibleUntil = now + visibilityTimeoutMs; - - await this.redis.zadd(`queue`, invisibleUntil, id); dequeuedItems.push({ id, @@ -374,6 +372,9 @@ export class SimpleQueue { redis.call('ZADD', queue, invisibleUntil, id) table.insert(dequeued, {id, serializedItem, score}) + else + -- Remove the orphaned queue entry if no corresponding item exists + redis.call('ZREM', queue, id) end end