Skip to content
Merged
4 changes: 4 additions & 0 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const EnvUtil = require('./utils/env.js');
const ChainUtil = require('./utils/chain.js');
const AlchemyUtil = require('./datasources/alchemy.js');
const StartupSeeder = require('./repository/postgres/startup-seeders/startup-seeder.js');
const WebsocketTaskTrigger = require('./scheduled/websocket.js');

async function appStartup() {
// Activate whichever cron jobs are configured, if any
Expand All @@ -38,6 +39,9 @@ async function appStartup() {
// Long-running async seeder process, the api will come online before this is complete.
StartupSeeder.seedDatabase();

// Websocket begins listening for events
WebsocketTaskTrigger.listen();

const app = new Koa();

app.use(
Expand Down
2 changes: 2 additions & 0 deletions src/constants/runtime-constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class RuntimeConstants {
get: (target, property, receiver) => {
if (property === 'RPC') {
return AlchemyUtil.providerForChain(chain);
} else if (property === 'WS') {
return AlchemyUtil.alchemyForChain(chain).ws;
}
let constants;
if (chain) {
Expand Down
10 changes: 8 additions & 2 deletions src/datasources/alchemy.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ const EnvUtil = require('../utils/env');
const { ethers } = require('ethers');

class AlchemyUtil {
// Contains alchemy object
static _alchemies = {};
// Contains a provider by chain
static _providers = {};
// Access to the underlying promise whos execution populates _providers.
Expand All @@ -20,14 +22,18 @@ class AlchemyUtil {
apiKey: EnvUtil.getAlchemyKey(),
network: `${chain}-mainnet` // Of type alchemy-sdk.Network
};
const alchemy = new Alchemy(settings);
this._providerPromises[chain] = alchemy.config.getProvider().then((p) => {
this._alchemies[chain] = new Alchemy(settings);
this._providerPromises[chain] = this._alchemies[chain].config.getProvider().then((p) => {
this._providers[chain] = p;
});
}
}
}

static alchemyForChain(chain) {
return AlchemyUtil._alchemies[chain];
}

static providerForChain(chain) {
return AlchemyUtil._providers[chain];
}
Expand Down
2 changes: 0 additions & 2 deletions src/datasources/events/silo-events.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
const { C } = require('../../constants/runtime-constants');
const { BigInt_abs } = require('../../utils/bigint');
const Log = require('../../utils/logging');
const { fromBigInt, bigintFloatMultiplier, bigintPercent } = require('../../utils/number');
const AlchemyUtil = require('../alchemy');
const FilterLogs = require('./filter-logs');

Expand Down
10 changes: 5 additions & 5 deletions src/repository/postgres/startup-seeders/dev-seeder.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ class DevSeeder {
});

await AsyncContext.run({ chain: 'base' }, async () => {
try {
TractorTask.__cronLock = true;
while (await TractorTask.update()) {}
} finally {
TractorTask.__cronLock = false;
while (true) {
const { canExecuteAgain } = await TractorTask.queueExecution();
if (!canExecuteAgain) {
break;
}
}
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/repository/postgres/startup-seeders/season-seeder.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class SeasonSeeder {
await Concurrent.run(TAG, 50, () =>
retryable(async () => {
try {
await SeasonService.insertSeasonFromEvent(season);
await SeasonService.insertSeason(season);

if (season % 100 === 0) {
Log.info(`Saved season ${season}...`);
Expand Down
71 changes: 43 additions & 28 deletions src/scheduled/cron-schedule.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,7 @@ const Log = require('../utils/logging');
const DepositsTask = require('./tasks/deposits');
const TractorTask = require('./tasks/tractor');
const InflowsTask = require('./tasks/inflows');

const genericTask = (Executor, label) => ({
[label]: {
// 11 seconds into the minute; these tasks have a 5 block buffer, this will ensure it processes the block on the minute
cron: '11 * * * * *',
function: async () => {
if (Executor.__cronLock) {
Log.info(`${label} task is still running, skipping this minute...`);
return;
}

try {
Executor.__cronLock = true;
let canExecuteAgain = true;
while (canExecuteAgain) {
canExecuteAgain = await Executor.update();
}
} finally {
Executor.__cronLock = false;
}
}
}
});
const EnvUtil = require('../utils/env');

// All cron jobs which could be activated are configured here
const ALL_JOBS = {
Expand All @@ -36,9 +14,42 @@ const ALL_JOBS = {
cron: '50-59 59 * * * *',
function: SunriseTask.handleSunrise
},
...genericTask(DepositsTask, 'deposits'),
...genericTask(TractorTask, 'tractor'),
...genericTask(InflowsTask, 'inflows'),
deposits: {
executeOnStartup: true,
// Updated less frequently because the underlying data is currently unused
cron: '0 10 * * * *',
function: async () => {
while ((await DepositsTask.queueExecution({ minIntervalMinutes: 55 })).canExecuteAgain) {}
}
},
inflows: {
executeOnStartup: true,
// Updated less frequently because its only used for snapshots (and the ws should invoke it at sunrise)
cron: '0 10 * * * *',
function: async () => {
while ((await InflowsTask.queueExecution({ minIntervalMinutes: 55 })).canExecuteAgain) {}
}
},
tractor: {
executeOnStartup: !EnvUtil.getDevTractor().seeder, // will execute on startup either from here or from the dev seeder
cron: '0 */5 * * * *',
function: async () => {
const isInitialRun = TractorTask.getLastExecutionTime() === null;
while (true) {
const { countEvents, queuedCallersBehind, canExecuteAgain } = await TractorTask.queueExecution({
minIntervalMinutes: 4.5
});
if (!canExecuteAgain) {
// queuedCallersBehind can be true if the task ran slightly before the websocket got the event,
// and the websocket queued a run while this one was still running.
if (countEvents > 0 && !isInitialRun && !queuedCallersBehind) {
sendWebhookMessage(`Cron task processed ${countEvents} tractor events; websocket might be disconnected?`);
}
break;
}
}
}
},
alert: {
cron: '*/10 * * * * *',
function: () => Log.info('10 seconds testing Alert')
Expand Down Expand Up @@ -70,7 +81,7 @@ function activateJobs(jobNames) {
for (const jobName of jobNames) {
const job = ALL_JOBS[jobName];
if (job) {
cron.schedule(job.cron, () => {
const execute = () => {
// This is to mitigate a quirk in node-cron where sometimes jobs are missed. Jobs can specify
// a range of seconds they are willing to execute on, making it far less likely to drop.
// This guard prevents double-execution.
Expand All @@ -80,7 +91,11 @@ function activateJobs(jobNames) {
job.__lastExecuted = Date.now();

errorWrapper(job.function);
});
};
if (job.executeOnStartup) {
execute();
}
cron.schedule(job.cron, execute);
activated.push(jobName);
} else {
failed.push(jobName);
Expand Down
75 changes: 75 additions & 0 deletions src/scheduled/tasks/IndexingTask.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
class IndexingTask {
static _lastQueuedBlock = null;
static _lastExecutionTime = null;
static _running = false;
static _queueCounter = 0;
static _isCaughtUp = false;

/**
* Attempts to execute the tasks, queuing an execution if its already running and the second request is unique.
* @param blockNumber - the block an update is being requested for; ignores processing if already processed up to this block.
* @param minIntervalMinutes - ignores queue requests if the task was recently executed within this interval.
* @returns {Promise<{ countEvents: number, queuedCallersBehind: boolean, canExecuteAgain: boolean }>}
*/
static async queueExecution({ blockNumber = -1, minIntervalMinutes = 0 } = {}) {
if (blockNumber !== -1 && blockNumber <= this._lastQueuedBlock) {
// Requested block number was already queued or processed
return { countEvents: 0, queuedCallersBehind: false, canExecuteAgain: false };
} else if (this._lastExecutionTime && Date.now() - this._lastExecutionTime < minIntervalMinutes * 60 * 1000) {
// Minimum requested interval hasn't passed since the last execution
return { countEvents: 0, queuedCallersBehind: false, canExecuteAgain: false };
}

if (blockNumber !== -1) {
this._lastQueuedBlock = blockNumber;
}

const localCount = ++this._queueCounter;
// Wait up to 20 seconds for the task to finish executing
for (let i = 0; i < 20 && this._running; ++i) {
await new Promise((resolve) => setTimeout(resolve, 1000));
}

// If another execution was queued during the wait, allow that one to execute instead
if (!this._running && localCount === this._queueCounter) {
try {
this._running = true;
// update return sig to be number of events, and boolean?
const countEvents = await this.update();
this._lastExecutionTime = new Date();
return {
countEvents,
queuedCallersBehind: this._queueCounter > localCount,
canExecuteAgain: !this.isCaughtUp()
};
} finally {
this._running = false;
}
}
return { countEvents: 0, queuedCallersBehind: this._queueCounter > localCount, canExecuteAgain: false };
}

// Notifies of an event occuring in real-time via a websocket. Task decides how to proceed.
static async handleLiveEvent(event) {
throw new Error('Must be implemented by subclass');
}

/**
* Runs the task, updating as many blocks as possible
* @returns {Promise<{updateBlock: number, processedEvents: number}>}
*/
static async update() {
throw new Error('Must be implemented by subclass');
}

// Indicates if the task is caught up to the latest block as of its most recent update.
static isCaughtUp() {
return this._isCaughtUp;
}

static getLastExecutionTime() {
return this._lastExecutionTime;
}
}

module.exports = IndexingTask;
17 changes: 15 additions & 2 deletions src/scheduled/tasks/deposits.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,29 @@ const { percentDiff } = require('../../utils/number');
const Log = require('../../utils/logging');
const SiloService = require('../../service/silo-service');
const TaskRangeUtil = require('../util/task-range');
const IndexingTask = require('./IndexingTask');

// If the BDV has changed by at least these amounts, update lambda stats
const DEFAULT_UPDATE_THRESHOLD = 0.01;
const HOURLY_UPDATE_THRESHOLD = 0.005;
// Maximum number of blocks to process in one invocation
const MAX_BLOCKS = 10000;

class DepositsTask {
class DepositsTask extends IndexingTask {
// Set by SunriseTask when a new season is encountered. Indicates that all deposits should be updated.
// This approach would not work if also taking deposit snapshots (this flag/behavior is only triggered in real-time).
static __seasonUpdate = false;

static async handleLiveEvent(event) {
// Deposits task is not currently used for anything, therefore ok to update infrequently
if (event.name === 'Sunrise') {
await this.queueExecution({ blockNumber: event.rawLog.blockNumber });
}
// if (['AddDeposit', 'RemoveDeposit', 'RemoveDeposits', 'StalkBalanceChanged'].includes(event.name)) {
// await this.queueExecution();
// }
}

// Returns true if the task can be called again immediately
static async update() {
const meta = await AppMetaService.getLambdaMeta();
Expand Down Expand Up @@ -63,7 +74,9 @@ class DepositsTask {
});
DepositsTask.__seasonUpdate = false;

return !isCaughtUp;
this._isCaughtUp = isCaughtUp;
// Unknown number of events, this task should be refactrored to retrieve them upfront within this method instead of separately
return -1;
}

// Updates the list of deposits in the database, adding/removing entries as needed
Expand Down
17 changes: 14 additions & 3 deletions src/scheduled/tasks/inflows.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const SiloEvents = require('../../datasources/events/silo-events');
const SiloInflowService = require('../../service/inflow/silo-inflow-service');
const SiloInflowSnapshotService = require('../../service/inflow/silo-inflow-snapshot-service');
const PriceService = require('../../service/price-service');
const IndexingTask = require('./IndexingTask');

// Maximum number of blocks to process in one invocation
const MAX_BLOCKS = 2000;
Expand All @@ -21,8 +22,17 @@ const FIELD_EVENTS = new Set(['Sow', 'Harvest', 'PodListingFilled', 'PodOrderFil
const SILO_EVENTS = new Set(['AddDeposit', 'RemoveDeposit', 'RemoveDeposits', 'Plant', 'Convert', 'ClaimPlenty']);
const ALL_EVENTS = [...FIELD_EVENTS, ...SILO_EVENTS];

class InflowsTask {
// Returns true if the task can be called again immediately
class InflowsTask extends IndexingTask {
static async handleLiveEvent(event) {
// Inflows are only used for snapshots currently, therefore update on Sunrise only
if (event.name === 'Sunrise') {
await this.queueExecution({ blockNumber: event.rawLog.blockNumber });
}
// if (ALL_EVENTS.includes(event.name)) {
// await this.queueExecution();
// }
}

static async update() {
const meta = await AppMetaService.getInflowMeta();
let { isInitialized, lastUpdate, updateBlock, isCaughtUp } = await TaskRangeUtil.getUpdateInfo(meta, MAX_BLOCKS);
Expand Down Expand Up @@ -95,7 +105,8 @@ class InflowsTask {
await AppMetaService.setLastInflowUpdate(updateBlock);
});

return !isCaughtUp;
this._isCaughtUp = isCaughtUp;
return events.length;
}
}
module.exports = InflowsTask;
2 changes: 1 addition & 1 deletion src/scheduled/tasks/sunrise.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class SunriseTask {

try {
// Insert basic season info
await SeasonService.insertSeasonFromEvent(nextSeason);
await SeasonService.insertSeason(nextSeason);

// Update whitelisted token info
const tokenModels = await SiloService.updateWhitelistedTokenInfo();
Expand Down
14 changes: 11 additions & 3 deletions src/scheduled/tasks/tractor.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,21 @@ const AsyncContext = require('../../utils/async/context');
const EnvUtil = require('../../utils/env');
const Log = require('../../utils/logging');
const TaskRangeUtil = require('../util/task-range');
const IndexingTask = require('./IndexingTask');

// Maximum number of blocks to process in one invocation
const MAX_BLOCKS = 10000;

const SNAPSHOT_SERVICES = [SnapshotSowV0Service, SnapshotConvertUpV0Service];

class TractorTask {
// Returns true if the task can be called again immediately
class TractorTask extends IndexingTask {
static async handleLiveEvent(event) {
if (['Sunrise', 'PublishRequisition', 'CancelBlueprint', 'Tractor'].includes(event.name)) {
await this.queueExecution({ blockNumber: event.rawLog.blockNumber });
}
// Silo events could trigger a periodicUpdate, ignoring currently
}

static async update() {
const meta = await AppMetaService.getTractorMeta();
if (!meta.lastUpdate) {
Expand Down Expand Up @@ -102,7 +109,8 @@ class TractorTask {
await AppMetaService.setLastTractorUpdate(updateBlock);
});

return !isCaughtUp;
this._isCaughtUp = isCaughtUp;
return events.length + sunrise.length;
}

static async handlePublishRequsition(event) {
Expand Down
Loading