diff --git a/src/class/DataSync.ts b/src/class/DataSync.ts index 92e7d9d..d6f4de5 100644 --- a/src/class/DataSync.ts +++ b/src/class/DataSync.ts @@ -23,6 +23,12 @@ interface queryFromDistributorParameters { endCycle?: number } +// Sync tracker interface for data synchronization based on cycles +interface SyncTracker { + lastSavedCycle: number + updateLastSavedCycle(cycle: number): void +} + export const queryFromDistributor = async ( type: DataType, queryParameters: queryFromDistributorParameters @@ -71,6 +77,31 @@ export const queryFromDistributor = async ( } } +// Tracker instances for data synchronization +export const cycleTracker: SyncTracker = { + lastSavedCycle: 0, + updateLastSavedCycle(cycle: number): void { + console.log(`Updating lastSavedCycle from ${this.lastSavedCycle} to ${cycle}`) + this.lastSavedCycle = cycle + }, +} + +export const receiptTracker: SyncTracker = { + lastSavedCycle: 0, + updateLastSavedCycle(cycle: number): void { + console.log(`Updating lastSavedReceiptCycle from ${this.lastSavedCycle} to ${cycle}`) + this.lastSavedCycle = cycle + }, +} + +export const originalTxTracker: SyncTracker = { + lastSavedCycle: 0, + updateLastSavedCycle(cycle: number): void { + console.log(`Updating lastSavedOriginalTxCycle from ${this.lastSavedCycle} to ${cycle}`) + this.lastSavedCycle = cycle + }, +} + export async function compareWithOldReceiptsData( lastStoredReceiptCycle = 0 ): Promise<{ success: boolean; matchedCycle: number }> { @@ -541,6 +572,160 @@ export async function downloadOriginalTxsDataByCycle( } } +/** + * Calculate the safe sync cycle with buffer + * If new cycle is 9, we can safely sync up to cycle 7 (9 - 2) + * This maintains a 1-cycle buffer for safety + * @param newCycle - The current/latest cycle number + * @returns The cycle number that is safe to sync up to + */ +export function getSafeSyncCycle(newCycle: number): number { + const CYCLE_BUFFER = 2 + return Math.max(0, newCycle - CYCLE_BUFFER) +} +/** + * Validate and synchronize receipts data based on cycle tracker + * @param newCycle - The current/latest cycle number from incoming data + */ +export async function validateAndSyncReceipts(newCycle: number): Promise { + const targetCycle = getSafeSyncCycle(newCycle) + + // Initialize tracker if it's at 0 (first call) - set it to the safe sync cycle + // This prevents trying to sync from cycle 0 on first run + if (receiptTracker.lastSavedCycle === 0) { + receiptTracker.lastSavedCycle = targetCycle + if (config.verbose) { + console.log( + `Initialized receipt sync tracker lastSavedCycle to ${targetCycle} (current cycle: ${newCycle})` + ) + } + } + + const startCycle = receiptTracker.lastSavedCycle + 1 + const shouldSyncCheck = targetCycle > receiptTracker.lastSavedCycle + + if (!shouldSyncCheck) { + if (config.verbose) console.log('Receipts are already synchronized') + return + } + try { + // Compare and validate receipts count between cycles + const unmatchedCycles = await compareReceiptsCountByCycles(startCycle, targetCycle) + + if (unmatchedCycles && unmatchedCycles.length > 0) { + console.log(`Found ${unmatchedCycles.length} cycles with mismatched receipt counts`, unmatchedCycles) + await downloadReceiptsByCycle(unmatchedCycles) + } + } catch (error) { + console.error('Error sync checking receipts:', newCycle, targetCycle, error) + } + // Update the tracker after sync check + receiptTracker.updateLastSavedCycle(targetCycle) + console.log(`✅ Receipts synchronized up to cycle ${targetCycle}`) +} + +/** + * Validate and synchronize original transactions data based on cycle tracker + * @param newCycle - The current/latest cycle number from incoming data + */ +export async function validateAndSyncOriginalTxs(newCycle: number): Promise { + const targetCycle = getSafeSyncCycle(newCycle) + + // Initialize tracker if it's at 0 (first call) - set it to the safe sync cycle + // This prevents trying to sync from cycle 0 on first run + if (originalTxTracker.lastSavedCycle === 0) { + originalTxTracker.lastSavedCycle = targetCycle + if (config.verbose) { + console.log( + `Initialized originalTx sync tracker lastSavedCycle to ${targetCycle} (current cycle: ${newCycle})` + ) + } + } + + const startCycle = originalTxTracker.lastSavedCycle + 1 + const shouldSyncCheck = targetCycle > originalTxTracker.lastSavedCycle + + if (!shouldSyncCheck) { + if (config.verbose) console.log('OriginalTxs are already synchronized') + return + } + console.log(`Syncing originalTxs from cycle ${startCycle} to ${targetCycle} (current cycle: ${newCycle})`) + try { + // Compare and validate originalTxs count between cycles + const unmatchedCycles = await compareOriginalTxsCountByCycles(startCycle, targetCycle) + + if (unmatchedCycles && unmatchedCycles.length > 0) { + console.log(`Found ${unmatchedCycles.length} cycles with mismatched originalTx counts`, unmatchedCycles) + await downloadOriginalTxsDataByCycle(unmatchedCycles) + } + } catch (error) { + console.error('Error sync checking originalTxs:', newCycle, targetCycle, error) + } + // Update the tracker after sync check + originalTxTracker.updateLastSavedCycle(targetCycle) + console.log(`✅ OriginalTxs synchronized up to cycle ${targetCycle}`) +} + +/** + * Validate and synchronize cycle data based on cycle tracker + * @param newCycle - The current/latest cycle number from incoming data + */ +export async function validateAndSyncCycles(newCycle: number): Promise { + const targetCycle = getSafeSyncCycle(newCycle) + + // Initialize tracker if it's at 0 (first call) - set it to the safe sync cycle + // This prevents trying to sync from cycle 0 on first run + if (cycleTracker.lastSavedCycle === 0) { + cycleTracker.lastSavedCycle = targetCycle + if (config.verbose) { + console.log( + `Initialized cycle sync tracker lastSavedCycle to ${targetCycle} (current cycle: ${newCycle})` + ) + } + } + + const startCycle = cycleTracker.lastSavedCycle + 1 + const shouldSyncCheck = targetCycle > cycleTracker.lastSavedCycle + + if (!shouldSyncCheck) { + if (config.verbose) console.log('Cycles are already synchronized') + return + } + + const isInSync = targetCycle === startCycle + + if (!isInSync) { + try { + console.log(`Syncing cycles from ${startCycle} to ${targetCycle} (current cycle: ${newCycle})`) + await downloadCyclcesBetweenCycles(startCycle, targetCycle, true) + } catch (error) { + console.error('Error syncing cycles:', error) + } + } else { + if (config.verbose) console.log('Cycles are already synchronized') + } + + // Update the tracker after sync check + cycleTracker.updateLastSavedCycle(targetCycle) + console.log(`✅ Cycles synchronized up to cycle ${targetCycle}`) +} + +/** + * Check and synchronize all data types based on the current cycle + * This should be called when new cycle data is inserted/updated + * @param newCycle - The current cycle number from newly inserted/updated cycle + */ +export async function checkAndSyncDataByCycle(newCycle: number): Promise { + if (config.verbose) console.log(`Checking and syncing data for cycle: ${newCycle}`) + + // Run all sync operations in parallel + await Promise.all([ + validateAndSyncCycles(newCycle), + validateAndSyncReceipts(newCycle), + validateAndSyncOriginalTxs(newCycle), + ]) +} + export const downloadCyclcesBetweenCycles = async ( startCycle: number, totalCyclesToSync: number, diff --git a/src/server.ts b/src/server.ts index bf578c6..39cb68d 100644 --- a/src/server.ts +++ b/src/server.ts @@ -54,7 +54,7 @@ import { ValidatorStats } from './stats/validatorStats' import { TransactionStats, convertBaseTxStatsAsArray } from './stats/transactionStats' import { DailyTransactionStats } from './stats/dailyTransactionStats' import { DailyAccountStats } from './stats/dailyAccountStats' -import { DailyCoinStats, DailyCoinStatsSummary } from './stats/dailyCoinStats' +import { DailyCoinStats } from './stats/dailyCoinStats' if (config.env == envEnum.DEV) { //default debug mode diff --git a/src/storage/cycle.ts b/src/storage/cycle.ts index c28f0d4..6ddc964 100644 --- a/src/storage/cycle.ts +++ b/src/storage/cycle.ts @@ -5,17 +5,13 @@ import { config } from '../config/index' import { cleanOldReceiptsMap } from './receipt' import { cleanOldOriginalTxsMap } from './originalTxData' import { Utils as StringUtils } from '@shardus/types' +import { checkAndSyncDataByCycle } from '../class/DataSync' type DbCycle = Cycle & { cycleRecord: string } -const CYCLE_COLUMNS: readonly (keyof Cycle)[] = [ - 'cycleMarker', - 'counter', - 'start', - 'cycleRecord', -] as const +const CYCLE_COLUMNS: readonly (keyof Cycle)[] = ['cycleMarker', 'counter', 'start', 'cycleRecord'] as const export function isCycle(obj: Cycle): obj is Cycle { return (obj as Cycle).cycleRecord !== undefined && (obj as Cycle).cycleMarker !== undefined @@ -110,6 +106,14 @@ export async function insertOrUpdateCycle(cycle: Cycle): Promise { cleanOldReceiptsMap(CLEAN_UP_TIMESTMAP_MS) cleanOldOriginalTxsMap(CLEAN_UP_TIMESTMAP_MS) } + + // Trigger cycle-based synchronization check when new cycle data is received + if (cycleInfo.counter > 0) { + // Run sync in background + checkAndSyncDataByCycle(cycleInfo.counter).catch((error) => { + console.error('Error in checkAndSyncDataByCycle:', error) + }) + } } else { console.log('No cycleRecord or cycleMarker in cycle,', cycle) }