diff --git a/apps/api/src/billing/model-schemas/wallet-setting/wallet-setting.schema.ts b/apps/api/src/billing/model-schemas/wallet-setting/wallet-setting.schema.ts index df21c03021..bb74d26c75 100644 --- a/apps/api/src/billing/model-schemas/wallet-setting/wallet-setting.schema.ts +++ b/apps/api/src/billing/model-schemas/wallet-setting/wallet-setting.schema.ts @@ -18,7 +18,6 @@ export const WalletSetting = pgTable( .references(() => Users.id, { onDelete: "cascade" }) .notNull(), autoReloadEnabled: boolean("auto_reload_enabled").default(false).notNull(), - autoReloadJobId: uuid("auto_reload_job_id"), createdAt: timestamp("created_at").defaultNow(), updatedAt: timestamp("updated_at").defaultNow() }, diff --git a/apps/api/src/billing/services/wallet-balance-reload-check/wallet-balance-reload-check.handler.spec.ts b/apps/api/src/billing/services/wallet-balance-reload-check/wallet-balance-reload-check.handler.spec.ts index 4403ad223f..732d6bb9e0 100644 --- a/apps/api/src/billing/services/wallet-balance-reload-check/wallet-balance-reload-check.handler.spec.ts +++ b/apps/api/src/billing/services/wallet-balance-reload-check/wallet-balance-reload-check.handler.spec.ts @@ -56,7 +56,7 @@ describe(WalletBalanceReloadCheckHandler.name, () => { }), expect.objectContaining({ startAfter: expect.any(String), - prevAction: "complete" + withCleanup: true }) ); const scheduleCall = walletReloadJobService.scheduleForWalletSetting.mock.calls[0]; @@ -191,7 +191,7 @@ describe(WalletBalanceReloadCheckHandler.name, () => { }), expect.objectContaining({ startAfter: expect.any(String), - prevAction: "complete" + withCleanup: true }) ); }); diff --git a/apps/api/src/billing/services/wallet-balance-reload-check/wallet-balance-reload-check.handler.ts b/apps/api/src/billing/services/wallet-balance-reload-check/wallet-balance-reload-check.handler.ts index e457500861..1454094543 100644 --- a/apps/api/src/billing/services/wallet-balance-reload-check/wallet-balance-reload-check.handler.ts +++ b/apps/api/src/billing/services/wallet-balance-reload-check/wallet-balance-reload-check.handler.ts @@ -21,7 +21,7 @@ type ValidationError = { }; type InitializedWallet = Require, "address">; -type ActionableWalletSetting = Pick; +type ActionableWalletSetting = Pick; type Resources = { walletSetting: ActionableWalletSetting; @@ -38,7 +38,7 @@ export class WalletBalanceReloadCheckHandler implements JobHandler { + describe("scheduleImmediate", () => { + it("returns early when walletSetting does not exist", async () => { + const { service, walletSettingRepository, jobQueueService } = setup(); + const userId = faker.string.uuid(); + walletSettingRepository.findByUserId.mockResolvedValue(undefined); + + await service.scheduleImmediate(userId); + + expect(walletSettingRepository.findByUserId).toHaveBeenCalledWith(userId); + expect(jobQueueService.enqueue).not.toHaveBeenCalled(); + }); + + it("returns early when autoReloadEnabled is false", async () => { + const { service, walletSettingRepository, jobQueueService } = setup(); + const userId = faker.string.uuid(); + const walletSetting = generateWalletSetting({ autoReloadEnabled: false }); + walletSettingRepository.findByUserId.mockResolvedValue(walletSetting); + + await service.scheduleImmediate(userId); + + expect(walletSettingRepository.findByUserId).toHaveBeenCalledWith(userId); + expect(jobQueueService.enqueue).not.toHaveBeenCalled(); + }); + + it("calls scheduleForWalletSetting when conditions are met", async () => { + const { service, walletSettingRepository, jobQueueService } = setup(); + const userId = faker.string.uuid(); + const walletSetting = generateWalletSetting({ autoReloadEnabled: true }); + walletSettingRepository.findByUserId.mockResolvedValue(walletSetting); + const jobId = faker.string.uuid(); + jobQueueService.enqueue.mockResolvedValue(jobId); + + await service.scheduleImmediate(userId); + + expect(walletSettingRepository.findByUserId).toHaveBeenCalledWith(userId); + expect(jobQueueService.enqueue).toHaveBeenCalledWith( + expect.any(WalletBalanceReloadCheck), + expect.objectContaining({ + singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}` + }) + ); + }); + }); + + describe("scheduleForWalletSetting", () => { + it("creates job successfully without cleanup", async () => { + const { service, jobQueueService } = setup(); + const walletSetting = generateWalletSetting({ + autoReloadEnabled: true + }); + const jobId = faker.string.uuid(); + jobQueueService.enqueue.mockResolvedValue(jobId); + + const result = await service.scheduleForWalletSetting(walletSetting); + + expect(jobQueueService.cancelCreatedBy).not.toHaveBeenCalled(); + expect(jobQueueService.enqueue).toHaveBeenCalledWith( + expect.any(WalletBalanceReloadCheck), + expect.objectContaining({ + singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}` + }) + ); + expect(result).toBe(jobId); + }); + + it("cancels created jobs when withCleanup is true", async () => { + const { service, jobQueueService } = setup(); + const walletSetting = generateWalletSetting({ + autoReloadEnabled: true + }); + const jobId = faker.string.uuid(); + jobQueueService.enqueue.mockResolvedValue(jobId); + + await service.scheduleForWalletSetting(walletSetting, { withCleanup: true }); + + expect(jobQueueService.cancelCreatedBy).toHaveBeenCalledWith({ + name: WalletBalanceReloadCheck.name, + singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}` + }); + expect(jobQueueService.enqueue).toHaveBeenCalledWith( + expect.any(WalletBalanceReloadCheck), + expect.objectContaining({ + singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}` + }) + ); + }); + + it("handles startAfter option", async () => { + const { service, jobQueueService } = setup(); + const walletSetting = generateWalletSetting({ + autoReloadEnabled: true + }); + const jobId = faker.string.uuid(); + const startAfter = new Date(); + jobQueueService.enqueue.mockResolvedValue(jobId); + + await service.scheduleForWalletSetting(walletSetting, { startAfter }); + + expect(jobQueueService.enqueue).toHaveBeenCalledWith( + expect.any(WalletBalanceReloadCheck), + expect.objectContaining({ + singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}`, + startAfter + }) + ); + }); + + it("handles both withCleanup and startAfter options", async () => { + const { service, jobQueueService } = setup(); + const walletSetting = generateWalletSetting({ + autoReloadEnabled: true + }); + const jobId = faker.string.uuid(); + const startAfter = new Date(); + jobQueueService.enqueue.mockResolvedValue(jobId); + + await service.scheduleForWalletSetting(walletSetting, { withCleanup: true, startAfter }); + + expect(jobQueueService.cancelCreatedBy).toHaveBeenCalledWith({ + name: WalletBalanceReloadCheck.name, + singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}` + }); + expect(jobQueueService.enqueue).toHaveBeenCalledWith( + expect.any(WalletBalanceReloadCheck), + expect.objectContaining({ + singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}`, + startAfter + }) + ); + }); + + it("throws error when job creation fails", async () => { + const { service, jobQueueService, logger } = setup(); + const walletSetting = generateWalletSetting({ + autoReloadEnabled: true + }); + jobQueueService.enqueue.mockResolvedValue(null); + + await expect(service.scheduleForWalletSetting(walletSetting)).rejects.toThrow("Failed to schedule wallet balance reload check"); + + expect(logger.error).toHaveBeenCalledWith({ + event: "JOB_CREATION_FAILED", + userId: walletSetting.userId + }); + }); + }); + + describe("cancelCreatedByUserId", () => { + it("cancels created jobs for user with correct parameters", async () => { + const { service, jobQueueService } = setup(); + const userId = faker.string.uuid(); + + await service.cancelCreatedByUserId(userId); + + expect(jobQueueService.cancelCreatedBy).toHaveBeenCalledWith({ + name: WalletBalanceReloadCheck.name, + singletonKey: `${WalletBalanceReloadCheck.name}.${userId}` + }); + }); + }); + + function setup() { + const walletSettingRepository = mock(); + const jobQueueService = mock(); + const logger = mock(); + + const service = new WalletReloadJobService(walletSettingRepository, jobQueueService, logger); + + return { + service, + walletSettingRepository, + jobQueueService, + logger + }; + } +}); diff --git a/apps/api/src/billing/services/wallet-reload-job/wallet-reload-job.service.ts b/apps/api/src/billing/services/wallet-reload-job/wallet-reload-job.service.ts index 80dffc2ad4..eb83c766b3 100644 --- a/apps/api/src/billing/services/wallet-reload-job/wallet-reload-job.service.ts +++ b/apps/api/src/billing/services/wallet-reload-job/wallet-reload-job.service.ts @@ -1,9 +1,8 @@ import { singleton } from "tsyringe"; -import { v4 as uuidv4 } from "uuid"; import { WalletBalanceReloadCheck } from "@src/billing/events/wallet-balance-reload-check"; import { WalletSettingOutput, WalletSettingRepository } from "@src/billing/repositories"; -import { EnqueueOptions, JobQueueService, TxService } from "@src/core"; +import { EnqueueOptions, JobQueueService } from "@src/core"; import { LoggerService } from "@src/core/providers/logging.provider"; @singleton() @@ -11,7 +10,6 @@ export class WalletReloadJobService { constructor( private readonly walletSettingRepository: WalletSettingRepository, private readonly jobQueueService: JobQueueService, - private readonly txService: TxService, private readonly logger: LoggerService ) { this.logger.setContext(WalletReloadJobService.name); @@ -20,67 +18,36 @@ export class WalletReloadJobService { async scheduleImmediate(userId: string): Promise { const walletSetting = await this.walletSettingRepository.findByUserId(userId); - if (!walletSetting || !walletSetting.userId || !walletSetting.autoReloadEnabled) { - return; + if (walletSetting?.autoReloadEnabled) { + await this.scheduleForWalletSetting(walletSetting); } - - await this.scheduleForWalletSetting(walletSetting, { prevAction: "cancel" }); } async scheduleForWalletSetting( - walletSetting: Pick, - options?: Pick & { prevAction?: "cancel" | "complete" } + walletSetting: Pick, + options?: Pick & { withCleanup?: boolean } ): Promise { - return await this.txService.transaction(async () => { - // Try to cancel/complete the previous job if it exists - // This may fail if the job is already in a terminal state, which is fine - if (walletSetting.autoReloadJobId) { - this.logger.info({ - event: "PREVIOUS_JOB_CLEANUP", - action: options?.prevAction, - previousJobId: walletSetting.autoReloadJobId, - userId: walletSetting.userId - }); - - if (options?.prevAction === "cancel") { - await this.jobQueueService.cancel(WalletBalanceReloadCheck.name, walletSetting.autoReloadJobId); - } else { - await this.jobQueueService.complete(WalletBalanceReloadCheck.name, walletSetting.autoReloadJobId); - } - } - - const jobId = uuidv4(); - await this.walletSettingRepository.updateById(walletSetting.id, { autoReloadJobId: jobId }); - - const createdJobId = await this.jobQueueService.enqueue(new WalletBalanceReloadCheck({ userId: walletSetting.userId }), { - singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}`, - id: jobId, - ...(options?.startAfter && { startAfter: options.startAfter }) - }); + if (options?.withCleanup) { + await this.cancelCreatedByUserId(walletSetting.userId); + } - if (!createdJobId) { - this.logger.error({ - event: "JOB_CREATION_FAILED", - message: "Failed to schedule wallet balance reload check - a job already exists for this user", - userId: walletSetting.userId, - attemptedJobId: jobId, - previousJobId: walletSetting.autoReloadJobId - }); - throw new Error("Failed to schedule wallet balance reload check: job already exists"); - } + const createdJobId = await this.jobQueueService.enqueue(new WalletBalanceReloadCheck({ userId: walletSetting.userId }), { + singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}`, + ...(options?.startAfter && { startAfter: options.startAfter }) + }); - this.logger.info({ - event: "JOB_SCHEDULED", - jobId: createdJobId, - userId: walletSetting.userId, - startAfter: options?.startAfter + if (!createdJobId) { + this.logger.error({ + event: "JOB_CREATION_FAILED", + userId: walletSetting.userId }); + throw new Error("Failed to schedule wallet balance reload check"); + } - return jobId; - }); + return createdJobId; } - async cancel(userId: string, jobId: string): Promise { - await this.jobQueueService.cancel(WalletBalanceReloadCheck.name, jobId); + async cancelCreatedByUserId(userId: string): Promise { + await this.jobQueueService.cancelCreatedBy({ name: WalletBalanceReloadCheck.name, singletonKey: `${WalletBalanceReloadCheck.name}.${userId}` }); } } diff --git a/apps/api/src/billing/services/wallet-settings/wallet-settings.service.spec.ts b/apps/api/src/billing/services/wallet-settings/wallet-settings.service.spec.ts index a1f0f86b50..3fe13093e5 100644 --- a/apps/api/src/billing/services/wallet-settings/wallet-settings.service.spec.ts +++ b/apps/api/src/billing/services/wallet-settings/wallet-settings.service.spec.ts @@ -51,9 +51,7 @@ describe(WalletSettingService.name, () => { autoReloadEnabled: false }); - const { autoReloadJobId, ...publicUpdatedSetting } = updatedSetting; - - expect(result).toEqual(publicUpdatedSetting); + expect(result).toEqual(updatedSetting); expect(walletSettingRepository.findByUserId).toHaveBeenCalledWith(user.id); expect(walletSettingRepository.updateById).toHaveBeenCalledWith(publicSetting.id, { autoReloadEnabled: false }, { returning: true }); }); @@ -73,9 +71,7 @@ describe(WalletSettingService.name, () => { autoReloadEnabled: true }); - const { autoReloadJobId, ...publicSetting } = newSetting; - - expect(result).toEqual(publicSetting); + expect(result).toEqual(newSetting); expect(walletSettingRepository.findByUserId).toHaveBeenCalledWith(user.id); expect(userWalletRepository.findOneByUserId).toHaveBeenCalledWith(user.id); expect(walletSettingRepository.create).toHaveBeenCalledWith({ @@ -87,8 +83,7 @@ describe(WalletSettingService.name, () => { expect.objectContaining({ id: newSetting.id, userId: user.id - }), - { prevAction: "cancel" } + }) ); }); @@ -106,9 +101,8 @@ describe(WalletSettingService.name, () => { const result = await service.upsertWalletSetting(user.id, { autoReloadEnabled: true }); - const { autoReloadJobId, ...newPublicSetting } = newSetting; - expect(result).toEqual(newPublicSetting); + expect(result).toEqual(newSetting); expect(walletSettingRepository.findByUserId).toHaveBeenCalledWith(user.id); expect(userWalletRepository.findOneByUserId).toHaveBeenCalledWith(user.id); expect(walletSettingRepository.create).toHaveBeenCalledWith({ @@ -132,9 +126,8 @@ describe(WalletSettingService.name, () => { const result = await service.upsertWalletSetting(user.id, { autoReloadEnabled: true }); - const { autoReloadJobId, ...publicUpdatedSetting } = updatedSetting; - expect(result).toEqual(publicUpdatedSetting); + expect(result).toEqual(updatedSetting); expect(walletSettingRepository.updateById).toHaveBeenCalledWith( existingSetting.id, { @@ -146,38 +139,8 @@ describe(WalletSettingService.name, () => { expect.objectContaining({ id: updatedSetting.id, userId: user.id - }), - { prevAction: "cancel" } - ); - }); - - it("cancels job when auto-reload is disabled", async () => { - const { user, walletSetting, walletSettingRepository, walletReloadJobService, service } = setup(); - const existingJobId = faker.string.uuid(); - const existingSetting = { ...walletSetting, autoReloadEnabled: true, autoReloadJobId: existingJobId }; - const updatedSetting = { - ...walletSetting, - id: walletSetting.id, - autoReloadEnabled: false, - autoReloadJobId: existingJobId - }; - walletSettingRepository.findByUserId.mockResolvedValue(existingSetting); - walletSettingRepository.updateById.mockResolvedValue(updatedSetting as any); - - const result = await service.upsertWalletSetting(user.id, { - autoReloadEnabled: false - }); - const { autoReloadJobId, ...publicUpdatedSetting } = updatedSetting; - - expect(result).toEqual(publicUpdatedSetting); - expect(walletSettingRepository.updateById).toHaveBeenCalledWith( - walletSetting.id, - { - autoReloadEnabled: false - }, - { returning: true } + }) ); - expect(walletReloadJobService.cancel).toHaveBeenCalledWith(user.id, existingJobId); }); it("throws 404 when user wallet not found during create", async () => { @@ -202,20 +165,6 @@ describe(WalletSettingService.name, () => { expect(walletSettingRepository.deleteBy).toHaveBeenCalledWith({ userId: user.id }); }); - - it("deletes wallet setting and cancels job when autoReloadJobId exists", async () => { - const { user, walletSetting, walletSettingRepository, walletReloadJobService, service } = setup(); - const existingJobId = faker.string.uuid(); - const settingWithJob = { ...walletSetting, autoReloadJobId: existingJobId }; - walletSettingRepository.findByUserId.mockResolvedValue(settingWithJob); - walletSettingRepository.deleteBy.mockResolvedValue(undefined); - - await service.deleteWalletSetting(user.id); - - expect(walletSettingRepository.findByUserId).toHaveBeenCalledWith(user.id); - expect(walletSettingRepository.deleteBy).toHaveBeenCalledWith({ userId: user.id }); - expect(walletReloadJobService.cancel).toHaveBeenCalledWith(user.id, existingJobId); - }); }); function setup() { @@ -241,17 +190,15 @@ describe(WalletSettingService.name, () => { }); const jobId = faker.string.uuid(); const walletReloadJobService = mock({ - scheduleForWalletSetting: jest.fn().mockResolvedValue(jobId), - cancel: jest.fn().mockResolvedValue(undefined) + scheduleForWalletSetting: jest.fn().mockResolvedValue(jobId) }); const service = new WalletSettingService(walletSettingRepository, userWalletRepository, userRepository, stripeService, authService, walletReloadJobService); - const { autoReloadJobId, ...publicSetting } = walletSetting; return { user: userWithStripe, userWallet, walletSetting, - publicSetting, + publicSetting: walletSetting, walletSettingRepository, userWalletRepository, userRepository, diff --git a/apps/api/src/billing/services/wallet-settings/wallet-settings.service.ts b/apps/api/src/billing/services/wallet-settings/wallet-settings.service.ts index 103ea8d772..71c90fb3da 100644 --- a/apps/api/src/billing/services/wallet-settings/wallet-settings.service.ts +++ b/apps/api/src/billing/services/wallet-settings/wallet-settings.service.ts @@ -26,22 +26,14 @@ export class WalletSettingService { private readonly walletReloadJobService: WalletReloadJobService ) {} - async getWalletSetting(userId: string): Promise | undefined> { + async getWalletSetting(userId: string): Promise { const { ability } = this.authService; - const maybeSetting = await this.walletSettingRepository.accessibleBy(ability, "read").findByUserId(userId); - - if (!maybeSetting) { - return undefined; - } - - const { autoReloadJobId, ...setting } = maybeSetting; - - return setting; + return await this.walletSettingRepository.accessibleBy(ability, "read").findByUserId(userId); } @WithTransaction() - async upsertWalletSetting(userId: UserOutput["id"], input: WalletSettingInput): Promise> { + async upsertWalletSetting(userId: UserOutput["id"], input: WalletSettingInput): Promise { let mutationResult = await this.#update(userId, input); if (!mutationResult.next) { @@ -50,9 +42,7 @@ export class WalletSettingService { await this.#arrangeSchedule(mutationResult.prev, mutationResult.next); - const { autoReloadJobId, ...setting } = mutationResult.next!; - - return setting; + return mutationResult.next!; } async #update(userId: UserOutput["id"], settings: WalletSettingInput): Promise<{ prev?: WalletSettingOutput; next?: WalletSettingOutput }> { @@ -123,11 +113,7 @@ export class WalletSettingService { async #arrangeSchedule(prev?: WalletSettingOutput, next?: WalletSettingOutput) { if (!prev?.autoReloadEnabled && next?.autoReloadEnabled) { - await this.walletReloadJobService.scheduleForWalletSetting(next, { prevAction: "cancel" }); - } - - if (!next?.autoReloadEnabled && next?.autoReloadJobId) { - await this.walletReloadJobService.cancel(next.userId, next.autoReloadJobId); + await this.walletReloadJobService.scheduleForWalletSetting(next); } } @@ -137,9 +123,6 @@ export class WalletSettingService { assert(walletSetting, 404, "WalletSetting Not Found"); - await Promise.all([ - this.walletSettingRepository.accessibleBy(ability, "delete").deleteBy({ userId }), - ...(walletSetting.autoReloadJobId ? [this.walletReloadJobService.cancel(userId, walletSetting.autoReloadJobId)] : []) - ]); + await this.walletSettingRepository.accessibleBy(ability, "delete").deleteBy({ userId }); } } diff --git a/apps/api/src/core/services/job-queue/job-queue.service.ts b/apps/api/src/core/services/job-queue/job-queue.service.ts index 84557e4715..4c05cd364f 100644 --- a/apps/api/src/core/services/job-queue/job-queue.service.ts +++ b/apps/api/src/core/services/job-queue/job-queue.service.ts @@ -120,6 +120,31 @@ export class JobQueueService implements Disposable { } } + async cancelCreatedBy(query: { name: string; singletonKey: string }): Promise { + const db = await this.pgBoss.getDb(); + const schema = this.coreConfig.get("POSTGRES_BACKGROUND_JOBS_SCHEMA"); + const result = (await db.executeSql( + ` + WITH results as ( + UPDATE ${schema}.job + SET completed_on = now(), + state = 'cancelled' + WHERE name = $1 + AND state = 'created' + AND singleton_key = $2 + RETURNING id + ) + SELECT id FROM results + `, + [query.name, query.singletonKey] + )) as { rows: { id: string }[] }; + + this.logger.info({ + event: "JOBS_CANCELLED", + jobIds: result.rows.map(r => r.id) + }); + } + async complete(name: string, id: string): Promise { try { await this.pgBoss.complete(name, id); diff --git a/apps/api/test/seeders/wallet-setting.seeder.ts b/apps/api/test/seeders/wallet-setting.seeder.ts index cc20f62ddf..f853b509c2 100644 --- a/apps/api/test/seeders/wallet-setting.seeder.ts +++ b/apps/api/test/seeders/wallet-setting.seeder.ts @@ -10,7 +10,6 @@ export const generateWalletSetting = (overrides: Partial) = autoReloadEnabled: faker.datatype.boolean(), autoReloadThreshold: faker.number.float({ min: 0, max: 1000, fractionDigits: 2 }), autoReloadAmount: faker.number.float({ min: 0, max: 1000, fractionDigits: 2 }), - autoReloadJobId: faker.string.uuid(), createdAt: faker.date.recent(), updatedAt: faker.date.recent(), ...overrides