diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index da5e85f35a6..543f13ddb8d 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -717,6 +717,47 @@ const EnvironmentSchema = z.object({ COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + BATCH_TRIGGER_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"), + BATCH_TRIGGER_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), + BATCH_TRIGGER_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10), + BATCH_TRIGGER_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), + BATCH_TRIGGER_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50), + BATCH_TRIGGER_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100), + BATCH_TRIGGER_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), + BATCH_TRIGGER_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), + + BATCH_TRIGGER_WORKER_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + BATCH_TRIGGER_WORKER_REDIS_READER_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_READER_HOST), + BATCH_TRIGGER_WORKER_REDIS_READER_PORT: z.coerce + .number() + .optional() + .transform( + (v) => + v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined) + ), + BATCH_TRIGGER_WORKER_REDIS_PORT: z.coerce + .number() + .optional() + .transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)), + BATCH_TRIGGER_WORKER_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + BATCH_TRIGGER_WORKER_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + BATCH_TRIGGER_WORKER_REDIS_TLS_DISABLED: z + .string() + .default(process.env.REDIS_TLS_DISABLED ?? "false"), + BATCH_TRIGGER_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + ALERTS_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"), ALERTS_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), ALERTS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10), diff --git a/apps/webapp/app/runEngine/services/batchTrigger.server.ts b/apps/webapp/app/runEngine/services/batchTrigger.server.ts index 405d7aa831c..9eb6fed0bfb 100644 --- a/apps/webapp/app/runEngine/services/batchTrigger.server.ts +++ b/apps/webapp/app/runEngine/services/batchTrigger.server.ts @@ -14,7 +14,7 @@ import { env } from "~/env.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { getEntitlement } from "~/services/platform.v3.server"; -import { commonWorker } from "~/v3/commonWorker.server"; +import { batchTriggerWorker } from "~/v3/batchTriggerWorker.server"; import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server"; import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server"; import { OutOfEntitlementError, TriggerTaskService } from "../../v3/services/triggerTask.server"; @@ -314,7 +314,7 @@ export class RunEngineBatchTriggerService extends WithRunEngine { } async #enqueueBatchTaskRun(options: BatchProcessingOptions) { - await commonWorker.enqueue({ + await batchTriggerWorker.enqueue({ id: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`, job: "runengine.processBatchTaskRun", payload: options, diff --git a/apps/webapp/app/v3/batchTriggerWorker.server.ts b/apps/webapp/app/v3/batchTriggerWorker.server.ts new file mode 100644 index 00000000000..cd00a9c4c11 --- /dev/null +++ b/apps/webapp/app/v3/batchTriggerWorker.server.ts @@ -0,0 +1,90 @@ +import { Logger } from "@trigger.dev/core/logger"; +import { Worker as RedisWorker } from "@trigger.dev/redis-worker"; +import { z } from "zod"; +import { env } from "~/env.server"; +import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server"; +import { logger } from "~/services/logger.server"; +import { singleton } from "~/utils/singleton"; +import { BatchTriggerV3Service } from "./services/batchTriggerV3.server"; + +function initializeWorker() { + const redisOptions = { + keyPrefix: "batch-trigger:worker:", + host: env.BATCH_TRIGGER_WORKER_REDIS_HOST, + port: env.BATCH_TRIGGER_WORKER_REDIS_PORT, + username: env.BATCH_TRIGGER_WORKER_REDIS_USERNAME, + password: env.BATCH_TRIGGER_WORKER_REDIS_PASSWORD, + enableAutoPipelining: true, + ...(env.BATCH_TRIGGER_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }; + + logger.debug( + `👨‍🏭 Initializing batch trigger worker at host ${env.BATCH_TRIGGER_WORKER_REDIS_HOST}` + ); + + const worker = new RedisWorker({ + name: "batch-trigger-worker", + redisOptions, + catalog: { + "v3.processBatchTaskRun": { + schema: z.object({ + batchId: z.string(), + processingId: z.string(), + range: z.object({ start: z.number().int(), count: z.number().int() }), + attemptCount: z.number().int(), + strategy: z.enum(["sequential", "parallel"]), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 5, + }, + }, + "runengine.processBatchTaskRun": { + schema: z.object({ + batchId: z.string(), + processingId: z.string(), + range: z.object({ start: z.number().int(), count: z.number().int() }), + attemptCount: z.number().int(), + strategy: z.enum(["sequential", "parallel"]), + parentRunId: z.string().optional(), + resumeParentOnCompletion: z.boolean().optional(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 5, + }, + }, + }, + concurrency: { + workers: env.BATCH_TRIGGER_WORKER_CONCURRENCY_WORKERS, + tasksPerWorker: env.BATCH_TRIGGER_WORKER_CONCURRENCY_TASKS_PER_WORKER, + limit: env.BATCH_TRIGGER_WORKER_CONCURRENCY_LIMIT, + }, + pollIntervalMs: env.BATCH_TRIGGER_WORKER_POLL_INTERVAL, + immediatePollIntervalMs: env.BATCH_TRIGGER_WORKER_IMMEDIATE_POLL_INTERVAL, + shutdownTimeoutMs: env.BATCH_TRIGGER_WORKER_SHUTDOWN_TIMEOUT_MS, + logger: new Logger("BatchTriggerWorker", env.BATCH_TRIGGER_WORKER_LOG_LEVEL), + jobs: { + "v3.processBatchTaskRun": async ({ payload }) => { + const service = new BatchTriggerV3Service(payload.strategy); + await service.processBatchTaskRun(payload); + }, + "runengine.processBatchTaskRun": async ({ payload }) => { + const service = new RunEngineBatchTriggerService(payload.strategy); + await service.processBatchTaskRun(payload); + }, + }, + }); + + if (env.BATCH_TRIGGER_WORKER_ENABLED === "true") { + logger.debug( + `👨‍🏭 Starting batch trigger worker at host ${env.BATCH_TRIGGER_WORKER_REDIS_HOST}, pollInterval = ${env.BATCH_TRIGGER_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.BATCH_TRIGGER_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.BATCH_TRIGGER_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.BATCH_TRIGGER_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.BATCH_TRIGGER_WORKER_CONCURRENCY_LIMIT}` + ); + + worker.start(); + } + + return worker; +} + +export const batchTriggerWorker = singleton("batchTriggerWorker", initializeWorker); diff --git a/apps/webapp/app/v3/commonWorker.server.ts b/apps/webapp/app/v3/commonWorker.server.ts index 78c373a9696..7669c1aed70 100644 --- a/apps/webapp/app/v3/commonWorker.server.ts +++ b/apps/webapp/app/v3/commonWorker.server.ts @@ -114,6 +114,7 @@ function initializeWorker() { maxAttempts: 5, }, }, + // @deprecated, moved to batchTriggerWorker.server.ts "v3.processBatchTaskRun": { schema: z.object({ batchId: z.string(), @@ -127,6 +128,7 @@ function initializeWorker() { maxAttempts: 5, }, }, + // @deprecated, moved to batchTriggerWorker.server.ts "runengine.processBatchTaskRun": { schema: z.object({ batchId: z.string(), @@ -229,10 +231,12 @@ function initializeWorker() { const service = new CancelDevSessionRunsService(); await service.call(payload); }, + // @deprecated, moved to batchTriggerWorker.server.ts "v3.processBatchTaskRun": async ({ payload }) => { const service = new BatchTriggerV3Service(payload.strategy); await service.processBatchTaskRun(payload); }, + // @deprecated, moved to batchTriggerWorker.server.ts "runengine.processBatchTaskRun": async ({ payload }) => { const service = new RunEngineBatchTriggerService(payload.strategy); await service.processBatchTaskRun(payload); diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index 29271f9cdf4..150c2a6f004 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -20,7 +20,7 @@ import { batchTaskRunItemStatusForRunStatus } from "~/models/taskRun.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { getEntitlement } from "~/services/platform.v3.server"; -import { commonWorker } from "../commonWorker.server"; +import { batchTriggerWorker } from "../batchTriggerWorker.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server"; import { marqs } from "../marqs/index.server"; @@ -892,7 +892,7 @@ export class BatchTriggerV3Service extends BaseService { } async #enqueueBatchTaskRun(options: BatchProcessingOptions) { - await commonWorker.enqueue({ + await batchTriggerWorker.enqueue({ id: `BatchTriggerV2Service.process:${options.batchId}:${options.processingId}`, job: "v3.processBatchTaskRun", payload: options,