Skip to content

Commit b91f5a4

Browse files
committed
Create a more reliable ttl expiration system using atomic redis worker
1 parent 7c16590 commit b91f5a4

File tree

4 files changed

+90
-62
lines changed

4 files changed

+90
-62
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,15 @@ import {
7575
RunEngineOptions,
7676
TriggerParams,
7777
} from "./types.js";
78+
import { ttlWorkerCatalog } from "./ttlWorkerCatalog.js";
7879
import { workerCatalog } from "./workerCatalog.js";
7980
import pMap from "p-map";
8081

8182
export class RunEngine {
8283
private runLockRedis: Redis;
8384
private runLock: RunLocker;
8485
private worker: EngineWorker;
86+
private ttlWorker: Worker<typeof ttlWorkerCatalog>;
8587
private logger: Logger;
8688
private tracer: Tracer;
8789
private meter: Meter;
@@ -193,7 +195,9 @@ export class RunEngine {
193195
shardCount: options.queue?.ttlSystem?.shardCount,
194196
pollIntervalMs: options.queue?.ttlSystem?.pollIntervalMs,
195197
batchSize: options.queue?.ttlSystem?.batchSize,
196-
callback: this.#ttlExpiredCallback.bind(this),
198+
workerQueueSuffix: "ttl-worker:{queue:ttl-expiration:}queue",
199+
workerItemsSuffix: "ttl-worker:{queue:ttl-expiration:}items",
200+
visibilityTimeoutMs: options.queue?.ttlSystem?.visibilityTimeoutMs ?? 30_000,
197201
},
198202
});
199203

@@ -337,6 +341,31 @@ export class RunEngine {
337341
waitpointSystem: this.waitpointSystem,
338342
});
339343

344+
this.ttlWorker = new Worker({
345+
name: "ttl-expiration",
346+
redisOptions: {
347+
...options.queue.redis,
348+
keyPrefix: `${options.queue.redis.keyPrefix}runqueue:ttl-worker:`,
349+
},
350+
catalog: ttlWorkerCatalog,
351+
concurrency: { limit: 20 },
352+
pollIntervalMs: options.worker.pollIntervalMs ?? 1000,
353+
immediatePollIntervalMs: options.worker.immediatePollIntervalMs ?? 100,
354+
shutdownTimeoutMs: options.worker.shutdownTimeoutMs ?? 10_000,
355+
logger: new Logger("RunEngineTtlWorker", options.logLevel ?? "info"),
356+
jobs: {
357+
expireTtlRun: async ({ payload }) => {
358+
await this.ttlSystem.expireRunsBatch([payload.runId]);
359+
},
360+
},
361+
});
362+
363+
// Start TTL worker whenever TTL system is enabled, so expired runs enqueued by the
364+
// Lua script get processed even when the main engine worker is disabled (e.g. in tests).
365+
if (options.queue?.ttlSystem && !options.queue.ttlSystem.disabled) {
366+
this.ttlWorker.start();
367+
}
368+
340369
this.batchSystem = new BatchSystem({
341370
resources,
342371
waitpointSystem: this.waitpointSystem,
@@ -1621,6 +1650,7 @@ export class RunEngine {
16211650
//stop the run queue
16221651
await this.runQueue.quit();
16231652
await this.worker.stop();
1653+
await this.ttlWorker.stop();
16241654
await this.runLock.quit();
16251655

16261656
// This is just a failsafe
@@ -2229,41 +2259,6 @@ export class RunEngine {
22292259
});
22302260
}
22312261

2232-
/**
2233-
* Callback for the TTL system when runs expire.
2234-
* Uses the optimized batch method that doesn't require run locks
2235-
* since the Lua script already atomically claimed these runs.
2236-
*/
2237-
async #ttlExpiredCallback(
2238-
runs: Array<{ queueKey: string; runId: string; orgId: string }>
2239-
): Promise<void> {
2240-
if (runs.length === 0) return;
2241-
2242-
try {
2243-
const runIds = runs.map((r) => r.runId);
2244-
const result = await this.ttlSystem.expireRunsBatch(runIds);
2245-
2246-
if (result.expired.length > 0) {
2247-
this.logger.debug("TTL system expired runs", {
2248-
expiredCount: result.expired.length,
2249-
expiredRunIds: result.expired,
2250-
});
2251-
}
2252-
2253-
if (result.skipped.length > 0) {
2254-
this.logger.debug("TTL system skipped runs", {
2255-
skippedCount: result.skipped.length,
2256-
skipped: result.skipped,
2257-
});
2258-
}
2259-
} catch (error) {
2260-
this.logger.error("Failed to expire runs via TTL system", {
2261-
runIds: runs.map((r) => r.runId),
2262-
error,
2263-
});
2264-
}
2265-
}
2266-
22672262
/**
22682263
* Applies `defaultMaxTtl` to a run's TTL:
22692264
* - No max configured → pass through as-is.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import { z } from "zod";
2+
3+
export const ttlWorkerCatalog = {
4+
expireTtlRun: {
5+
schema: z.object({
6+
runId: z.string(),
7+
orgId: z.string(),
8+
queueKey: z.string(),
9+
}),
10+
visibilityTimeoutMs: 30_000,
11+
},
12+
};

internal-packages/run-engine/src/engine/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ export type RunEngineOptions = {
7373
batchSize?: number;
7474
/** Whether TTL consumers are disabled (default: false) */
7575
disabled?: boolean;
76+
/** Visibility timeout for TTL worker jobs (ms, default: 30000) */
77+
visibilityTimeoutMs?: number;
7678
};
7779
};
7880
runLock: {

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -100,15 +100,15 @@ export type RunQueueOptions = {
100100
pollIntervalMs?: number;
101101
/** Max number of runs to expire per poll per shard (default: 100) */
102102
batchSize?: number;
103-
/** Callback to handle expired runs */
104-
callback: TtlSystemCallback;
103+
/** Key suffix for TTL worker's queue sorted set (relative to RunQueue keyPrefix) */
104+
workerQueueSuffix: string;
105+
/** Key suffix for TTL worker's items hash (relative to RunQueue keyPrefix) */
106+
workerItemsSuffix: string;
107+
/** Visibility timeout for TTL worker jobs (ms, default: 30000) */
108+
visibilityTimeoutMs?: number;
105109
};
106110
};
107111

108-
export interface TtlSystemCallback {
109-
(runs: Array<{ queueKey: string; runId: string; orgId: string }>): Promise<void>;
110-
}
111-
112112
export interface ConcurrencySweeperCallback {
113113
(runIds: string[]): Promise<Array<{ id: string; orgId: string }>>;
114114
}
@@ -1289,19 +1289,7 @@ export class RunQueue {
12891289
shard,
12901290
count: expiredRuns.length,
12911291
});
1292-
1293-
// Call the callback with expired runs
1294-
try {
1295-
await this.options.ttlSystem!.callback(expiredRuns);
1296-
processedCount += expiredRuns.length;
1297-
} catch (callbackError) {
1298-
this.logger.error(`TTL callback failed for shard ${shard}`, {
1299-
error: callbackError,
1300-
service: this.name,
1301-
shard,
1302-
runCount: expiredRuns.length,
1303-
});
1304-
}
1292+
processedCount += expiredRuns.length;
13051293
}
13061294
}
13071295
} catch (error) {
@@ -1318,24 +1306,36 @@ export class RunQueue {
13181306
}
13191307

13201308
/**
1321-
* Atomically expire TTL runs: removes from TTL set AND acknowledges from normal queue.
1322-
* This prevents race conditions with the normal dequeue system.
1309+
* Atomically expire TTL runs: removes from TTL set, acknowledges from normal queue,
1310+
* and enqueues each run to the TTL worker for DB updates.
13231311
*/
13241312
async #expireTtlRuns(
13251313
shard: number,
13261314
now: number,
13271315
batchSize: number
13281316
): Promise<Array<{ queueKey: string; runId: string; orgId: string }>> {
1329-
const shardCount = this.options.ttlSystem?.shardCount ?? this.shardCount;
1317+
const ttlSystem = this.options.ttlSystem;
1318+
if (!ttlSystem) {
1319+
return [];
1320+
}
1321+
1322+
const shardCount = ttlSystem.shardCount ?? this.shardCount;
13301323
const ttlQueueKey = this.keys.ttlQueueKeyForShard(shard);
1324+
const keyPrefix = this.options.redis.keyPrefix ?? "";
1325+
const workerQueueKey = keyPrefix + ttlSystem.workerQueueSuffix;
1326+
const workerItemsKey = keyPrefix + ttlSystem.workerItemsSuffix;
1327+
const visibilityTimeoutMs = (ttlSystem.visibilityTimeoutMs ?? 30_000).toString();
13311328

1332-
// Atomically get and remove expired runs from TTL set, and ack them from normal queues
1329+
// Atomically get and remove expired runs from TTL set, ack them from normal queues, and enqueue to TTL worker
13331330
const results = await this.redis.expireTtlRuns(
13341331
ttlQueueKey,
1335-
this.options.redis.keyPrefix ?? "",
1332+
keyPrefix,
13361333
now.toString(),
13371334
batchSize.toString(),
1338-
shardCount.toString()
1335+
shardCount.toString(),
1336+
workerQueueKey,
1337+
workerItemsKey,
1338+
visibilityTimeoutMs
13391339
);
13401340

13411341
if (!results || results.length === 0) {
@@ -2587,7 +2587,7 @@ redis.call('SREM', envCurrentDequeuedKey, messageId)
25872587
`,
25882588
});
25892589

2590-
// Expire TTL runs - atomically removes from TTL set and acknowledges from normal queue
2590+
// Expire TTL runs - atomically removes from TTL set, acknowledges from normal queue, and enqueues to TTL worker
25912591
this.redis.defineCommand("expireTtlRuns", {
25922592
numberOfKeys: 1,
25932593
lua: `
@@ -2596,6 +2596,9 @@ local keyPrefix = ARGV[1]
25962596
local currentTime = tonumber(ARGV[2])
25972597
local batchSize = tonumber(ARGV[3])
25982598
local shardCount = tonumber(ARGV[4])
2599+
local workerQueueKey = ARGV[5]
2600+
local workerItemsKey = ARGV[6]
2601+
local visibilityTimeoutMs = tonumber(ARGV[7])
25992602
26002603
-- Get expired runs from TTL sorted set (score <= currentTime)
26012604
local expiredMembers = redis.call('ZRANGEBYSCORE', ttlQueueKey, '-inf', currentTime, 'LIMIT', 0, batchSize)
@@ -2604,6 +2607,9 @@ if #expiredMembers == 0 then
26042607
return {}
26052608
end
26062609
2610+
local time = redis.call('TIME')
2611+
local nowMs = tonumber(time[1]) * 1000 + math.floor(tonumber(time[2]) / 1000)
2612+
26072613
local results = {}
26082614
26092615
for i, member in ipairs(expiredMembers) do
@@ -2656,6 +2662,16 @@ for i, member in ipairs(expiredMembers) do
26562662
redis.call('SREM', envConcurrencyKey, runId)
26572663
redis.call('SREM', envDequeuedKey, runId)
26582664
2665+
-- Enqueue to TTL worker (runId is natural dedup key)
2666+
local serializedItem = cjson.encode({
2667+
job = "expireTtlRun",
2668+
item = { runId = runId, orgId = orgId, queueKey = rawQueueKey },
2669+
visibilityTimeoutMs = visibilityTimeoutMs,
2670+
attempt = 0
2671+
})
2672+
redis.call('ZADD', workerQueueKey, nowMs, runId)
2673+
redis.call('HSET', workerItemsKey, runId, serializedItem)
2674+
26592675
-- Add to results
26602676
table.insert(results, member)
26612677
end
@@ -3151,6 +3167,9 @@ declare module "@internal/redis" {
31513167
currentTime: string,
31523168
batchSize: string,
31533169
shardCount: string,
3170+
workerQueueKey: string,
3171+
workerItemsKey: string,
3172+
visibilityTimeoutMs: string,
31543173
callback?: Callback<string[]>
31553174
): Result<string[], Context>;
31563175

0 commit comments

Comments
 (0)