From cacdf0040007aa90b0d1bafcaa3ab429a902512b Mon Sep 17 00:00:00 2001 From: PintoPirate <189064953+PintoPirate@users.noreply.github.com> Date: Thu, 6 Nov 2025 17:02:23 -0500 Subject: [PATCH 01/13] ws getter, run indexers immediately on startup --- src/constants/runtime-constants.js | 2 ++ src/datasources/alchemy.js | 10 ++++++++-- src/scheduled/cron-schedule.js | 9 +++++++-- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/constants/runtime-constants.js b/src/constants/runtime-constants.js index e2f300d..60620f0 100644 --- a/src/constants/runtime-constants.js +++ b/src/constants/runtime-constants.js @@ -21,6 +21,8 @@ class RuntimeConstants { get: (target, property, receiver) => { if (property === 'RPC') { return AlchemyUtil.providerForChain(chain); + } else if (property === 'WS') { + return AlchemyUtil.alchemyForChain(chain).ws; } let constants; if (chain) { diff --git a/src/datasources/alchemy.js b/src/datasources/alchemy.js index 84fd1c0..37c4275 100644 --- a/src/datasources/alchemy.js +++ b/src/datasources/alchemy.js @@ -3,6 +3,8 @@ const EnvUtil = require('../utils/env'); const { ethers } = require('ethers'); class AlchemyUtil { + // Contains alchemy object + static _alchemies = {}; // Contains a provider by chain static _providers = {}; // Access to the underlying promise whos execution populates _providers. @@ -20,14 +22,18 @@ class AlchemyUtil { apiKey: EnvUtil.getAlchemyKey(), network: `${chain}-mainnet` // Of type alchemy-sdk.Network }; - const alchemy = new Alchemy(settings); - this._providerPromises[chain] = alchemy.config.getProvider().then((p) => { + this._alchemies[chain] = new Alchemy(settings); + this._providerPromises[chain] = this._alchemies[chain].config.getProvider().then((p) => { this._providers[chain] = p; }); } } } + static alchemyForChain(chain) { + return AlchemyUtil._alchemies[chain]; + } + static providerForChain(chain) { return AlchemyUtil._providers[chain]; } diff --git a/src/scheduled/cron-schedule.js b/src/scheduled/cron-schedule.js index 362ab56..65f4617 100644 --- a/src/scheduled/cron-schedule.js +++ b/src/scheduled/cron-schedule.js @@ -8,6 +8,7 @@ const InflowsTask = require('./tasks/inflows'); const genericTask = (Executor, label) => ({ [label]: { + executeOnStartup: true, // 11 seconds into the minute; these tasks have a 5 block buffer, this will ensure it processes the block on the minute cron: '11 * * * * *', function: async () => { @@ -70,7 +71,7 @@ function activateJobs(jobNames) { for (const jobName of jobNames) { const job = ALL_JOBS[jobName]; if (job) { - cron.schedule(job.cron, () => { + const execute = () => { // This is to mitigate a quirk in node-cron where sometimes jobs are missed. Jobs can specify // a range of seconds they are willing to execute on, making it far less likely to drop. // This guard prevents double-execution. @@ -80,7 +81,11 @@ function activateJobs(jobNames) { job.__lastExecuted = Date.now(); errorWrapper(job.function); - }); + }; + if (job.executeOnStartup) { + execute(); + } + cron.schedule(job.cron, execute); activated.push(jobName); } else { failed.push(jobName); From d7f2fadba7493afa6b19d8752d40857986760d2a Mon Sep 17 00:00:00 2001 From: PintoPirate <189064953+PintoPirate@users.noreply.github.com> Date: Thu, 6 Nov 2025 18:10:45 -0500 Subject: [PATCH 02/13] websocket for beanstalk events --- src/datasources/events/silo-events.js | 2 - src/scheduled/websocket.js | 60 +++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) create mode 100644 src/scheduled/websocket.js diff --git a/src/datasources/events/silo-events.js b/src/datasources/events/silo-events.js index 83e0f47..de634ff 100644 --- a/src/datasources/events/silo-events.js +++ b/src/datasources/events/silo-events.js @@ -1,7 +1,5 @@ const { C } = require('../../constants/runtime-constants'); -const { BigInt_abs } = require('../../utils/bigint'); const Log = require('../../utils/logging'); -const { fromBigInt, bigintFloatMultiplier, bigintPercent } = require('../../utils/number'); const AlchemyUtil = require('../alchemy'); const FilterLogs = require('./filter-logs'); diff --git a/src/scheduled/websocket.js b/src/scheduled/websocket.js new file mode 100644 index 0000000..0a8e0a7 --- /dev/null +++ b/src/scheduled/websocket.js @@ -0,0 +1,60 @@ +// Process sitting atop each of the scheduled tasks; Depending on which events are encountered, +// triggers the appropriate task to run immediately. The tasks decide what to do with the provided events; +// the task may decide to wait to process until a larger volume of events is encountered, and may also +// re-retrieve the logs on its own (implementation will vary by task). + +const { C } = require('../constants/runtime-constants'); +const Contracts = require('../datasources/contracts/contracts'); +const { sendWebhookMessage } = require('../utils/discord'); +const Log = require('../utils/logging'); + +// Add other events here to support other tasks. +const EVENT_NAMES = ['PublishRequisition', 'CancelBlueprint', 'Tractor']; + +class WebsocketTaskTrigger { + static async listen(c = C()) { + const beanstalk = Contracts.getBeanstalk(c); + const interfaces = [beanstalk.interface]; + + const topics = []; + const ifaceMap = {}; + for (const eventName of EVENT_NAMES) { + for (const iface of interfaces) { + const topicHash = iface.getEventTopic(eventName); + if (topicHash) { + topics.push(topicHash); + ifaceMap[topicHash] = iface; + } + } + } + + Log.info(`Websocket activated for task events`); + + c.WS.on( + { + address: [c.BEANSTALK], + topics: [topics] + }, + (log) => { + const parsedLog = ifaceMap[log.topics[0]].parseLog(log); + + console.log(`encountered ${parsedLog.name} log ${log.transactionHash}`); + // Determine which task(s) to invoke based on the event name. + // Might need to do so at a delay to prevent hitting the reorg protection? + // Or just always call into the task immediately and let that executor decide what to do? + + if (log.removed) { + // If this occurs in practice, we may need to start handling it if the underlying task executors + // are not reorg-resistant. + sendWebhookMessage( + `Chain reorg encountered at block ${log.blockNumber}? ${parsedLog.name} log was removed (${log.transactionHash})` + ); + } + } + ); + } +} +module.exports = WebsocketTaskTrigger; + +// TODO: underlying executors will keep getting invoked by the cron task, if they have already executed within the last interval, +// it is unnecessary to process again. From 29a36ebf688cd7cdfc051ec5a68dfb03465ea545 Mon Sep 17 00:00:00 2001 From: PintoPirate <189064953+PintoPirate@users.noreply.github.com> Date: Wed, 12 Nov 2025 14:13:28 -0500 Subject: [PATCH 03/13] Individual tasks receive events from websocket --- .../postgres/startup-seeders/season-seeder.js | 2 +- src/scheduled/cron-schedule.js | 27 +++-------- src/scheduled/tasks/IndexingTask.js | 40 ++++++++++++++++ src/scheduled/tasks/deposits.js | 13 +++++- src/scheduled/tasks/inflows.js | 13 +++++- src/scheduled/tasks/sunrise.js | 2 +- src/scheduled/tasks/tractor.js | 10 +++- src/scheduled/websocket.js | 46 +++++++++++++++---- src/service/season-service.js | 9 +++- 9 files changed, 125 insertions(+), 37 deletions(-) create mode 100644 src/scheduled/tasks/IndexingTask.js diff --git a/src/repository/postgres/startup-seeders/season-seeder.js b/src/repository/postgres/startup-seeders/season-seeder.js index cb6f83b..610f690 100644 --- a/src/repository/postgres/startup-seeders/season-seeder.js +++ b/src/repository/postgres/startup-seeders/season-seeder.js @@ -20,7 +20,7 @@ class SeasonSeeder { await Concurrent.run(TAG, 50, () => retryable(async () => { try { - await SeasonService.insertSeasonFromEvent(season); + await SeasonService.insertSeason(season); if (season % 100 === 0) { Log.info(`Saved season ${season}...`); diff --git a/src/scheduled/cron-schedule.js b/src/scheduled/cron-schedule.js index 65f4617..e0e5f34 100644 --- a/src/scheduled/cron-schedule.js +++ b/src/scheduled/cron-schedule.js @@ -6,26 +6,12 @@ const DepositsTask = require('./tasks/deposits'); const TractorTask = require('./tasks/tractor'); const InflowsTask = require('./tasks/inflows'); -const genericTask = (Executor, label) => ({ +const indexingTask = (indexingTask, label, cron) => ({ [label]: { executeOnStartup: true, - // 11 seconds into the minute; these tasks have a 5 block buffer, this will ensure it processes the block on the minute - cron: '11 * * * * *', + cron, function: async () => { - if (Executor.__cronLock) { - Log.info(`${label} task is still running, skipping this minute...`); - return; - } - - try { - Executor.__cronLock = true; - let canExecuteAgain = true; - while (canExecuteAgain) { - canExecuteAgain = await Executor.update(); - } - } finally { - Executor.__cronLock = false; - } + while (await indexingTask.queueExecution()) {} } } }); @@ -37,9 +23,10 @@ const ALL_JOBS = { cron: '50-59 59 * * * *', function: SunriseTask.handleSunrise }, - ...genericTask(DepositsTask, 'deposits'), - ...genericTask(TractorTask, 'tractor'), - ...genericTask(InflowsTask, 'inflows'), + // Deposits/inflows can be updated less frequently because deposits are unused, and inflows are only useful for the snapshots. + ...indexingTask(DepositsTask, 'deposits', '0 0 * * * *'), + ...indexingTask(InflowsTask, 'inflows', '0 0 * * * *'), + ...indexingTask(TractorTask, 'tractor', '0 */5 * * * *'), alert: { cron: '*/10 * * * * *', function: () => Log.info('10 seconds testing Alert') diff --git a/src/scheduled/tasks/IndexingTask.js b/src/scheduled/tasks/IndexingTask.js new file mode 100644 index 0000000..c060f4c --- /dev/null +++ b/src/scheduled/tasks/IndexingTask.js @@ -0,0 +1,40 @@ +class IndexingTask { + // TODO: do something with this var + static lastExecution = null; + static _running = false; + static _queueCounter = 0; + + // Runs update immediately if nothing is executing, otherwise queues an update execution. + static async queueExecution() { + const localCount = ++this._queueCounter; + // Wait up to 20 seconds for the task to finish executing + for (let i = 0; i < 20 && this._running; ++i) { + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + + // If another execution was queued during the wait, allow that one to execute instead + if (localCount === this._queueCounter) { + try { + this._running = true; + const canExecuteAgain = await this.update(); + this._lastExecution = new Date(); + return canExecuteAgain; + } finally { + this._running = false; + } + } + return false; + } + + // Notifies of an event occuring in real-time via a websocket. Task decides how to proceed. + static async handleLiveEvent(event) { + throw new Error('Must be implemented by subclass'); + } + + // Runs the task, updating as many blocks as possible + static async update() { + throw new Error('Must be implemented by subclass'); + } +} + +module.exports = IndexingTask; diff --git a/src/scheduled/tasks/deposits.js b/src/scheduled/tasks/deposits.js index dda1b17..c7f3ac1 100644 --- a/src/scheduled/tasks/deposits.js +++ b/src/scheduled/tasks/deposits.js @@ -8,6 +8,7 @@ const { percentDiff } = require('../../utils/number'); const Log = require('../../utils/logging'); const SiloService = require('../../service/silo-service'); const TaskRangeUtil = require('../util/task-range'); +const IndexingTask = require('./IndexingTask'); // If the BDV has changed by at least these amounts, update lambda stats const DEFAULT_UPDATE_THRESHOLD = 0.01; @@ -15,11 +16,21 @@ const HOURLY_UPDATE_THRESHOLD = 0.005; // Maximum number of blocks to process in one invocation const MAX_BLOCKS = 10000; -class DepositsTask { +class DepositsTask extends IndexingTask { // Set by SunriseTask when a new season is encountered. Indicates that all deposits should be updated. // This approach would not work if also taking deposit snapshots (this flag/behavior is only triggered in real-time). static __seasonUpdate = false; + static async handleLiveEvent(event) { + // Deposits task is not currently used for anything, therefore ok to update infrequently + if (event.name === 'Sunrise') { + await this.queueExecution(); + } + // if (['AddDeposit', 'RemoveDeposit', 'RemoveDeposits', 'StalkBalanceChanged'].includes(event.name)) { + // await this.queueExecution(); + // } + } + // Returns true if the task can be called again immediately static async update() { const meta = await AppMetaService.getLambdaMeta(); diff --git a/src/scheduled/tasks/inflows.js b/src/scheduled/tasks/inflows.js index 0e45173..feea0bc 100644 --- a/src/scheduled/tasks/inflows.js +++ b/src/scheduled/tasks/inflows.js @@ -13,6 +13,7 @@ const SiloEvents = require('../../datasources/events/silo-events'); const SiloInflowService = require('../../service/inflow/silo-inflow-service'); const SiloInflowSnapshotService = require('../../service/inflow/silo-inflow-snapshot-service'); const PriceService = require('../../service/price-service'); +const IndexingTask = require('./IndexingTask'); // Maximum number of blocks to process in one invocation const MAX_BLOCKS = 2000; @@ -21,7 +22,17 @@ const FIELD_EVENTS = new Set(['Sow', 'Harvest', 'PodListingFilled', 'PodOrderFil const SILO_EVENTS = new Set(['AddDeposit', 'RemoveDeposit', 'RemoveDeposits', 'Plant', 'Convert', 'ClaimPlenty']); const ALL_EVENTS = [...FIELD_EVENTS, ...SILO_EVENTS]; -class InflowsTask { +class InflowsTask extends IndexingTask { + static async handleLiveEvent(event) { + // Inflows are only used for snapshots currently, therefore update on Sunrise only + if (event.name === 'Sunrise') { + await this.queueExecution(); + } + // if (ALL_EVENTS.includes(event.name)) { + // await this.queueExecution(); + // } + } + // Returns true if the task can be called again immediately static async update() { const meta = await AppMetaService.getInflowMeta(); diff --git a/src/scheduled/tasks/sunrise.js b/src/scheduled/tasks/sunrise.js index 8a06a48..cfb55a0 100644 --- a/src/scheduled/tasks/sunrise.js +++ b/src/scheduled/tasks/sunrise.js @@ -25,7 +25,7 @@ class SunriseTask { try { // Insert basic season info - await SeasonService.insertSeasonFromEvent(nextSeason); + await SeasonService.insertSeason(nextSeason); // Update whitelisted token info const tokenModels = await SiloService.updateWhitelistedTokenInfo(); diff --git a/src/scheduled/tasks/tractor.js b/src/scheduled/tasks/tractor.js index 6ee87d5..bbdc6d2 100644 --- a/src/scheduled/tasks/tractor.js +++ b/src/scheduled/tasks/tractor.js @@ -16,13 +16,21 @@ const AsyncContext = require('../../utils/async/context'); const EnvUtil = require('../../utils/env'); const Log = require('../../utils/logging'); const TaskRangeUtil = require('../util/task-range'); +const IndexingTask = require('./IndexingTask'); // Maximum number of blocks to process in one invocation const MAX_BLOCKS = 10000; const SNAPSHOT_SERVICES = [SnapshotSowV0Service, SnapshotConvertUpV0Service]; -class TractorTask { +class TractorTask extends IndexingTask { + static async handleLiveEvent(event) { + if (['Sunrise', 'PublishRequisition', 'CancelBlueprint', 'Tractor'].includes(event.name)) { + await this.queueExecution(); + } + // Silo events could trigger a periodicUpdate, ignoring currently + } + // Returns true if the task can be called again immediately static async update() { const meta = await AppMetaService.getTractorMeta(); diff --git a/src/scheduled/websocket.js b/src/scheduled/websocket.js index 0a8e0a7..413807f 100644 --- a/src/scheduled/websocket.js +++ b/src/scheduled/websocket.js @@ -4,25 +4,45 @@ // re-retrieve the logs on its own (implementation will vary by task). const { C } = require('../constants/runtime-constants'); -const Contracts = require('../datasources/contracts/contracts'); +const Beanstalk = require('../datasources/contracts/upgradeable/beanstalk'); +const SeasonService = require('../service/season-service'); const { sendWebhookMessage } = require('../utils/discord'); const Log = require('../utils/logging'); +const DepositsTask = require('./tasks/deposits'); +const InflowsTask = require('./tasks/inflows'); +const TractorTask = require('./tasks/tractor'); -// Add other events here to support other tasks. -const EVENT_NAMES = ['PublishRequisition', 'CancelBlueprint', 'Tractor']; +// These events will be listened for, and the corresponding tasks notified when encountered. +const EVENT_TASKS = { + Sunrise: [DepositsTask, InflowsTask, TractorTask], + PublishRequisition: [TractorTask], + CancelBlueprint: [TractorTask], + Tractor: [TractorTask], + AddDeposit: [DepositsTask, InflowsTask, TractorTask], + RemoveDeposit: [DepositsTask, InflowsTask, TractorTask], + RemoveDeposits: [DepositsTask, InflowsTask, TractorTask] + // StalkBalanceChanged: [DepositsTask], + // Sow: [InflowsTask], + // Harvest: [InflowsTask], + // PodListingFilled: [InflowsTask], + // PodOrderFilled: [InflowsTask], + // Plant: [InflowsTask], + // Convert: [InflowsTask], + // ClaimPlenty: [InflowsTask] +}; class WebsocketTaskTrigger { static async listen(c = C()) { - const beanstalk = Contracts.getBeanstalk(c); - const interfaces = [beanstalk.interface]; + const interfaces = Beanstalk.getAllInterfaces(c); const topics = []; const ifaceMap = {}; - for (const eventName of EVENT_NAMES) { + for (const eventName in EVENT_TASKS) { for (const iface of interfaces) { const topicHash = iface.getEventTopic(eventName); if (topicHash) { topics.push(topicHash); + // If multiple interfaces have the same name/topicHash mapping, doesn't matter which interface is used. ifaceMap[topicHash] = iface; } } @@ -35,13 +55,19 @@ class WebsocketTaskTrigger { address: [c.BEANSTALK], topics: [topics] }, - (log) => { + async (log) => { const parsedLog = ifaceMap[log.topics[0]].parseLog(log); + parsedLog.rawLog = log; + + if (log.name === 'Sunrise') { + await SeasonService.handleSunrise(parsedLog); + } console.log(`encountered ${parsedLog.name} log ${log.transactionHash}`); - // Determine which task(s) to invoke based on the event name. - // Might need to do so at a delay to prevent hitting the reorg protection? - // Or just always call into the task immediately and let that executor decide what to do? + for (const task of EVENT_TASKS[parsedLog.name]) { + // TODO: bind this? check it + task.handleLiveEvent(parsedLog); + } if (log.removed) { // If this occurs in practice, we may need to start handling it if the underlying task executors diff --git a/src/service/season-service.js b/src/service/season-service.js index c3e4f82..8cb47ab 100644 --- a/src/service/season-service.js +++ b/src/service/season-service.js @@ -14,7 +14,7 @@ class SeasonService { } // Finds the corresponding onchain event and inserts the season info - static async insertSeasonFromEvent(season) { + static async insertSeason(season) { const events = await FilterLogs.getBeanstalkEvents(['Sunrise'], { indexedTopics: [ethers.toBeHex(season, 32)], safeBatch: false @@ -23,7 +23,12 @@ class SeasonService { throw new Error(`No sunrise event found for season ${season}`); } - const dto = await SeasonDto.fromEvent(events[0]); + await this.handleSunrise(events[0]); + } + + // Inserts the season info from the given sunrise event. Event is expected to be a parsed log with a rawLog property. + static async handleSunrise(event) { + const dto = await SeasonDto.fromEvent(event); await SharedRepository.genericUpsert(sequelize.models.Season, [dto], false); } From 05d42360773e6f9b4b41a9fe751d1aafb6404e92 Mon Sep 17 00:00:00 2001 From: PintoPirate <189064953+PintoPirate@users.noreply.github.com> Date: Wed, 12 Nov 2025 14:45:37 -0500 Subject: [PATCH 04/13] Adjust cron wrappers --- .../postgres/startup-seeders/dev-seeder.js | 2 +- src/scheduled/cron-schedule.js | 45 +++++++++++++------ src/scheduled/tasks/IndexingTask.js | 15 ++++--- src/scheduled/tasks/deposits.js | 3 +- src/scheduled/tasks/inflows.js | 3 +- src/scheduled/tasks/tractor.js | 3 +- 6 files changed, 46 insertions(+), 25 deletions(-) diff --git a/src/repository/postgres/startup-seeders/dev-seeder.js b/src/repository/postgres/startup-seeders/dev-seeder.js index a27d0e6..d22114f 100644 --- a/src/repository/postgres/startup-seeders/dev-seeder.js +++ b/src/repository/postgres/startup-seeders/dev-seeder.js @@ -23,7 +23,7 @@ class DevSeeder { await AsyncContext.run({ chain: 'base' }, async () => { try { TractorTask.__cronLock = true; - while (await TractorTask.update()) {} + while ((await TractorTask.update()).canExecuteAgain) {} } finally { TractorTask.__cronLock = false; } diff --git a/src/scheduled/cron-schedule.js b/src/scheduled/cron-schedule.js index e0e5f34..c4e96b5 100644 --- a/src/scheduled/cron-schedule.js +++ b/src/scheduled/cron-schedule.js @@ -6,16 +6,6 @@ const DepositsTask = require('./tasks/deposits'); const TractorTask = require('./tasks/tractor'); const InflowsTask = require('./tasks/inflows'); -const indexingTask = (indexingTask, label, cron) => ({ - [label]: { - executeOnStartup: true, - cron, - function: async () => { - while (await indexingTask.queueExecution()) {} - } - } -}); - // All cron jobs which could be activated are configured here const ALL_JOBS = { sunrise: { @@ -23,10 +13,37 @@ const ALL_JOBS = { cron: '50-59 59 * * * *', function: SunriseTask.handleSunrise }, - // Deposits/inflows can be updated less frequently because deposits are unused, and inflows are only useful for the snapshots. - ...indexingTask(DepositsTask, 'deposits', '0 0 * * * *'), - ...indexingTask(InflowsTask, 'inflows', '0 0 * * * *'), - ...indexingTask(TractorTask, 'tractor', '0 */5 * * * *'), + deposits: { + executeOnStartup: true, + // Updated less frequently because the underlying data is currently unused + cron: '0 30 * * * *', + function: async () => { + while ((await DepositsTask.queueExecution(50)).canExecuteAgain) {} + } + }, + inflows: { + executeOnStartup: true, + // Updated less frequently because its only used for snapshots (and the ws should invoke it at sunrise) + cron: '0 30 * * * *', + function: async () => { + while ((await InflowsTask.queueExecution(50)).canExecuteAgain) {} + } + }, + tractor: { + executeOnStartup: true, + cron: '0 */5 * * * *', + function: async () => { + while (true) { + const { countEvents, canExecuteAgain } = await TractorTask.queueExecution(4.5); + if (!canExecuteAgain) { + if (countEvents > 0) { + sendWebhookMessage(`Cron task processed ${countEvents} tractor events; websocket might be disconnected?`); + } + break; + } + } + } + }, alert: { cron: '*/10 * * * * *', function: () => Log.info('10 seconds testing Alert') diff --git a/src/scheduled/tasks/IndexingTask.js b/src/scheduled/tasks/IndexingTask.js index c060f4c..b7920d3 100644 --- a/src/scheduled/tasks/IndexingTask.js +++ b/src/scheduled/tasks/IndexingTask.js @@ -1,11 +1,15 @@ class IndexingTask { - // TODO: do something with this var - static lastExecution = null; + static _lastExecution = null; static _running = false; static _queueCounter = 0; // Runs update immediately if nothing is executing, otherwise queues an update execution. - static async queueExecution() { + static async queueExecution(minIntervalMinutes = 0) { + // Ensure at least the minimum requested interval has passed since the last execution + if (this._lastExecution && Date.now() - this._lastExecution < minIntervalMinutes * 60 * 1000) { + return { countEvents: 0, canExecuteAgain: false }; + } + const localCount = ++this._queueCounter; // Wait up to 20 seconds for the task to finish executing for (let i = 0; i < 20 && this._running; ++i) { @@ -16,9 +20,10 @@ class IndexingTask { if (localCount === this._queueCounter) { try { this._running = true; - const canExecuteAgain = await this.update(); + // update return sig to be number of events, and boolean? + const { countEvents, canExecuteAgain } = await this.update(); this._lastExecution = new Date(); - return canExecuteAgain; + return { countEvents, canExecuteAgain }; } finally { this._running = false; } diff --git a/src/scheduled/tasks/deposits.js b/src/scheduled/tasks/deposits.js index c7f3ac1..85abbdc 100644 --- a/src/scheduled/tasks/deposits.js +++ b/src/scheduled/tasks/deposits.js @@ -74,7 +74,8 @@ class DepositsTask extends IndexingTask { }); DepositsTask.__seasonUpdate = false; - return !isCaughtUp; + // Unknown number of events, this task should be refactrored to retrieve them upfront within this method instead of separately + return { countEvents: -1, canExecuteAgain: !isCaughtUp }; } // Updates the list of deposits in the database, adding/removing entries as needed diff --git a/src/scheduled/tasks/inflows.js b/src/scheduled/tasks/inflows.js index feea0bc..dae2303 100644 --- a/src/scheduled/tasks/inflows.js +++ b/src/scheduled/tasks/inflows.js @@ -33,7 +33,6 @@ class InflowsTask extends IndexingTask { // } } - // Returns true if the task can be called again immediately static async update() { const meta = await AppMetaService.getInflowMeta(); let { isInitialized, lastUpdate, updateBlock, isCaughtUp } = await TaskRangeUtil.getUpdateInfo(meta, MAX_BLOCKS); @@ -106,7 +105,7 @@ class InflowsTask extends IndexingTask { await AppMetaService.setLastInflowUpdate(updateBlock); }); - return !isCaughtUp; + return { countEvents: events.length, canExecuteAgain: !isCaughtUp }; } } module.exports = InflowsTask; diff --git a/src/scheduled/tasks/tractor.js b/src/scheduled/tasks/tractor.js index bbdc6d2..bbfaa86 100644 --- a/src/scheduled/tasks/tractor.js +++ b/src/scheduled/tasks/tractor.js @@ -31,7 +31,6 @@ class TractorTask extends IndexingTask { // Silo events could trigger a periodicUpdate, ignoring currently } - // Returns true if the task can be called again immediately static async update() { const meta = await AppMetaService.getTractorMeta(); if (!meta.lastUpdate) { @@ -110,7 +109,7 @@ class TractorTask extends IndexingTask { await AppMetaService.setLastTractorUpdate(updateBlock); }); - return !isCaughtUp; + return { countEvents: events.length + sunrise.length, canExecuteAgain: !isCaughtUp }; } static async handlePublishRequsition(event) { From 1ec989e8bd14925eb010a0fd6758eab64382a774 Mon Sep 17 00:00:00 2001 From: PintoPirate <189064953+PintoPirate@users.noreply.github.com> Date: Wed, 12 Nov 2025 15:28:13 -0500 Subject: [PATCH 05/13] Websocket events ignored until task is caught up --- src/scheduled/tasks/IndexingTask.js | 10 ++++++++-- src/scheduled/tasks/deposits.js | 3 ++- src/scheduled/tasks/inflows.js | 3 ++- src/scheduled/tasks/tractor.js | 3 ++- src/scheduled/util/task-range.js | 6 +++--- src/scheduled/websocket.js | 25 ++++++++++++------------- 6 files changed, 29 insertions(+), 21 deletions(-) diff --git a/src/scheduled/tasks/IndexingTask.js b/src/scheduled/tasks/IndexingTask.js index b7920d3..0a19333 100644 --- a/src/scheduled/tasks/IndexingTask.js +++ b/src/scheduled/tasks/IndexingTask.js @@ -2,6 +2,7 @@ class IndexingTask { static _lastExecution = null; static _running = false; static _queueCounter = 0; + static _isCaughtUp = false; // Runs update immediately if nothing is executing, otherwise queues an update execution. static async queueExecution(minIntervalMinutes = 0) { @@ -21,9 +22,9 @@ class IndexingTask { try { this._running = true; // update return sig to be number of events, and boolean? - const { countEvents, canExecuteAgain } = await this.update(); + const countEvents = await this.update(); this._lastExecution = new Date(); - return { countEvents, canExecuteAgain }; + return { countEvents, canExecuteAgain: !this.isCaughtUp() }; } finally { this._running = false; } @@ -40,6 +41,11 @@ class IndexingTask { static async update() { throw new Error('Must be implemented by subclass'); } + + // Indicates if the task is caught up to the latest block as of its most recent update. + static isCaughtUp() { + return this._isCaughtUp; + } } module.exports = IndexingTask; diff --git a/src/scheduled/tasks/deposits.js b/src/scheduled/tasks/deposits.js index 85abbdc..d3721d4 100644 --- a/src/scheduled/tasks/deposits.js +++ b/src/scheduled/tasks/deposits.js @@ -74,8 +74,9 @@ class DepositsTask extends IndexingTask { }); DepositsTask.__seasonUpdate = false; + this._isCaughtUp = isCaughtUp; // Unknown number of events, this task should be refactrored to retrieve them upfront within this method instead of separately - return { countEvents: -1, canExecuteAgain: !isCaughtUp }; + return -1; } // Updates the list of deposits in the database, adding/removing entries as needed diff --git a/src/scheduled/tasks/inflows.js b/src/scheduled/tasks/inflows.js index dae2303..0a0c9e7 100644 --- a/src/scheduled/tasks/inflows.js +++ b/src/scheduled/tasks/inflows.js @@ -105,7 +105,8 @@ class InflowsTask extends IndexingTask { await AppMetaService.setLastInflowUpdate(updateBlock); }); - return { countEvents: events.length, canExecuteAgain: !isCaughtUp }; + this._isCaughtUp = isCaughtUp; + return events.length; } } module.exports = InflowsTask; diff --git a/src/scheduled/tasks/tractor.js b/src/scheduled/tasks/tractor.js index bbfaa86..345bc74 100644 --- a/src/scheduled/tasks/tractor.js +++ b/src/scheduled/tasks/tractor.js @@ -109,7 +109,8 @@ class TractorTask extends IndexingTask { await AppMetaService.setLastTractorUpdate(updateBlock); }); - return { countEvents: events.length + sunrise.length, canExecuteAgain: !isCaughtUp }; + this._isCaughtUp = isCaughtUp; + return events.length + sunrise.length; } static async handlePublishRequsition(event) { diff --git a/src/scheduled/util/task-range.js b/src/scheduled/util/task-range.js index 9b336f7..d5a7fe2 100644 --- a/src/scheduled/util/task-range.js +++ b/src/scheduled/util/task-range.js @@ -19,9 +19,9 @@ class TaskRangeUtil { // Determine range of blocks to update on let updateBlock = (await retryable(() => C().RPC.getBlock())).number; // Buffer to avoid issues with a chain reorg on non-local rpc - if (!EnvUtil.isLocalRpc(C().CHAIN)) { - updateBlock -= ChainUtil.blocksPerInterval(C().CHAIN, 10000); - } + // if (!EnvUtil.isLocalRpc(C().CHAIN)) { + // updateBlock -= ChainUtil.blocksPerInterval(C().CHAIN, 10000); + // } if (updateBlock - lastUpdate > maxBlocksAtOnce) { updateBlock = lastUpdate + maxBlocksAtOnce; isCaughtUp = false; diff --git a/src/scheduled/websocket.js b/src/scheduled/websocket.js index 413807f..0d6eded 100644 --- a/src/scheduled/websocket.js +++ b/src/scheduled/websocket.js @@ -20,15 +20,15 @@ const EVENT_TASKS = { Tractor: [TractorTask], AddDeposit: [DepositsTask, InflowsTask, TractorTask], RemoveDeposit: [DepositsTask, InflowsTask, TractorTask], - RemoveDeposits: [DepositsTask, InflowsTask, TractorTask] - // StalkBalanceChanged: [DepositsTask], - // Sow: [InflowsTask], - // Harvest: [InflowsTask], - // PodListingFilled: [InflowsTask], - // PodOrderFilled: [InflowsTask], - // Plant: [InflowsTask], - // Convert: [InflowsTask], - // ClaimPlenty: [InflowsTask] + RemoveDeposits: [DepositsTask, InflowsTask, TractorTask], + StalkBalanceChanged: [DepositsTask], + Sow: [InflowsTask], + Harvest: [InflowsTask], + PodListingFilled: [InflowsTask], + PodOrderFilled: [InflowsTask], + Plant: [InflowsTask], + Convert: [InflowsTask], + ClaimPlenty: [InflowsTask] }; class WebsocketTaskTrigger { @@ -66,7 +66,9 @@ class WebsocketTaskTrigger { console.log(`encountered ${parsedLog.name} log ${log.transactionHash}`); for (const task of EVENT_TASKS[parsedLog.name]) { // TODO: bind this? check it - task.handleLiveEvent(parsedLog); + if (task.isCaughtUp()) { + task.handleLiveEvent(parsedLog); + } } if (log.removed) { @@ -81,6 +83,3 @@ class WebsocketTaskTrigger { } } module.exports = WebsocketTaskTrigger; - -// TODO: underlying executors will keep getting invoked by the cron task, if they have already executed within the last interval, -// it is unnecessary to process again. From d33599919eb60bc715919d9c125539c7908c61b6 Mon Sep 17 00:00:00 2001 From: PintoPirate <189064953+PintoPirate@users.noreply.github.com> Date: Thu, 13 Nov 2025 12:33:53 -0500 Subject: [PATCH 06/13] fix tests --- test/scheduled/task-range.test.js | 2 +- test/tractor/tractor.test.js | 39 ++++++++++++++++++++----------- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/test/scheduled/task-range.test.js b/test/scheduled/task-range.test.js index 3c2d811..f993a3f 100644 --- a/test/scheduled/task-range.test.js +++ b/test/scheduled/task-range.test.js @@ -3,7 +3,7 @@ const TaskRangeUtil = require('../../src/scheduled/util/task-range'); const ChainUtil = require('../../src/utils/chain'); const CHAIN_HEAD = 2500; -const BUFFER = 5; +const BUFFER = 0; // Reduced as the reorg resistance was disabled in src describe('TaskRangeUtil', () => { beforeEach(() => { diff --git a/test/tractor/tractor.test.js b/test/tractor/tractor.test.js index a2e133d..ca86abb 100644 --- a/test/tractor/tractor.test.js +++ b/test/tractor/tractor.test.js @@ -35,6 +35,11 @@ describe('TractorTask', () => { }); describe('Initialized', () => { + let requisitionSpy; + let cancelSpy; + let tractorSpy; + let metaSpy; + beforeEach(() => { jest.spyOn(AppMetaService, 'getTractorMeta').mockResolvedValue({ lastUpdate: 10 @@ -58,18 +63,18 @@ describe('TractorTask', () => { ]); jest.spyOn(SiloEvents, 'getSiloDepositEvents').mockResolvedValue([{ account: '0xabcd' }]); jest.spyOn(SnapshotSowV0Service, 'takeSnapshot').mockImplementation(() => {}); + requisitionSpy = jest.spyOn(TractorTask, 'handlePublishRequsition').mockImplementation(() => {}); + cancelSpy = jest.spyOn(TractorTask, 'handleCancelBlueprint').mockImplementation(() => {}); + tractorSpy = jest.spyOn(TractorTask, 'handleTractor').mockImplementation(() => {}); + metaSpy = jest.spyOn(AppMetaService, 'setLastTractorUpdate').mockImplementation(() => {}); }); test('Passes events to correct handlers', async () => { jest.spyOn(TractorConstants, 'knownBlueprints').mockReturnValue({}); - const requisitionSpy = jest.spyOn(TractorTask, 'handlePublishRequsition').mockImplementation(() => {}); - const cancelSpy = jest.spyOn(TractorTask, 'handleCancelBlueprint').mockImplementation(() => {}); - const tractorSpy = jest.spyOn(TractorTask, 'handleTractor').mockImplementation(() => {}); - const metaSpy = jest.spyOn(AppMetaService, 'setLastTractorUpdate').mockImplementation(() => {}); const retval = await TractorTask.update(); - expect(retval).toBe(true); + expect(retval).toBe(5); expect(requisitionSpy).toHaveBeenCalledWith(expect.objectContaining({ value: 1 })); expect(cancelSpy).toHaveBeenCalledWith(expect.objectContaining({ value: 2 })); expect(tractorSpy).toHaveBeenCalledWith(expect.objectContaining({ value: 3 })); @@ -82,13 +87,10 @@ describe('TractorTask', () => { periodicUpdate: jest.fn().mockImplementation(() => {}) }; jest.spyOn(TractorConstants, 'knownBlueprints').mockReturnValue({ a: blueprintSpy }); - jest.spyOn(TractorTask, 'handlePublishRequsition').mockImplementation(() => {}); - jest.spyOn(TractorTask, 'handleCancelBlueprint').mockImplementation(() => {}); - jest.spyOn(TractorTask, 'handleTractor').mockImplementation(() => {}); - jest.spyOn(AppMetaService, 'setLastTractorUpdate').mockImplementation(() => {}); await TractorTask.update(); + expect(TractorTask.isCaughtUp()).toBe(false); expect(blueprintSpy.periodicUpdate).toHaveBeenCalledWith( expect.any(Function), expect.any(Function), @@ -111,10 +113,6 @@ describe('TractorTask', () => { periodicUpdate: jest.fn().mockImplementation(() => {}) }; jest.spyOn(TractorConstants, 'knownBlueprints').mockReturnValue({ a: blueprintSpy }); - jest.spyOn(TractorTask, 'handlePublishRequsition').mockImplementation(() => {}); - jest.spyOn(TractorTask, 'handleCancelBlueprint').mockImplementation(() => {}); - jest.spyOn(TractorTask, 'handleTractor').mockImplementation(() => {}); - jest.spyOn(AppMetaService, 'setLastTractorUpdate').mockImplementation(() => {}); await TractorTask.update(); @@ -126,6 +124,21 @@ describe('TractorTask', () => { true ); }); + + test('Task can catch up', async () => { + jest.spyOn(TaskRangeUtil, 'getUpdateInfo').mockResolvedValue({ + isInitialized: true, + lastUpdate: 500, + updateBlock: 1000, + isCaughtUp: true, + meta: null + }); + jest.spyOn(TractorConstants, 'knownBlueprints').mockReturnValue({}); + + await TractorTask.update(); + + expect(TractorTask.isCaughtUp()).toBe(true); + }); }); describe('PublishRequisition', () => { From 66d6bd6f7d929ce68d7b108bb796faee61cb8713 Mon Sep 17 00:00:00 2001 From: PintoPirate <189064953+PintoPirate@users.noreply.github.com> Date: Thu, 13 Nov 2025 13:08:15 -0500 Subject: [PATCH 07/13] Websocket tests --- src/scheduled/websocket.js | 1 - test/scheduled/websocket.test.js | 256 +++++++++++++++++++++++++++++++ 2 files changed, 256 insertions(+), 1 deletion(-) create mode 100644 test/scheduled/websocket.test.js diff --git a/src/scheduled/websocket.js b/src/scheduled/websocket.js index 0d6eded..10d91af 100644 --- a/src/scheduled/websocket.js +++ b/src/scheduled/websocket.js @@ -65,7 +65,6 @@ class WebsocketTaskTrigger { console.log(`encountered ${parsedLog.name} log ${log.transactionHash}`); for (const task of EVENT_TASKS[parsedLog.name]) { - // TODO: bind this? check it if (task.isCaughtUp()) { task.handleLiveEvent(parsedLog); } diff --git a/test/scheduled/websocket.test.js b/test/scheduled/websocket.test.js new file mode 100644 index 0000000..5c8443f --- /dev/null +++ b/test/scheduled/websocket.test.js @@ -0,0 +1,256 @@ +const WebsocketTaskTrigger = require('../../src/scheduled/websocket'); +const Beanstalk = require('../../src/datasources/contracts/upgradeable/beanstalk'); +const SeasonService = require('../../src/service/season-service'); +const DepositsTask = require('../../src/scheduled/tasks/deposits'); +const InflowsTask = require('../../src/scheduled/tasks/inflows'); +const TractorTask = require('../../src/scheduled/tasks/tractor'); +const { sendWebhookMessage } = require('../../src/utils/discord'); +const { mockBeanstalkConstants } = require('../util/mock-constants'); + +describe('WebsocketTaskTrigger', () => { + let mockWS; + let mockConstants; + let registeredCallback; + let mockInterfaces; + + beforeEach(() => { + jest.clearAllMocks(); + jest.restoreAllMocks(); + mockBeanstalkConstants(); + + // Create mock interfaces for parsing logs + mockInterfaces = createMockInterfaces(); + + // Mock Beanstalk.getAllInterfaces + jest.spyOn(Beanstalk, 'getAllInterfaces').mockReturnValue(mockInterfaces); + + // Mock the websocket object and capture the callback + mockWS = { + on: jest.fn((filter, callback) => { + registeredCallback = callback; + }) + }; + + // Create mock constants object with mocked WS + mockConstants = { + BEANSTALK: '0xC1E088fC1323b20BCBee9bd1B9fC9546db5624C5', + CHAIN: 'eth', + WS: mockWS + }; + + // Mock task methods + jest.spyOn(DepositsTask, 'isCaughtUp').mockReturnValue(true); + jest.spyOn(DepositsTask, 'handleLiveEvent').mockImplementation(() => {}); + jest.spyOn(InflowsTask, 'isCaughtUp').mockReturnValue(true); + jest.spyOn(InflowsTask, 'handleLiveEvent').mockImplementation(() => {}); + jest.spyOn(TractorTask, 'isCaughtUp').mockReturnValue(true); + jest.spyOn(TractorTask, 'handleLiveEvent').mockImplementation(() => {}); + + // Mock SeasonService + jest.spyOn(SeasonService, 'handleSunrise').mockImplementation(() => {}); + }); + + test('Registers websocket listener with correct filter', async () => { + await WebsocketTaskTrigger.listen(mockConstants); + + expect(mockWS.on).toHaveBeenCalledTimes(1); + const [filter, callback] = mockWS.on.mock.calls[0]; + + expect(filter.address).toEqual([mockConstants.BEANSTALK]); + expect(filter.topics).toEqual([expect.arrayContaining(['sunrise_topic', 'adddeposit_topic', 'tractor_topic'])]); + expect(callback).toBeDefined(); + }); + + test('Parses and handles Sunrise event', async () => { + await WebsocketTaskTrigger.listen(mockConstants); + + const mockLog = createMockLog('Sunrise', 'sunrise_topic', { + season: 12345n, + transactionHash: '0xabc123', + blockNumber: 1000 + }); + + await registeredCallback(mockLog); + + expect(SeasonService.handleSunrise).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'Sunrise', + args: expect.objectContaining({ season: 12345n }), + rawLog: mockLog + }) + ); + expect(DepositsTask.handleLiveEvent).toHaveBeenCalled(); + expect(InflowsTask.handleLiveEvent).toHaveBeenCalled(); + expect(TractorTask.handleLiveEvent).toHaveBeenCalled(); + }); + + test('Handles AddDeposit event and notifies correct tasks', async () => { + await WebsocketTaskTrigger.listen(mockConstants); + + const mockLog = createMockLog('AddDeposit', 'adddeposit_topic', { + account: '0x123', + token: '0xabc', + amount: 1000n, + transactionHash: '0xdef456', + blockNumber: 2000 + }); + + await registeredCallback(mockLog); + + expect(DepositsTask.handleLiveEvent).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'AddDeposit', + args: expect.objectContaining({ account: '0x123', amount: 1000n }) + }) + ); + expect(InflowsTask.handleLiveEvent).toHaveBeenCalled(); + expect(TractorTask.handleLiveEvent).toHaveBeenCalled(); + expect(SeasonService.handleSunrise).not.toHaveBeenCalled(); + }); + + test('Handles PublishRequisition event and notifies only TractorTask', async () => { + await WebsocketTaskTrigger.listen(mockConstants); + + const mockLog = createMockLog('PublishRequisition', 'tractor_topic', { + publisher: '0x789', + requisitionId: '0xreq123', + transactionHash: '0xghi789', + blockNumber: 3000 + }); + + await registeredCallback(mockLog); + + expect(TractorTask.handleLiveEvent).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'PublishRequisition', + args: expect.objectContaining({ publisher: '0x789' }) + }) + ); + expect(DepositsTask.handleLiveEvent).not.toHaveBeenCalled(); + expect(InflowsTask.handleLiveEvent).not.toHaveBeenCalled(); + }); + + test('Does not notify tasks that are not caught up', async () => { + // Set some tasks as not caught up + DepositsTask.isCaughtUp.mockReturnValue(false); + TractorTask.isCaughtUp.mockReturnValue(false); + + await WebsocketTaskTrigger.listen(mockConstants); + + const mockLog = createMockLog('Sunrise', 'sunrise_topic', { + season: 12345n, + transactionHash: '0xabc123', + blockNumber: 1000 + }); + + await registeredCallback(mockLog); + + // Only InflowsTask is caught up and should be notified + expect(InflowsTask.handleLiveEvent).toHaveBeenCalled(); + expect(DepositsTask.handleLiveEvent).not.toHaveBeenCalled(); + expect(TractorTask.handleLiveEvent).not.toHaveBeenCalled(); + }); + + test('Handles multiple events in sequence', async () => { + await WebsocketTaskTrigger.listen(mockConstants); + + const logs = [ + createMockLog('Sunrise', 'sunrise_topic', { + season: 100n, + transactionHash: '0xtx1', + blockNumber: 1000 + }), + createMockLog('AddDeposit', 'adddeposit_topic', { + account: '0x111', + token: '0xaaa', + amount: 100n, + transactionHash: '0xtx2', + blockNumber: 1000 + }), + createMockLog('Sow', 'sow_topic', { + account: '0x222', + beans: 500n, + pods: 550n, + transactionHash: '0xtx3', + blockNumber: 1000 + }) + ]; + + for (const log of logs) { + await registeredCallback(log); + } + + expect(SeasonService.handleSunrise).toHaveBeenCalledTimes(1); + expect(DepositsTask.handleLiveEvent).toHaveBeenCalledTimes(2); // Sunrise + AddDeposit + expect(InflowsTask.handleLiveEvent).toHaveBeenCalledTimes(3); // All three events + expect(TractorTask.handleLiveEvent).toHaveBeenCalledTimes(2); // Sunrise + AddDeposit + }); + + test('Attaches rawLog to parsed event', async () => { + await WebsocketTaskTrigger.listen(mockConstants); + + const mockLog = createMockLog('AddDeposit', 'adddeposit_topic', { + account: '0x123', + token: '0xabc', + amount: 1000n, + transactionHash: '0xyz567', + blockNumber: 9000 + }); + + await registeredCallback(mockLog); + + expect(DepositsTask.handleLiveEvent).toHaveBeenCalledWith( + expect.objectContaining({ + rawLog: mockLog + }) + ); + }); +}); + +// Helper functions + +function createMockInterfaces() { + const eventTopics = { + Sunrise: 'sunrise_topic', + PublishRequisition: 'tractor_topic', + CancelBlueprint: 'cancelblueprint_topic', + Tractor: 'tractor_topic', + AddDeposit: 'adddeposit_topic', + RemoveDeposit: 'removedeposit_topic', + RemoveDeposits: 'removedeposits_topic', + StalkBalanceChanged: 'stalkbalancechanged_topic', + Sow: 'sow_topic', + Harvest: 'harvest_topic', + PodListingFilled: 'podlistingfilled_topic', + PodOrderFilled: 'podorderfilled_topic', + Plant: 'plant_topic', + Convert: 'convert_topic', + ClaimPlenty: 'claimplenty_topic' + }; + + return [ + { + getEventTopic: jest.fn((eventName) => eventTopics[eventName]), + parseLog: jest.fn((log) => { + // Find the event name from the topic + const eventName = Object.keys(eventTopics).find((name) => eventTopics[name] === log.topics[0]); + return { + name: eventName, + args: log.args || {}, + rawLog: undefined // Will be set by the websocket handler + }; + }) + } + ]; +} + +function createMockLog(eventName, topicHash, data) { + return { + topics: [topicHash], + args: data, + name: eventName, + transactionHash: data.transactionHash, + blockNumber: data.blockNumber, + removed: false + }; +} From 6eddb102809e78c5e072480a7a1e48b8cbd46b9c Mon Sep 17 00:00:00 2001 From: PintoPirate <189064953+PintoPirate@users.noreply.github.com> Date: Fri, 14 Nov 2025 13:00:03 -0500 Subject: [PATCH 08/13] avoids queueing multiple executions for events in the same block --- .../postgres/startup-seeders/dev-seeder.js | 10 +++--- src/scheduled/cron-schedule.js | 15 ++++---- src/scheduled/tasks/IndexingTask.js | 36 ++++++++++++++----- src/scheduled/tasks/deposits.js | 2 +- src/scheduled/tasks/inflows.js | 2 +- src/scheduled/tasks/tractor.js | 2 +- 6 files changed, 44 insertions(+), 23 deletions(-) diff --git a/src/repository/postgres/startup-seeders/dev-seeder.js b/src/repository/postgres/startup-seeders/dev-seeder.js index d22114f..7428b7b 100644 --- a/src/repository/postgres/startup-seeders/dev-seeder.js +++ b/src/repository/postgres/startup-seeders/dev-seeder.js @@ -21,11 +21,11 @@ class DevSeeder { }); await AsyncContext.run({ chain: 'base' }, async () => { - try { - TractorTask.__cronLock = true; - while ((await TractorTask.update()).canExecuteAgain) {} - } finally { - TractorTask.__cronLock = false; + while (true) { + const { canExecuteAgain } = await TractorTask.queueExecution(); + if (!canExecuteAgain) { + break; + } } }); } diff --git a/src/scheduled/cron-schedule.js b/src/scheduled/cron-schedule.js index c4e96b5..54b68e0 100644 --- a/src/scheduled/cron-schedule.js +++ b/src/scheduled/cron-schedule.js @@ -16,27 +16,28 @@ const ALL_JOBS = { deposits: { executeOnStartup: true, // Updated less frequently because the underlying data is currently unused - cron: '0 30 * * * *', + cron: '0 10 * * * *', function: async () => { - while ((await DepositsTask.queueExecution(50)).canExecuteAgain) {} + while ((await DepositsTask.queueExecution({ minIntervalMinutes: 55 })).canExecuteAgain) {} } }, inflows: { executeOnStartup: true, // Updated less frequently because its only used for snapshots (and the ws should invoke it at sunrise) - cron: '0 30 * * * *', + cron: '0 10 * * * *', function: async () => { - while ((await InflowsTask.queueExecution(50)).canExecuteAgain) {} + while ((await InflowsTask.queueExecution({ minIntervalMinutes: 55 })).canExecuteAgain) {} } }, tractor: { - executeOnStartup: true, + executeOnStartup: !EnvUtil.getDevTractor().seeder, // will execute on startup either from here or from the dev seeder cron: '0 */5 * * * *', function: async () => { + const isInitialRun = TractorTask.getLastExecutionTime() === null; while (true) { - const { countEvents, canExecuteAgain } = await TractorTask.queueExecution(4.5); + const { countEvents, canExecuteAgain } = await TractorTask.queueExecution({ minIntervalMinutes: 4.5 }); if (!canExecuteAgain) { - if (countEvents > 0) { + if (countEvents > 0 && !isInitialRun) { sendWebhookMessage(`Cron task processed ${countEvents} tractor events; websocket might be disconnected?`); } break; diff --git a/src/scheduled/tasks/IndexingTask.js b/src/scheduled/tasks/IndexingTask.js index 0a19333..810c89d 100644 --- a/src/scheduled/tasks/IndexingTask.js +++ b/src/scheduled/tasks/IndexingTask.js @@ -1,14 +1,27 @@ class IndexingTask { - static _lastExecution = null; + static _lastQueuedBlock = null; + static _lastExecutionTime = null; static _running = false; static _queueCounter = 0; static _isCaughtUp = false; - // Runs update immediately if nothing is executing, otherwise queues an update execution. - static async queueExecution(minIntervalMinutes = 0) { - // Ensure at least the minimum requested interval has passed since the last execution - if (this._lastExecution && Date.now() - this._lastExecution < minIntervalMinutes * 60 * 1000) { + /** + * Attempts to execute the tasks, queuing an execution if its already running and the second request is unique. + * @param blockNumber - the block an update is being requested for; ignores processing if already processed up to this block. + * @param minIntervalMinutes - ignores queue requests if the task was recently executed within this interval. + * @returns {Promise<{ countEvents: number, canExecuteAgain: boolean }>} + */ + static async queueExecution({ blockNumber = -1, minIntervalMinutes = 0 } = {}) { + if (blockNumber !== -1 && blockNumber <= this._lastQueuedBlock) { + // Requested block number was already queued or processed return { countEvents: 0, canExecuteAgain: false }; + } else if (this._lastExecutionTime && Date.now() - this._lastExecutionTime < minIntervalMinutes * 60 * 1000) { + // Minimum requested interval hasn't passed since the last execution + return { countEvents: 0, canExecuteAgain: false }; + } + + if (blockNumber !== -1) { + this._lastQueuedBlock = blockNumber; } const localCount = ++this._queueCounter; @@ -23,13 +36,13 @@ class IndexingTask { this._running = true; // update return sig to be number of events, and boolean? const countEvents = await this.update(); - this._lastExecution = new Date(); + this._lastExecutionTime = new Date(); return { countEvents, canExecuteAgain: !this.isCaughtUp() }; } finally { this._running = false; } } - return false; + return { countEvents: 0, canExecuteAgain: false }; } // Notifies of an event occuring in real-time via a websocket. Task decides how to proceed. @@ -37,7 +50,10 @@ class IndexingTask { throw new Error('Must be implemented by subclass'); } - // Runs the task, updating as many blocks as possible + /** + * Runs the task, updating as many blocks as possible + * @returns {Promise<{updateBlock: number, processedEvents: number}>} + */ static async update() { throw new Error('Must be implemented by subclass'); } @@ -46,6 +62,10 @@ class IndexingTask { static isCaughtUp() { return this._isCaughtUp; } + + static getLastExecutionTime() { + return this._lastExecutionTime; + } } module.exports = IndexingTask; diff --git a/src/scheduled/tasks/deposits.js b/src/scheduled/tasks/deposits.js index d3721d4..de558d2 100644 --- a/src/scheduled/tasks/deposits.js +++ b/src/scheduled/tasks/deposits.js @@ -24,7 +24,7 @@ class DepositsTask extends IndexingTask { static async handleLiveEvent(event) { // Deposits task is not currently used for anything, therefore ok to update infrequently if (event.name === 'Sunrise') { - await this.queueExecution(); + await this.queueExecution({ blockNumber: event.rawLog.blockNumber }); } // if (['AddDeposit', 'RemoveDeposit', 'RemoveDeposits', 'StalkBalanceChanged'].includes(event.name)) { // await this.queueExecution(); diff --git a/src/scheduled/tasks/inflows.js b/src/scheduled/tasks/inflows.js index 0a0c9e7..d6bf41e 100644 --- a/src/scheduled/tasks/inflows.js +++ b/src/scheduled/tasks/inflows.js @@ -26,7 +26,7 @@ class InflowsTask extends IndexingTask { static async handleLiveEvent(event) { // Inflows are only used for snapshots currently, therefore update on Sunrise only if (event.name === 'Sunrise') { - await this.queueExecution(); + await this.queueExecution({ blockNumber: event.rawLog.blockNumber }); } // if (ALL_EVENTS.includes(event.name)) { // await this.queueExecution(); diff --git a/src/scheduled/tasks/tractor.js b/src/scheduled/tasks/tractor.js index 345bc74..39f5aee 100644 --- a/src/scheduled/tasks/tractor.js +++ b/src/scheduled/tasks/tractor.js @@ -26,7 +26,7 @@ const SNAPSHOT_SERVICES = [SnapshotSowV0Service, SnapshotConvertUpV0Service]; class TractorTask extends IndexingTask { static async handleLiveEvent(event) { if (['Sunrise', 'PublishRequisition', 'CancelBlueprint', 'Tractor'].includes(event.name)) { - await this.queueExecution(); + await this.queueExecution({ blockNumber: event.rawLog.blockNumber }); } // Silo events could trigger a periodicUpdate, ignoring currently } From 79bcb8b0f2220eab00f344c7f0399bf2d2a8b67c Mon Sep 17 00:00:00 2001 From: PintoPirate <189064953+PintoPirate@users.noreply.github.com> Date: Fri, 14 Nov 2025 13:48:34 -0500 Subject: [PATCH 09/13] Indexing task tests --- test/scheduled/IndexingTask.test.js | 197 ++++++++++++++++++++++++++++ 1 file changed, 197 insertions(+) create mode 100644 test/scheduled/IndexingTask.test.js diff --git a/test/scheduled/IndexingTask.test.js b/test/scheduled/IndexingTask.test.js new file mode 100644 index 0000000..c4b85c7 --- /dev/null +++ b/test/scheduled/IndexingTask.test.js @@ -0,0 +1,197 @@ +const IndexingTask = require('../../src/scheduled/tasks/IndexingTask'); + +describe('IndexingTask', () => { + let mockUpdate; + + beforeEach(() => { + jest.clearAllMocks(); + jest.restoreAllMocks(); + + mockUpdate = jest.spyOn(IndexingTask, 'update').mockResolvedValue(0); + + // Reset static properties before each test + IndexingTask._lastQueuedBlock = null; + IndexingTask._lastExecutionTime = null; + IndexingTask._running = false; + IndexingTask._queueCounter = 0; + IndexingTask._isCaughtUp = false; + }); + + afterEach(() => { + expect(IndexingTask._running).toBeFalsy(); + jest.useRealTimers(); + }); + + describe('queueExecution', () => { + it('should return early if blockNumber was already queued', async () => { + IndexingTask._lastQueuedBlock = 100; + + await IndexingTask.queueExecution({ blockNumber: 100 }); + + expect(IndexingTask._lastQueuedBlock).toBe(100); + expect(mockUpdate).not.toHaveBeenCalled(); + }); + + it('should return early if blockNumber is less than last queued block', async () => { + IndexingTask._lastQueuedBlock = 100; + + await IndexingTask.queueExecution({ blockNumber: 50 }); + + expect(IndexingTask._lastQueuedBlock).toBe(100); + expect(mockUpdate).not.toHaveBeenCalled(); + }); + + it('should return early if minimum interval has not passed', async () => { + IndexingTask._lastExecutionTime = new Date(Date.now() - 4 * 60 * 1000); + + await IndexingTask.queueExecution({ minIntervalMinutes: 5 }); + + expect(mockUpdate).not.toHaveBeenCalled(); + }); + + it('should proceed if minimum interval has passed', async () => { + IndexingTask._lastExecutionTime = new Date(Date.now() - 10 * 60 * 1000); + + await IndexingTask.queueExecution({ minIntervalMinutes: 5 }); + + expect(mockUpdate).toHaveBeenCalled(); + }); + + it('should update _lastQueuedBlock when valid blockNumber is provided', async () => { + IndexingTask._lastQueuedBlock = 100; + + await IndexingTask.queueExecution({ blockNumber: 200 }); + + expect(IndexingTask._lastQueuedBlock).toBe(200); + expect(mockUpdate).toHaveBeenCalled(); + }); + + it('should not update _lastQueuedBlock when blockNumber isnt provided', async () => { + IndexingTask._lastQueuedBlock = 100; + + await IndexingTask.queueExecution(); + + expect(IndexingTask._lastQueuedBlock).toBe(100); + expect(mockUpdate).toHaveBeenCalled(); + }); + + it('should return correct result when update is successful', async () => { + mockUpdate.mockResolvedValue(10); + jest.spyOn(IndexingTask, 'isCaughtUp').mockReturnValue(true); + + const { countEvents, canExecuteAgain } = await IndexingTask.queueExecution({ blockNumber: 200 }); + + expect(countEvents).toBe(10); + expect(canExecuteAgain).toBe(true); + }); + + it('should wait for running task to finish before executing', async () => { + jest.useFakeTimers(); + IndexingTask._running = true; + + const queuePromise = IndexingTask.queueExecution({ blockNumber: 500 }); + + // Still waiting on running task + await jest.runAllTimersAsync(); + expect(mockUpdate).not.toHaveBeenCalled(); + await jest.runAllTimersAsync(); + expect(mockUpdate).not.toHaveBeenCalled(); + + // Stop the running task + IndexingTask._running = false; + + // Advance time to complete waiting + await jest.runAllTimersAsync(); + await queuePromise; + + expect(mockUpdate).toHaveBeenCalled(); + }); + + it('should not run task if exceeds wait time', async () => { + jest.useFakeTimers(); + IndexingTask._running = true; + + const queuePromise = IndexingTask.queueExecution({ blockNumber: 600 }); + + // Advance many wait times + for (let i = 0; i < 50; ++i) { + await jest.runAllTimersAsync(); + } + + // Task should still be running from another caller, this wont update + await queuePromise; + + expect(mockUpdate).not.toHaveBeenCalled(); + }); + + it('should not execute if another execution was queued during wait', async () => { + jest.useFakeTimers(); + // Simulate a task that's already running + IndexingTask._running = true; + + mockUpdate.mockResolvedValue(5); + + // First execution starts waiting + const firstPromise = IndexingTask.queueExecution({ blockNumber: 700 }); + // Queue another execution + const secondPromise = IndexingTask.queueExecution({ blockNumber: 701 }); + + // Stop the running task so both can proceed + IndexingTask._running = false; + + // Advance time to complete both executions + await jest.runAllTimersAsync(); + + const firstResult = await firstPromise; + const secondResult = await secondPromise; + + // First execution should be skipped because another arrived during wait + expect(firstResult).toEqual({ countEvents: 0, canExecuteAgain: false }); + expect(secondResult).toEqual({ countEvents: 5, canExecuteAgain: true }); + expect(mockUpdate).toHaveBeenCalledTimes(1); + }); + + it('should set _running flag during execution and clear it after', async () => { + mockUpdate.mockImplementation(async () => { + expect(IndexingTask._running).toBe(true); + return 10; + }); + + await IndexingTask.queueExecution({ blockNumber: 800 }); + + expect(IndexingTask._running).toBe(false); + expect(mockUpdate).toHaveBeenCalled(); + }); + + it('should clear _running flag even if update throws an error', async () => { + mockUpdate.mockImplementation(async () => { + expect(IndexingTask._running).toBe(true); + throw new Error('Update failed'); + }); + + const resultPromise = IndexingTask.queueExecution({ blockNumber: 900 }); + + // Verify the promise rejects and _running is cleared + try { + await resultPromise; + throw new Error('Expected error to be thrown, but wasnt'); + } catch (error) { + errorThrown = true; + expect(error.message).toBe('Update failed'); + } + + expect(IndexingTask._running).toBe(false); + expect(mockUpdate).toHaveBeenCalled(); + }); + + it('should update _lastExecutionTime after successful execution', async () => { + expect(IndexingTask._lastExecutionTime).toBeNull(); + + const startTime = Date.now(); + await IndexingTask.queueExecution({ blockNumber: 1000 }); + + expect(IndexingTask._lastExecutionTime).toBeInstanceOf(Date); + expect(IndexingTask._lastExecutionTime.getTime()).toBeGreaterThanOrEqual(startTime); + }); + }); +}); From 88f6089aa6d5bc0ac68262da8afe2f7e0e77528d Mon Sep 17 00:00:00 2001 From: PintoPirate <189064953+PintoPirate@users.noreply.github.com> Date: Wed, 19 Nov 2025 14:16:57 -0500 Subject: [PATCH 10/13] Activate websocket --- src/app.js | 4 ++++ src/scheduled/cron-schedule.js | 1 + src/scheduled/websocket.js | 8 +++----- test/scheduled/websocket.test.js | 1 - 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/app.js b/src/app.js index a5c6ba5..b492ae2 100644 --- a/src/app.js +++ b/src/app.js @@ -20,6 +20,7 @@ const EnvUtil = require('./utils/env.js'); const ChainUtil = require('./utils/chain.js'); const AlchemyUtil = require('./datasources/alchemy.js'); const StartupSeeder = require('./repository/postgres/startup-seeders/startup-seeder.js'); +const WebsocketTaskTrigger = require('./scheduled/websocket.js'); async function appStartup() { // Activate whichever cron jobs are configured, if any @@ -38,6 +39,9 @@ async function appStartup() { // Long-running async seeder process, the api will come online before this is complete. StartupSeeder.seedDatabase(); + // Websocket begins listening for events + WebsocketTaskTrigger.listen(); + const app = new Koa(); app.use( diff --git a/src/scheduled/cron-schedule.js b/src/scheduled/cron-schedule.js index 54b68e0..2cf2d48 100644 --- a/src/scheduled/cron-schedule.js +++ b/src/scheduled/cron-schedule.js @@ -5,6 +5,7 @@ const Log = require('../utils/logging'); const DepositsTask = require('./tasks/deposits'); const TractorTask = require('./tasks/tractor'); const InflowsTask = require('./tasks/inflows'); +const EnvUtil = require('../utils/env'); // All cron jobs which could be activated are configured here const ALL_JOBS = { diff --git a/src/scheduled/websocket.js b/src/scheduled/websocket.js index 10d91af..7812683 100644 --- a/src/scheduled/websocket.js +++ b/src/scheduled/websocket.js @@ -1,7 +1,5 @@ // Process sitting atop each of the scheduled tasks; Depending on which events are encountered, -// triggers the appropriate task to run immediately. The tasks decide what to do with the provided events; -// the task may decide to wait to process until a larger volume of events is encountered, and may also -// re-retrieve the logs on its own (implementation will vary by task). +// triggers the appropriate task to run immediately. The tasks decide what to do with the provided events. const { C } = require('../constants/runtime-constants'); const Beanstalk = require('../datasources/contracts/upgradeable/beanstalk'); @@ -32,7 +30,7 @@ const EVENT_TASKS = { }; class WebsocketTaskTrigger { - static async listen(c = C()) { + static listen(c = C()) { const interfaces = Beanstalk.getAllInterfaces(c); const topics = []; @@ -63,7 +61,7 @@ class WebsocketTaskTrigger { await SeasonService.handleSunrise(parsedLog); } - console.log(`encountered ${parsedLog.name} log ${log.transactionHash}`); + Log.info(`encountered ${parsedLog.name} log in txn: ${log.transactionHash}`); for (const task of EVENT_TASKS[parsedLog.name]) { if (task.isCaughtUp()) { task.handleLiveEvent(parsedLog); diff --git a/test/scheduled/websocket.test.js b/test/scheduled/websocket.test.js index 5c8443f..e6fd2f0 100644 --- a/test/scheduled/websocket.test.js +++ b/test/scheduled/websocket.test.js @@ -4,7 +4,6 @@ const SeasonService = require('../../src/service/season-service'); const DepositsTask = require('../../src/scheduled/tasks/deposits'); const InflowsTask = require('../../src/scheduled/tasks/inflows'); const TractorTask = require('../../src/scheduled/tasks/tractor'); -const { sendWebhookMessage } = require('../../src/utils/discord'); const { mockBeanstalkConstants } = require('../util/mock-constants'); describe('WebsocketTaskTrigger', () => { From d915421537090c7818c4da4cf5838f3f4cece2d4 Mon Sep 17 00:00:00 2001 From: PintoPirate <189064953+PintoPirate@users.noreply.github.com> Date: Wed, 19 Nov 2025 14:38:02 -0500 Subject: [PATCH 11/13] Correct name comparison --- src/scheduled/websocket.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scheduled/websocket.js b/src/scheduled/websocket.js index 7812683..a2cd4ed 100644 --- a/src/scheduled/websocket.js +++ b/src/scheduled/websocket.js @@ -57,7 +57,7 @@ class WebsocketTaskTrigger { const parsedLog = ifaceMap[log.topics[0]].parseLog(log); parsedLog.rawLog = log; - if (log.name === 'Sunrise') { + if (parsedLog.name === 'Sunrise') { await SeasonService.handleSunrise(parsedLog); } From 1373b86da9da053642d1f83139f6c31e87c8bf93 Mon Sep 17 00:00:00 2001 From: PintoPirate <189064953+PintoPirate@users.noreply.github.com> Date: Wed, 19 Nov 2025 17:58:54 -0500 Subject: [PATCH 12/13] Fix tests --- src/scheduled/tasks/IndexingTask.js | 2 +- test/scheduled/IndexingTask.test.js | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/scheduled/tasks/IndexingTask.js b/src/scheduled/tasks/IndexingTask.js index 810c89d..e3cbad8 100644 --- a/src/scheduled/tasks/IndexingTask.js +++ b/src/scheduled/tasks/IndexingTask.js @@ -31,7 +31,7 @@ class IndexingTask { } // If another execution was queued during the wait, allow that one to execute instead - if (localCount === this._queueCounter) { + if (!this._running && localCount === this._queueCounter) { try { this._running = true; // update return sig to be number of events, and boolean? diff --git a/test/scheduled/IndexingTask.test.js b/test/scheduled/IndexingTask.test.js index c4b85c7..e7644a5 100644 --- a/test/scheduled/IndexingTask.test.js +++ b/test/scheduled/IndexingTask.test.js @@ -82,7 +82,7 @@ describe('IndexingTask', () => { const { countEvents, canExecuteAgain } = await IndexingTask.queueExecution({ blockNumber: 200 }); expect(countEvents).toBe(10); - expect(canExecuteAgain).toBe(true); + expect(canExecuteAgain).toBe(false); }); it('should wait for running task to finish before executing', async () => { @@ -92,16 +92,16 @@ describe('IndexingTask', () => { const queuePromise = IndexingTask.queueExecution({ blockNumber: 500 }); // Still waiting on running task - await jest.runAllTimersAsync(); + await jest.advanceTimersByTimeAsync(1000); expect(mockUpdate).not.toHaveBeenCalled(); - await jest.runAllTimersAsync(); + await jest.advanceTimersByTimeAsync(1000); expect(mockUpdate).not.toHaveBeenCalled(); // Stop the running task IndexingTask._running = false; // Advance time to complete waiting - await jest.runAllTimersAsync(); + await jest.advanceTimersByTimeAsync(1000); await queuePromise; expect(mockUpdate).toHaveBeenCalled(); @@ -115,13 +115,15 @@ describe('IndexingTask', () => { // Advance many wait times for (let i = 0; i < 50; ++i) { - await jest.runAllTimersAsync(); + await jest.advanceTimersByTimeAsync(1000); } // Task should still be running from another caller, this wont update await queuePromise; expect(mockUpdate).not.toHaveBeenCalled(); + expect(IndexingTask._running).toBeTruthy(); + IndexingTask._running = false; }); it('should not execute if another execution was queued during wait', async () => { From 5c62eb79da70fee23fa751919d22fe7a57d45085 Mon Sep 17 00:00:00 2001 From: PintoPirate <189064953+PintoPirate@users.noreply.github.com> Date: Wed, 19 Nov 2025 18:30:23 -0500 Subject: [PATCH 13/13] Prevent incorrect warning log on race condition --- src/scheduled/cron-schedule.js | 8 ++++++-- src/scheduled/tasks/IndexingTask.js | 14 +++++++++----- test/scheduled/IndexingTask.test.js | 4 ++-- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/scheduled/cron-schedule.js b/src/scheduled/cron-schedule.js index 2cf2d48..456dee2 100644 --- a/src/scheduled/cron-schedule.js +++ b/src/scheduled/cron-schedule.js @@ -36,9 +36,13 @@ const ALL_JOBS = { function: async () => { const isInitialRun = TractorTask.getLastExecutionTime() === null; while (true) { - const { countEvents, canExecuteAgain } = await TractorTask.queueExecution({ minIntervalMinutes: 4.5 }); + const { countEvents, queuedCallersBehind, canExecuteAgain } = await TractorTask.queueExecution({ + minIntervalMinutes: 4.5 + }); if (!canExecuteAgain) { - if (countEvents > 0 && !isInitialRun) { + // queuedCallersBehind can be true if the task ran slightly before the websocket got the event, + // and the websocket queued a run while this one was still running. + if (countEvents > 0 && !isInitialRun && !queuedCallersBehind) { sendWebhookMessage(`Cron task processed ${countEvents} tractor events; websocket might be disconnected?`); } break; diff --git a/src/scheduled/tasks/IndexingTask.js b/src/scheduled/tasks/IndexingTask.js index e3cbad8..d82cc9f 100644 --- a/src/scheduled/tasks/IndexingTask.js +++ b/src/scheduled/tasks/IndexingTask.js @@ -9,15 +9,15 @@ class IndexingTask { * Attempts to execute the tasks, queuing an execution if its already running and the second request is unique. * @param blockNumber - the block an update is being requested for; ignores processing if already processed up to this block. * @param minIntervalMinutes - ignores queue requests if the task was recently executed within this interval. - * @returns {Promise<{ countEvents: number, canExecuteAgain: boolean }>} + * @returns {Promise<{ countEvents: number, queuedCallersBehind: boolean, canExecuteAgain: boolean }>} */ static async queueExecution({ blockNumber = -1, minIntervalMinutes = 0 } = {}) { if (blockNumber !== -1 && blockNumber <= this._lastQueuedBlock) { // Requested block number was already queued or processed - return { countEvents: 0, canExecuteAgain: false }; + return { countEvents: 0, queuedCallersBehind: false, canExecuteAgain: false }; } else if (this._lastExecutionTime && Date.now() - this._lastExecutionTime < minIntervalMinutes * 60 * 1000) { // Minimum requested interval hasn't passed since the last execution - return { countEvents: 0, canExecuteAgain: false }; + return { countEvents: 0, queuedCallersBehind: false, canExecuteAgain: false }; } if (blockNumber !== -1) { @@ -37,12 +37,16 @@ class IndexingTask { // update return sig to be number of events, and boolean? const countEvents = await this.update(); this._lastExecutionTime = new Date(); - return { countEvents, canExecuteAgain: !this.isCaughtUp() }; + return { + countEvents, + queuedCallersBehind: this._queueCounter > localCount, + canExecuteAgain: !this.isCaughtUp() + }; } finally { this._running = false; } } - return { countEvents: 0, canExecuteAgain: false }; + return { countEvents: 0, queuedCallersBehind: this._queueCounter > localCount, canExecuteAgain: false }; } // Notifies of an event occuring in real-time via a websocket. Task decides how to proceed. diff --git a/test/scheduled/IndexingTask.test.js b/test/scheduled/IndexingTask.test.js index e7644a5..a5acceb 100644 --- a/test/scheduled/IndexingTask.test.js +++ b/test/scheduled/IndexingTask.test.js @@ -148,8 +148,8 @@ describe('IndexingTask', () => { const secondResult = await secondPromise; // First execution should be skipped because another arrived during wait - expect(firstResult).toEqual({ countEvents: 0, canExecuteAgain: false }); - expect(secondResult).toEqual({ countEvents: 5, canExecuteAgain: true }); + expect(firstResult).toEqual({ countEvents: 0, queuedCallersBehind: true, canExecuteAgain: false }); + expect(secondResult).toEqual({ countEvents: 5, queuedCallersBehind: false, canExecuteAgain: true }); expect(mockUpdate).toHaveBeenCalledTimes(1); });