diff --git a/server/src/services/event-bridge.service.ts b/server/src/services/event-bridge.service.ts index 2326622..ff1682b 100644 --- a/server/src/services/event-bridge.service.ts +++ b/server/src/services/event-bridge.service.ts @@ -1,6 +1,7 @@ import sorobanService from './soroban.service'; import prisma from '../utils/prisma'; import logger from '../utils/logger'; +import queueService, { JobType } from './queue.service'; interface SorobanEvent { id?: string; @@ -117,11 +118,31 @@ class EventBridgeService { try { // Port logic from indexer_service.rs - // Example: Handle PAY_DONE event (this depends on the contract event schema) - if (event.type === 'contract' && event.topic?.[0] === 'PAY_DONE') { - const paymentId = event.value?.paymentId; - if (!paymentId) return; - + // Checks if event is a contract event and has the expected structure + if (event.type === 'contract' && event.topic) { + // Topic decoding would happen here using scValToNative + // For now assuming existing string topics for simplicity or raw match + // In production, use scValToNative(xdr.ScVal.fromXDR(event.topic[0], 'base64')) + + // Simplified matching for topic strings if they are not XDR encoded in this mock/stub + const topic = event.topic[0]; + + if (topic === 'PAY_DONE' || topic === 'TRANSFER_DONE') { + const paymentId = event.value?.paymentId; // Need to decode value too + + if (paymentId) { + await queueService.addJob({ + type: JobType.SYNC, + data: { + syncType: 'ON_CHAIN_COMPLETION', + eventType: topic, + paymentId: paymentId, + rawEvent: event + } + }); + logger.info(`Queued SYNC job for ${topic} event: ${paymentId}`); + } + } await prisma.payment.update({ where: { id: paymentId }, data: { status: 'COMPLETED' }, diff --git a/server/src/services/queue.service.ts b/server/src/services/queue.service.ts index b254f7b..19dd6fb 100644 --- a/server/src/services/queue.service.ts +++ b/server/src/services/queue.service.ts @@ -32,6 +32,32 @@ const DEFAULT_OPTIONS: JobsOptions = { }; class QueueService { + private emailQueue: Queue; + private pushQueue: Queue; + private syncQueue: Queue; + private blockchainTxQueue: Queue; + + constructor() { + this.emailQueue = new Queue('email-queue', { connection: connection as any }); + this.pushQueue = new Queue('push-queue', { connection: connection as any }); + this.syncQueue = new Queue('sync-queue', { connection: connection as any }); + this.blockchainTxQueue = new Queue('blockchain-tx-queue', { connection: connection as any }); + } + + public getEmailQueue(): Queue { + return this.emailQueue; + } + + public getPushQueue(): Queue { + return this.pushQueue; + } + + public getSyncQueue(): Queue { + return this.syncQueue; + } + + public getBlockchainTxQueue(): Queue { + return this.blockchainTxQueue; private queue: Queue; constructor() { @@ -79,6 +105,29 @@ class QueueService { /** Generic add for backwards compatibility - validates payload structure */ public async addJob(payload: JobPayload, options?: JobsOptions) { + const defaultOptions: JobsOptions = { + attempts: 5, + backoff: { + type: 'exponential', + delay: 1000, + }, + removeOnComplete: true, // Auto-remove completed jobs to save Redis space + removeOnFail: false, // Keep failed jobs for inspection + }; + const finalOptions = { ...defaultOptions, ...options }; + + switch (payload.type) { + case JobType.EMAIL: + return this.emailQueue.add(JobType.EMAIL, payload.data, finalOptions); + case JobType.NOTIFICATION: + return this.pushQueue.add(JobType.NOTIFICATION, payload.data, finalOptions); + case JobType.SYNC: + return this.syncQueue.add(JobType.SYNC, payload.data, finalOptions); + case JobType.BLOCKCHAIN_TX: + return this.blockchainTxQueue.add(JobType.BLOCKCHAIN_TX, payload.data, finalOptions); + default: + throw new Error(`Unknown job type: ${payload.type}`); + } return this.queue.add(payload.type, payload.data, { ...DEFAULT_OPTIONS, ...options, diff --git a/server/src/services/soroban.service.ts b/server/src/services/soroban.service.ts index 15ecf27..b15d73a 100644 --- a/server/src/services/soroban.service.ts +++ b/server/src/services/soroban.service.ts @@ -212,6 +212,8 @@ class SorobanService { async getEvents(startLedger: number) { return this.server.getEvents({ startLedger, + filters: [], + limit: 100 // Reasonable limit per poll filters: [ { type: 'contract', diff --git a/server/src/workers/index.ts b/server/src/workers/index.ts index 8171e42..331717f 100644 --- a/server/src/workers/index.ts +++ b/server/src/workers/index.ts @@ -1,10 +1,40 @@ import { Worker } from 'bullmq'; import { connection } from '../utils/redis'; +import queueService, { JobType } from '../services/queue.service'; import { JobType } from '../services/queue.service'; import { workerConfig } from '../config/worker.config'; import { getProcessor } from '../processors'; import logger from '../utils/logger'; +import prisma from '../utils/prisma'; +import { PaymentStatus } from '@prisma/client'; +export const startWorkers = () => { + // Email Worker + new Worker('email-queue', async (job: Job) => { + logger.info(`Processing EMAIL job ${job.id}`); + await processEmail(job.data); + }, { connection: connection as any, concurrency: 5 }); + + // Push Notification Worker + new Worker('push-queue', async (job: Job) => { + logger.info(`Processing PUSH job ${job.id}`); + await processNotification(job.data); + }, { connection: connection as any, concurrency: 5 }); + + // Sync Worker + new Worker('sync-queue', async (job: Job) => { + logger.info(`Processing SYNC job ${job.id}`); + await processSync(job.data); + }, { connection: connection as any, concurrency: 1 }); // Sequential processing for sync might be safer or just 1 for now + + // Blockchain Tx Worker + new Worker('blockchain-tx-queue', async (job: Job) => { + logger.info(`Processing BLOCKCHAIN_TX job ${job.id}`); + await processBlockchainTx(job.data); + }, { connection: connection as any, concurrency: 5 }); + + logger.info('Background workers started for all queues...'); +}; let worker: Worker | null = null; export function startWorkers(): Worker { @@ -84,6 +114,56 @@ export async function stopWorkers(): Promise { } } +const processSync = async (data: any) => { + logger.info('Processing SYNC job', { data }); + + if (data.syncType === 'ON_CHAIN_COMPLETION' && (data.eventType === 'PAY_DONE' || data.eventType === 'TRANSFER_DONE')) { + const { paymentId } = data; + + if (!paymentId) return; + + const payment = await prisma.payment.findUnique({ where: { id: paymentId } }); + + if (!payment) { + logger.warn(`Payment not found for sync: ${paymentId}`); + return; + } + + if (payment.status === PaymentStatus.COMPLETED) { + logger.info(`Payment ${paymentId} already completed. Skipping.`); + return; + } + + await prisma.payment.update({ + where: { id: paymentId }, + data: { status: PaymentStatus.COMPLETED }, + }); + logger.info(`Payment ${paymentId} marked as COMPLETED.`); + + // Dispatch downstream jobs + await queueService.addJob({ + type: JobType.EMAIL, + data: { + to: 'user@example.com', // Placeholder + subject: 'Payment Completed', + paymentId, + amount: payment.sendAmount.toString() + } + }); + + if (payment.userAddress) { + await queueService.addJob({ + type: JobType.NOTIFICATION, + data: { + userId: payment.userAddress, + title: 'Payment Completed', + paymentId + } + }); + } + } +}; + export function getWorker(): Worker | null { return worker; }