diff --git a/src/app.js b/src/app.js index a5c6ba5..b492ae2 100644 --- a/src/app.js +++ b/src/app.js @@ -20,6 +20,7 @@ const EnvUtil = require('./utils/env.js'); const ChainUtil = require('./utils/chain.js'); const AlchemyUtil = require('./datasources/alchemy.js'); const StartupSeeder = require('./repository/postgres/startup-seeders/startup-seeder.js'); +const WebsocketTaskTrigger = require('./scheduled/websocket.js'); async function appStartup() { // Activate whichever cron jobs are configured, if any @@ -38,6 +39,9 @@ async function appStartup() { // Long-running async seeder process, the api will come online before this is complete. StartupSeeder.seedDatabase(); + // Websocket begins listening for events + WebsocketTaskTrigger.listen(); + const app = new Koa(); app.use( diff --git a/src/constants/runtime-constants.js b/src/constants/runtime-constants.js index e2f300d..60620f0 100644 --- a/src/constants/runtime-constants.js +++ b/src/constants/runtime-constants.js @@ -21,6 +21,8 @@ class RuntimeConstants { get: (target, property, receiver) => { if (property === 'RPC') { return AlchemyUtil.providerForChain(chain); + } else if (property === 'WS') { + return AlchemyUtil.alchemyForChain(chain).ws; } let constants; if (chain) { diff --git a/src/datasources/alchemy.js b/src/datasources/alchemy.js index 84fd1c0..37c4275 100644 --- a/src/datasources/alchemy.js +++ b/src/datasources/alchemy.js @@ -3,6 +3,8 @@ const EnvUtil = require('../utils/env'); const { ethers } = require('ethers'); class AlchemyUtil { + // Contains alchemy object + static _alchemies = {}; // Contains a provider by chain static _providers = {}; // Access to the underlying promise whos execution populates _providers. @@ -20,14 +22,18 @@ class AlchemyUtil { apiKey: EnvUtil.getAlchemyKey(), network: `${chain}-mainnet` // Of type alchemy-sdk.Network }; - const alchemy = new Alchemy(settings); - this._providerPromises[chain] = alchemy.config.getProvider().then((p) => { + this._alchemies[chain] = new Alchemy(settings); + this._providerPromises[chain] = this._alchemies[chain].config.getProvider().then((p) => { this._providers[chain] = p; }); } } } + static alchemyForChain(chain) { + return AlchemyUtil._alchemies[chain]; + } + static providerForChain(chain) { return AlchemyUtil._providers[chain]; } diff --git a/src/datasources/events/silo-events.js b/src/datasources/events/silo-events.js index 83e0f47..de634ff 100644 --- a/src/datasources/events/silo-events.js +++ b/src/datasources/events/silo-events.js @@ -1,7 +1,5 @@ const { C } = require('../../constants/runtime-constants'); -const { BigInt_abs } = require('../../utils/bigint'); const Log = require('../../utils/logging'); -const { fromBigInt, bigintFloatMultiplier, bigintPercent } = require('../../utils/number'); const AlchemyUtil = require('../alchemy'); const FilterLogs = require('./filter-logs'); diff --git a/src/repository/postgres/startup-seeders/dev-seeder.js b/src/repository/postgres/startup-seeders/dev-seeder.js index a27d0e6..7428b7b 100644 --- a/src/repository/postgres/startup-seeders/dev-seeder.js +++ b/src/repository/postgres/startup-seeders/dev-seeder.js @@ -21,11 +21,11 @@ class DevSeeder { }); await AsyncContext.run({ chain: 'base' }, async () => { - try { - TractorTask.__cronLock = true; - while (await TractorTask.update()) {} - } finally { - TractorTask.__cronLock = false; + while (true) { + const { canExecuteAgain } = await TractorTask.queueExecution(); + if (!canExecuteAgain) { + break; + } } }); } diff --git a/src/repository/postgres/startup-seeders/season-seeder.js b/src/repository/postgres/startup-seeders/season-seeder.js index cb6f83b..610f690 100644 --- a/src/repository/postgres/startup-seeders/season-seeder.js +++ b/src/repository/postgres/startup-seeders/season-seeder.js @@ -20,7 +20,7 @@ class SeasonSeeder { await Concurrent.run(TAG, 50, () => retryable(async () => { try { - await SeasonService.insertSeasonFromEvent(season); + await SeasonService.insertSeason(season); if (season % 100 === 0) { Log.info(`Saved season ${season}...`); diff --git a/src/scheduled/cron-schedule.js b/src/scheduled/cron-schedule.js index 362ab56..456dee2 100644 --- a/src/scheduled/cron-schedule.js +++ b/src/scheduled/cron-schedule.js @@ -5,29 +5,7 @@ const Log = require('../utils/logging'); const DepositsTask = require('./tasks/deposits'); const TractorTask = require('./tasks/tractor'); const InflowsTask = require('./tasks/inflows'); - -const genericTask = (Executor, label) => ({ - [label]: { - // 11 seconds into the minute; these tasks have a 5 block buffer, this will ensure it processes the block on the minute - cron: '11 * * * * *', - function: async () => { - if (Executor.__cronLock) { - Log.info(`${label} task is still running, skipping this minute...`); - return; - } - - try { - Executor.__cronLock = true; - let canExecuteAgain = true; - while (canExecuteAgain) { - canExecuteAgain = await Executor.update(); - } - } finally { - Executor.__cronLock = false; - } - } - } -}); +const EnvUtil = require('../utils/env'); // All cron jobs which could be activated are configured here const ALL_JOBS = { @@ -36,9 +14,42 @@ const ALL_JOBS = { cron: '50-59 59 * * * *', function: SunriseTask.handleSunrise }, - ...genericTask(DepositsTask, 'deposits'), - ...genericTask(TractorTask, 'tractor'), - ...genericTask(InflowsTask, 'inflows'), + deposits: { + executeOnStartup: true, + // Updated less frequently because the underlying data is currently unused + cron: '0 10 * * * *', + function: async () => { + while ((await DepositsTask.queueExecution({ minIntervalMinutes: 55 })).canExecuteAgain) {} + } + }, + inflows: { + executeOnStartup: true, + // Updated less frequently because its only used for snapshots (and the ws should invoke it at sunrise) + cron: '0 10 * * * *', + function: async () => { + while ((await InflowsTask.queueExecution({ minIntervalMinutes: 55 })).canExecuteAgain) {} + } + }, + tractor: { + executeOnStartup: !EnvUtil.getDevTractor().seeder, // will execute on startup either from here or from the dev seeder + cron: '0 */5 * * * *', + function: async () => { + const isInitialRun = TractorTask.getLastExecutionTime() === null; + while (true) { + const { countEvents, queuedCallersBehind, canExecuteAgain } = await TractorTask.queueExecution({ + minIntervalMinutes: 4.5 + }); + if (!canExecuteAgain) { + // queuedCallersBehind can be true if the task ran slightly before the websocket got the event, + // and the websocket queued a run while this one was still running. + if (countEvents > 0 && !isInitialRun && !queuedCallersBehind) { + sendWebhookMessage(`Cron task processed ${countEvents} tractor events; websocket might be disconnected?`); + } + break; + } + } + } + }, alert: { cron: '*/10 * * * * *', function: () => Log.info('10 seconds testing Alert') @@ -70,7 +81,7 @@ function activateJobs(jobNames) { for (const jobName of jobNames) { const job = ALL_JOBS[jobName]; if (job) { - cron.schedule(job.cron, () => { + const execute = () => { // This is to mitigate a quirk in node-cron where sometimes jobs are missed. Jobs can specify // a range of seconds they are willing to execute on, making it far less likely to drop. // This guard prevents double-execution. @@ -80,7 +91,11 @@ function activateJobs(jobNames) { job.__lastExecuted = Date.now(); errorWrapper(job.function); - }); + }; + if (job.executeOnStartup) { + execute(); + } + cron.schedule(job.cron, execute); activated.push(jobName); } else { failed.push(jobName); diff --git a/src/scheduled/tasks/IndexingTask.js b/src/scheduled/tasks/IndexingTask.js new file mode 100644 index 0000000..d82cc9f --- /dev/null +++ b/src/scheduled/tasks/IndexingTask.js @@ -0,0 +1,75 @@ +class IndexingTask { + static _lastQueuedBlock = null; + static _lastExecutionTime = null; + static _running = false; + static _queueCounter = 0; + static _isCaughtUp = false; + + /** + * Attempts to execute the tasks, queuing an execution if its already running and the second request is unique. + * @param blockNumber - the block an update is being requested for; ignores processing if already processed up to this block. + * @param minIntervalMinutes - ignores queue requests if the task was recently executed within this interval. + * @returns {Promise<{ countEvents: number, queuedCallersBehind: boolean, canExecuteAgain: boolean }>} + */ + static async queueExecution({ blockNumber = -1, minIntervalMinutes = 0 } = {}) { + if (blockNumber !== -1 && blockNumber <= this._lastQueuedBlock) { + // Requested block number was already queued or processed + return { countEvents: 0, queuedCallersBehind: false, canExecuteAgain: false }; + } else if (this._lastExecutionTime && Date.now() - this._lastExecutionTime < minIntervalMinutes * 60 * 1000) { + // Minimum requested interval hasn't passed since the last execution + return { countEvents: 0, queuedCallersBehind: false, canExecuteAgain: false }; + } + + if (blockNumber !== -1) { + this._lastQueuedBlock = blockNumber; + } + + const localCount = ++this._queueCounter; + // Wait up to 20 seconds for the task to finish executing + for (let i = 0; i < 20 && this._running; ++i) { + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + + // If another execution was queued during the wait, allow that one to execute instead + if (!this._running && localCount === this._queueCounter) { + try { + this._running = true; + // update return sig to be number of events, and boolean? + const countEvents = await this.update(); + this._lastExecutionTime = new Date(); + return { + countEvents, + queuedCallersBehind: this._queueCounter > localCount, + canExecuteAgain: !this.isCaughtUp() + }; + } finally { + this._running = false; + } + } + return { countEvents: 0, queuedCallersBehind: this._queueCounter > localCount, canExecuteAgain: false }; + } + + // Notifies of an event occuring in real-time via a websocket. Task decides how to proceed. + static async handleLiveEvent(event) { + throw new Error('Must be implemented by subclass'); + } + + /** + * Runs the task, updating as many blocks as possible + * @returns {Promise<{updateBlock: number, processedEvents: number}>} + */ + static async update() { + throw new Error('Must be implemented by subclass'); + } + + // Indicates if the task is caught up to the latest block as of its most recent update. + static isCaughtUp() { + return this._isCaughtUp; + } + + static getLastExecutionTime() { + return this._lastExecutionTime; + } +} + +module.exports = IndexingTask; diff --git a/src/scheduled/tasks/deposits.js b/src/scheduled/tasks/deposits.js index dda1b17..de558d2 100644 --- a/src/scheduled/tasks/deposits.js +++ b/src/scheduled/tasks/deposits.js @@ -8,6 +8,7 @@ const { percentDiff } = require('../../utils/number'); const Log = require('../../utils/logging'); const SiloService = require('../../service/silo-service'); const TaskRangeUtil = require('../util/task-range'); +const IndexingTask = require('./IndexingTask'); // If the BDV has changed by at least these amounts, update lambda stats const DEFAULT_UPDATE_THRESHOLD = 0.01; @@ -15,11 +16,21 @@ const HOURLY_UPDATE_THRESHOLD = 0.005; // Maximum number of blocks to process in one invocation const MAX_BLOCKS = 10000; -class DepositsTask { +class DepositsTask extends IndexingTask { // Set by SunriseTask when a new season is encountered. Indicates that all deposits should be updated. // This approach would not work if also taking deposit snapshots (this flag/behavior is only triggered in real-time). static __seasonUpdate = false; + static async handleLiveEvent(event) { + // Deposits task is not currently used for anything, therefore ok to update infrequently + if (event.name === 'Sunrise') { + await this.queueExecution({ blockNumber: event.rawLog.blockNumber }); + } + // if (['AddDeposit', 'RemoveDeposit', 'RemoveDeposits', 'StalkBalanceChanged'].includes(event.name)) { + // await this.queueExecution(); + // } + } + // Returns true if the task can be called again immediately static async update() { const meta = await AppMetaService.getLambdaMeta(); @@ -63,7 +74,9 @@ class DepositsTask { }); DepositsTask.__seasonUpdate = false; - return !isCaughtUp; + this._isCaughtUp = isCaughtUp; + // Unknown number of events, this task should be refactrored to retrieve them upfront within this method instead of separately + return -1; } // Updates the list of deposits in the database, adding/removing entries as needed diff --git a/src/scheduled/tasks/inflows.js b/src/scheduled/tasks/inflows.js index 0e45173..d6bf41e 100644 --- a/src/scheduled/tasks/inflows.js +++ b/src/scheduled/tasks/inflows.js @@ -13,6 +13,7 @@ const SiloEvents = require('../../datasources/events/silo-events'); const SiloInflowService = require('../../service/inflow/silo-inflow-service'); const SiloInflowSnapshotService = require('../../service/inflow/silo-inflow-snapshot-service'); const PriceService = require('../../service/price-service'); +const IndexingTask = require('./IndexingTask'); // Maximum number of blocks to process in one invocation const MAX_BLOCKS = 2000; @@ -21,8 +22,17 @@ const FIELD_EVENTS = new Set(['Sow', 'Harvest', 'PodListingFilled', 'PodOrderFil const SILO_EVENTS = new Set(['AddDeposit', 'RemoveDeposit', 'RemoveDeposits', 'Plant', 'Convert', 'ClaimPlenty']); const ALL_EVENTS = [...FIELD_EVENTS, ...SILO_EVENTS]; -class InflowsTask { - // Returns true if the task can be called again immediately +class InflowsTask extends IndexingTask { + static async handleLiveEvent(event) { + // Inflows are only used for snapshots currently, therefore update on Sunrise only + if (event.name === 'Sunrise') { + await this.queueExecution({ blockNumber: event.rawLog.blockNumber }); + } + // if (ALL_EVENTS.includes(event.name)) { + // await this.queueExecution(); + // } + } + static async update() { const meta = await AppMetaService.getInflowMeta(); let { isInitialized, lastUpdate, updateBlock, isCaughtUp } = await TaskRangeUtil.getUpdateInfo(meta, MAX_BLOCKS); @@ -95,7 +105,8 @@ class InflowsTask { await AppMetaService.setLastInflowUpdate(updateBlock); }); - return !isCaughtUp; + this._isCaughtUp = isCaughtUp; + return events.length; } } module.exports = InflowsTask; diff --git a/src/scheduled/tasks/sunrise.js b/src/scheduled/tasks/sunrise.js index 8a06a48..cfb55a0 100644 --- a/src/scheduled/tasks/sunrise.js +++ b/src/scheduled/tasks/sunrise.js @@ -25,7 +25,7 @@ class SunriseTask { try { // Insert basic season info - await SeasonService.insertSeasonFromEvent(nextSeason); + await SeasonService.insertSeason(nextSeason); // Update whitelisted token info const tokenModels = await SiloService.updateWhitelistedTokenInfo(); diff --git a/src/scheduled/tasks/tractor.js b/src/scheduled/tasks/tractor.js index 6ee87d5..39f5aee 100644 --- a/src/scheduled/tasks/tractor.js +++ b/src/scheduled/tasks/tractor.js @@ -16,14 +16,21 @@ const AsyncContext = require('../../utils/async/context'); const EnvUtil = require('../../utils/env'); const Log = require('../../utils/logging'); const TaskRangeUtil = require('../util/task-range'); +const IndexingTask = require('./IndexingTask'); // Maximum number of blocks to process in one invocation const MAX_BLOCKS = 10000; const SNAPSHOT_SERVICES = [SnapshotSowV0Service, SnapshotConvertUpV0Service]; -class TractorTask { - // Returns true if the task can be called again immediately +class TractorTask extends IndexingTask { + static async handleLiveEvent(event) { + if (['Sunrise', 'PublishRequisition', 'CancelBlueprint', 'Tractor'].includes(event.name)) { + await this.queueExecution({ blockNumber: event.rawLog.blockNumber }); + } + // Silo events could trigger a periodicUpdate, ignoring currently + } + static async update() { const meta = await AppMetaService.getTractorMeta(); if (!meta.lastUpdate) { @@ -102,7 +109,8 @@ class TractorTask { await AppMetaService.setLastTractorUpdate(updateBlock); }); - return !isCaughtUp; + this._isCaughtUp = isCaughtUp; + return events.length + sunrise.length; } static async handlePublishRequsition(event) { diff --git a/src/scheduled/util/task-range.js b/src/scheduled/util/task-range.js index 9b336f7..d5a7fe2 100644 --- a/src/scheduled/util/task-range.js +++ b/src/scheduled/util/task-range.js @@ -19,9 +19,9 @@ class TaskRangeUtil { // Determine range of blocks to update on let updateBlock = (await retryable(() => C().RPC.getBlock())).number; // Buffer to avoid issues with a chain reorg on non-local rpc - if (!EnvUtil.isLocalRpc(C().CHAIN)) { - updateBlock -= ChainUtil.blocksPerInterval(C().CHAIN, 10000); - } + // if (!EnvUtil.isLocalRpc(C().CHAIN)) { + // updateBlock -= ChainUtil.blocksPerInterval(C().CHAIN, 10000); + // } if (updateBlock - lastUpdate > maxBlocksAtOnce) { updateBlock = lastUpdate + maxBlocksAtOnce; isCaughtUp = false; diff --git a/src/scheduled/websocket.js b/src/scheduled/websocket.js new file mode 100644 index 0000000..a2cd4ed --- /dev/null +++ b/src/scheduled/websocket.js @@ -0,0 +1,82 @@ +// Process sitting atop each of the scheduled tasks; Depending on which events are encountered, +// triggers the appropriate task to run immediately. The tasks decide what to do with the provided events. + +const { C } = require('../constants/runtime-constants'); +const Beanstalk = require('../datasources/contracts/upgradeable/beanstalk'); +const SeasonService = require('../service/season-service'); +const { sendWebhookMessage } = require('../utils/discord'); +const Log = require('../utils/logging'); +const DepositsTask = require('./tasks/deposits'); +const InflowsTask = require('./tasks/inflows'); +const TractorTask = require('./tasks/tractor'); + +// These events will be listened for, and the corresponding tasks notified when encountered. +const EVENT_TASKS = { + Sunrise: [DepositsTask, InflowsTask, TractorTask], + PublishRequisition: [TractorTask], + CancelBlueprint: [TractorTask], + Tractor: [TractorTask], + AddDeposit: [DepositsTask, InflowsTask, TractorTask], + RemoveDeposit: [DepositsTask, InflowsTask, TractorTask], + RemoveDeposits: [DepositsTask, InflowsTask, TractorTask], + StalkBalanceChanged: [DepositsTask], + Sow: [InflowsTask], + Harvest: [InflowsTask], + PodListingFilled: [InflowsTask], + PodOrderFilled: [InflowsTask], + Plant: [InflowsTask], + Convert: [InflowsTask], + ClaimPlenty: [InflowsTask] +}; + +class WebsocketTaskTrigger { + static listen(c = C()) { + const interfaces = Beanstalk.getAllInterfaces(c); + + const topics = []; + const ifaceMap = {}; + for (const eventName in EVENT_TASKS) { + for (const iface of interfaces) { + const topicHash = iface.getEventTopic(eventName); + if (topicHash) { + topics.push(topicHash); + // If multiple interfaces have the same name/topicHash mapping, doesn't matter which interface is used. + ifaceMap[topicHash] = iface; + } + } + } + + Log.info(`Websocket activated for task events`); + + c.WS.on( + { + address: [c.BEANSTALK], + topics: [topics] + }, + async (log) => { + const parsedLog = ifaceMap[log.topics[0]].parseLog(log); + parsedLog.rawLog = log; + + if (parsedLog.name === 'Sunrise') { + await SeasonService.handleSunrise(parsedLog); + } + + Log.info(`encountered ${parsedLog.name} log in txn: ${log.transactionHash}`); + for (const task of EVENT_TASKS[parsedLog.name]) { + if (task.isCaughtUp()) { + task.handleLiveEvent(parsedLog); + } + } + + if (log.removed) { + // If this occurs in practice, we may need to start handling it if the underlying task executors + // are not reorg-resistant. + sendWebhookMessage( + `Chain reorg encountered at block ${log.blockNumber}? ${parsedLog.name} log was removed (${log.transactionHash})` + ); + } + } + ); + } +} +module.exports = WebsocketTaskTrigger; diff --git a/src/service/season-service.js b/src/service/season-service.js index c3e4f82..8cb47ab 100644 --- a/src/service/season-service.js +++ b/src/service/season-service.js @@ -14,7 +14,7 @@ class SeasonService { } // Finds the corresponding onchain event and inserts the season info - static async insertSeasonFromEvent(season) { + static async insertSeason(season) { const events = await FilterLogs.getBeanstalkEvents(['Sunrise'], { indexedTopics: [ethers.toBeHex(season, 32)], safeBatch: false @@ -23,7 +23,12 @@ class SeasonService { throw new Error(`No sunrise event found for season ${season}`); } - const dto = await SeasonDto.fromEvent(events[0]); + await this.handleSunrise(events[0]); + } + + // Inserts the season info from the given sunrise event. Event is expected to be a parsed log with a rawLog property. + static async handleSunrise(event) { + const dto = await SeasonDto.fromEvent(event); await SharedRepository.genericUpsert(sequelize.models.Season, [dto], false); } diff --git a/test/scheduled/IndexingTask.test.js b/test/scheduled/IndexingTask.test.js new file mode 100644 index 0000000..a5acceb --- /dev/null +++ b/test/scheduled/IndexingTask.test.js @@ -0,0 +1,199 @@ +const IndexingTask = require('../../src/scheduled/tasks/IndexingTask'); + +describe('IndexingTask', () => { + let mockUpdate; + + beforeEach(() => { + jest.clearAllMocks(); + jest.restoreAllMocks(); + + mockUpdate = jest.spyOn(IndexingTask, 'update').mockResolvedValue(0); + + // Reset static properties before each test + IndexingTask._lastQueuedBlock = null; + IndexingTask._lastExecutionTime = null; + IndexingTask._running = false; + IndexingTask._queueCounter = 0; + IndexingTask._isCaughtUp = false; + }); + + afterEach(() => { + expect(IndexingTask._running).toBeFalsy(); + jest.useRealTimers(); + }); + + describe('queueExecution', () => { + it('should return early if blockNumber was already queued', async () => { + IndexingTask._lastQueuedBlock = 100; + + await IndexingTask.queueExecution({ blockNumber: 100 }); + + expect(IndexingTask._lastQueuedBlock).toBe(100); + expect(mockUpdate).not.toHaveBeenCalled(); + }); + + it('should return early if blockNumber is less than last queued block', async () => { + IndexingTask._lastQueuedBlock = 100; + + await IndexingTask.queueExecution({ blockNumber: 50 }); + + expect(IndexingTask._lastQueuedBlock).toBe(100); + expect(mockUpdate).not.toHaveBeenCalled(); + }); + + it('should return early if minimum interval has not passed', async () => { + IndexingTask._lastExecutionTime = new Date(Date.now() - 4 * 60 * 1000); + + await IndexingTask.queueExecution({ minIntervalMinutes: 5 }); + + expect(mockUpdate).not.toHaveBeenCalled(); + }); + + it('should proceed if minimum interval has passed', async () => { + IndexingTask._lastExecutionTime = new Date(Date.now() - 10 * 60 * 1000); + + await IndexingTask.queueExecution({ minIntervalMinutes: 5 }); + + expect(mockUpdate).toHaveBeenCalled(); + }); + + it('should update _lastQueuedBlock when valid blockNumber is provided', async () => { + IndexingTask._lastQueuedBlock = 100; + + await IndexingTask.queueExecution({ blockNumber: 200 }); + + expect(IndexingTask._lastQueuedBlock).toBe(200); + expect(mockUpdate).toHaveBeenCalled(); + }); + + it('should not update _lastQueuedBlock when blockNumber isnt provided', async () => { + IndexingTask._lastQueuedBlock = 100; + + await IndexingTask.queueExecution(); + + expect(IndexingTask._lastQueuedBlock).toBe(100); + expect(mockUpdate).toHaveBeenCalled(); + }); + + it('should return correct result when update is successful', async () => { + mockUpdate.mockResolvedValue(10); + jest.spyOn(IndexingTask, 'isCaughtUp').mockReturnValue(true); + + const { countEvents, canExecuteAgain } = await IndexingTask.queueExecution({ blockNumber: 200 }); + + expect(countEvents).toBe(10); + expect(canExecuteAgain).toBe(false); + }); + + it('should wait for running task to finish before executing', async () => { + jest.useFakeTimers(); + IndexingTask._running = true; + + const queuePromise = IndexingTask.queueExecution({ blockNumber: 500 }); + + // Still waiting on running task + await jest.advanceTimersByTimeAsync(1000); + expect(mockUpdate).not.toHaveBeenCalled(); + await jest.advanceTimersByTimeAsync(1000); + expect(mockUpdate).not.toHaveBeenCalled(); + + // Stop the running task + IndexingTask._running = false; + + // Advance time to complete waiting + await jest.advanceTimersByTimeAsync(1000); + await queuePromise; + + expect(mockUpdate).toHaveBeenCalled(); + }); + + it('should not run task if exceeds wait time', async () => { + jest.useFakeTimers(); + IndexingTask._running = true; + + const queuePromise = IndexingTask.queueExecution({ blockNumber: 600 }); + + // Advance many wait times + for (let i = 0; i < 50; ++i) { + await jest.advanceTimersByTimeAsync(1000); + } + + // Task should still be running from another caller, this wont update + await queuePromise; + + expect(mockUpdate).not.toHaveBeenCalled(); + expect(IndexingTask._running).toBeTruthy(); + IndexingTask._running = false; + }); + + it('should not execute if another execution was queued during wait', async () => { + jest.useFakeTimers(); + // Simulate a task that's already running + IndexingTask._running = true; + + mockUpdate.mockResolvedValue(5); + + // First execution starts waiting + const firstPromise = IndexingTask.queueExecution({ blockNumber: 700 }); + // Queue another execution + const secondPromise = IndexingTask.queueExecution({ blockNumber: 701 }); + + // Stop the running task so both can proceed + IndexingTask._running = false; + + // Advance time to complete both executions + await jest.runAllTimersAsync(); + + const firstResult = await firstPromise; + const secondResult = await secondPromise; + + // First execution should be skipped because another arrived during wait + expect(firstResult).toEqual({ countEvents: 0, queuedCallersBehind: true, canExecuteAgain: false }); + expect(secondResult).toEqual({ countEvents: 5, queuedCallersBehind: false, canExecuteAgain: true }); + expect(mockUpdate).toHaveBeenCalledTimes(1); + }); + + it('should set _running flag during execution and clear it after', async () => { + mockUpdate.mockImplementation(async () => { + expect(IndexingTask._running).toBe(true); + return 10; + }); + + await IndexingTask.queueExecution({ blockNumber: 800 }); + + expect(IndexingTask._running).toBe(false); + expect(mockUpdate).toHaveBeenCalled(); + }); + + it('should clear _running flag even if update throws an error', async () => { + mockUpdate.mockImplementation(async () => { + expect(IndexingTask._running).toBe(true); + throw new Error('Update failed'); + }); + + const resultPromise = IndexingTask.queueExecution({ blockNumber: 900 }); + + // Verify the promise rejects and _running is cleared + try { + await resultPromise; + throw new Error('Expected error to be thrown, but wasnt'); + } catch (error) { + errorThrown = true; + expect(error.message).toBe('Update failed'); + } + + expect(IndexingTask._running).toBe(false); + expect(mockUpdate).toHaveBeenCalled(); + }); + + it('should update _lastExecutionTime after successful execution', async () => { + expect(IndexingTask._lastExecutionTime).toBeNull(); + + const startTime = Date.now(); + await IndexingTask.queueExecution({ blockNumber: 1000 }); + + expect(IndexingTask._lastExecutionTime).toBeInstanceOf(Date); + expect(IndexingTask._lastExecutionTime.getTime()).toBeGreaterThanOrEqual(startTime); + }); + }); +}); diff --git a/test/scheduled/task-range.test.js b/test/scheduled/task-range.test.js index 3c2d811..f993a3f 100644 --- a/test/scheduled/task-range.test.js +++ b/test/scheduled/task-range.test.js @@ -3,7 +3,7 @@ const TaskRangeUtil = require('../../src/scheduled/util/task-range'); const ChainUtil = require('../../src/utils/chain'); const CHAIN_HEAD = 2500; -const BUFFER = 5; +const BUFFER = 0; // Reduced as the reorg resistance was disabled in src describe('TaskRangeUtil', () => { beforeEach(() => { diff --git a/test/scheduled/websocket.test.js b/test/scheduled/websocket.test.js new file mode 100644 index 0000000..e6fd2f0 --- /dev/null +++ b/test/scheduled/websocket.test.js @@ -0,0 +1,255 @@ +const WebsocketTaskTrigger = require('../../src/scheduled/websocket'); +const Beanstalk = require('../../src/datasources/contracts/upgradeable/beanstalk'); +const SeasonService = require('../../src/service/season-service'); +const DepositsTask = require('../../src/scheduled/tasks/deposits'); +const InflowsTask = require('../../src/scheduled/tasks/inflows'); +const TractorTask = require('../../src/scheduled/tasks/tractor'); +const { mockBeanstalkConstants } = require('../util/mock-constants'); + +describe('WebsocketTaskTrigger', () => { + let mockWS; + let mockConstants; + let registeredCallback; + let mockInterfaces; + + beforeEach(() => { + jest.clearAllMocks(); + jest.restoreAllMocks(); + mockBeanstalkConstants(); + + // Create mock interfaces for parsing logs + mockInterfaces = createMockInterfaces(); + + // Mock Beanstalk.getAllInterfaces + jest.spyOn(Beanstalk, 'getAllInterfaces').mockReturnValue(mockInterfaces); + + // Mock the websocket object and capture the callback + mockWS = { + on: jest.fn((filter, callback) => { + registeredCallback = callback; + }) + }; + + // Create mock constants object with mocked WS + mockConstants = { + BEANSTALK: '0xC1E088fC1323b20BCBee9bd1B9fC9546db5624C5', + CHAIN: 'eth', + WS: mockWS + }; + + // Mock task methods + jest.spyOn(DepositsTask, 'isCaughtUp').mockReturnValue(true); + jest.spyOn(DepositsTask, 'handleLiveEvent').mockImplementation(() => {}); + jest.spyOn(InflowsTask, 'isCaughtUp').mockReturnValue(true); + jest.spyOn(InflowsTask, 'handleLiveEvent').mockImplementation(() => {}); + jest.spyOn(TractorTask, 'isCaughtUp').mockReturnValue(true); + jest.spyOn(TractorTask, 'handleLiveEvent').mockImplementation(() => {}); + + // Mock SeasonService + jest.spyOn(SeasonService, 'handleSunrise').mockImplementation(() => {}); + }); + + test('Registers websocket listener with correct filter', async () => { + await WebsocketTaskTrigger.listen(mockConstants); + + expect(mockWS.on).toHaveBeenCalledTimes(1); + const [filter, callback] = mockWS.on.mock.calls[0]; + + expect(filter.address).toEqual([mockConstants.BEANSTALK]); + expect(filter.topics).toEqual([expect.arrayContaining(['sunrise_topic', 'adddeposit_topic', 'tractor_topic'])]); + expect(callback).toBeDefined(); + }); + + test('Parses and handles Sunrise event', async () => { + await WebsocketTaskTrigger.listen(mockConstants); + + const mockLog = createMockLog('Sunrise', 'sunrise_topic', { + season: 12345n, + transactionHash: '0xabc123', + blockNumber: 1000 + }); + + await registeredCallback(mockLog); + + expect(SeasonService.handleSunrise).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'Sunrise', + args: expect.objectContaining({ season: 12345n }), + rawLog: mockLog + }) + ); + expect(DepositsTask.handleLiveEvent).toHaveBeenCalled(); + expect(InflowsTask.handleLiveEvent).toHaveBeenCalled(); + expect(TractorTask.handleLiveEvent).toHaveBeenCalled(); + }); + + test('Handles AddDeposit event and notifies correct tasks', async () => { + await WebsocketTaskTrigger.listen(mockConstants); + + const mockLog = createMockLog('AddDeposit', 'adddeposit_topic', { + account: '0x123', + token: '0xabc', + amount: 1000n, + transactionHash: '0xdef456', + blockNumber: 2000 + }); + + await registeredCallback(mockLog); + + expect(DepositsTask.handleLiveEvent).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'AddDeposit', + args: expect.objectContaining({ account: '0x123', amount: 1000n }) + }) + ); + expect(InflowsTask.handleLiveEvent).toHaveBeenCalled(); + expect(TractorTask.handleLiveEvent).toHaveBeenCalled(); + expect(SeasonService.handleSunrise).not.toHaveBeenCalled(); + }); + + test('Handles PublishRequisition event and notifies only TractorTask', async () => { + await WebsocketTaskTrigger.listen(mockConstants); + + const mockLog = createMockLog('PublishRequisition', 'tractor_topic', { + publisher: '0x789', + requisitionId: '0xreq123', + transactionHash: '0xghi789', + blockNumber: 3000 + }); + + await registeredCallback(mockLog); + + expect(TractorTask.handleLiveEvent).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'PublishRequisition', + args: expect.objectContaining({ publisher: '0x789' }) + }) + ); + expect(DepositsTask.handleLiveEvent).not.toHaveBeenCalled(); + expect(InflowsTask.handleLiveEvent).not.toHaveBeenCalled(); + }); + + test('Does not notify tasks that are not caught up', async () => { + // Set some tasks as not caught up + DepositsTask.isCaughtUp.mockReturnValue(false); + TractorTask.isCaughtUp.mockReturnValue(false); + + await WebsocketTaskTrigger.listen(mockConstants); + + const mockLog = createMockLog('Sunrise', 'sunrise_topic', { + season: 12345n, + transactionHash: '0xabc123', + blockNumber: 1000 + }); + + await registeredCallback(mockLog); + + // Only InflowsTask is caught up and should be notified + expect(InflowsTask.handleLiveEvent).toHaveBeenCalled(); + expect(DepositsTask.handleLiveEvent).not.toHaveBeenCalled(); + expect(TractorTask.handleLiveEvent).not.toHaveBeenCalled(); + }); + + test('Handles multiple events in sequence', async () => { + await WebsocketTaskTrigger.listen(mockConstants); + + const logs = [ + createMockLog('Sunrise', 'sunrise_topic', { + season: 100n, + transactionHash: '0xtx1', + blockNumber: 1000 + }), + createMockLog('AddDeposit', 'adddeposit_topic', { + account: '0x111', + token: '0xaaa', + amount: 100n, + transactionHash: '0xtx2', + blockNumber: 1000 + }), + createMockLog('Sow', 'sow_topic', { + account: '0x222', + beans: 500n, + pods: 550n, + transactionHash: '0xtx3', + blockNumber: 1000 + }) + ]; + + for (const log of logs) { + await registeredCallback(log); + } + + expect(SeasonService.handleSunrise).toHaveBeenCalledTimes(1); + expect(DepositsTask.handleLiveEvent).toHaveBeenCalledTimes(2); // Sunrise + AddDeposit + expect(InflowsTask.handleLiveEvent).toHaveBeenCalledTimes(3); // All three events + expect(TractorTask.handleLiveEvent).toHaveBeenCalledTimes(2); // Sunrise + AddDeposit + }); + + test('Attaches rawLog to parsed event', async () => { + await WebsocketTaskTrigger.listen(mockConstants); + + const mockLog = createMockLog('AddDeposit', 'adddeposit_topic', { + account: '0x123', + token: '0xabc', + amount: 1000n, + transactionHash: '0xyz567', + blockNumber: 9000 + }); + + await registeredCallback(mockLog); + + expect(DepositsTask.handleLiveEvent).toHaveBeenCalledWith( + expect.objectContaining({ + rawLog: mockLog + }) + ); + }); +}); + +// Helper functions + +function createMockInterfaces() { + const eventTopics = { + Sunrise: 'sunrise_topic', + PublishRequisition: 'tractor_topic', + CancelBlueprint: 'cancelblueprint_topic', + Tractor: 'tractor_topic', + AddDeposit: 'adddeposit_topic', + RemoveDeposit: 'removedeposit_topic', + RemoveDeposits: 'removedeposits_topic', + StalkBalanceChanged: 'stalkbalancechanged_topic', + Sow: 'sow_topic', + Harvest: 'harvest_topic', + PodListingFilled: 'podlistingfilled_topic', + PodOrderFilled: 'podorderfilled_topic', + Plant: 'plant_topic', + Convert: 'convert_topic', + ClaimPlenty: 'claimplenty_topic' + }; + + return [ + { + getEventTopic: jest.fn((eventName) => eventTopics[eventName]), + parseLog: jest.fn((log) => { + // Find the event name from the topic + const eventName = Object.keys(eventTopics).find((name) => eventTopics[name] === log.topics[0]); + return { + name: eventName, + args: log.args || {}, + rawLog: undefined // Will be set by the websocket handler + }; + }) + } + ]; +} + +function createMockLog(eventName, topicHash, data) { + return { + topics: [topicHash], + args: data, + name: eventName, + transactionHash: data.transactionHash, + blockNumber: data.blockNumber, + removed: false + }; +} diff --git a/test/tractor/tractor.test.js b/test/tractor/tractor.test.js index a2e133d..ca86abb 100644 --- a/test/tractor/tractor.test.js +++ b/test/tractor/tractor.test.js @@ -35,6 +35,11 @@ describe('TractorTask', () => { }); describe('Initialized', () => { + let requisitionSpy; + let cancelSpy; + let tractorSpy; + let metaSpy; + beforeEach(() => { jest.spyOn(AppMetaService, 'getTractorMeta').mockResolvedValue({ lastUpdate: 10 @@ -58,18 +63,18 @@ describe('TractorTask', () => { ]); jest.spyOn(SiloEvents, 'getSiloDepositEvents').mockResolvedValue([{ account: '0xabcd' }]); jest.spyOn(SnapshotSowV0Service, 'takeSnapshot').mockImplementation(() => {}); + requisitionSpy = jest.spyOn(TractorTask, 'handlePublishRequsition').mockImplementation(() => {}); + cancelSpy = jest.spyOn(TractorTask, 'handleCancelBlueprint').mockImplementation(() => {}); + tractorSpy = jest.spyOn(TractorTask, 'handleTractor').mockImplementation(() => {}); + metaSpy = jest.spyOn(AppMetaService, 'setLastTractorUpdate').mockImplementation(() => {}); }); test('Passes events to correct handlers', async () => { jest.spyOn(TractorConstants, 'knownBlueprints').mockReturnValue({}); - const requisitionSpy = jest.spyOn(TractorTask, 'handlePublishRequsition').mockImplementation(() => {}); - const cancelSpy = jest.spyOn(TractorTask, 'handleCancelBlueprint').mockImplementation(() => {}); - const tractorSpy = jest.spyOn(TractorTask, 'handleTractor').mockImplementation(() => {}); - const metaSpy = jest.spyOn(AppMetaService, 'setLastTractorUpdate').mockImplementation(() => {}); const retval = await TractorTask.update(); - expect(retval).toBe(true); + expect(retval).toBe(5); expect(requisitionSpy).toHaveBeenCalledWith(expect.objectContaining({ value: 1 })); expect(cancelSpy).toHaveBeenCalledWith(expect.objectContaining({ value: 2 })); expect(tractorSpy).toHaveBeenCalledWith(expect.objectContaining({ value: 3 })); @@ -82,13 +87,10 @@ describe('TractorTask', () => { periodicUpdate: jest.fn().mockImplementation(() => {}) }; jest.spyOn(TractorConstants, 'knownBlueprints').mockReturnValue({ a: blueprintSpy }); - jest.spyOn(TractorTask, 'handlePublishRequsition').mockImplementation(() => {}); - jest.spyOn(TractorTask, 'handleCancelBlueprint').mockImplementation(() => {}); - jest.spyOn(TractorTask, 'handleTractor').mockImplementation(() => {}); - jest.spyOn(AppMetaService, 'setLastTractorUpdate').mockImplementation(() => {}); await TractorTask.update(); + expect(TractorTask.isCaughtUp()).toBe(false); expect(blueprintSpy.periodicUpdate).toHaveBeenCalledWith( expect.any(Function), expect.any(Function), @@ -111,10 +113,6 @@ describe('TractorTask', () => { periodicUpdate: jest.fn().mockImplementation(() => {}) }; jest.spyOn(TractorConstants, 'knownBlueprints').mockReturnValue({ a: blueprintSpy }); - jest.spyOn(TractorTask, 'handlePublishRequsition').mockImplementation(() => {}); - jest.spyOn(TractorTask, 'handleCancelBlueprint').mockImplementation(() => {}); - jest.spyOn(TractorTask, 'handleTractor').mockImplementation(() => {}); - jest.spyOn(AppMetaService, 'setLastTractorUpdate').mockImplementation(() => {}); await TractorTask.update(); @@ -126,6 +124,21 @@ describe('TractorTask', () => { true ); }); + + test('Task can catch up', async () => { + jest.spyOn(TaskRangeUtil, 'getUpdateInfo').mockResolvedValue({ + isInitialized: true, + lastUpdate: 500, + updateBlock: 1000, + isCaughtUp: true, + meta: null + }); + jest.spyOn(TractorConstants, 'knownBlueprints').mockReturnValue({}); + + await TractorTask.update(); + + expect(TractorTask.isCaughtUp()).toBe(true); + }); }); describe('PublishRequisition', () => {