From be144d8b235ad79dc500acb30239aeed4932702b Mon Sep 17 00:00:00 2001 From: caxtonacollins Date: Thu, 19 Feb 2026 23:49:05 +0100 Subject: [PATCH 1/6] feat: implement specific queues in QueueService --- server/src/services/queue.service.ts | 42 +++++++++++++++++++++------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/server/src/services/queue.service.ts b/server/src/services/queue.service.ts index 5ec8985..bbec3fa 100644 --- a/server/src/services/queue.service.ts +++ b/server/src/services/queue.service.ts @@ -14,25 +14,47 @@ export interface JobPayload { } class QueueService { - private queues: Map = new Map(); + private emailQueue: Queue; + private pushQueue: Queue; + private syncQueue: Queue; + private blockchainTxQueue: Queue; constructor() { - this.createQueue('default'); + this.emailQueue = new Queue('email-queue', { connection: connection as any }); + this.pushQueue = new Queue('push-queue', { connection: connection as any }); + this.syncQueue = new Queue('sync-queue', { connection: connection as any }); + this.blockchainTxQueue = new Queue('blockchain-tx-queue', { connection: connection as any }); } - private createQueue(name: string) { - const queue = new Queue(name, { connection: connection as any }); - this.queues.set(name, queue); - return queue; + public getEmailQueue(): Queue { + return this.emailQueue; } - public getQueue(name: string = 'default'): Queue { - return this.queues.get(name) || this.createQueue(name); + public getPushQueue(): Queue { + return this.pushQueue; + } + + public getSyncQueue(): Queue { + return this.syncQueue; + } + + public getBlockchainTxQueue(): Queue { + return this.blockchainTxQueue; } public async addJob(payload: JobPayload, options?: JobsOptions) { - const queue = this.getQueue(); - return queue.add(payload.type, payload.data, options); + switch (payload.type) { + case JobType.EMAIL: + return this.emailQueue.add(JobType.EMAIL, payload.data, options); + case JobType.NOTIFICATION: + return this.pushQueue.add(JobType.NOTIFICATION, payload.data, options); + case JobType.SYNC: + return this.syncQueue.add(JobType.SYNC, payload.data, options); + case JobType.BLOCKCHAIN_TX: + return this.blockchainTxQueue.add(JobType.BLOCKCHAIN_TX, payload.data, options); + default: + throw new Error(`Unknown job type: ${payload.type}`); + } } } From 8dd6e971d2adc5b7bce9a0154987516937363b96 Mon Sep 17 00:00:00 2001 From: caxtonacollins Date: Thu, 19 Feb 2026 23:49:37 +0100 Subject: [PATCH 2/6] feat: use specific workers for each queue --- server/src/workers/index.ts | 63 +++++++++++++++---------------------- 1 file changed, 25 insertions(+), 38 deletions(-) diff --git a/server/src/workers/index.ts b/server/src/workers/index.ts index a7a28cb..b382026 100644 --- a/server/src/workers/index.ts +++ b/server/src/workers/index.ts @@ -4,44 +4,31 @@ import { JobType } from '../services/queue.service'; import logger from '../utils/logger'; export const startWorkers = () => { - const worker = new Worker('default', async (job: Job) => { - logger.info(`Processing job ${job.id} of type ${job.name}`); - - try { - switch (job.name) { - case JobType.EMAIL: - await processEmail(job.data); - break; - case JobType.NOTIFICATION: - await processNotification(job.data); - break; - case JobType.BLOCKCHAIN_TX: - await processBlockchainTx(job.data); - break; - case JobType.SYNC: - await processSync(job.data); - break; - default: - logger.warn(`Unknown job type: ${job.name}`); - } - } catch (err: any) { - logger.error(`Error processing job ${job.id}: ${err.message}`, { stack: err.stack }); - throw err; // Allow BullMQ to handle retries - } - }, { - connection: connection as any, - concurrency: 5 - }); - - worker.on('completed', (job) => { - logger.info(`Job ${job.id} completed successfully`); - }); - - worker.on('failed', (job, err) => { - logger.error(`Job ${job?.id} failed with error: ${err.message}`); - }); - - logger.info('Background workers started...'); + // Email Worker + new Worker('email-queue', async (job: Job) => { + logger.info(`Processing EMAIL job ${job.id}`); + await processEmail(job.data); + }, { connection: connection as any, concurrency: 5 }); + + // Push Notification Worker + new Worker('push-queue', async (job: Job) => { + logger.info(`Processing PUSH job ${job.id}`); + await processNotification(job.data); + }, { connection: connection as any, concurrency: 5 }); + + // Sync Worker + new Worker('sync-queue', async (job: Job) => { + logger.info(`Processing SYNC job ${job.id}`); + await processSync(job.data); + }, { connection: connection as any, concurrency: 1 }); // Sequential processing for sync might be safer or just 1 for now + + // Blockchain Tx Worker + new Worker('blockchain-tx-queue', async (job: Job) => { + logger.info(`Processing BLOCKCHAIN_TX job ${job.id}`); + await processBlockchainTx(job.data); + }, { connection: connection as any, concurrency: 5 }); + + logger.info('Background workers started for all queues...'); }; const processEmail = async (data: any) => { From f30c9605d1c5e8382571c6ce08f07e16db7a1f4c Mon Sep 17 00:00:00 2001 From: caxtonacollins Date: Thu, 19 Feb 2026 23:50:01 +0100 Subject: [PATCH 3/6] feat: implement Soroban polling and event bridging to Sync queue --- server/src/services/event-bridge.service.ts | 35 +++++++++++++++------ server/src/services/soroban.service.ts | 5 +-- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/server/src/services/event-bridge.service.ts b/server/src/services/event-bridge.service.ts index b99fb53..c8fa9e3 100644 --- a/server/src/services/event-bridge.service.ts +++ b/server/src/services/event-bridge.service.ts @@ -1,7 +1,7 @@ import sorobanService from './soroban.service'; import prisma from '../utils/prisma'; -import { PaymentStatus } from '@prisma/client'; import logger from '../utils/logger'; +import queueService, { JobType } from './queue.service'; class EventBridgeService { private isRunning: boolean = false; @@ -53,16 +53,31 @@ class EventBridgeService { private async processEvent(event: any) { try { // Port logic from indexer_service.rs - // Example: Handle PAY_DONE event (this depends on the contract event schema) - if (event.type === 'contract' && event.topic?.[0] === 'PAY_DONE') { - const paymentId = event.value?.paymentId; - if (!paymentId) return; + // Checks if event is a contract event and has the expected structure + if (event.type === 'contract' && event.topic) { + // Topic decoding would happen here using scValToNative + // For now assuming existing string topics for simplicity or raw match + // In production, use scValToNative(xdr.ScVal.fromXDR(event.topic[0], 'base64')) - await prisma.payment.update({ - where: { id: paymentId }, - data: { status: PaymentStatus.COMPLETED }, - }); - logger.info(`Payment ${paymentId} completed on-chain via Event Bridge`); + // Simplified matching for topic strings if they are not XDR encoded in this mock/stub + const topic = event.topic[0]; + + if (topic === 'PAY_DONE' || topic === 'TRANSFER_DONE') { + const paymentId = event.value?.paymentId; // Need to decode value too + + if (paymentId) { + await queueService.addJob({ + type: JobType.SYNC, + data: { + syncType: 'ON_CHAIN_COMPLETION', + eventType: topic, + paymentId: paymentId, + rawEvent: event + } + }); + logger.info(`Queued SYNC job for ${topic} event: ${paymentId}`); + } + } } } catch (err: any) { logger.error('Error processing event:', { error: err.message, event }); diff --git a/server/src/services/soroban.service.ts b/server/src/services/soroban.service.ts index dc7ddd6..d7a47fe 100644 --- a/server/src/services/soroban.service.ts +++ b/server/src/services/soroban.service.ts @@ -39,12 +39,9 @@ class SorobanService { } async getEvents(startLedger: number) { - // Logic to poll for events return this.server.getEvents({ startLedger, - filters: [ - // Filter by contract ID - ] + limit: 100 // Reasonable limit per poll }); } } From 5e7b524fc43a46040745bc1972e9cd2d8df75a44 Mon Sep 17 00:00:00 2001 From: caxtonacollins Date: Thu, 19 Feb 2026 23:51:30 +0100 Subject: [PATCH 4/6] fix: processSync implementation and imports --- server/src/workers/index.ts | 54 ++++++++++++++++++++++++++++++++++--- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/server/src/workers/index.ts b/server/src/workers/index.ts index b382026..91b3cb9 100644 --- a/server/src/workers/index.ts +++ b/server/src/workers/index.ts @@ -1,7 +1,9 @@ import { Worker, Job } from 'bullmq'; import { connection } from '../utils/redis'; -import { JobType } from '../services/queue.service'; +import queueService, { JobType } from '../services/queue.service'; import logger from '../utils/logger'; +import prisma from '../utils/prisma'; +import { PaymentStatus } from '@prisma/client'; export const startWorkers = () => { // Email Worker @@ -47,6 +49,52 @@ const processBlockchainTx = async (data: any) => { }; const processSync = async (data: any) => { - // Logic for analytical syncs or database maintenance - logger.info('Performing sync operation:', { syncType: data.syncType }); + logger.info('Processing SYNC job', { data }); + + if (data.syncType === 'ON_CHAIN_COMPLETION' && (data.eventType === 'PAY_DONE' || data.eventType === 'TRANSFER_DONE')) { + const { paymentId } = data; + + if (!paymentId) return; + + const payment = await prisma.payment.findUnique({ where: { id: paymentId } }); + + if (!payment) { + logger.warn(`Payment not found for sync: ${paymentId}`); + return; + } + + if (payment.status === PaymentStatus.COMPLETED) { + logger.info(`Payment ${paymentId} already completed. Skipping.`); + return; + } + + await prisma.payment.update({ + where: { id: paymentId }, + data: { status: PaymentStatus.COMPLETED }, + }); + logger.info(`Payment ${paymentId} marked as COMPLETED.`); + + // Dispatch downstream jobs + await queueService.addJob({ + type: JobType.EMAIL, + data: { + to: 'user@example.com', // Placeholder + subject: 'Payment Completed', + paymentId, + amount: payment.sendAmount.toString() + } + }); + + if (payment.userAddress) { + await queueService.addJob({ + type: JobType.NOTIFICATION, + data: { + userId: payment.userAddress, + title: 'Payment Completed', + paymentId + } + }); + } + } }; + From 5140f68137836fd6fd3213b35d0ad128ae97f001 Mon Sep 17 00:00:00 2001 From: caxtonacollins Date: Thu, 19 Feb 2026 23:51:47 +0100 Subject: [PATCH 5/6] feat: implement default exponential backoff retry for all jobs --- server/src/services/queue.service.ts | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/server/src/services/queue.service.ts b/server/src/services/queue.service.ts index bbec3fa..ca7ab16 100644 --- a/server/src/services/queue.service.ts +++ b/server/src/services/queue.service.ts @@ -43,15 +43,26 @@ class QueueService { } public async addJob(payload: JobPayload, options?: JobsOptions) { + const defaultOptions: JobsOptions = { + attempts: 5, + backoff: { + type: 'exponential', + delay: 1000, + }, + removeOnComplete: true, // Auto-remove completed jobs to save Redis space + removeOnFail: false, // Keep failed jobs for inspection + }; + const finalOptions = { ...defaultOptions, ...options }; + switch (payload.type) { case JobType.EMAIL: - return this.emailQueue.add(JobType.EMAIL, payload.data, options); + return this.emailQueue.add(JobType.EMAIL, payload.data, finalOptions); case JobType.NOTIFICATION: - return this.pushQueue.add(JobType.NOTIFICATION, payload.data, options); + return this.pushQueue.add(JobType.NOTIFICATION, payload.data, finalOptions); case JobType.SYNC: - return this.syncQueue.add(JobType.SYNC, payload.data, options); + return this.syncQueue.add(JobType.SYNC, payload.data, finalOptions); case JobType.BLOCKCHAIN_TX: - return this.blockchainTxQueue.add(JobType.BLOCKCHAIN_TX, payload.data, options); + return this.blockchainTxQueue.add(JobType.BLOCKCHAIN_TX, payload.data, finalOptions); default: throw new Error(`Unknown job type: ${payload.type}`); } From 8a3a2e750afaf8bf3739801a470709a0a1e96466 Mon Sep 17 00:00:00 2001 From: caxtonacollins Date: Fri, 20 Feb 2026 00:48:38 +0100 Subject: [PATCH 6/6] feat: On-Chain Sync & Background Jobs --- server/src/services/soroban.service.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/services/soroban.service.ts b/server/src/services/soroban.service.ts index d7a47fe..a9fa07c 100644 --- a/server/src/services/soroban.service.ts +++ b/server/src/services/soroban.service.ts @@ -41,6 +41,7 @@ class SorobanService { async getEvents(startLedger: number) { return this.server.getEvents({ startLedger, + filters: [], limit: 100 // Reasonable limit per poll }); }