diff --git a/multinode/mock_pool_chain_info_provider_test.go b/multinode/mock_pool_chain_info_provider_test.go index 3dcd76c..770a54a 100644 --- a/multinode/mock_pool_chain_info_provider_test.go +++ b/multinode/mock_pool_chain_info_provider_test.go @@ -62,9 +62,9 @@ func (_c *mockPoolChainInfoProvider_HighestUserObservations_Call) RunAndReturn(r return _c } -// LatestChainInfo provides a mock function with no fields -func (_m *mockPoolChainInfoProvider) LatestChainInfo() (int, ChainInfo) { - ret := _m.Called() +// LatestChainInfo provides a mock function with given fields: callerName +func (_m *mockPoolChainInfoProvider) LatestChainInfo(callerName string) (int, ChainInfo) { + ret := _m.Called(callerName) if len(ret) == 0 { panic("no return value specified for LatestChainInfo") @@ -72,17 +72,17 @@ func (_m *mockPoolChainInfoProvider) LatestChainInfo() (int, ChainInfo) { var r0 int var r1 ChainInfo - if rf, ok := ret.Get(0).(func() (int, ChainInfo)); ok { - return rf() + if rf, ok := ret.Get(0).(func(string) (int, ChainInfo)); ok { + return rf(callerName) } - if rf, ok := ret.Get(0).(func() int); ok { - r0 = rf() + if rf, ok := ret.Get(0).(func(string) int); ok { + r0 = rf(callerName) } else { r0 = ret.Get(0).(int) } - if rf, ok := ret.Get(1).(func() ChainInfo); ok { - r1 = rf() + if rf, ok := ret.Get(1).(func(string) ChainInfo); ok { + r1 = rf(callerName) } else { r1 = ret.Get(1).(ChainInfo) } @@ -96,13 +96,14 @@ type mockPoolChainInfoProvider_LatestChainInfo_Call struct { } // LatestChainInfo is a helper method to define mock.On call -func (_e *mockPoolChainInfoProvider_Expecter) LatestChainInfo() *mockPoolChainInfoProvider_LatestChainInfo_Call { - return &mockPoolChainInfoProvider_LatestChainInfo_Call{Call: _e.mock.On("LatestChainInfo")} +// - callerName string +func (_e *mockPoolChainInfoProvider_Expecter) LatestChainInfo(callerName interface{}) *mockPoolChainInfoProvider_LatestChainInfo_Call { + return &mockPoolChainInfoProvider_LatestChainInfo_Call{Call: _e.mock.On("LatestChainInfo", callerName)} } -func (_c *mockPoolChainInfoProvider_LatestChainInfo_Call) Run(run func()) *mockPoolChainInfoProvider_LatestChainInfo_Call { +func (_c *mockPoolChainInfoProvider_LatestChainInfo_Call) Run(run func(callerName string)) *mockPoolChainInfoProvider_LatestChainInfo_Call { _c.Call.Run(func(args mock.Arguments) { - run() + run(args[0].(string)) }) return _c } @@ -112,7 +113,7 @@ func (_c *mockPoolChainInfoProvider_LatestChainInfo_Call) Return(_a0 int, _a1 Ch return _c } -func (_c *mockPoolChainInfoProvider_LatestChainInfo_Call) RunAndReturn(run func() (int, ChainInfo)) *mockPoolChainInfoProvider_LatestChainInfo_Call { +func (_c *mockPoolChainInfoProvider_LatestChainInfo_Call) RunAndReturn(run func(string) (int, ChainInfo)) *mockPoolChainInfoProvider_LatestChainInfo_Call { _c.Call.Return(run) return _c } diff --git a/multinode/multi_node.go b/multinode/multi_node.go index 3f8007c..2cca96c 100644 --- a/multinode/multi_node.go +++ b/multinode/multi_node.go @@ -222,7 +222,7 @@ func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CH return nil, err } - c.lggr.Debugw("Switched to a new active node due to prev node heath issues", "prevNode", prevNodeName, "newNode", c.activeNode.String()) + c.lggr.Debugw("Switched to a new active node due to prev node health issues", "prevNode", prevNodeName, "newNode", c.activeNode.String()) return c.activeNode, err } @@ -250,17 +250,19 @@ func (c *MultiNode[CHAIN_ID, RPC]) awaitNodeSelection(ctx context.Context) (Node } } -// LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being marked as out-of-sync. -// Return highest ChainInfo most recently received by the alive nodes. +// LatestChainInfo returns the number of alive nodes in the pool (excluding the node identified by callerName +// from the count) and the highest ChainInfo most recently received by alive nodes. // E.g. If Node A's the most recent block is 10 and highest 15 and for Node B it's - 12 and 14. This method will return 12. -func (c *MultiNode[CHAIN_ID, RPC]) LatestChainInfo() (int, ChainInfo) { +func (c *MultiNode[CHAIN_ID, RPC]) LatestChainInfo(callerName string) (int, ChainInfo) { var nLiveNodes int ch := ChainInfo{ TotalDifficulty: big.NewInt(0), } for _, n := range c.primaryNodes { if s, nodeChainInfo := n.StateAndLatest(); s == nodeStateAlive { - nLiveNodes++ + if n.Name() != callerName { + nLiveNodes++ + } ch.BlockNumber = max(ch.BlockNumber, nodeChainInfo.BlockNumber) ch.FinalizedBlockNumber = max(ch.FinalizedBlockNumber, nodeChainInfo.FinalizedBlockNumber) ch.TotalDifficulty = MaxTotalDifficulty(ch.TotalDifficulty, nodeChainInfo.TotalDifficulty) diff --git a/multinode/multi_node_test.go b/multinode/multi_node_test.go index efde024..fea7fe6 100644 --- a/multinode/multi_node_test.go +++ b/multinode/multi_node_test.go @@ -602,14 +602,15 @@ func TestMultiNode_ChainInfo(t *testing.T) { for i := range testCases { tc := testCases[i] t.Run(tc.Name, func(t *testing.T) { - for _, params := range tc.NodeParams { + for i, params := range tc.NodeParams { node := newMockNode[ID, multiNodeRPCClient](t) mn.primaryNodes = append(mn.primaryNodes, node) + node.On("Name").Return(fmt.Sprintf("node_%d", i)).Maybe() node.On("StateAndLatest").Return(params.State, params.LatestChainInfo) node.On("HighestUserObservations").Return(params.HighestUserObservations) } - nNodes, latestChainInfo := mn.LatestChainInfo() + nNodes, latestChainInfo := mn.LatestChainInfo("") assert.Equal(t, tc.ExpectedNLiveNodes, nNodes) assert.Equal(t, tc.ExpectedLatestChainInfo, latestChainInfo) diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index e2974c0..8fd4e9d 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -128,7 +128,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold { lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState()) if n.poolInfoProvider != nil { - if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC { + if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC { lggr.Criticalf("RPC endpoint failed to respond to polls; %s %s", msgCannotDisable, msgDegradedState) continue } @@ -138,7 +138,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { } if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync { // note: there must be another live node for us to be out of sync - if liveNodes < 2 && !n.isLoadBalancedRPC { + if liveNodes < 1 && !n.isLoadBalancedRPC { lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState) continue } @@ -166,7 +166,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { if n.poolInfoProvider != nil { // if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo) // if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc - if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC { + if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC { lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState) // We don't necessarily want to wait the full timeout to check again, we should // check regularly and log noisily in this state @@ -194,7 +194,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { if n.poolInfoProvider != nil { // if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo) // if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc - if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC { + if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC { lggr.Criticalf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState) // We don't necessarily want to wait the full timeout to check again, we should // check regularly and log noisily in this state @@ -342,7 +342,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool() (outOfSync bool, liveN return // disabled } // Check against best node - ln, ci := n.poolInfoProvider.LatestChainInfo() + ln, ci := n.poolInfoProvider.LatestChainInfo(n.name) localChainInfo, _ := n.rpc.GetInterceptedChainInfo() mode := n.nodePoolCfg.SelectionMode() switch mode { @@ -459,7 +459,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { lggr.Debugw(msgReceivedBlock, "blockNumber", head.BlockNumber(), "blockDifficulty", head.BlockDifficulty(), "syncIssues", syncIssues) case <-time.After(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold)): if n.poolInfoProvider != nil { - if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 1 { + if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 { if n.isLoadBalancedRPC { // in case all rpcs behind a load balanced rpc are out of sync, we need to declare out of sync to prevent false transition to alive n.declareOutOfSync(syncIssues) diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index 684d0c7..f1eb250 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -191,7 +191,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ BlockNumber: 20, }).Once() node.SetPoolChainInfoProvider(poolInfo) @@ -218,7 +218,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ BlockNumber: 20, }).Once() node.SetPoolChainInfoProvider(poolInfo) @@ -250,7 +250,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(10, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(10, ChainInfo{ BlockNumber: syncThreshold + mostRecentBlock + 1, TotalDifficulty: big.NewInt(10), }) @@ -285,7 +285,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ BlockNumber: syncThreshold + mostRecentBlock + 1, TotalDifficulty: big.NewInt(10), }) @@ -313,7 +313,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}).Twice() poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ BlockNumber: syncThreshold + mostRecentBlock + 1, TotalDifficulty: big.NewInt(10), }) @@ -389,7 +389,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ BlockNumber: 20, TotalDifficulty: big.NewInt(10), }).Once() @@ -414,7 +414,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ BlockNumber: 20, TotalDifficulty: big.NewInt(10), }).Once() @@ -640,7 +640,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ BlockNumber: 20, TotalDifficulty: big.NewInt(10), }).Once() @@ -668,13 +668,12 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ BlockNumber: 20, TotalDifficulty: big.NewInt(10), }).Once() node.SetPoolChainInfoProvider(poolInfo) // tries to redial in outOfSync - // tries to redial in outOfSync rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) { assert.Equal(t, nodeStateOutOfSync, node.State()) }).Once() @@ -1043,7 +1042,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(0, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ BlockNumber: 100, TotalDifficulty: big.NewInt(200), }) @@ -1081,7 +1080,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(0, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ BlockNumber: 100, TotalDifficulty: big.NewInt(200), }) @@ -1121,7 +1120,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { defer func() { assert.NoError(t, node.close()) }() poolInfo := newMockPoolChainInfoProvider(t) const highestBlock = 20 - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(1, ChainInfo{ BlockNumber: highestBlock * 2, TotalDifficulty: big.NewInt(200), }) @@ -1774,7 +1773,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { config: testNodeConfig{syncThreshold: 1}, }) poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{}).Once() + poolInfo.On("LatestChainInfo", mock.Anything).Return(1, ChainInfo{}).Once() node.SetPoolChainInfoProvider(poolInfo) assert.Panics(t, func() { _, _ = node.isOutOfSyncWithPool() @@ -1820,7 +1819,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { }, }) poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(nodesNum, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(nodesNum, ChainInfo{ BlockNumber: highestBlock, TotalDifficulty: big.NewInt(totalDifficulty), }) @@ -1880,7 +1879,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { }) poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(nodesNum, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(nodesNum, ChainInfo{ BlockNumber: highestBlock, TotalDifficulty: big.NewInt(totalDifficulty), }) diff --git a/multinode/types.go b/multinode/types.go index b31c6ca..6400b2d 100644 --- a/multinode/types.go +++ b/multinode/types.go @@ -89,11 +89,12 @@ type Head interface { // PoolChainInfoProvider - provides aggregation of nodes pool ChainInfo type PoolChainInfoProvider interface { - // LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being + // LatestChainInfo returns the number of live nodes available in the pool (excluding the node identified by + // callerName from the count), so we can prevent the last alive node in a pool from being // moved to out-of-sync state. It is better to have one out-of-sync node than no nodes at all. // Returns highest latest ChainInfo within the alive nodes. E.g. most recent block number and highest block number // observed by Node A are 10 and 15; Node B - 12 and 14. This method will return 12. - LatestChainInfo() (int, ChainInfo) + LatestChainInfo(callerName string) (int, ChainInfo) // HighestUserObservations - returns highest ChainInfo ever observed by any user of MultiNode. HighestUserObservations() ChainInfo }