From 4d0e5bd78bb291b303c89c01de0477b9bc982f34 Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Wed, 4 Oct 2023 17:39:37 +0400 Subject: [PATCH 1/2] perf(connection pool): create a pool of websocket connections to speedup indexing affects: @joystream/hydra-indexer --- packages/hydra-indexer/src/node/config.ts | 6 + .../src/substrate/SubstrateService.ts | 109 ++++++++++++++---- packages/hydra-indexer/src/substrate/index.ts | 4 +- 3 files changed, 94 insertions(+), 25 deletions(-) diff --git a/packages/hydra-indexer/src/node/config.ts b/packages/hydra-indexer/src/node/config.ts index 5065397db..226bee4f0 100644 --- a/packages/hydra-indexer/src/node/config.ts +++ b/packages/hydra-indexer/src/node/config.ts @@ -29,6 +29,7 @@ let conf: { BLOCK_PRODUCER_FETCH_RETRIES: number SUBSTRATE_API_TIMEOUT: number SUBSTRATE_API_CALL_RETRIES: number + SUBSTRATE_API_POOL_SIZE: number NEW_BLOCK_TIMEOUT_MS: number HEADER_CACHE_CAPACITY: number FINALITY_THRESHOLD: number @@ -179,6 +180,11 @@ export function configure(): void { default: 5, desc: 'Number of times an API call is retried before giving up and throwing and error', }), + // Api connection pool size + SUBSTRATE_API_POOL_SIZE: num({ + default: 2, + desc: 'Number of websocket connections to setup for API calls', + }), // If the block producer does not recieve a new block within this time limit, // panic and thow an error. This is needed to prevent the situation when the diff --git a/packages/hydra-indexer/src/substrate/SubstrateService.ts b/packages/hydra-indexer/src/substrate/SubstrateService.ts index 74c4e13eb..15e954238 100644 --- a/packages/hydra-indexer/src/substrate/SubstrateService.ts +++ b/packages/hydra-indexer/src/substrate/SubstrateService.ts @@ -26,17 +26,17 @@ import BN from 'bn.js' import { BlockData } from '../model' import { eventEmitter, IndexerEvents } from '../node/event-emitter' import { Subscription } from 'rxjs' +import { ApiDecoration } from '@polkadot/api/types' const debug = Debug('hydra-indexer:substrate-service') export class SubstrateService implements ISubstrateService { private shouldStop = false - private api: ApiPromise | undefined + private apiPool: ApiPromise[] = [] - async init(): Promise { + async init({ poolSize }: { poolSize: number }): Promise { debug(`Initializing SubstrateService`) - await this.connect() - + await this.createConnectionPool(poolSize) eventEmitter.on(IndexerEvents.INDEXER_STOP, async () => await this.stop()) pForever(async () => { @@ -48,12 +48,16 @@ export class SubstrateService implements ISubstrateService { }) } - private async connect() { - this.api = await createApiPromise() + private async createConnectionPool(size: number) { + const poolSize = Math.max(2, size) + debug(`Creating Api pool size=${poolSize}`) + this.apiPool = await Promise.all( + Array(poolSize).fill(null).map(createApiPromise) + ) - const subscriptions = this.subscribeToHeads(this.api) + const subscriptions = this.subscribeToHeads(this.apiPool[0]) - this.api + this.apiPool[0] .once('disconnected', () => { debug('Api disconnected') }) @@ -62,19 +66,36 @@ export class SubstrateService implements ISubstrateService { }) .once('decorated', async () => { debug('Api decorated') + const currentPool = this.apiPool subscriptions.forEach((sub) => sub.unsubscribe()) - const oldApi = this.api - await this.connect() + await this.createConnectionPool(currentPool.length) // allow short time for running queries to complete await delay(1000) try { - oldApi?.isConnected && (await oldApi?.disconnect()) + await Promise.all(currentPool.map((api) => api.disconnect())) } catch (err) { debug(`Error trying to disconnection Api ${err}`) } }) } + private *_getApiRoundRobin(): Generator { + let ix = 0 + while (true) { + if (ix + 1 >= this.apiPool.length) { + ix = 0 + } + if (this.apiPool.length === 0) { + throw Error('Api pool is empty') + } + yield this.apiPool[ix++] + } + } + + private nextApi(): ApiPromise { + return this._getApiRoundRobin().next().value + } + async getHeader(hash: Hash | Uint8Array | string): Promise
{ return this.apiCall( (api) => api.rpc.chain.getHeader(hash), @@ -89,7 +110,7 @@ export class SubstrateService implements ISubstrateService { ) } - subscribeToHeads(api: ApiPromise): Subscription[] { + private subscribeToHeads(api: ApiPromise): Subscription[] { debug(`Subscribing to new heads`) return [ api.rx.rpc.chain.subscribeFinalizedHeads().subscribe({ @@ -140,8 +161,9 @@ export class SubstrateService implements ISubstrateService { hash: Hash | Uint8Array | string ): Promise { debug(`Fething events. BlockHash: ${JSON.stringify(hash)}`) - return this.apiCall( - (api) => api.query.system.events.at(hash), + return this.apiCallAt( + hash, + (api) => api.query.system.events(), `get block events of block ${JSON.stringify(hash)}` ) } @@ -157,10 +179,45 @@ export class SubstrateService implements ISubstrateService { 'The indexer is stopping, aborting all API calls' ) } - if (!this.api || !this.api.isConnected) { - throw Error(`Api connection not ready`) + + return promiseFn(this.nextApi()) + }, + { + retries: getConfig().SUBSTRATE_API_CALL_RETRIES, + onFailedAttempt: async (i) => { + debug( + `Failed to execute "${functionName}" after ${i.attemptNumber} attempts. Retries left: ${i.retriesLeft}` + ) + await delay(200) + }, + } + ) + } + + private async apiCallAt( + hash: Hash | Uint8Array | string, + promiseFn: (api: ApiDecoration<'promise'>) => Promise, + functionName = 'api request' + ): Promise { + return pRetry( + async (attempt: number) => { + if (this.shouldStop) { + throw new pRetry.AbortError( + 'The indexer is stopping, aborting all API calls' + ) + } + + // on first attempt try runtime versioned guess, on second attempt let the api get the version from chain + + const api = await this.nextApi().at(hash) + try { + return await promiseFn(api) + } catch (err) { + debug( + `api.at call error with runtime version ${api.runtimeVersion}, hash=${hash}` + ) + throw err } - return promiseFn(this.api) }, { retries: getConfig().SUBSTRATE_API_CALL_RETRIES, @@ -211,8 +268,9 @@ export class SubstrateService implements ISubstrateService { } async timestamp(hash: Hash): Promise { - return this.apiCall( - (api) => api.query.timestamp.now.at(hash), + return this.apiCallAt( + hash, + (api) => api.query.timestamp.now(), 'get timestamp' ) } @@ -220,8 +278,9 @@ export class SubstrateService implements ISubstrateService { async lastRuntimeUpgrade( hash: Hash ): Promise { - const info = await this.apiCall( - (api) => api.query.system.lastRuntimeUpgrade.at(hash), + const info = await this.apiCallAt( + hash, + (api) => api.query.system.lastRuntimeUpgrade(), 'get last runtime upgrade' ) return info.unwrapOr(undefined) @@ -230,9 +289,11 @@ export class SubstrateService implements ISubstrateService { async stop(): Promise { debug(`Stopping substrate service`) this.shouldStop = true - if (this.api && this.api.isConnected) { - await this.api.disconnect() - } + this.apiPool.map(async (api) => { + if (api && api.isConnected) { + await api.disconnect() + } + }) debug(`Done`) } } diff --git a/packages/hydra-indexer/src/substrate/index.ts b/packages/hydra-indexer/src/substrate/index.ts index edb7219f0..8dabd8b56 100644 --- a/packages/hydra-indexer/src/substrate/index.ts +++ b/packages/hydra-indexer/src/substrate/index.ts @@ -19,7 +19,9 @@ export async function getSubstrateService(): Promise { return substrateService } substrateService = new SubstrateService() - await (substrateService as SubstrateService).init() + await (substrateService as SubstrateService).init({ + poolSize: getConfig().SUBSTRATE_API_POOL_SIZE, + }) return substrateService } From b74b2f00bf48fc62ae7d8e9585be05360811edf8 Mon Sep 17 00:00:00 2001 From: Mokhtar Naamani Date: Thu, 5 Oct 2023 10:04:03 +0400 Subject: [PATCH 2/2] style(remove unused argument): linter fixes affects: @joystream/hydra-indexer --- packages/hydra-indexer/src/substrate/SubstrateService.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/hydra-indexer/src/substrate/SubstrateService.ts b/packages/hydra-indexer/src/substrate/SubstrateService.ts index 15e954238..7c2f95ad5 100644 --- a/packages/hydra-indexer/src/substrate/SubstrateService.ts +++ b/packages/hydra-indexer/src/substrate/SubstrateService.ts @@ -200,15 +200,13 @@ export class SubstrateService implements ISubstrateService { functionName = 'api request' ): Promise { return pRetry( - async (attempt: number) => { + async () => { if (this.shouldStop) { throw new pRetry.AbortError( 'The indexer is stopping, aborting all API calls' ) } - // on first attempt try runtime versioned guess, on second attempt let the api get the version from chain - const api = await this.nextApi().at(hash) try { return await promiseFn(api)