Skip to content
31 changes: 26 additions & 5 deletions server/src/services/event-bridge.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sorobanService from './soroban.service';
import prisma from '../utils/prisma';
import logger from '../utils/logger';
import queueService, { JobType } from './queue.service';

interface SorobanEvent {
id?: string;
Expand Down Expand Up @@ -117,11 +118,31 @@ class EventBridgeService {

try {
// Port logic from indexer_service.rs
// Example: Handle PAY_DONE event (this depends on the contract event schema)
if (event.type === 'contract' && event.topic?.[0] === 'PAY_DONE') {
const paymentId = event.value?.paymentId;
if (!paymentId) return;

// Checks if event is a contract event and has the expected structure
if (event.type === 'contract' && event.topic) {
// Topic decoding would happen here using scValToNative
// For now assuming existing string topics for simplicity or raw match
// In production, use scValToNative(xdr.ScVal.fromXDR(event.topic[0], 'base64'))

// Simplified matching for topic strings if they are not XDR encoded in this mock/stub
const topic = event.topic[0];

if (topic === 'PAY_DONE' || topic === 'TRANSFER_DONE') {
const paymentId = event.value?.paymentId; // Need to decode value too

if (paymentId) {
await queueService.addJob({
type: JobType.SYNC,
data: {
syncType: 'ON_CHAIN_COMPLETION',
eventType: topic,
paymentId: paymentId,
rawEvent: event
}
});
logger.info(`Queued SYNC job for ${topic} event: ${paymentId}`);
}
}
await prisma.payment.update({
where: { id: paymentId },
data: { status: 'COMPLETED' },
Expand Down
49 changes: 49 additions & 0 deletions server/src/services/queue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,32 @@ const DEFAULT_OPTIONS: JobsOptions = {
};

class QueueService {
private emailQueue: Queue;
private pushQueue: Queue;
private syncQueue: Queue;
private blockchainTxQueue: Queue;

constructor() {
this.emailQueue = new Queue('email-queue', { connection: connection as any });
this.pushQueue = new Queue('push-queue', { connection: connection as any });
this.syncQueue = new Queue('sync-queue', { connection: connection as any });
this.blockchainTxQueue = new Queue('blockchain-tx-queue', { connection: connection as any });
}

public getEmailQueue(): Queue {
return this.emailQueue;
}

public getPushQueue(): Queue {
return this.pushQueue;
}

public getSyncQueue(): Queue {
return this.syncQueue;
}

public getBlockchainTxQueue(): Queue {
return this.blockchainTxQueue;
private queue: Queue;

constructor() {
Expand Down Expand Up @@ -79,6 +105,29 @@ class QueueService {

/** Generic add for backwards compatibility - validates payload structure */
public async addJob(payload: JobPayload, options?: JobsOptions) {
const defaultOptions: JobsOptions = {
attempts: 5,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: true, // Auto-remove completed jobs to save Redis space
removeOnFail: false, // Keep failed jobs for inspection
};
const finalOptions = { ...defaultOptions, ...options };

switch (payload.type) {
case JobType.EMAIL:
return this.emailQueue.add(JobType.EMAIL, payload.data, finalOptions);
case JobType.NOTIFICATION:
return this.pushQueue.add(JobType.NOTIFICATION, payload.data, finalOptions);
case JobType.SYNC:
return this.syncQueue.add(JobType.SYNC, payload.data, finalOptions);
case JobType.BLOCKCHAIN_TX:
return this.blockchainTxQueue.add(JobType.BLOCKCHAIN_TX, payload.data, finalOptions);
default:
throw new Error(`Unknown job type: ${payload.type}`);
}
return this.queue.add(payload.type, payload.data, {
...DEFAULT_OPTIONS,
...options,
Expand Down
2 changes: 2 additions & 0 deletions server/src/services/soroban.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ class SorobanService {
async getEvents(startLedger: number) {
return this.server.getEvents({
startLedger,
filters: [],
limit: 100 // Reasonable limit per poll
filters: [
{
type: 'contract',
Expand Down
80 changes: 80 additions & 0 deletions server/src/workers/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,40 @@
import { Worker } from 'bullmq';
import { connection } from '../utils/redis';
import queueService, { JobType } from '../services/queue.service';
import { JobType } from '../services/queue.service';
import { workerConfig } from '../config/worker.config';
import { getProcessor } from '../processors';
import logger from '../utils/logger';
import prisma from '../utils/prisma';
import { PaymentStatus } from '@prisma/client';

export const startWorkers = () => {
// Email Worker
new Worker('email-queue', async (job: Job) => {
logger.info(`Processing EMAIL job ${job.id}`);
await processEmail(job.data);
}, { connection: connection as any, concurrency: 5 });

// Push Notification Worker
new Worker('push-queue', async (job: Job) => {
logger.info(`Processing PUSH job ${job.id}`);
await processNotification(job.data);
}, { connection: connection as any, concurrency: 5 });

// Sync Worker
new Worker('sync-queue', async (job: Job) => {
logger.info(`Processing SYNC job ${job.id}`);
await processSync(job.data);
}, { connection: connection as any, concurrency: 1 }); // Sequential processing for sync might be safer or just 1 for now

// Blockchain Tx Worker
new Worker('blockchain-tx-queue', async (job: Job) => {
logger.info(`Processing BLOCKCHAIN_TX job ${job.id}`);
await processBlockchainTx(job.data);
}, { connection: connection as any, concurrency: 5 });

logger.info('Background workers started for all queues...');
};
let worker: Worker | null = null;

export function startWorkers(): Worker {
Expand Down Expand Up @@ -84,6 +114,56 @@ export async function stopWorkers(): Promise<void> {
}
}

const processSync = async (data: any) => {
logger.info('Processing SYNC job', { data });

if (data.syncType === 'ON_CHAIN_COMPLETION' && (data.eventType === 'PAY_DONE' || data.eventType === 'TRANSFER_DONE')) {
const { paymentId } = data;

if (!paymentId) return;

const payment = await prisma.payment.findUnique({ where: { id: paymentId } });

if (!payment) {
logger.warn(`Payment not found for sync: ${paymentId}`);
return;
}

if (payment.status === PaymentStatus.COMPLETED) {
logger.info(`Payment ${paymentId} already completed. Skipping.`);
return;
}

await prisma.payment.update({
where: { id: paymentId },
data: { status: PaymentStatus.COMPLETED },
});
logger.info(`Payment ${paymentId} marked as COMPLETED.`);

// Dispatch downstream jobs
await queueService.addJob({
type: JobType.EMAIL,
data: {
to: 'user@example.com', // Placeholder
subject: 'Payment Completed',
paymentId,
amount: payment.sendAmount.toString()
}
});

if (payment.userAddress) {
await queueService.addJob({
type: JobType.NOTIFICATION,
data: {
userId: payment.userAddress,
title: 'Payment Completed',
paymentId
}
});
}
}
};

export function getWorker(): Worker | null {
return worker;
}
Loading