Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/hydra-indexer/src/node/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ let conf: {
BLOCK_PRODUCER_FETCH_RETRIES: number
SUBSTRATE_API_TIMEOUT: number
SUBSTRATE_API_CALL_RETRIES: number
SUBSTRATE_API_POOL_SIZE: number
NEW_BLOCK_TIMEOUT_MS: number
HEADER_CACHE_CAPACITY: number
FINALITY_THRESHOLD: number
Expand Down Expand Up @@ -179,6 +180,11 @@ export function configure(): void {
default: 5,
desc: 'Number of times an API call is retried before giving up and throwing and error',
}),
// Api connection pool size
SUBSTRATE_API_POOL_SIZE: num({
default: 2,
desc: 'Number of websocket connections to setup for API calls',
}),

// If the block producer does not recieve a new block within this time limit,
// panic and thow an error. This is needed to prevent the situation when the
Expand Down
109 changes: 84 additions & 25 deletions packages/hydra-indexer/src/substrate/SubstrateService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ import BN from 'bn.js'
import { BlockData } from '../model'
import { eventEmitter, IndexerEvents } from '../node/event-emitter'
import { Subscription } from 'rxjs'
import { ApiDecoration } from '@polkadot/api/types'

const debug = Debug('hydra-indexer:substrate-service')

export class SubstrateService implements ISubstrateService {
private shouldStop = false
private api: ApiPromise | undefined
private apiPool: ApiPromise[] = []

async init(): Promise<void> {
async init({ poolSize }: { poolSize: number }): Promise<void> {
debug(`Initializing SubstrateService`)
await this.connect()

await this.createConnectionPool(poolSize)
eventEmitter.on(IndexerEvents.INDEXER_STOP, async () => await this.stop())

pForever(async () => {
Expand All @@ -48,12 +48,16 @@ export class SubstrateService implements ISubstrateService {
})
}

private async connect() {
this.api = await createApiPromise()
private async createConnectionPool(size: number) {
const poolSize = Math.max(2, size)
debug(`Creating Api pool size=${poolSize}`)
this.apiPool = await Promise.all(
Array(poolSize).fill(null).map(createApiPromise)
)

const subscriptions = this.subscribeToHeads(this.api)
const subscriptions = this.subscribeToHeads(this.apiPool[0])

this.api
this.apiPool[0]
.once('disconnected', () => {
debug('Api disconnected')
})
Expand All @@ -62,19 +66,36 @@ export class SubstrateService implements ISubstrateService {
})
.once('decorated', async () => {
debug('Api decorated')
const currentPool = this.apiPool
subscriptions.forEach((sub) => sub.unsubscribe())
const oldApi = this.api
await this.connect()
await this.createConnectionPool(currentPool.length)
// allow short time for running queries to complete
await delay(1000)
try {
oldApi?.isConnected && (await oldApi?.disconnect())
await Promise.all(currentPool.map((api) => api.disconnect()))
} catch (err) {
debug(`Error trying to disconnection Api ${err}`)
}
})
}

private *_getApiRoundRobin(): Generator<ApiPromise> {
let ix = 0
while (true) {
if (ix + 1 >= this.apiPool.length) {
ix = 0
}
if (this.apiPool.length === 0) {
throw Error('Api pool is empty')
}
yield this.apiPool[ix++]
}
}

private nextApi(): ApiPromise {
return this._getApiRoundRobin().next().value
}

async getHeader(hash: Hash | Uint8Array | string): Promise<Header> {
return this.apiCall(
(api) => api.rpc.chain.getHeader(hash),
Expand All @@ -89,7 +110,7 @@ export class SubstrateService implements ISubstrateService {
)
}

subscribeToHeads(api: ApiPromise): Subscription[] {
private subscribeToHeads(api: ApiPromise): Subscription[] {
debug(`Subscribing to new heads`)
return [
api.rx.rpc.chain.subscribeFinalizedHeads().subscribe({
Expand Down Expand Up @@ -140,8 +161,9 @@ export class SubstrateService implements ISubstrateService {
hash: Hash | Uint8Array | string
): Promise<EventRecord[] & Codec> {
debug(`Fething events. BlockHash: ${JSON.stringify(hash)}`)
return this.apiCall(
(api) => api.query.system.events.at(hash),
return this.apiCallAt(
hash,
(api) => api.query.system.events(),
`get block events of block ${JSON.stringify(hash)}`
)
}
Expand All @@ -157,10 +179,43 @@ export class SubstrateService implements ISubstrateService {
'The indexer is stopping, aborting all API calls'
)
}
if (!this.api || !this.api.isConnected) {
throw Error(`Api connection not ready`)

return promiseFn(this.nextApi())
},
{
retries: getConfig().SUBSTRATE_API_CALL_RETRIES,
onFailedAttempt: async (i) => {
debug(
`Failed to execute "${functionName}" after ${i.attemptNumber} attempts. Retries left: ${i.retriesLeft}`
)
await delay(200)
},
}
)
}

private async apiCallAt<T>(
hash: Hash | Uint8Array | string,
promiseFn: (api: ApiDecoration<'promise'>) => Promise<T>,
functionName = 'api request'
): Promise<T> {
return pRetry(
async () => {
if (this.shouldStop) {
throw new pRetry.AbortError(
'The indexer is stopping, aborting all API calls'
)
}

const api = await this.nextApi().at(hash)
try {
return await promiseFn(api)
} catch (err) {
debug(
`api.at call error with runtime version ${api.runtimeVersion}, hash=${hash}`
)
throw err
}
return promiseFn(this.api)
},
{
retries: getConfig().SUBSTRATE_API_CALL_RETRIES,
Expand Down Expand Up @@ -211,17 +266,19 @@ export class SubstrateService implements ISubstrateService {
}

async timestamp(hash: Hash): Promise<BN> {
return this.apiCall(
(api) => api.query.timestamp.now.at(hash),
return this.apiCallAt(
hash,
(api) => api.query.timestamp.now(),
'get timestamp'
)
}

async lastRuntimeUpgrade(
hash: Hash
): Promise<LastRuntimeUpgradeInfo | undefined> {
const info = await this.apiCall(
(api) => api.query.system.lastRuntimeUpgrade.at(hash),
const info = await this.apiCallAt(
hash,
(api) => api.query.system.lastRuntimeUpgrade(),
'get last runtime upgrade'
)
return info.unwrapOr(undefined)
Expand All @@ -230,10 +287,12 @@ export class SubstrateService implements ISubstrateService {
async stop(): Promise<void> {
debug(`Stopping substrate service`)
this.shouldStop = true
if (this.api && this.api.isConnected) {
await this.api.disconnect()
debug(`Api disconnected`)
}
this.apiPool.map(async (api) => {
if (api && api.isConnected) {
await api.disconnect()
debug(`Api disconnected`)
}
})
debug(`Done`)
}
}
4 changes: 3 additions & 1 deletion packages/hydra-indexer/src/substrate/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ export async function getSubstrateService(): Promise<ISubstrateService> {
return substrateService
}
substrateService = new SubstrateService()
await (substrateService as SubstrateService).init()
await (substrateService as SubstrateService).init({
poolSize: getConfig().SUBSTRATE_API_POOL_SIZE,
})
return substrateService
}

Expand Down