diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index db1dd25a223..13c9a56ba2f 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -973,6 +973,7 @@ const EnvironmentSchema = z.object({ // Bulk action BULK_ACTION_BATCH_SIZE: z.coerce.number().int().default(100), BULK_ACTION_BATCH_DELAY_MS: z.coerce.number().int().default(200), + BULK_ACTION_SUBBATCH_CONCURRENCY: z.coerce.number().int().default(5), }); export type Environment = z.infer; diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index 86ca632f242..a68d6d330f5 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -24,6 +24,7 @@ import { timeFilters } from "~/components/runs/v3/SharedFilters"; import parseDuration from "parse-duration"; import { v3BulkActionPath } from "~/utils/pathBuilder"; import { formatDateTime } from "~/components/primitives/DateTime"; +import pMap from "p-map"; export class BulkActionService extends BaseService { public async create( @@ -191,29 +192,33 @@ export class BulkActionService extends BaseService { }, }); - for (const run of runs) { - const [error, result] = await tryCatch( - cancelService.call(run, { - reason: `Bulk action ${group.friendlyId} cancelled run`, - bulkActionId: bulkActionId, - }) - ); - if (error) { - logger.error("Failed to cancel run", { - error, - runId: run.id, - status: run.status, - }); + await pMap( + runs, + async (run) => { + const [error, result] = await tryCatch( + cancelService.call(run, { + reason: `Bulk action ${group.friendlyId} cancelled run`, + bulkActionId: bulkActionId, + }) + ); + if (error) { + logger.error("Failed to cancel run", { + error, + runId: run.id, + status: run.status, + }); - failureCount++; - } else { - if (!result || result.alreadyFinished) { failureCount++; } else { - successCount++; + if (!result || result.alreadyFinished) { + failureCount++; + } else { + successCount++; + } } - } - } + }, + { concurrency: env.BULK_ACTION_SUBBATCH_CONCURRENCY } + ); break; } @@ -228,33 +233,37 @@ export class BulkActionService extends BaseService { }, }); - for (const run of runs) { - const [error, result] = await tryCatch( - replayService.call(run, { - bulkActionId: bulkActionId, - }) - ); - if (error) { - logger.error("Failed to replay run, error", { - error, - runId: run.id, - status: run.status, - }); - - failureCount++; - } else { - if (!result) { - logger.error("Failed to replay run, no result", { + await pMap( + runs, + async (run) => { + const [error, result] = await tryCatch( + replayService.call(run, { + bulkActionId: bulkActionId, + }) + ); + if (error) { + logger.error("Failed to replay run, error", { + error, runId: run.id, status: run.status, }); failureCount++; } else { - successCount++; + if (!result) { + logger.error("Failed to replay run, no result", { + runId: run.id, + status: run.status, + }); + + failureCount++; + } else { + successCount++; + } } - } - } + }, + { concurrency: env.BULK_ACTION_SUBBATCH_CONCURRENCY } + ); break; } } diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 0dbfffc7e25..42a77d62e66 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -156,6 +156,7 @@ "ohash": "^1.1.3", "openai": "^4.33.1", "p-limit": "^6.2.0", + "p-map": "^6.0.0", "parse-duration": "^2.1.0", "posthog-js": "^1.93.3", "posthog-node": "4.17.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 01f186038b8..9389ad43eab 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -563,6 +563,9 @@ importers: p-limit: specifier: ^6.2.0 version: 6.2.0 + p-map: + specifier: ^6.0.0 + version: 6.0.0 parse-duration: specifier: ^2.1.0 version: 2.1.4 @@ -29827,7 +29830,6 @@ packages: /p-map@6.0.0: resolution: {integrity: sha512-T8BatKGY+k5rU+Q/GTYgrEf2r4xRMevAN5mtXc2aPc4rS1j3s+vWTaO2Wag94neXuCAUAs8cxBL9EeB5EA6diw==} engines: {node: '>=16'} - dev: true /p-queue@6.6.2: resolution: {integrity: sha512-RwFpb72c/BhQLEXIZ5K2e+AhgNVmIejGlTgiB9MzZ0e93GRvqZ7uSi0dvRF7/XIXDeNkra2fNHBxTyPDGySpjQ==}