Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ export const WalletSetting = pgTable(
.references(() => Users.id, { onDelete: "cascade" })
.notNull(),
autoReloadEnabled: boolean("auto_reload_enabled").default(false).notNull(),
autoReloadJobId: uuid("auto_reload_job_id"),
createdAt: timestamp("created_at").defaultNow(),
updatedAt: timestamp("updated_at").defaultNow()
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ describe(WalletBalanceReloadCheckHandler.name, () => {
}),
expect.objectContaining({
startAfter: expect.any(String),
prevAction: "complete"
withCleanup: true
})
);
const scheduleCall = walletReloadJobService.scheduleForWalletSetting.mock.calls[0];
Expand Down Expand Up @@ -191,7 +191,7 @@ describe(WalletBalanceReloadCheckHandler.name, () => {
}),
expect.objectContaining({
startAfter: expect.any(String),
prevAction: "complete"
withCleanup: true
})
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type ValidationError = {
};

type InitializedWallet = Require<Pick<UserWalletOutput, "address">, "address">;
type ActionableWalletSetting = Pick<WalletSettingOutput, "id" | "userId" | "autoReloadJobId">;
type ActionableWalletSetting = Pick<WalletSettingOutput, "id" | "userId">;

type Resources = {
walletSetting: ActionableWalletSetting;
Expand All @@ -38,7 +38,7 @@ export class WalletBalanceReloadCheckHandler implements JobHandler<WalletBalance

public readonly concurrency = 10;

public readonly policy = "short";
public readonly policy = "singleton";

#CHECK_INTERVAL_IN_MS = millisecondsInDay;

Expand Down Expand Up @@ -209,7 +209,7 @@ export class WalletBalanceReloadCheckHandler implements JobHandler<WalletBalance
try {
await this.walletReloadJobService.scheduleForWalletSetting(resources.walletSetting, {
startAfter: this.#calculateNextCheckDate().toISOString(),
prevAction: "complete"
withCleanup: true
});
} catch (error) {
this.instrumentationService.recordSchedulingError(resources.wallet.address, error);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import { faker } from "@faker-js/faker";
import { mock } from "jest-mock-extended";

import { WalletBalanceReloadCheck } from "@src/billing/events/wallet-balance-reload-check";
import type { WalletSettingRepository } from "@src/billing/repositories";
import type { JobQueueService } from "@src/core";
import type { LoggerService } from "@src/core/providers/logging.provider";
import { WalletReloadJobService } from "./wallet-reload-job.service";

import { generateWalletSetting } from "@test/seeders/wallet-setting.seeder";

describe(WalletReloadJobService.name, () => {
describe("scheduleImmediate", () => {
it("returns early when walletSetting does not exist", async () => {
const { service, walletSettingRepository, jobQueueService } = setup();
const userId = faker.string.uuid();
walletSettingRepository.findByUserId.mockResolvedValue(undefined);

await service.scheduleImmediate(userId);

expect(walletSettingRepository.findByUserId).toHaveBeenCalledWith(userId);
expect(jobQueueService.enqueue).not.toHaveBeenCalled();
});

it("returns early when autoReloadEnabled is false", async () => {
const { service, walletSettingRepository, jobQueueService } = setup();
const userId = faker.string.uuid();
const walletSetting = generateWalletSetting({ autoReloadEnabled: false });
walletSettingRepository.findByUserId.mockResolvedValue(walletSetting);

await service.scheduleImmediate(userId);

expect(walletSettingRepository.findByUserId).toHaveBeenCalledWith(userId);
expect(jobQueueService.enqueue).not.toHaveBeenCalled();
});

it("calls scheduleForWalletSetting when conditions are met", async () => {
const { service, walletSettingRepository, jobQueueService } = setup();
const userId = faker.string.uuid();
const walletSetting = generateWalletSetting({ autoReloadEnabled: true });
walletSettingRepository.findByUserId.mockResolvedValue(walletSetting);
const jobId = faker.string.uuid();
jobQueueService.enqueue.mockResolvedValue(jobId);

await service.scheduleImmediate(userId);

expect(walletSettingRepository.findByUserId).toHaveBeenCalledWith(userId);
expect(jobQueueService.enqueue).toHaveBeenCalledWith(
expect.any(WalletBalanceReloadCheck),
expect.objectContaining({
singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}`
})
);
});
});

describe("scheduleForWalletSetting", () => {
it("creates job successfully without cleanup", async () => {
const { service, jobQueueService } = setup();
const walletSetting = generateWalletSetting({
autoReloadEnabled: true
});
const jobId = faker.string.uuid();
jobQueueService.enqueue.mockResolvedValue(jobId);

const result = await service.scheduleForWalletSetting(walletSetting);

expect(jobQueueService.cancelCreatedBy).not.toHaveBeenCalled();
expect(jobQueueService.enqueue).toHaveBeenCalledWith(
expect.any(WalletBalanceReloadCheck),
expect.objectContaining({
singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}`
})
);
expect(result).toBe(jobId);
});

it("cancels created jobs when withCleanup is true", async () => {
const { service, jobQueueService } = setup();
const walletSetting = generateWalletSetting({
autoReloadEnabled: true
});
const jobId = faker.string.uuid();
jobQueueService.enqueue.mockResolvedValue(jobId);

await service.scheduleForWalletSetting(walletSetting, { withCleanup: true });

expect(jobQueueService.cancelCreatedBy).toHaveBeenCalledWith({
name: WalletBalanceReloadCheck.name,
singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}`
});
expect(jobQueueService.enqueue).toHaveBeenCalledWith(
expect.any(WalletBalanceReloadCheck),
expect.objectContaining({
singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}`
})
);
});

it("handles startAfter option", async () => {
const { service, jobQueueService } = setup();
const walletSetting = generateWalletSetting({
autoReloadEnabled: true
});
const jobId = faker.string.uuid();
const startAfter = new Date();
jobQueueService.enqueue.mockResolvedValue(jobId);

await service.scheduleForWalletSetting(walletSetting, { startAfter });

expect(jobQueueService.enqueue).toHaveBeenCalledWith(
expect.any(WalletBalanceReloadCheck),
expect.objectContaining({
singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}`,
startAfter
})
);
});

it("handles both withCleanup and startAfter options", async () => {
const { service, jobQueueService } = setup();
const walletSetting = generateWalletSetting({
autoReloadEnabled: true
});
const jobId = faker.string.uuid();
const startAfter = new Date();
jobQueueService.enqueue.mockResolvedValue(jobId);

await service.scheduleForWalletSetting(walletSetting, { withCleanup: true, startAfter });

expect(jobQueueService.cancelCreatedBy).toHaveBeenCalledWith({
name: WalletBalanceReloadCheck.name,
singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}`
});
expect(jobQueueService.enqueue).toHaveBeenCalledWith(
expect.any(WalletBalanceReloadCheck),
expect.objectContaining({
singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}`,
startAfter
})
);
});

it("throws error when job creation fails", async () => {
const { service, jobQueueService, logger } = setup();
const walletSetting = generateWalletSetting({
autoReloadEnabled: true
});
jobQueueService.enqueue.mockResolvedValue(null);

await expect(service.scheduleForWalletSetting(walletSetting)).rejects.toThrow("Failed to schedule wallet balance reload check");

expect(logger.error).toHaveBeenCalledWith({
event: "JOB_CREATION_FAILED",
userId: walletSetting.userId
});
});
});

describe("cancelCreatedByUserId", () => {
it("cancels created jobs for user with correct parameters", async () => {
const { service, jobQueueService } = setup();
const userId = faker.string.uuid();

await service.cancelCreatedByUserId(userId);

expect(jobQueueService.cancelCreatedBy).toHaveBeenCalledWith({
name: WalletBalanceReloadCheck.name,
singletonKey: `${WalletBalanceReloadCheck.name}.${userId}`
});
});
});

function setup() {
const walletSettingRepository = mock<WalletSettingRepository>();
const jobQueueService = mock<JobQueueService>();
const logger = mock<LoggerService>();

const service = new WalletReloadJobService(walletSettingRepository, jobQueueService, logger);

return {
service,
walletSettingRepository,
jobQueueService,
logger
};
}
});
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import { singleton } from "tsyringe";
import { v4 as uuidv4 } from "uuid";

import { WalletBalanceReloadCheck } from "@src/billing/events/wallet-balance-reload-check";
import { WalletSettingOutput, WalletSettingRepository } from "@src/billing/repositories";
import { EnqueueOptions, JobQueueService, TxService } from "@src/core";
import { EnqueueOptions, JobQueueService } from "@src/core";
import { LoggerService } from "@src/core/providers/logging.provider";

@singleton()
export class WalletReloadJobService {
constructor(
private readonly walletSettingRepository: WalletSettingRepository,
private readonly jobQueueService: JobQueueService,
private readonly txService: TxService,
private readonly logger: LoggerService
) {
this.logger.setContext(WalletReloadJobService.name);
Expand All @@ -20,67 +18,36 @@ export class WalletReloadJobService {
async scheduleImmediate(userId: string): Promise<void> {
const walletSetting = await this.walletSettingRepository.findByUserId(userId);

if (!walletSetting || !walletSetting.userId || !walletSetting.autoReloadEnabled) {
return;
if (walletSetting?.autoReloadEnabled) {
await this.scheduleForWalletSetting(walletSetting);
}

await this.scheduleForWalletSetting(walletSetting, { prevAction: "cancel" });
}

async scheduleForWalletSetting(
walletSetting: Pick<WalletSettingOutput, "id" | "userId" | "autoReloadJobId">,
options?: Pick<EnqueueOptions, "startAfter"> & { prevAction?: "cancel" | "complete" }
walletSetting: Pick<WalletSettingOutput, "id" | "userId">,
options?: Pick<EnqueueOptions, "startAfter"> & { withCleanup?: boolean }
): Promise<string> {
return await this.txService.transaction(async () => {
// Try to cancel/complete the previous job if it exists
// This may fail if the job is already in a terminal state, which is fine
if (walletSetting.autoReloadJobId) {
this.logger.info({
event: "PREVIOUS_JOB_CLEANUP",
action: options?.prevAction,
previousJobId: walletSetting.autoReloadJobId,
userId: walletSetting.userId
});

if (options?.prevAction === "cancel") {
await this.jobQueueService.cancel(WalletBalanceReloadCheck.name, walletSetting.autoReloadJobId);
} else {
await this.jobQueueService.complete(WalletBalanceReloadCheck.name, walletSetting.autoReloadJobId);
}
}

const jobId = uuidv4();
await this.walletSettingRepository.updateById(walletSetting.id, { autoReloadJobId: jobId });

const createdJobId = await this.jobQueueService.enqueue(new WalletBalanceReloadCheck({ userId: walletSetting.userId }), {
singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}`,
id: jobId,
...(options?.startAfter && { startAfter: options.startAfter })
});
if (options?.withCleanup) {
await this.cancelCreatedByUserId(walletSetting.userId);
}

if (!createdJobId) {
this.logger.error({
event: "JOB_CREATION_FAILED",
message: "Failed to schedule wallet balance reload check - a job already exists for this user",
userId: walletSetting.userId,
attemptedJobId: jobId,
previousJobId: walletSetting.autoReloadJobId
});
throw new Error("Failed to schedule wallet balance reload check: job already exists");
}
const createdJobId = await this.jobQueueService.enqueue(new WalletBalanceReloadCheck({ userId: walletSetting.userId }), {
singletonKey: `${WalletBalanceReloadCheck.name}.${walletSetting.userId}`,
...(options?.startAfter && { startAfter: options.startAfter })
});

this.logger.info({
event: "JOB_SCHEDULED",
jobId: createdJobId,
userId: walletSetting.userId,
startAfter: options?.startAfter
if (!createdJobId) {
this.logger.error({
event: "JOB_CREATION_FAILED",
userId: walletSetting.userId
});
throw new Error("Failed to schedule wallet balance reload check");
}

return jobId;
});
return createdJobId;
}

async cancel(userId: string, jobId: string): Promise<void> {
await this.jobQueueService.cancel(WalletBalanceReloadCheck.name, jobId);
async cancelCreatedByUserId(userId: string): Promise<void> {
await this.jobQueueService.cancelCreatedBy({ name: WalletBalanceReloadCheck.name, singletonKey: `${WalletBalanceReloadCheck.name}.${userId}` });
}
}
Loading
Loading