@@ -124,11 +124,11 @@ export class BatchTriggerV3Service extends BaseService {
124124
125125 const existingBatch = options . idempotencyKey
126126 ? await this . _prisma . batchTaskRun . findFirst ( {
127- where : {
128- runtimeEnvironmentId : environment . id ,
129- idempotencyKey : options . idempotencyKey ,
130- } ,
131- } )
127+ where : {
128+ runtimeEnvironmentId : environment . id ,
129+ idempotencyKey : options . idempotencyKey ,
130+ } ,
131+ } )
132132 : undefined ;
133133
134134 if ( existingBatch ) {
@@ -167,16 +167,16 @@ export class BatchTriggerV3Service extends BaseService {
167167
168168 const dependentAttempt = body ?. dependentAttempt
169169 ? await this . _prisma . taskRunAttempt . findFirst ( {
170- where : { friendlyId : body . dependentAttempt } ,
171- include : {
172- taskRun : {
173- select : {
174- id : true ,
175- status : true ,
176- } ,
170+ where : { friendlyId : body . dependentAttempt } ,
171+ include : {
172+ taskRun : {
173+ select : {
174+ id : true ,
175+ status : true ,
177176 } ,
178177 } ,
179- } )
178+ } ,
179+ } )
180180 : undefined ;
181181
182182 if (
@@ -890,7 +890,69 @@ export class BatchTriggerV3Service extends BaseService {
890890 }
891891 }
892892
893- return false ;
893+ // FIX for Issue #2965: When a run is cached (duplicate idempotencyKey),
894+ // we need to create a BatchTaskRunItem and immediately mark it as completed.
895+ // This ensures the batch completion check (completedCount === expectedCount) works correctly.
896+ const isAlreadyComplete = isFinalRunStatus ( result . run . status ) ;
897+
898+ logger . debug (
899+ "[BatchTriggerV2][processBatchTaskRunItem] Cached run detected, creating batch item" ,
900+ {
901+ batchId : batch . friendlyId ,
902+ runId : task . runId ,
903+ cachedRunId : result . run . id ,
904+ cachedRunStatus : result . run . status ,
905+ isAlreadyComplete,
906+ currentIndex,
907+ }
908+ ) ;
909+
910+ try {
911+ // Create a BatchTaskRunItem for the cached run
912+ await this . _prisma . batchTaskRunItem . create ( {
913+ data : {
914+ batchTaskRunId : batch . id ,
915+ taskRunId : result . run . id ,
916+ // Mark as COMPLETED if the cached run is already finished
917+ status : isAlreadyComplete ? "COMPLETED" : batchTaskRunItemStatusForRunStatus ( result . run . status ) ,
918+ } ,
919+ } ) ;
920+
921+ // If the cached run is already complete, we need to increment completedCount
922+ // since this item won't go through the normal completeBatchTaskRunItemV3 flow
923+ if ( isAlreadyComplete ) {
924+ await this . _prisma . batchTaskRun . update ( {
925+ where : { id : batch . id } ,
926+ data : {
927+ completedCount : {
928+ increment : 1 ,
929+ } ,
930+ } ,
931+ } ) ;
932+ }
933+
934+ // Return true so expectedCount is incremented
935+ return true ;
936+ } catch ( error ) {
937+ if ( isUniqueConstraintError ( error , [ "batchTaskRunId" , "taskRunId" ] ) ) {
938+ // BatchTaskRunItem already exists for this batch and cached run
939+ // This can happen if the same idempotencyKey is used multiple times in the same batch
940+ logger . debug (
941+ "[BatchTriggerV2][processBatchTaskRunItem] BatchTaskRunItem already exists for cached run" ,
942+ {
943+ batchId : batch . friendlyId ,
944+ runId : task . runId ,
945+ cachedRunId : result . run . id ,
946+ currentIndex,
947+ }
948+ ) ;
949+
950+ // Don't increment expectedCount since this item is already tracked
951+ return false ;
952+ }
953+
954+ throw error ;
955+ }
894956 }
895957
896958 async #enqueueBatchTaskRun( options : BatchProcessingOptions ) {
0 commit comments