Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions multinode/mock_pool_chain_info_provider_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions multinode/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CH
return nil, err
}

c.lggr.Debugw("Switched to a new active node due to prev node heath issues", "prevNode", prevNodeName, "newNode", c.activeNode.String())
c.lggr.Debugw("Switched to a new active node due to prev node health issues", "prevNode", prevNodeName, "newNode", c.activeNode.String())
return c.activeNode, err
}

Expand Down Expand Up @@ -250,17 +250,19 @@ func (c *MultiNode[CHAIN_ID, RPC]) awaitNodeSelection(ctx context.Context) (Node
}
}

// LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being marked as out-of-sync.
// Return highest ChainInfo most recently received by the alive nodes.
// LatestChainInfo returns the number of alive nodes in the pool (excluding the node identified by callerName
// from the count) and the highest ChainInfo most recently received by alive nodes.
// E.g. If Node A's the most recent block is 10 and highest 15 and for Node B it's - 12 and 14. This method will return 12.
func (c *MultiNode[CHAIN_ID, RPC]) LatestChainInfo() (int, ChainInfo) {
func (c *MultiNode[CHAIN_ID, RPC]) LatestChainInfo(callerName string) (int, ChainInfo) {
var nLiveNodes int
ch := ChainInfo{
TotalDifficulty: big.NewInt(0),
}
for _, n := range c.primaryNodes {
if s, nodeChainInfo := n.StateAndLatest(); s == nodeStateAlive {
nLiveNodes++
if n.Name() != callerName {
nLiveNodes++
}
ch.BlockNumber = max(ch.BlockNumber, nodeChainInfo.BlockNumber)
ch.FinalizedBlockNumber = max(ch.FinalizedBlockNumber, nodeChainInfo.FinalizedBlockNumber)
ch.TotalDifficulty = MaxTotalDifficulty(ch.TotalDifficulty, nodeChainInfo.TotalDifficulty)
Expand Down
5 changes: 3 additions & 2 deletions multinode/multi_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,14 +602,15 @@ func TestMultiNode_ChainInfo(t *testing.T) {
for i := range testCases {
tc := testCases[i]
t.Run(tc.Name, func(t *testing.T) {
for _, params := range tc.NodeParams {
for i, params := range tc.NodeParams {
node := newMockNode[ID, multiNodeRPCClient](t)
mn.primaryNodes = append(mn.primaryNodes, node)
node.On("Name").Return(fmt.Sprintf("node_%d", i)).Maybe()
node.On("StateAndLatest").Return(params.State, params.LatestChainInfo)
node.On("HighestUserObservations").Return(params.HighestUserObservations)
}

nNodes, latestChainInfo := mn.LatestChainInfo()
nNodes, latestChainInfo := mn.LatestChainInfo("")
assert.Equal(t, tc.ExpectedNLiveNodes, nNodes)
assert.Equal(t, tc.ExpectedLatestChainInfo, latestChainInfo)

Expand Down
16 changes: 8 additions & 8 deletions multinode/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,17 @@
if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold {
lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState())
if n.poolInfoProvider != nil {
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC {
if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC {
lggr.Criticalf("RPC endpoint failed to respond to polls; %s %s", msgCannotDisable, msgDegradedState)
continue
}
}
n.declareUnreachable()
return
}
if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync {
// note: there must be another live node for us to be out of sync
if liveNodes < 2 && !n.isLoadBalancedRPC {
if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync {

Check failure on line 139 in multinode/node_lifecycle.go

View workflow job for this annotation

GitHub Actions / golangci-lint (multinode)

File is not properly formatted (goimports)
// note: there must be another live node for us to be out of sync
if liveNodes < 1 && !n.isLoadBalancedRPC {
lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)
continue
}
Expand Down Expand Up @@ -166,7 +166,7 @@
if n.poolInfoProvider != nil {
// if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove the comments?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed — restored all removed explanatory comments in aliveLoop (zombie guards for no-new-heads and no-new-finalized-heads sections, including proxy notes and check interval notes) and in outOfSyncLoop (load-balanced RPC note).

// if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC {
if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC {
lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)
// We don't necessarily want to wait the full timeout to check again, we should
// check regularly and log noisily in this state
Expand Down Expand Up @@ -194,7 +194,7 @@
if n.poolInfoProvider != nil {
// if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo)
// if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC {
if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC {
lggr.Criticalf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState)
// We don't necessarily want to wait the full timeout to check again, we should
// check regularly and log noisily in this state
Expand Down Expand Up @@ -342,7 +342,7 @@
return // disabled
}
// Check against best node
ln, ci := n.poolInfoProvider.LatestChainInfo()
ln, ci := n.poolInfoProvider.LatestChainInfo(n.name)
localChainInfo, _ := n.rpc.GetInterceptedChainInfo()
mode := n.nodePoolCfg.SelectionMode()
switch mode {
Expand Down Expand Up @@ -459,7 +459,7 @@
lggr.Debugw(msgReceivedBlock, "blockNumber", head.BlockNumber(), "blockDifficulty", head.BlockDifficulty(), "syncIssues", syncIssues)
case <-time.After(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold)):
if n.poolInfoProvider != nil {
if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 1 {
if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 {
if n.isLoadBalancedRPC {
// in case all rpcs behind a load balanced rpc are out of sync, we need to declare out of sync to prevent false transition to alive
n.declareOutOfSync(syncIssues)
Expand Down
31 changes: 15 additions & 16 deletions multinode/node_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
})
defer func() { assert.NoError(t, node.close()) }()
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
BlockNumber: 20,
}).Once()
node.SetPoolChainInfoProvider(poolInfo)
Expand All @@ -218,7 +218,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
})
defer func() { assert.NoError(t, node.close()) }()
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
BlockNumber: 20,
}).Once()
node.SetPoolChainInfoProvider(poolInfo)
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
const mostRecentBlock = 20
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30})
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(10, ChainInfo{
poolInfo.On("LatestChainInfo", mock.Anything).Return(10, ChainInfo{
BlockNumber: syncThreshold + mostRecentBlock + 1,
TotalDifficulty: big.NewInt(10),
})
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
const mostRecentBlock = 20
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30})
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
BlockNumber: syncThreshold + mostRecentBlock + 1,
TotalDifficulty: big.NewInt(10),
})
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
const mostRecentBlock = 20
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}).Twice()
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
BlockNumber: syncThreshold + mostRecentBlock + 1,
TotalDifficulty: big.NewInt(10),
})
Expand Down Expand Up @@ -389,7 +389,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
})
defer func() { assert.NoError(t, node.close()) }()
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
BlockNumber: 20,
TotalDifficulty: big.NewInt(10),
}).Once()
Expand All @@ -414,7 +414,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
})
defer func() { assert.NoError(t, node.close()) }()
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
BlockNumber: 20,
TotalDifficulty: big.NewInt(10),
}).Once()
Expand Down Expand Up @@ -640,7 +640,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
})
defer func() { assert.NoError(t, node.close()) }()
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
BlockNumber: 20,
TotalDifficulty: big.NewInt(10),
}).Once()
Expand Down Expand Up @@ -668,13 +668,12 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
})
defer func() { assert.NoError(t, node.close()) }()
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
BlockNumber: 20,
TotalDifficulty: big.NewInt(10),
}).Once()
node.SetPoolChainInfoProvider(poolInfo)
// tries to redial in outOfSync
// tries to redial in outOfSync
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) {
assert.Equal(t, nodeStateOutOfSync, node.State())
}).Once()
Expand Down Expand Up @@ -1043,7 +1042,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
})
defer func() { assert.NoError(t, node.close()) }()
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(0, ChainInfo{
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
BlockNumber: 100,
TotalDifficulty: big.NewInt(200),
})
Expand Down Expand Up @@ -1081,7 +1080,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
})
defer func() { assert.NoError(t, node.close()) }()
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(0, ChainInfo{
poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{
BlockNumber: 100,
TotalDifficulty: big.NewInt(200),
})
Expand Down Expand Up @@ -1121,7 +1120,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) {
defer func() { assert.NoError(t, node.close()) }()
poolInfo := newMockPoolChainInfoProvider(t)
const highestBlock = 20
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{
poolInfo.On("LatestChainInfo", mock.Anything).Return(1, ChainInfo{
BlockNumber: highestBlock * 2,
TotalDifficulty: big.NewInt(200),
})
Expand Down Expand Up @@ -1774,7 +1773,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
config: testNodeConfig{syncThreshold: 1},
})
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(1, ChainInfo{}).Once()
poolInfo.On("LatestChainInfo", mock.Anything).Return(1, ChainInfo{}).Once()
node.SetPoolChainInfoProvider(poolInfo)
assert.Panics(t, func() {
_, _ = node.isOutOfSyncWithPool()
Expand Down Expand Up @@ -1820,7 +1819,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
},
})
poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(nodesNum, ChainInfo{
poolInfo.On("LatestChainInfo", mock.Anything).Return(nodesNum, ChainInfo{
BlockNumber: highestBlock,
TotalDifficulty: big.NewInt(totalDifficulty),
})
Expand Down Expand Up @@ -1880,7 +1879,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) {
})

poolInfo := newMockPoolChainInfoProvider(t)
poolInfo.On("LatestChainInfo").Return(nodesNum, ChainInfo{
poolInfo.On("LatestChainInfo", mock.Anything).Return(nodesNum, ChainInfo{
BlockNumber: highestBlock,
TotalDifficulty: big.NewInt(totalDifficulty),
})
Expand Down
5 changes: 3 additions & 2 deletions multinode/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ type Head interface {

// PoolChainInfoProvider - provides aggregation of nodes pool ChainInfo
type PoolChainInfoProvider interface {
// LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being
// LatestChainInfo returns the number of live nodes available in the pool (excluding the node identified by
// callerName from the count), so we can prevent the last alive node in a pool from being
// moved to out-of-sync state. It is better to have one out-of-sync node than no nodes at all.
// Returns highest latest ChainInfo within the alive nodes. E.g. most recent block number and highest block number
// observed by Node A are 10 and 15; Node B - 12 and 14. This method will return 12.
LatestChainInfo() (int, ChainInfo)
LatestChainInfo(callerName string) (int, ChainInfo)
// HighestUserObservations - returns highest ChainInfo ever observed by any user of MultiNode.
HighestUserObservations() ChainInfo
}
Expand Down
Loading