Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions server/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,10 @@ REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=

# Worker / BullMQ
WORKER_CONCURRENCY=5
JOB_MAX_RETRIES=3
JOB_BACKOFF_DELAY_MS=1000

# Security
JWT_SECRET=super-secret-key
1 change: 0 additions & 1 deletion server/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions server/src/config/worker.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/**
* Worker configuration for BullMQ job processors.
* Controls concurrency, retries, and graceful shutdown.
*/
export const workerConfig = {
defaultQueue: 'zaps:jobs',
concurrency: parseInt(process.env.WORKER_CONCURRENCY || '5', 10),
maxRetries: parseInt(process.env.JOB_MAX_RETRIES || '3', 10),
backoff: {
type: 'exponential' as const,
delay: parseInt(process.env.JOB_BACKOFF_DELAY_MS || '1000', 10),
},
stalledInterval: 30000,
lockDuration: 300000, // 5 min
lockRenewTime: 150000, // 2.5 min (renew before expiry)
};
6 changes: 3 additions & 3 deletions server/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import app from './app';
import config from './config';
import { startWorkers } from './workers';
import { startWorkers, stopWorkers } from './workers';
import eventBridgeService from './services/event-bridge.service';
import evmBridgeMonitorService from './services/evm-bridge-monitor.service';
import logger from './utils/logger';
Expand All @@ -25,5 +25,5 @@ const shutdown = async () => {
});
};

process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
process.on('SIGINT', () => void shutdown());
process.on('SIGTERM', () => void shutdown());
52 changes: 52 additions & 0 deletions server/src/processors/blockchain-tx.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import sorobanService from '../services/soroban.service';
import prisma from '../utils/prisma';
import { PaymentStatus } from '@prisma/client';
import { TransferStatus } from '@prisma/client';
import logger from '../utils/logger';
import type { BlockchainTxJobPayload } from '../types/job-payloads';

export async function processBlockchainTx(data: BlockchainTxJobPayload): Promise<void> {
const { fromAddress, toAddress, amount, xdr, paymentId, transferId } = data;
const logCtx = {
component: 'blockchain-tx-processor',
fromAddress,
toAddress,
paymentId,
};

logger.info('Processing blockchain tx job', logCtx);

if (!fromAddress || !toAddress) {
throw new Error('Invalid blockchain tx payload: missing fromAddress or toAddress');
}

try {
if (xdr) {
const simulated = await sorobanService.simulateTransaction(xdr);
logger.debug('Transaction simulated', { ...logCtx, simulated });
}

// Submit XDR to network (via Horizon / Soroban RPC)
// const result = await horizon.submitTransaction(xdr);
const txHash = `tx_${Date.now()}_${Math.random().toString(36).slice(2, 10)}`;

if (paymentId) {
await prisma.payment.updateMany({
where: { id: paymentId },
data: { txHash, status: PaymentStatus.PROCESSING },
});
}
if (transferId) {
await prisma.transfer.updateMany({
where: { id: transferId },
data: { txHash, status: TransferStatus.PROCESSING },
});
}

logger.info('Blockchain tx processed', { ...logCtx, txHash });
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
logger.error('Blockchain tx failed', { ...logCtx, error: msg });
throw err;
}
}
27 changes: 27 additions & 0 deletions server/src/processors/email.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import logger from '../utils/logger';
import type { EmailJobPayload } from '../types/job-payloads';

export async function processEmail(data: EmailJobPayload): Promise<void> {
const { to, subject, body, templateId } = data;
const logCtx = { component: 'email-processor', to, subject };

logger.info('Processing email job', logCtx);

if (!to || typeof to !== 'string') {
logger.error('Email job failed: missing or invalid "to" field', logCtx);
throw new Error('Invalid email payload: missing "to"');
}

try {
// Integration with SendGrid/AWS SES
// Example SendGrid: await sendgrid.send({ to, subject, text: body ?? '', templateId })
logger.info('Email sent successfully', {
...logCtx,
templateId: templateId ?? 'none',
});
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
logger.error('Email send failed', { ...logCtx, error: msg });
throw err;
}
}
34 changes: 34 additions & 0 deletions server/src/processors/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* JobProcessorRegistry - maps JobType to processor functions.
* Manages background job processing and provides a central registry.
*/
import { JobType } from '../services/queue.service';
import { processEmail } from './email.processor';
import { processNotification } from './notification.processor';
import { processSync } from './sync.processor';
import { processBlockchainTx } from './blockchain-tx.processor';
import type {
EmailJobPayload,
NotificationJobPayload,
SyncJobPayload,
BlockchainTxJobPayload,
} from '../types/job-payloads';

export type ProcessorFn = (data: unknown) => Promise<void>;

const registry: Map<JobType, ProcessorFn> = new Map([
[JobType.EMAIL, (d) => processEmail(d as EmailJobPayload)],
[JobType.NOTIFICATION, (d) => processNotification(d as NotificationJobPayload)],
[JobType.SYNC, (d) => processSync(d as SyncJobPayload)],
[JobType.BLOCKCHAIN_TX, (d) => processBlockchainTx(d as BlockchainTxJobPayload)],
]);

export function getProcessor(jobType: JobType): ProcessorFn | undefined {
return registry.get(jobType);
}

export function getAllJobTypes(): JobType[] {
return Array.from(registry.keys());
}

export { processEmail, processNotification, processSync, processBlockchainTx };
44 changes: 44 additions & 0 deletions server/src/processors/notification.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import prisma from '../utils/prisma';
import logger from '../utils/logger';
import type { NotificationJobPayload } from '../types/job-payloads';
import { NotificationType } from '@prisma/client';

export async function processNotification(data: NotificationJobPayload): Promise<void> {
const { userId, title, message, type = 'SYSTEM', metadata } = data;
const logCtx = { component: 'notification-processor', userId, title };

logger.info('Processing notification job', logCtx);

if (!userId || typeof userId !== 'string') {
logger.error('Notification job failed: missing or invalid "userId"', logCtx);
throw new Error('Invalid notification payload: missing "userId"');
}

try {
const user = await prisma.user.findUnique({ where: { userId } });
if (!user) {
logger.warn('Notification target user not found', logCtx);
return;
}

const notifType = type === 'ACTION' ? NotificationType.ACTION : type === 'SECURITY' ? NotificationType.SECURITY : NotificationType.SYSTEM;
await prisma.notification.create({
data: {
userId: user.userId,
title,
message,
type: notifType,
metadata: metadata != null ? (metadata as object) : undefined,
},
});

logger.info('Notification created in DB', logCtx);

// FCM / OneSignal push integration would go here
// await fcm.send(user.fcmToken, { title, body: message });
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
logger.error('Notification processing failed', { ...logCtx, error: msg });
throw err;
}
}
66 changes: 66 additions & 0 deletions server/src/processors/sync.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import prisma from '../utils/prisma';
import logger from '../utils/logger';
import type { SyncJobPayload } from '../types/job-payloads';

export async function processSync(data: SyncJobPayload): Promise<void> {
const { syncType, userId, resourceId, metadata } = data;
const logCtx = { component: 'sync-processor', syncType, userId };

logger.info('Processing sync job', logCtx);

switch (syncType) {
case 'user_data':
if (!userId) {
throw new Error('Sync user_data requires userId');
}
await syncUserData(userId, logCtx);
break;
case 'analytics':
await syncAnalytics(logCtx);
break;
case 'backup':
await syncBackup(logCtx);
break;
case 'on_chain_sync':
await syncOnChain(resourceId, metadata, logCtx);
break;
default:
logger.warn('Unknown sync type', { ...logCtx, syncType });
throw new Error(`Unknown sync type: ${syncType}`);
}

logger.info('Sync job completed', logCtx);
}

async function syncUserData(userId: string, logCtx: Record<string, unknown>) {
const user = await prisma.user.findUnique({
where: { userId },
include: { profile: true, balances: true },
});
if (!user) {
logger.warn('User not found for sync', logCtx);
return;
}
logger.debug('Synced user data', { ...logCtx, hasProfile: !!user.profile });
}

async function syncAnalytics(logCtx: Record<string, unknown>) {
const [paymentCount, transferCount] = await Promise.all([
prisma.payment.count(),
prisma.transfer.count(),
]);
logger.debug('Analytics sync completed', { ...logCtx, paymentCount, transferCount });
}

async function syncBackup(logCtx: Record<string, unknown>) {
// Placeholder for backup/export logic
logger.debug('Backup sync completed', logCtx);
}

async function syncOnChain(
resourceId: string | undefined,
metadata: Record<string, unknown> | undefined,
logCtx: Record<string, unknown>
) {
logger.debug('On-chain sync', { ...logCtx, resourceId, metadata });
}
Loading