Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions backend/src/repositories/market.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,18 @@ export class MarketRepository extends BaseRepository<Market> {
});
}

async getMarketsPastClosingTime(): Promise<Market[]> {
return await this.prisma.market.findMany({
where: {
status: MarketStatus.OPEN,
closingAt: {
lte: new Date(),
},
},
orderBy: { closingAt: 'asc' },
});
}

async getMarketStatistics() {
const [totalMarkets, activeMarkets, totalVolume, avgParticipants] =
await Promise.all([
Expand Down
66 changes: 66 additions & 0 deletions backend/src/services/cron.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ export class CronService {
async initialize() {
logger.info('Initializing scheduled jobs');

// Market Auto-Close: Every 1 minute
cron.schedule('* * * * *', async () => {
await this.autoCloseMarkets();
});

// Weekly Ranking Reset: Every Monday at 00:00 UTC
// Cron pattern: minute hour day-of-month month day-of-week
cron.schedule('0 0 * * 1', async () => {
Expand All @@ -45,6 +50,67 @@ export class CronService {
logger.info('Scheduled jobs initialized successfully');
}

/**
* Auto-closes markets that have passed their closing time.
* Runs every 1 minute.
*/
async autoCloseMarkets() {
logger.info('Running market auto-close job');

let markets;
try {
markets = await this.marketRepository.getMarketsPastClosingTime();
} catch (error) {
logger.error('Market auto-close: failed to fetch markets past closing time', { error });
return;
}

if (markets.length === 0) {
logger.info('Market auto-close: no markets to close');
return;
}

logger.info(`Market auto-close: found ${markets.length} market(s) to close`);

let successCount = 0;
let failureCount = 0;

for (const market of markets) {
try {
await this.marketService.closeMarket(market.id);
successCount++;
logger.info(`Market auto-close: successfully closed market ${market.id}`, {
marketId: market.id,
title: market.title,
closingAt: market.closingAt,
});
} catch (error) {
failureCount++;
logger.error(`Market auto-close: failed to close market ${market.id}`, {
error,
marketId: market.id,
title: market.title,
closingAt: market.closingAt,
});
// Continue processing remaining markets
}
}

logger.info('Market auto-close: job completed', {
total: markets.length,
success: successCount,
failures: failureCount,
});

// Alert on failures if any
if (failureCount > 0) {
logger.warn(`Market auto-close: ${failureCount} market(s) failed to close`, {
failureCount,
totalAttempted: markets.length,
});
}
}

/**
* Polls oracle contract for all CLOSED markets and resolves any that have reached consensus.
*/
Expand Down
132 changes: 132 additions & 0 deletions backend/src/services/scheduler.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Scheduler service - centralized scheduling configuration and management
import cron from 'node-cron';
import { logger } from '../utils/logger.js';

export interface ScheduledJob {
name: string;
schedule: string;
task: () => Promise<void>;
enabled: boolean;
}

export class SchedulerService {
private jobs: Map<string, cron.ScheduledTask> = new Map();
private jobConfigs: Map<string, ScheduledJob> = new Map();

/**
* Register a scheduled job
*/
registerJob(config: ScheduledJob): void {
if (this.jobConfigs.has(config.name)) {
logger.warn(`Scheduler: job '${config.name}' already registered, skipping`);
return;
}

this.jobConfigs.set(config.name, config);
logger.info(`Scheduler: registered job '${config.name}' with schedule '${config.schedule}'`);
}

/**
* Start a specific job by name
*/
startJob(name: string): boolean {
const config = this.jobConfigs.get(name);
if (!config) {
logger.error(`Scheduler: job '${name}' not found`);
return false;
}

if (!config.enabled) {
logger.info(`Scheduler: job '${name}' is disabled, not starting`);
return false;
}

if (this.jobs.has(name)) {
logger.warn(`Scheduler: job '${name}' already running`);
return false;
}

try {
const task = cron.schedule(config.schedule, async () => {
logger.info(`Scheduler: executing job '${name}'`);
try {
await config.task();
} catch (error) {
logger.error(`Scheduler: job '${name}' failed`, { error });
}
});

this.jobs.set(name, task);
logger.info(`Scheduler: started job '${name}'`);
return true;
} catch (error) {
logger.error(`Scheduler: failed to start job '${name}'`, { error });
return false;
}
}

/**
* Stop a specific job by name
*/
stopJob(name: string): boolean {
const task = this.jobs.get(name);
if (!task) {
logger.warn(`Scheduler: job '${name}' not running`);
return false;
}

task.stop();
this.jobs.delete(name);
logger.info(`Scheduler: stopped job '${name}'`);
return true;
}

/**
* Start all registered jobs
*/
startAll(): void {
logger.info('Scheduler: starting all registered jobs');
for (const [name] of this.jobConfigs) {
this.startJob(name);
}
}

/**
* Stop all running jobs
*/
stopAll(): void {
logger.info('Scheduler: stopping all jobs');
for (const [name, task] of this.jobs) {
task.stop();
logger.info(`Scheduler: stopped job '${name}'`);
}
this.jobs.clear();
}

/**
* Get status of all jobs
*/
getJobStatus(): Array<{ name: string; schedule: string; enabled: boolean; running: boolean }> {
const status: Array<{ name: string; schedule: string; enabled: boolean; running: boolean }> = [];

for (const [name, config] of this.jobConfigs) {
status.push({
name,
schedule: config.schedule,
enabled: config.enabled,
running: this.jobs.has(name),
});
}

return status;
}

/**
* Validate cron expression
*/
static validateCronExpression(expression: string): boolean {
return cron.validate(expression);
}
}

export const schedulerService = new SchedulerService();
Loading