From 1d3921980c40d685da97eb8cc934c09aecfebf1a Mon Sep 17 00:00:00 2001 From: James Walker Date: Wed, 25 Feb 2026 18:38:02 -0500 Subject: [PATCH 1/5] zombie nodes --- multinode/multi_node.go | 64 +++++++++- multinode/multi_node_test.go | 133 +++++++++++++++++++- multinode/node_lifecycle.go | 82 ++---------- multinode/node_lifecycle_test.go | 207 +++---------------------------- 4 files changed, 220 insertions(+), 266 deletions(-) diff --git a/multinode/multi_node.go b/multinode/multi_node.go index 3f8007c..dc7b368 100644 --- a/multinode/multi_node.go +++ b/multinode/multi_node.go @@ -3,6 +3,7 @@ package multinode import ( "context" "fmt" + "math" "math/big" "slices" "sync" @@ -191,6 +192,8 @@ func (c *MultiNode[CHAIN_ID, RPC]) SelectRPC(ctx context.Context) (rpc RPC, err } // selectNode returns the active Node, if it is still nodeStateAlive, otherwise it selects a new one from the NodeSelector. +// If no alive node is available, it falls back to an out-of-sync node. If the current active node is out-of-sync, +// it will try to upgrade to an alive node when one becomes available. func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CHAIN_ID, RPC], err error) { if c.selectionMode == NodeSelectionModeRandomRPC { return c.awaitNodeSelection(ctx) @@ -211,6 +214,17 @@ func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CH return // another goroutine beat us here } + // If the current active node is out-of-sync, try to find an alive one first + if node != nil && isUsableState(node.State()) { + if aliveNode := c.nodeSelector.Select(); aliveNode != nil { + c.activeNode.UnsubscribeAllExceptAliveLoop() + c.activeNode = aliveNode + c.lggr.Debugw("Upgraded from out-of-sync to alive node", "prevNode", node.String(), "newNode", aliveNode.String()) + return c.activeNode, nil + } + return // keep using the out-of-sync node + } + var prevNodeName string if c.activeNode != nil { prevNodeName = c.activeNode.String() @@ -222,12 +236,13 @@ func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CH return nil, err } - c.lggr.Debugw("Switched to a new active node due to prev node heath issues", "prevNode", prevNodeName, "newNode", c.activeNode.String()) + c.lggr.Debugw("Switched to a new active node due to prev node health issues", "prevNode", prevNodeName, "newNode", c.activeNode.String()) return c.activeNode, err } // awaitNodeSelection blocks until nodeSelector returns a live node or all nodes -// finish initializing. Returns ErrNodeError when no live nodes are available. +// finish initializing. If no alive nodes are available, falls back to an out-of-sync node. +// Returns ErrNodeError when no usable nodes are available. func (c *MultiNode[CHAIN_ID, RPC]) awaitNodeSelection(ctx context.Context) (Node[CHAIN_ID, RPC], error) { for { node := c.nodeSelector.Select() @@ -244,14 +259,39 @@ func (c *MultiNode[CHAIN_ID, RPC]) awaitNodeSelection(ctx context.Context) (Node continue } } + if fallback := c.selectOutOfSyncNode(); fallback != nil { + c.lggr.Warnw("No alive RPC nodes available, falling back to out-of-sync node", "node", fallback.String()) + return fallback, nil + } c.lggr.Criticalw("No live RPC nodes available", "NodeSelectionMode", c.nodeSelector.Name()) c.eng.EmitHealthErr(fmt.Errorf("no live nodes available for chain %s", c.chainID.String())) return nil, ErrNodeError } } -// LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being marked as out-of-sync. -// Return highest ChainInfo most recently received by the alive nodes. +// selectOutOfSyncNode picks the best out-of-sync node by highest block number. +// Returns nil if no out-of-sync nodes are available. +func (c *MultiNode[CHAIN_ID, RPC]) selectOutOfSyncNode() Node[CHAIN_ID, RPC] { + var bestNode Node[CHAIN_ID, RPC] + var bestBlock int64 = math.MinInt64 + for _, n := range c.primaryNodes { + if isUsableState(n.State()) { + _, ci := n.StateAndLatest() + if ci.BlockNumber > bestBlock { + bestBlock = ci.BlockNumber + bestNode = n + } + } + } + return bestNode +} + +// isUsableState returns true for out-of-sync states that can still serve requests as a fallback. +func isUsableState(s nodeState) bool { + return s == nodeStateOutOfSync || s == nodeStateFinalizedBlockOutOfSync +} + +// LatestChainInfo returns the number of alive nodes in the pool and the highest ChainInfo most recently received by those nodes. // E.g. If Node A's the most recent block is 10 and highest 15 and for Node B it's - 12 and 14. This method will return 12. func (c *MultiNode[CHAIN_ID, RPC]) LatestChainInfo() (int, ChainInfo) { var nLiveNodes int @@ -296,6 +336,22 @@ func (c *MultiNode[CHAIN_ID, RPC]) checkLease() { c.activeMu.Lock() defer c.activeMu.Unlock() + + if bestNode == nil { + // No alive node available; if the current active is still usable (out-of-sync), keep it + if c.activeNode != nil && isUsableState(c.activeNode.State()) { + return + } + // Try out-of-sync fallback + if fallback := c.selectOutOfSyncNode(); fallback != nil && fallback != c.activeNode { + if c.activeNode != nil { + c.activeNode.UnsubscribeAllExceptAliveLoop() + } + c.activeNode = fallback + } + return + } + if bestNode != c.activeNode { if c.activeNode != nil { c.activeNode.UnsubscribeAllExceptAliveLoop() diff --git a/multinode/multi_node_test.go b/multinode/multi_node_test.go index efde024..81dea7b 100644 --- a/multinode/multi_node_test.go +++ b/multinode/multi_node_test.go @@ -378,8 +378,8 @@ func TestMultiNode_selectNode(t *testing.T) { activeNode, err := mn.selectNode(ctx) require.NoError(t, err) require.Equal(t, oldBest.String(), activeNode.String()) - // old best died, so we should replace it - oldBest.On("State").Return(nodeStateOutOfSync).Twice() + // old best is out-of-sync, a new alive node is available via selector + oldBest.On("State").Return(nodeStateOutOfSync).Maybe() nodeSelector.On("Select").Return(newBest).Once() newActiveNode, err := mn.selectNode(ctx) require.NoError(t, err) @@ -404,6 +404,135 @@ func TestMultiNode_selectNode(t *testing.T) { require.Nil(t, node) tests.RequireLogMessage(t, observedLogs, "No live RPC nodes available") }) + t.Run("Falls back to out-of-sync node when no alive nodes available", func(t *testing.T) { + t.Parallel() + ctx := tests.Context(t) + chainID := RandomID() + lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) + oosNode := newMockNode[ID, multiNodeRPCClient](t) + oosNode.On("State").Return(nodeStateOutOfSync).Maybe() + oosNode.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 50}).Maybe() + oosNode.On("String").Return("oosNode").Maybe() + mn := newTestMultiNode(t, multiNodeOpts{ + selectionMode: NodeSelectionModeRoundRobin, + chainID: chainID, + nodes: []Node[ID, multiNodeRPCClient]{oosNode}, + logger: lggr, + }) + nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t) + nodeSelector.On("Select").Return(nil) + mn.nodeSelector = nodeSelector + selected, err := mn.selectNode(ctx) + require.NoError(t, err) + assert.Equal(t, oosNode, selected) + tests.RequireLogMessage(t, observedLogs, "No alive RPC nodes available, falling back to out-of-sync node") + }) + t.Run("Falls back to FinalizedBlockOutOfSync node when no alive nodes available", func(t *testing.T) { + t.Parallel() + ctx := tests.Context(t) + chainID := RandomID() + lggr, _ := logger.TestObserved(t, zap.WarnLevel) + fbOosNode := newMockNode[ID, multiNodeRPCClient](t) + fbOosNode.On("State").Return(nodeStateFinalizedBlockOutOfSync).Maybe() + fbOosNode.On("StateAndLatest").Return(nodeStateFinalizedBlockOutOfSync, ChainInfo{BlockNumber: 80}).Maybe() + fbOosNode.On("String").Return("fbOosNode").Maybe() + mn := newTestMultiNode(t, multiNodeOpts{ + selectionMode: NodeSelectionModeRoundRobin, + chainID: chainID, + nodes: []Node[ID, multiNodeRPCClient]{fbOosNode}, + logger: lggr, + }) + nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t) + nodeSelector.On("Select").Return(nil) + mn.nodeSelector = nodeSelector + selected, err := mn.selectNode(ctx) + require.NoError(t, err) + assert.Equal(t, fbOosNode, selected) + }) + t.Run("Selects best out-of-sync node by highest block number", func(t *testing.T) { + t.Parallel() + ctx := tests.Context(t) + chainID := RandomID() + lggr, _ := logger.TestObserved(t, zap.WarnLevel) + oosLow := newMockNode[ID, multiNodeRPCClient](t) + oosLow.On("State").Return(nodeStateOutOfSync).Maybe() + oosLow.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 30}).Maybe() + oosLow.On("String").Return("oosLow").Maybe() + oosHigh := newMockNode[ID, multiNodeRPCClient](t) + oosHigh.On("State").Return(nodeStateOutOfSync).Maybe() + oosHigh.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 90}).Maybe() + oosHigh.On("String").Return("oosHigh").Maybe() + mn := newTestMultiNode(t, multiNodeOpts{ + selectionMode: NodeSelectionModeRoundRobin, + chainID: chainID, + nodes: []Node[ID, multiNodeRPCClient]{oosLow, oosHigh}, + logger: lggr, + }) + nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t) + nodeSelector.On("Select").Return(nil) + mn.nodeSelector = nodeSelector + selected, err := mn.selectNode(ctx) + require.NoError(t, err) + assert.Equal(t, oosHigh, selected) + }) + t.Run("Keeps out-of-sync active node when no alive node becomes available", func(t *testing.T) { + t.Parallel() + ctx := tests.Context(t) + chainID := RandomID() + oosNode := newMockNode[ID, multiNodeRPCClient](t) + oosNode.On("State").Return(nodeStateOutOfSync).Maybe() + oosNode.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 50}).Maybe() + oosNode.On("String").Return("oosNode").Maybe() + mn := newTestMultiNode(t, multiNodeOpts{ + selectionMode: NodeSelectionModeRoundRobin, + chainID: chainID, + nodes: []Node[ID, multiNodeRPCClient]{oosNode}, + }) + nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t) + nodeSelector.On("Select").Return(nil) + mn.nodeSelector = nodeSelector + // First call falls back to out-of-sync + first, err := mn.selectNode(ctx) + require.NoError(t, err) + assert.Equal(t, oosNode, first) + // Second call: still no alive, keeps the same out-of-sync node + second, err := mn.selectNode(ctx) + require.NoError(t, err) + assert.Equal(t, oosNode, second) + }) + t.Run("Upgrades from out-of-sync active to alive node when one becomes available", func(t *testing.T) { + t.Parallel() + ctx := tests.Context(t) + chainID := RandomID() + lggr, _ := logger.TestObserved(t, zap.DebugLevel) + oosNode := newMockNode[ID, multiNodeRPCClient](t) + oosNode.On("State").Return(nodeStateOutOfSync).Maybe() + oosNode.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 50}).Maybe() + oosNode.On("String").Return("oosNode").Maybe() + oosNode.On("UnsubscribeAllExceptAliveLoop").Maybe() + aliveNode := newMockNode[ID, multiNodeRPCClient](t) + aliveNode.On("State").Return(nodeStateAlive).Maybe() + aliveNode.On("String").Return("aliveNode").Maybe() + mn := newTestMultiNode(t, multiNodeOpts{ + selectionMode: NodeSelectionModeRoundRobin, + chainID: chainID, + nodes: []Node[ID, multiNodeRPCClient]{oosNode, aliveNode}, + logger: lggr, + }) + nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t) + // First select returns nil (no alive), second returns alive + nodeSelector.On("Select").Return(nil).Once() + mn.nodeSelector = nodeSelector + // First selection falls back to out-of-sync + first, err := mn.selectNode(ctx) + require.NoError(t, err) + assert.Equal(t, oosNode, first) + // Now an alive node becomes available + nodeSelector.On("Select").Return(aliveNode).Once() + second, err := mn.selectNode(ctx) + require.NoError(t, err) + assert.Equal(t, aliveNode, second) + }) } func TestMultiNode_RandomRPC(t *testing.T) { diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index e2974c0..2052d17 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -8,27 +8,9 @@ import ( "time" "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink-common/pkg/utils" bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math" ) -// zombieNodeCheckInterval controls how often to re-check to see if we need to -// state change in case we have to force a state transition due to no available -// nodes. -// NOTE: This only applies to out-of-sync nodes if they are the last available node -func zombieNodeCheckInterval(noNewHeadsThreshold time.Duration) time.Duration { - interval := noNewHeadsThreshold - if interval <= 0 || interval > QueryTimeout { - interval = QueryTimeout - } - return utils.WithJitter(interval) -} - -const ( - msgCannotDisable = "but cannot disable this connection because there are no other RPC endpoints, or all other RPC endpoints are dead." - msgDegradedState = "Chainlink is now operating in a degraded state and urgent action is required to resolve the issue" -) - // Node is a FSM // Each state has a loop that goes with it, which monitors the node and moves it into another state as necessary. // Only one loop must run at a time. @@ -125,26 +107,15 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { n.metrics.IncrementPollsSuccess(ctx, n.name) pollFailures = 0 } - if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold { - lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState()) - if n.poolInfoProvider != nil { - if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC { - lggr.Criticalf("RPC endpoint failed to respond to polls; %s %s", msgCannotDisable, msgDegradedState) - continue - } - } - n.declareUnreachable() - return - } - if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync { - // note: there must be another live node for us to be out of sync - if liveNodes < 2 && !n.isLoadBalancedRPC { - lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState) - continue - } - n.declareOutOfSync(syncStatusNotInSyncWithPool) - return - } + if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold { + lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState()) + n.declareUnreachable() + return + } + if outOfSync, _ := n.isOutOfSyncWithPool(); outOfSync { + n.declareOutOfSync(syncStatusNotInSyncWithPool) + return + } case bh, open := <-headsSub.Heads: if !open { lggr.Errorw("Subscription channel unexpectedly closed", "nodeState", n.getCachedState()) @@ -163,17 +134,6 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { // We haven't received a head on the channel for at least the // threshold amount of time, mark it broken lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, localHighestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold) - if n.poolInfoProvider != nil { - // if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo) - // if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc - if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC { - lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState) - // We don't necessarily want to wait the full timeout to check again, we should - // check regularly and log noisily in this state - headsSub.ResetTimer(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold)) - continue - } - } n.declareOutOfSync(syncStatusNoNewHead) return case latestFinalized, open := <-finalizedHeadsSub.Heads: @@ -191,17 +151,6 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { // We haven't received a finalized head on the channel for at least the // threshold amount of time, mark it broken lggr.Errorw(fmt.Sprintf("RPC's finalized state is out of sync; no new finalized heads received for %s (last finalized head received was %v)", noNewFinalizedBlocksTimeoutThreshold, localHighestChainInfo.FinalizedBlockNumber), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber) - if n.poolInfoProvider != nil { - // if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo) - // if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc - if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 2 && !n.isLoadBalancedRPC { - lggr.Criticalf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState) - // We don't necessarily want to wait the full timeout to check again, we should - // check regularly and log noisily in this state - finalizedHeadsSub.ResetTimer(zombieNodeCheckInterval(noNewFinalizedBlocksTimeoutThreshold)) - continue - } - } n.declareOutOfSync(syncStatusNoNewFinalizedHead) return case <-finalizedHeadsSub.Errors: @@ -457,19 +406,6 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { } lggr.Debugw(msgReceivedBlock, "blockNumber", head.BlockNumber(), "blockDifficulty", head.BlockDifficulty(), "syncIssues", syncIssues) - case <-time.After(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold)): - if n.poolInfoProvider != nil { - if l, _ := n.poolInfoProvider.LatestChainInfo(); l < 1 { - if n.isLoadBalancedRPC { - // in case all rpcs behind a load balanced rpc are out of sync, we need to declare out of sync to prevent false transition to alive - n.declareOutOfSync(syncIssues) - return - } - lggr.Criticalw("RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state", "syncIssues", syncIssues) - n.declareInSync() - return - } - } case err := <-headsSub.Errors: lggr.Errorw("Subscription was terminated", "err", err) n.declareUnreachable() diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index 684d0c7..6813c87 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -176,7 +176,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return nodeStateUnreachable == node.State() }) }) - t.Run("with threshold poll failures, but we are the last node alive, forcibly keeps it alive", func(t *testing.T) { + t.Run("with threshold poll failures and last node alive, transitions to unreachable", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) @@ -190,41 +190,10 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { lggr: lggr, }) defer func() { assert.NoError(t, node.close()) }() - poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ - BlockNumber: 20, - }).Once() - node.SetPoolChainInfoProvider(poolInfo) - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: 20}, ChainInfo{BlockNumber: 20}) - pollError := errors.New("failed to get ClientVersion") - rpc.On("ClientVersion", mock.Anything).Return("", pollError) - node.declareAlive() - tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold)) - assert.Equal(t, nodeStateAlive, node.State()) - }) - t.Run("with threshold poll failures, we are the last node alive, but is a proxy, transitions to unreachable", func(t *testing.T) { - t.Parallel() - rpc := newMockRPCClient[ID, Head](t) - lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) - const pollFailureThreshold = 3 - node := newSubscribedNode(t, testNodeOpts{ - config: testNodeConfig{ - pollFailureThreshold: pollFailureThreshold, - pollInterval: tests.TestInterval, - }, - rpc: rpc, - lggr: lggr, - isLoadBalancedRPC: true, - }) - defer func() { assert.NoError(t, node.close()) }() - poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ - BlockNumber: 20, - }).Once() - node.SetPoolChainInfoProvider(poolInfo) rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: 20}, ChainInfo{BlockNumber: 20}) pollError := errors.New("failed to get ClientVersion") rpc.On("ClientVersion", mock.Anything).Return("", pollError) + rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() node.declareAlive() tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold)) tests.AssertEventually(t, func() bool { @@ -266,7 +235,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { node.declareAlive() tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable") }) - t.Run("when behind more than SyncThreshold but we are the last live node, forcibly stays alive", func(t *testing.T) { + t.Run("when behind more than SyncThreshold and last live node, transitions to out of sync", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) @@ -290,44 +259,15 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { TotalDifficulty: big.NewInt(10), }) node.SetPoolChainInfoProvider(poolInfo) - node.declareAlive() - tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)) - }) - t.Run("when behind more than SyncThreshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) { - t.Parallel() - rpc := newMockRPCClient[ID, Head](t) - lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) - const syncThreshold = 10 - node := newSubscribedNode(t, testNodeOpts{ - config: testNodeConfig{ - pollInterval: tests.TestInterval, - syncThreshold: syncThreshold, - selectionMode: NodeSelectionModeRoundRobin, - }, - rpc: rpc, - lggr: lggr, - isLoadBalancedRPC: true, - }) - defer func() { assert.NoError(t, node.close()) }() - rpc.On("ClientVersion", mock.Anything).Return("", nil) - const mostRecentBlock = 20 - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}).Twice() - poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ - BlockNumber: syncThreshold + mostRecentBlock + 1, - TotalDifficulty: big.NewInt(10), - }) - node.SetPoolChainInfoProvider(poolInfo) - // tries to redial in outOfSync rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) { assert.Equal(t, nodeStateOutOfSync, node.State()) }).Once() - rpc.On("Dial", mock.Anything).Run(func(_ mock.Arguments) { - require.Equal(t, nodeStateOutOfSync, node.State()) - }).Return(errors.New("failed to dial")).Maybe() + rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() node.declareAlive() tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable") - assert.Equal(t, nodeStateUnreachable, node.State()) + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateUnreachable + }) }) t.Run("when behind but SyncThreshold=0, stay alive", func(t *testing.T) { t.Parallel() @@ -374,7 +314,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return node.State() == nodeStateUnreachable }) }) - t.Run("when no new heads received for threshold but we are the last live node, forcibly stays alive", func(t *testing.T) { + t.Run("when no new heads received for threshold and last live node, transitions to out of sync", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) @@ -388,45 +328,15 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc: rpc, }) defer func() { assert.NoError(t, node.close()) }() - poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ - BlockNumber: 20, - TotalDifficulty: big.NewInt(10), - }).Once() - node.SetPoolChainInfoProvider(poolInfo) - node.declareAlive() - tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)) - assert.Equal(t, nodeStateAlive, node.State()) - }) - t.Run("when no new heads received for threshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) { - t.Parallel() - rpc := newMockRPCClient[ID, Head](t) - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) - lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) - node := newSubscribedNode(t, testNodeOpts{ - config: testNodeConfig{}, - lggr: lggr, - chainConfig: clientMocks.ChainConfig{ - NoNewHeadsThresholdVal: tests.TestInterval, - }, - rpc: rpc, - isLoadBalancedRPC: true, - }) - defer func() { assert.NoError(t, node.close()) }() - poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ - BlockNumber: 20, - TotalDifficulty: big.NewInt(10), - }).Once() - node.SetPoolChainInfoProvider(poolInfo) - // tries to redial in outOfSync rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) { assert.Equal(t, nodeStateOutOfSync, node.State()) }).Once() rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() node.declareAlive() - tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable") - assert.Equal(t, nodeStateUnreachable, node.State()) + tests.AssertLogEventually(t, observedLogs, "RPC endpoint detected out of sync") + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateUnreachable + }) }) t.Run("rpc closed head channel", func(t *testing.T) { t.Parallel() @@ -622,7 +532,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return node.State() == nodeStateUnreachable }) }) - t.Run("when no new finalized heads received for threshold but we are the last live node, forcibly stays alive", func(t *testing.T) { + t.Run("when no new finalized heads received for threshold and last live node, transitions to out of sync", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() @@ -639,49 +549,15 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { lggr: lggr, }) defer func() { assert.NoError(t, node.close()) }() - poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ - BlockNumber: 20, - TotalDifficulty: big.NewInt(10), - }).Once() - node.SetPoolChainInfoProvider(poolInfo) - node.declareAlive() - tests.AssertLogEventually(t, observed, fmt.Sprintf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState)) - assert.Equal(t, nodeStateAlive, node.State()) - }) - t.Run("when no new finalized heads received for threshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) { - t.Parallel() - rpc := newMockRPCClient[ID, Head](t) - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() - rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once() - lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) - noNewFinalizedHeadsThreshold := tests.TestInterval - node := newSubscribedNode(t, testNodeOpts{ - config: testNodeConfig{}, - chainConfig: clientMocks.ChainConfig{ - NoNewFinalizedHeadsThresholdVal: noNewFinalizedHeadsThreshold, - IsFinalityTagEnabled: true, - }, - rpc: rpc, - lggr: lggr, - isLoadBalancedRPC: true, - }) - defer func() { assert.NoError(t, node.close()) }() - poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ - BlockNumber: 20, - TotalDifficulty: big.NewInt(10), - }).Once() - node.SetPoolChainInfoProvider(poolInfo) - // tries to redial in outOfSync - // tries to redial in outOfSync rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) { assert.Equal(t, nodeStateOutOfSync, node.State()) }).Once() rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() node.declareAlive() - tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable") - assert.Equal(t, nodeStateUnreachable, node.State()) + tests.AssertLogEventually(t, observed, fmt.Sprintf("RPC's finalized state is out of sync; no new finalized heads received for %s", noNewFinalizedHeadsThreshold)) + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateUnreachable + }) }) t.Run("If finalized subscription returns an error, transitions to unreachable", func(t *testing.T) { t.Parallel() @@ -1028,7 +904,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { return node.State() == nodeStateAlive }) }) - t.Run("becomes alive if there is no other nodes", func(t *testing.T) { + t.Run("stays out of sync when no other nodes are available", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) nodeChainID := RandomID() @@ -1042,12 +918,6 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { lggr: lggr, }) defer func() { assert.NoError(t, node.close()) }() - poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(0, ChainInfo{ - BlockNumber: 100, - TotalDifficulty: big.NewInt(200), - }) - node.SetPoolChainInfoProvider(poolInfo) rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) rpc.On("Dial", mock.Anything).Return(nil).Once() @@ -1057,48 +927,11 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { outOfSyncSubscription.On("Err").Return((<-chan error)(nil)) outOfSyncSubscription.On("Unsubscribe").Once() rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), outOfSyncSubscription, nil).Once() - setupRPCForAliveLoop(t, rpc) node.declareOutOfSync(syncStatusNoNewHead) - tests.AssertLogEventually(t, observedLogs, "RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state") - tests.AssertEventually(t, func() bool { - return node.State() == nodeStateAlive - }) - }) - t.Run("becomes alive if there is no other nodes, unless proxy", func(t *testing.T) { - t.Parallel() - rpc := newMockRPCClient[ID, Head](t) - nodeChainID := RandomID() - lggr, _ := logger.TestObserved(t, zap.DebugLevel) - node := newAliveNode(t, testNodeOpts{ - chainConfig: clientMocks.ChainConfig{ - NoNewHeadsThresholdVal: tests.TestInterval, - }, - rpc: rpc, - chainID: nodeChainID, - lggr: lggr, - isLoadBalancedRPC: true, - }) - defer func() { assert.NoError(t, node.close()) }() - poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(0, ChainInfo{ - BlockNumber: 100, - TotalDifficulty: big.NewInt(200), - }) - node.SetPoolChainInfoProvider(poolInfo) - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) - - rpc.On("Dial", mock.Anything).Return(nil).Once() - rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once() - - outOfSyncSubscription := newMockSubscription(t) - outOfSyncSubscription.On("Err").Return((<-chan error)(nil)) - outOfSyncSubscription.On("Unsubscribe").Once() - rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), outOfSyncSubscription, nil).Once() - rpc.On("Dial", mock.Anything).Return(errors.New("failed to redial")).Maybe() - node.declareOutOfSync(syncStatusNoNewHead) + tests.AssertLogEventually(t, observedLogs, "No new heads received for") tests.AssertEventually(t, func() bool { - return node.State() == nodeStateUnreachable + return node.State() == nodeStateOutOfSync }) }) t.Run("Stays out-of-sync if received new head, but lags behind pool", func(t *testing.T) { From e083737dbbbaa8263daa3d5eb305db8aad3afdb8 Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 26 Feb 2026 11:30:45 -0500 Subject: [PATCH 2/5] update --- .../mock_pool_chain_info_provider_test.go | 29 +-- multinode/multi_node.go | 8 +- multinode/multi_node_test.go | 5 +- multinode/node_lifecycle.go | 74 ++++++- multinode/node_lifecycle_test.go | 180 +++++++++++++++--- multinode/types.go | 6 +- 6 files changed, 247 insertions(+), 55 deletions(-) diff --git a/multinode/mock_pool_chain_info_provider_test.go b/multinode/mock_pool_chain_info_provider_test.go index 3dcd76c..770a54a 100644 --- a/multinode/mock_pool_chain_info_provider_test.go +++ b/multinode/mock_pool_chain_info_provider_test.go @@ -62,9 +62,9 @@ func (_c *mockPoolChainInfoProvider_HighestUserObservations_Call) RunAndReturn(r return _c } -// LatestChainInfo provides a mock function with no fields -func (_m *mockPoolChainInfoProvider) LatestChainInfo() (int, ChainInfo) { - ret := _m.Called() +// LatestChainInfo provides a mock function with given fields: callerName +func (_m *mockPoolChainInfoProvider) LatestChainInfo(callerName string) (int, ChainInfo) { + ret := _m.Called(callerName) if len(ret) == 0 { panic("no return value specified for LatestChainInfo") @@ -72,17 +72,17 @@ func (_m *mockPoolChainInfoProvider) LatestChainInfo() (int, ChainInfo) { var r0 int var r1 ChainInfo - if rf, ok := ret.Get(0).(func() (int, ChainInfo)); ok { - return rf() + if rf, ok := ret.Get(0).(func(string) (int, ChainInfo)); ok { + return rf(callerName) } - if rf, ok := ret.Get(0).(func() int); ok { - r0 = rf() + if rf, ok := ret.Get(0).(func(string) int); ok { + r0 = rf(callerName) } else { r0 = ret.Get(0).(int) } - if rf, ok := ret.Get(1).(func() ChainInfo); ok { - r1 = rf() + if rf, ok := ret.Get(1).(func(string) ChainInfo); ok { + r1 = rf(callerName) } else { r1 = ret.Get(1).(ChainInfo) } @@ -96,13 +96,14 @@ type mockPoolChainInfoProvider_LatestChainInfo_Call struct { } // LatestChainInfo is a helper method to define mock.On call -func (_e *mockPoolChainInfoProvider_Expecter) LatestChainInfo() *mockPoolChainInfoProvider_LatestChainInfo_Call { - return &mockPoolChainInfoProvider_LatestChainInfo_Call{Call: _e.mock.On("LatestChainInfo")} +// - callerName string +func (_e *mockPoolChainInfoProvider_Expecter) LatestChainInfo(callerName interface{}) *mockPoolChainInfoProvider_LatestChainInfo_Call { + return &mockPoolChainInfoProvider_LatestChainInfo_Call{Call: _e.mock.On("LatestChainInfo", callerName)} } -func (_c *mockPoolChainInfoProvider_LatestChainInfo_Call) Run(run func()) *mockPoolChainInfoProvider_LatestChainInfo_Call { +func (_c *mockPoolChainInfoProvider_LatestChainInfo_Call) Run(run func(callerName string)) *mockPoolChainInfoProvider_LatestChainInfo_Call { _c.Call.Run(func(args mock.Arguments) { - run() + run(args[0].(string)) }) return _c } @@ -112,7 +113,7 @@ func (_c *mockPoolChainInfoProvider_LatestChainInfo_Call) Return(_a0 int, _a1 Ch return _c } -func (_c *mockPoolChainInfoProvider_LatestChainInfo_Call) RunAndReturn(run func() (int, ChainInfo)) *mockPoolChainInfoProvider_LatestChainInfo_Call { +func (_c *mockPoolChainInfoProvider_LatestChainInfo_Call) RunAndReturn(run func(string) (int, ChainInfo)) *mockPoolChainInfoProvider_LatestChainInfo_Call { _c.Call.Return(run) return _c } diff --git a/multinode/multi_node.go b/multinode/multi_node.go index dc7b368..da2a9fa 100644 --- a/multinode/multi_node.go +++ b/multinode/multi_node.go @@ -291,14 +291,18 @@ func isUsableState(s nodeState) bool { return s == nodeStateOutOfSync || s == nodeStateFinalizedBlockOutOfSync } -// LatestChainInfo returns the number of alive nodes in the pool and the highest ChainInfo most recently received by those nodes. +// LatestChainInfo returns the number of alive nodes in the pool (excluding the node identified by callerName) +// and the highest ChainInfo most recently received by those nodes. // E.g. If Node A's the most recent block is 10 and highest 15 and for Node B it's - 12 and 14. This method will return 12. -func (c *MultiNode[CHAIN_ID, RPC]) LatestChainInfo() (int, ChainInfo) { +func (c *MultiNode[CHAIN_ID, RPC]) LatestChainInfo(callerName string) (int, ChainInfo) { var nLiveNodes int ch := ChainInfo{ TotalDifficulty: big.NewInt(0), } for _, n := range c.primaryNodes { + if n.Name() == callerName { + continue + } if s, nodeChainInfo := n.StateAndLatest(); s == nodeStateAlive { nLiveNodes++ ch.BlockNumber = max(ch.BlockNumber, nodeChainInfo.BlockNumber) diff --git a/multinode/multi_node_test.go b/multinode/multi_node_test.go index 81dea7b..75a7703 100644 --- a/multinode/multi_node_test.go +++ b/multinode/multi_node_test.go @@ -731,14 +731,15 @@ func TestMultiNode_ChainInfo(t *testing.T) { for i := range testCases { tc := testCases[i] t.Run(tc.Name, func(t *testing.T) { - for _, params := range tc.NodeParams { + for i, params := range tc.NodeParams { node := newMockNode[ID, multiNodeRPCClient](t) mn.primaryNodes = append(mn.primaryNodes, node) + node.On("Name").Return(fmt.Sprintf("node_%d", i)).Maybe() node.On("StateAndLatest").Return(params.State, params.LatestChainInfo) node.On("HighestUserObservations").Return(params.HighestUserObservations) } - nNodes, latestChainInfo := mn.LatestChainInfo() + nNodes, latestChainInfo := mn.LatestChainInfo("") assert.Equal(t, tc.ExpectedNLiveNodes, nNodes) assert.Equal(t, tc.ExpectedLatestChainInfo, latestChainInfo) diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index 2052d17..a988b3a 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -8,9 +8,27 @@ import ( "time" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils" bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math" ) +// zombieNodeCheckInterval controls how often to re-check to see if we need to +// state change in case we have to force a state transition due to no available +// nodes. +// NOTE: This only applies to out-of-sync nodes if they are the last available node +func zombieNodeCheckInterval(noNewHeadsThreshold time.Duration) time.Duration { + interval := noNewHeadsThreshold + if interval <= 0 || interval > QueryTimeout { + interval = QueryTimeout + } + return utils.WithJitter(interval) +} + +const ( + msgCannotDisable = "but cannot disable this connection because there are no other RPC endpoints, or all other RPC endpoints are dead." + msgDegradedState = "Chainlink is now operating in a degraded state and urgent action is required to resolve the issue" +) + // Node is a FSM // Each state has a loop that goes with it, which monitors the node and moves it into another state as necessary. // Only one loop must run at a time. @@ -107,15 +125,25 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { n.metrics.IncrementPollsSuccess(ctx, n.name) pollFailures = 0 } - if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold { - lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState()) - n.declareUnreachable() - return - } - if outOfSync, _ := n.isOutOfSyncWithPool(); outOfSync { - n.declareOutOfSync(syncStatusNotInSyncWithPool) - return - } + if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold { + lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState()) + if n.poolInfoProvider != nil { + if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC { + lggr.Criticalf("RPC endpoint failed to respond to polls; %s %s", msgCannotDisable, msgDegradedState) + continue + } + } + n.declareUnreachable() + return + } + if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync { + if liveNodes < 1 && !n.isLoadBalancedRPC { + lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState) + continue + } + n.declareOutOfSync(syncStatusNotInSyncWithPool) + return + } case bh, open := <-headsSub.Heads: if !open { lggr.Errorw("Subscription channel unexpectedly closed", "nodeState", n.getCachedState()) @@ -134,6 +162,13 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { // We haven't received a head on the channel for at least the // threshold amount of time, mark it broken lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, localHighestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold) + if n.poolInfoProvider != nil { + if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC { + lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState) + headsSub.ResetTimer(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold)) + continue + } + } n.declareOutOfSync(syncStatusNoNewHead) return case latestFinalized, open := <-finalizedHeadsSub.Heads: @@ -151,6 +186,13 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { // We haven't received a finalized head on the channel for at least the // threshold amount of time, mark it broken lggr.Errorw(fmt.Sprintf("RPC's finalized state is out of sync; no new finalized heads received for %s (last finalized head received was %v)", noNewFinalizedBlocksTimeoutThreshold, localHighestChainInfo.FinalizedBlockNumber), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber) + if n.poolInfoProvider != nil { + if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC { + lggr.Criticalf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState) + finalizedHeadsSub.ResetTimer(zombieNodeCheckInterval(noNewFinalizedBlocksTimeoutThreshold)) + continue + } + } n.declareOutOfSync(syncStatusNoNewFinalizedHead) return case <-finalizedHeadsSub.Errors: @@ -291,7 +333,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool() (outOfSync bool, liveN return // disabled } // Check against best node - ln, ci := n.poolInfoProvider.LatestChainInfo() + ln, ci := n.poolInfoProvider.LatestChainInfo(n.name) localChainInfo, _ := n.rpc.GetInterceptedChainInfo() mode := n.nodePoolCfg.SelectionMode() switch mode { @@ -406,6 +448,18 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { } lggr.Debugw(msgReceivedBlock, "blockNumber", head.BlockNumber(), "blockDifficulty", head.BlockDifficulty(), "syncIssues", syncIssues) + case <-time.After(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold)): + if n.poolInfoProvider != nil { + if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 { + if n.isLoadBalancedRPC { + n.declareOutOfSync(syncIssues) + return + } + lggr.Criticalw("RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state", "syncIssues", syncIssues) + n.declareInSync() + return + } + } case err := <-headsSub.Errors: lggr.Errorw("Subscription was terminated", "err", err) n.declareUnreachable() diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index 6813c87..3cd8227 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -219,7 +219,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(10, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(10, ChainInfo{ BlockNumber: syncThreshold + mostRecentBlock + 1, TotalDifficulty: big.NewInt(10), }) @@ -235,7 +235,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { node.declareAlive() tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable") }) - t.Run("when behind more than SyncThreshold and last live node, transitions to out of sync", func(t *testing.T) { + t.Run("when behind more than SyncThreshold but we are the last live node, forcibly stays alive", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) @@ -254,7 +254,35 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ + BlockNumber: syncThreshold + mostRecentBlock + 1, + TotalDifficulty: big.NewInt(10), + }) + node.SetPoolChainInfoProvider(poolInfo) + node.declareAlive() + tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState)) + }) + t.Run("when behind more than SyncThreshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + const syncThreshold = 10 + node := newSubscribedNode(t, testNodeOpts{ + config: testNodeConfig{ + pollInterval: tests.TestInterval, + syncThreshold: syncThreshold, + selectionMode: NodeSelectionModeRoundRobin, + }, + rpc: rpc, + lggr: lggr, + isLoadBalancedRPC: true, + }) + defer func() { assert.NoError(t, node.close()) }() + rpc.On("ClientVersion", mock.Anything).Return("", nil) + const mostRecentBlock = 20 + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}).Twice() + poolInfo := newMockPoolChainInfoProvider(t) + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ BlockNumber: syncThreshold + mostRecentBlock + 1, TotalDifficulty: big.NewInt(10), }) @@ -262,12 +290,12 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) { assert.Equal(t, nodeStateOutOfSync, node.State()) }).Once() - rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() + rpc.On("Dial", mock.Anything).Run(func(_ mock.Arguments) { + require.Equal(t, nodeStateOutOfSync, node.State()) + }).Return(errors.New("failed to dial")).Maybe() node.declareAlive() tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable") - tests.AssertEventually(t, func() bool { - return node.State() == nodeStateUnreachable - }) + assert.Equal(t, nodeStateUnreachable, node.State()) }) t.Run("when behind but SyncThreshold=0, stay alive", func(t *testing.T) { t.Parallel() @@ -314,7 +342,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return node.State() == nodeStateUnreachable }) }) - t.Run("when no new heads received for threshold and last live node, transitions to out of sync", func(t *testing.T) { + t.Run("when no new heads received for threshold but we are the last live node, forcibly stays alive", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) @@ -328,15 +356,44 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc: rpc, }) defer func() { assert.NoError(t, node.close()) }() + poolInfo := newMockPoolChainInfoProvider(t) + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ + BlockNumber: 20, + TotalDifficulty: big.NewInt(10), + }).Once() + node.SetPoolChainInfoProvider(poolInfo) + node.declareAlive() + tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState)) + assert.Equal(t, nodeStateAlive, node.State()) + }) + t.Run("when no new heads received for threshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + node := newSubscribedNode(t, testNodeOpts{ + config: testNodeConfig{}, + lggr: lggr, + chainConfig: clientMocks.ChainConfig{ + NoNewHeadsThresholdVal: tests.TestInterval, + }, + rpc: rpc, + isLoadBalancedRPC: true, + }) + defer func() { assert.NoError(t, node.close()) }() + poolInfo := newMockPoolChainInfoProvider(t) + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ + BlockNumber: 20, + TotalDifficulty: big.NewInt(10), + }).Once() + node.SetPoolChainInfoProvider(poolInfo) rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) { assert.Equal(t, nodeStateOutOfSync, node.State()) }).Once() rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() node.declareAlive() - tests.AssertLogEventually(t, observedLogs, "RPC endpoint detected out of sync") - tests.AssertEventually(t, func() bool { - return node.State() == nodeStateUnreachable - }) + tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable") + assert.Equal(t, nodeStateUnreachable, node.State()) }) t.Run("rpc closed head channel", func(t *testing.T) { t.Parallel() @@ -532,7 +589,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return node.State() == nodeStateUnreachable }) }) - t.Run("when no new finalized heads received for threshold and last live node, transitions to out of sync", func(t *testing.T) { + t.Run("when no new finalized heads received for threshold but we are the last live node, forcibly stays alive", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() @@ -549,15 +606,47 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { lggr: lggr, }) defer func() { assert.NoError(t, node.close()) }() + poolInfo := newMockPoolChainInfoProvider(t) + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ + BlockNumber: 20, + TotalDifficulty: big.NewInt(10), + }).Once() + node.SetPoolChainInfoProvider(poolInfo) + node.declareAlive() + tests.AssertLogEventually(t, observed, fmt.Sprintf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState)) + assert.Equal(t, nodeStateAlive, node.State()) + }) + t.Run("when no new finalized heads received for threshold, we are the last live node, but is a proxy, transitions to out of sync -> unreachable", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Once() + rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), newSub(t), nil).Once() + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + noNewFinalizedHeadsThreshold := tests.TestInterval + node := newSubscribedNode(t, testNodeOpts{ + config: testNodeConfig{}, + chainConfig: clientMocks.ChainConfig{ + NoNewFinalizedHeadsThresholdVal: noNewFinalizedHeadsThreshold, + IsFinalityTagEnabled: true, + }, + rpc: rpc, + lggr: lggr, + isLoadBalancedRPC: true, + }) + defer func() { assert.NoError(t, node.close()) }() + poolInfo := newMockPoolChainInfoProvider(t) + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ + BlockNumber: 20, + TotalDifficulty: big.NewInt(10), + }).Once() + node.SetPoolChainInfoProvider(poolInfo) rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) { assert.Equal(t, nodeStateOutOfSync, node.State()) }).Once() rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() node.declareAlive() - tests.AssertLogEventually(t, observed, fmt.Sprintf("RPC's finalized state is out of sync; no new finalized heads received for %s", noNewFinalizedHeadsThreshold)) - tests.AssertEventually(t, func() bool { - return node.State() == nodeStateUnreachable - }) + tests.AssertLogEventually(t, observedLogs, "Dial failed: Node is unreachable") + assert.Equal(t, nodeStateUnreachable, node.State()) }) t.Run("If finalized subscription returns an error, transitions to unreachable", func(t *testing.T) { t.Parallel() @@ -904,7 +993,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { return node.State() == nodeStateAlive }) }) - t.Run("stays out of sync when no other nodes are available", func(t *testing.T) { + t.Run("becomes alive if there is no other nodes", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) nodeChainID := RandomID() @@ -918,6 +1007,12 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { lggr: lggr, }) defer func() { assert.NoError(t, node.close()) }() + poolInfo := newMockPoolChainInfoProvider(t) + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ + BlockNumber: 100, + TotalDifficulty: big.NewInt(200), + }) + node.SetPoolChainInfoProvider(poolInfo) rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) rpc.On("Dial", mock.Anything).Return(nil).Once() @@ -927,11 +1022,48 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { outOfSyncSubscription.On("Err").Return((<-chan error)(nil)) outOfSyncSubscription.On("Unsubscribe").Once() rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), outOfSyncSubscription, nil).Once() + setupRPCForAliveLoop(t, rpc) node.declareOutOfSync(syncStatusNoNewHead) - tests.AssertLogEventually(t, observedLogs, "No new heads received for") + tests.AssertLogEventually(t, observedLogs, "RPC endpoint is still out of sync, but there are no other available nodes. This RPC node will be forcibly moved back into the live pool in a degraded state") tests.AssertEventually(t, func() bool { - return node.State() == nodeStateOutOfSync + return node.State() == nodeStateAlive + }) + }) + t.Run("becomes alive if there is no other nodes, unless proxy", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + lggr, _ := logger.TestObserved(t, zap.DebugLevel) + node := newAliveNode(t, testNodeOpts{ + chainConfig: clientMocks.ChainConfig{ + NoNewHeadsThresholdVal: tests.TestInterval, + }, + rpc: rpc, + chainID: nodeChainID, + lggr: lggr, + isLoadBalancedRPC: true, + }) + defer func() { assert.NoError(t, node.close()) }() + poolInfo := newMockPoolChainInfoProvider(t) + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ + BlockNumber: 100, + TotalDifficulty: big.NewInt(200), + }) + node.SetPoolChainInfoProvider(poolInfo) + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) + + rpc.On("Dial", mock.Anything).Return(nil).Once() + rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once() + + outOfSyncSubscription := newMockSubscription(t) + outOfSyncSubscription.On("Err").Return((<-chan error)(nil)) + outOfSyncSubscription.On("Unsubscribe").Once() + rpc.On("SubscribeToHeads", mock.Anything).Return(make(<-chan Head), outOfSyncSubscription, nil).Once() + rpc.On("Dial", mock.Anything).Return(errors.New("failed to redial")).Maybe() + node.declareOutOfSync(syncStatusNoNewHead) + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateUnreachable }) }) t.Run("Stays out-of-sync if received new head, but lags behind pool", func(t *testing.T) { @@ -954,7 +1086,7 @@ func TestUnit_NodeLifecycle_outOfSyncLoop(t *testing.T) { defer func() { assert.NoError(t, node.close()) }() poolInfo := newMockPoolChainInfoProvider(t) const highestBlock = 20 - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(1, ChainInfo{ BlockNumber: highestBlock * 2, TotalDifficulty: big.NewInt(200), }) @@ -1607,7 +1739,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { config: testNodeConfig{syncThreshold: 1}, }) poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(1, ChainInfo{}).Once() + poolInfo.On("LatestChainInfo", mock.Anything).Return(1, ChainInfo{}).Once() node.SetPoolChainInfoProvider(poolInfo) assert.Panics(t, func() { _, _ = node.isOutOfSyncWithPool() @@ -1653,7 +1785,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { }, }) poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(nodesNum, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(nodesNum, ChainInfo{ BlockNumber: highestBlock, TotalDifficulty: big.NewInt(totalDifficulty), }) @@ -1713,7 +1845,7 @@ func TestUnit_NodeLifecycle_outOfSyncWithPool(t *testing.T) { }) poolInfo := newMockPoolChainInfoProvider(t) - poolInfo.On("LatestChainInfo").Return(nodesNum, ChainInfo{ + poolInfo.On("LatestChainInfo", mock.Anything).Return(nodesNum, ChainInfo{ BlockNumber: highestBlock, TotalDifficulty: big.NewInt(totalDifficulty), }) diff --git a/multinode/types.go b/multinode/types.go index b31c6ca..be8feea 100644 --- a/multinode/types.go +++ b/multinode/types.go @@ -89,11 +89,11 @@ type Head interface { // PoolChainInfoProvider - provides aggregation of nodes pool ChainInfo type PoolChainInfoProvider interface { - // LatestChainInfo - returns number of live nodes available in the pool, so we can prevent the last alive node in a pool from being - // moved to out-of-sync state. It is better to have one out-of-sync node than no nodes at all. + // LatestChainInfo returns the number of live nodes available in the pool (excluding the node identified by + // callerName), so we can prevent the last alive node from being moved to an unhealthy state. // Returns highest latest ChainInfo within the alive nodes. E.g. most recent block number and highest block number // observed by Node A are 10 and 15; Node B - 12 and 14. This method will return 12. - LatestChainInfo() (int, ChainInfo) + LatestChainInfo(callerName string) (int, ChainInfo) // HighestUserObservations - returns highest ChainInfo ever observed by any user of MultiNode. HighestUserObservations() ChainInfo } From 89eeb66ce4692a3622796c41284040823c6c0d09 Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 26 Feb 2026 12:46:52 -0500 Subject: [PATCH 3/5] fix comments --- multinode/multi_node.go | 21 ++++---------- multinode/multi_node_test.go | 56 ++++++++++++++++++++++++++++++------ 2 files changed, 53 insertions(+), 24 deletions(-) diff --git a/multinode/multi_node.go b/multinode/multi_node.go index da2a9fa..7b65fe1 100644 --- a/multinode/multi_node.go +++ b/multinode/multi_node.go @@ -260,7 +260,8 @@ func (c *MultiNode[CHAIN_ID, RPC]) awaitNodeSelection(ctx context.Context) (Node } } if fallback := c.selectOutOfSyncNode(); fallback != nil { - c.lggr.Warnw("No alive RPC nodes available, falling back to out-of-sync node", "node", fallback.String()) + c.lggr.Criticalw("No alive RPC nodes available, falling back to out-of-sync node", "node", fallback.String()) + c.eng.EmitHealthErr(fmt.Errorf("no alive nodes available for chain %s, using out-of-sync fallback", c.chainID.String())) return fallback, nil } c.lggr.Criticalw("No live RPC nodes available", "NodeSelectionMode", c.nodeSelector.Name()) @@ -287,8 +288,9 @@ func (c *MultiNode[CHAIN_ID, RPC]) selectOutOfSyncNode() Node[CHAIN_ID, RPC] { } // isUsableState returns true for out-of-sync states that can still serve requests as a fallback. +// nodeStateFinalizedBlockOutOfSync is intentionally excluded to prevent local finality violations. func isUsableState(s nodeState) bool { - return s == nodeStateOutOfSync || s == nodeStateFinalizedBlockOutOfSync + return s == nodeStateOutOfSync } // LatestChainInfo returns the number of alive nodes in the pool (excluding the node identified by callerName) @@ -342,21 +344,10 @@ func (c *MultiNode[CHAIN_ID, RPC]) checkLease() { defer c.activeMu.Unlock() if bestNode == nil { - // No alive node available; if the current active is still usable (out-of-sync), keep it - if c.activeNode != nil && isUsableState(c.activeNode.State()) { - return - } - // Try out-of-sync fallback - if fallback := c.selectOutOfSyncNode(); fallback != nil && fallback != c.activeNode { - if c.activeNode != nil { - c.activeNode.UnsubscribeAllExceptAliveLoop() - } - c.activeNode = fallback - } - return + bestNode = c.selectOutOfSyncNode() } - if bestNode != c.activeNode { + if bestNode != nil && bestNode != c.activeNode { if c.activeNode != nil { c.activeNode.UnsubscribeAllExceptAliveLoop() } diff --git a/multinode/multi_node_test.go b/multinode/multi_node_test.go index 75a7703..52b5249 100644 --- a/multinode/multi_node_test.go +++ b/multinode/multi_node_test.go @@ -413,10 +413,13 @@ func TestMultiNode_selectNode(t *testing.T) { oosNode.On("State").Return(nodeStateOutOfSync).Maybe() oosNode.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 50}).Maybe() oosNode.On("String").Return("oosNode").Maybe() + unreachableNode := newMockNode[ID, multiNodeRPCClient](t) + unreachableNode.On("State").Return(nodeStateUnreachable).Maybe() + unreachableNode.On("String").Return("unreachableNode").Maybe() mn := newTestMultiNode(t, multiNodeOpts{ selectionMode: NodeSelectionModeRoundRobin, chainID: chainID, - nodes: []Node[ID, multiNodeRPCClient]{oosNode}, + nodes: []Node[ID, multiNodeRPCClient]{oosNode, unreachableNode}, logger: lggr, }) nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t) @@ -427,14 +430,13 @@ func TestMultiNode_selectNode(t *testing.T) { assert.Equal(t, oosNode, selected) tests.RequireLogMessage(t, observedLogs, "No alive RPC nodes available, falling back to out-of-sync node") }) - t.Run("Falls back to FinalizedBlockOutOfSync node when no alive nodes available", func(t *testing.T) { + t.Run("Does not fall back to FinalizedBlockOutOfSync node", func(t *testing.T) { t.Parallel() ctx := tests.Context(t) chainID := RandomID() - lggr, _ := logger.TestObserved(t, zap.WarnLevel) + lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) fbOosNode := newMockNode[ID, multiNodeRPCClient](t) fbOosNode.On("State").Return(nodeStateFinalizedBlockOutOfSync).Maybe() - fbOosNode.On("StateAndLatest").Return(nodeStateFinalizedBlockOutOfSync, ChainInfo{BlockNumber: 80}).Maybe() fbOosNode.On("String").Return("fbOosNode").Maybe() mn := newTestMultiNode(t, multiNodeOpts{ selectionMode: NodeSelectionModeRoundRobin, @@ -444,10 +446,12 @@ func TestMultiNode_selectNode(t *testing.T) { }) nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t) nodeSelector.On("Select").Return(nil) + nodeSelector.On("Name").Return("MockedNodeSelector") mn.nodeSelector = nodeSelector selected, err := mn.selectNode(ctx) - require.NoError(t, err) - assert.Equal(t, fbOosNode, selected) + require.EqualError(t, err, ErrNodeError.Error()) + require.Nil(t, selected) + tests.RequireLogMessage(t, observedLogs, "No live RPC nodes available") }) t.Run("Selects best out-of-sync node by highest block number", func(t *testing.T) { t.Parallel() @@ -483,19 +487,23 @@ func TestMultiNode_selectNode(t *testing.T) { oosNode.On("State").Return(nodeStateOutOfSync).Maybe() oosNode.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 50}).Maybe() oosNode.On("String").Return("oosNode").Maybe() + oosNode2 := newMockNode[ID, multiNodeRPCClient](t) + oosNode2.On("State").Return(nodeStateOutOfSync).Maybe() + oosNode2.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 40}).Maybe() + oosNode2.On("String").Return("oosNode2").Maybe() mn := newTestMultiNode(t, multiNodeOpts{ selectionMode: NodeSelectionModeRoundRobin, chainID: chainID, - nodes: []Node[ID, multiNodeRPCClient]{oosNode}, + nodes: []Node[ID, multiNodeRPCClient]{oosNode, oosNode2}, }) nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t) nodeSelector.On("Select").Return(nil) mn.nodeSelector = nodeSelector - // First call falls back to out-of-sync + // First call falls back to best out-of-sync (oosNode has higher block) first, err := mn.selectNode(ctx) require.NoError(t, err) assert.Equal(t, oosNode, first) - // Second call: still no alive, keeps the same out-of-sync node + // Second call: still no alive, keeps the same out-of-sync node (no switch to oosNode2) second, err := mn.selectNode(ctx) require.NoError(t, err) assert.Equal(t, oosNode, second) @@ -609,6 +617,36 @@ func TestMultiNode_RandomRPC(t *testing.T) { node1.AssertNotCalled(t, "UnsubscribeAllExceptAliveLoop") node2.AssertNotCalled(t, "UnsubscribeAllExceptAliveLoop") }) + t.Run("RandomRPC falls back to out-of-sync node when no alive nodes available", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + chainID := RandomID() + lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) + oosNode := newMockNode[ID, multiNodeRPCClient](t) + oosNode.On("State").Return(nodeStateOutOfSync).Maybe() + oosNode.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 50}).Maybe() + oosNode.On("String").Return("oosNode").Maybe() + mn := newTestMultiNode(t, multiNodeOpts{ + selectionMode: NodeSelectionModeRandomRPC, + chainID: chainID, + nodes: []Node[ID, multiNodeRPCClient]{oosNode}, + logger: lggr, + }) + nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t) + nodeSelector.On("Select").Return(nil) + mn.nodeSelector = nodeSelector + + first, err := mn.selectNode(ctx) + require.NoError(t, err) + assert.Equal(t, oosNode, first) + tests.RequireLogMessage(t, observedLogs, "No alive RPC nodes available, falling back to out-of-sync node") + + second, err := mn.selectNode(ctx) + require.NoError(t, err) + assert.Equal(t, oosNode, second) + + oosNode.AssertNotCalled(t, "UnsubscribeAllExceptAliveLoop") + }) t.Run("RandomRPC reports error when no nodes available", func(t *testing.T) { t.Parallel() ctx := t.Context() From d030e50195360052b41bca22e8b4de56beb33680 Mon Sep 17 00:00:00 2001 From: James Walker Date: Thu, 26 Feb 2026 16:07:29 -0500 Subject: [PATCH 4/5] update --- multinode/multi_node.go | 63 ++---------- multinode/multi_node_test.go | 171 +------------------------------ multinode/node_lifecycle.go | 14 ++- multinode/node_lifecycle_test.go | 38 ++++++- multinode/types.go | 3 +- 5 files changed, 59 insertions(+), 230 deletions(-) diff --git a/multinode/multi_node.go b/multinode/multi_node.go index 7b65fe1..2cca96c 100644 --- a/multinode/multi_node.go +++ b/multinode/multi_node.go @@ -3,7 +3,6 @@ package multinode import ( "context" "fmt" - "math" "math/big" "slices" "sync" @@ -192,8 +191,6 @@ func (c *MultiNode[CHAIN_ID, RPC]) SelectRPC(ctx context.Context) (rpc RPC, err } // selectNode returns the active Node, if it is still nodeStateAlive, otherwise it selects a new one from the NodeSelector. -// If no alive node is available, it falls back to an out-of-sync node. If the current active node is out-of-sync, -// it will try to upgrade to an alive node when one becomes available. func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CHAIN_ID, RPC], err error) { if c.selectionMode == NodeSelectionModeRandomRPC { return c.awaitNodeSelection(ctx) @@ -214,17 +211,6 @@ func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CH return // another goroutine beat us here } - // If the current active node is out-of-sync, try to find an alive one first - if node != nil && isUsableState(node.State()) { - if aliveNode := c.nodeSelector.Select(); aliveNode != nil { - c.activeNode.UnsubscribeAllExceptAliveLoop() - c.activeNode = aliveNode - c.lggr.Debugw("Upgraded from out-of-sync to alive node", "prevNode", node.String(), "newNode", aliveNode.String()) - return c.activeNode, nil - } - return // keep using the out-of-sync node - } - var prevNodeName string if c.activeNode != nil { prevNodeName = c.activeNode.String() @@ -241,8 +227,7 @@ func (c *MultiNode[CHAIN_ID, RPC]) selectNode(ctx context.Context) (node Node[CH } // awaitNodeSelection blocks until nodeSelector returns a live node or all nodes -// finish initializing. If no alive nodes are available, falls back to an out-of-sync node. -// Returns ErrNodeError when no usable nodes are available. +// finish initializing. Returns ErrNodeError when no live nodes are available. func (c *MultiNode[CHAIN_ID, RPC]) awaitNodeSelection(ctx context.Context) (Node[CHAIN_ID, RPC], error) { for { node := c.nodeSelector.Select() @@ -259,42 +244,14 @@ func (c *MultiNode[CHAIN_ID, RPC]) awaitNodeSelection(ctx context.Context) (Node continue } } - if fallback := c.selectOutOfSyncNode(); fallback != nil { - c.lggr.Criticalw("No alive RPC nodes available, falling back to out-of-sync node", "node", fallback.String()) - c.eng.EmitHealthErr(fmt.Errorf("no alive nodes available for chain %s, using out-of-sync fallback", c.chainID.String())) - return fallback, nil - } c.lggr.Criticalw("No live RPC nodes available", "NodeSelectionMode", c.nodeSelector.Name()) c.eng.EmitHealthErr(fmt.Errorf("no live nodes available for chain %s", c.chainID.String())) return nil, ErrNodeError } } -// selectOutOfSyncNode picks the best out-of-sync node by highest block number. -// Returns nil if no out-of-sync nodes are available. -func (c *MultiNode[CHAIN_ID, RPC]) selectOutOfSyncNode() Node[CHAIN_ID, RPC] { - var bestNode Node[CHAIN_ID, RPC] - var bestBlock int64 = math.MinInt64 - for _, n := range c.primaryNodes { - if isUsableState(n.State()) { - _, ci := n.StateAndLatest() - if ci.BlockNumber > bestBlock { - bestBlock = ci.BlockNumber - bestNode = n - } - } - } - return bestNode -} - -// isUsableState returns true for out-of-sync states that can still serve requests as a fallback. -// nodeStateFinalizedBlockOutOfSync is intentionally excluded to prevent local finality violations. -func isUsableState(s nodeState) bool { - return s == nodeStateOutOfSync -} - -// LatestChainInfo returns the number of alive nodes in the pool (excluding the node identified by callerName) -// and the highest ChainInfo most recently received by those nodes. +// LatestChainInfo returns the number of alive nodes in the pool (excluding the node identified by callerName +// from the count) and the highest ChainInfo most recently received by alive nodes. // E.g. If Node A's the most recent block is 10 and highest 15 and for Node B it's - 12 and 14. This method will return 12. func (c *MultiNode[CHAIN_ID, RPC]) LatestChainInfo(callerName string) (int, ChainInfo) { var nLiveNodes int @@ -302,11 +259,10 @@ func (c *MultiNode[CHAIN_ID, RPC]) LatestChainInfo(callerName string) (int, Chai TotalDifficulty: big.NewInt(0), } for _, n := range c.primaryNodes { - if n.Name() == callerName { - continue - } if s, nodeChainInfo := n.StateAndLatest(); s == nodeStateAlive { - nLiveNodes++ + if n.Name() != callerName { + nLiveNodes++ + } ch.BlockNumber = max(ch.BlockNumber, nodeChainInfo.BlockNumber) ch.FinalizedBlockNumber = max(ch.FinalizedBlockNumber, nodeChainInfo.FinalizedBlockNumber) ch.TotalDifficulty = MaxTotalDifficulty(ch.TotalDifficulty, nodeChainInfo.TotalDifficulty) @@ -342,12 +298,7 @@ func (c *MultiNode[CHAIN_ID, RPC]) checkLease() { c.activeMu.Lock() defer c.activeMu.Unlock() - - if bestNode == nil { - bestNode = c.selectOutOfSyncNode() - } - - if bestNode != nil && bestNode != c.activeNode { + if bestNode != c.activeNode { if c.activeNode != nil { c.activeNode.UnsubscribeAllExceptAliveLoop() } diff --git a/multinode/multi_node_test.go b/multinode/multi_node_test.go index 52b5249..fea7fe6 100644 --- a/multinode/multi_node_test.go +++ b/multinode/multi_node_test.go @@ -378,8 +378,8 @@ func TestMultiNode_selectNode(t *testing.T) { activeNode, err := mn.selectNode(ctx) require.NoError(t, err) require.Equal(t, oldBest.String(), activeNode.String()) - // old best is out-of-sync, a new alive node is available via selector - oldBest.On("State").Return(nodeStateOutOfSync).Maybe() + // old best died, so we should replace it + oldBest.On("State").Return(nodeStateOutOfSync).Twice() nodeSelector.On("Select").Return(newBest).Once() newActiveNode, err := mn.selectNode(ctx) require.NoError(t, err) @@ -404,143 +404,6 @@ func TestMultiNode_selectNode(t *testing.T) { require.Nil(t, node) tests.RequireLogMessage(t, observedLogs, "No live RPC nodes available") }) - t.Run("Falls back to out-of-sync node when no alive nodes available", func(t *testing.T) { - t.Parallel() - ctx := tests.Context(t) - chainID := RandomID() - lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) - oosNode := newMockNode[ID, multiNodeRPCClient](t) - oosNode.On("State").Return(nodeStateOutOfSync).Maybe() - oosNode.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 50}).Maybe() - oosNode.On("String").Return("oosNode").Maybe() - unreachableNode := newMockNode[ID, multiNodeRPCClient](t) - unreachableNode.On("State").Return(nodeStateUnreachable).Maybe() - unreachableNode.On("String").Return("unreachableNode").Maybe() - mn := newTestMultiNode(t, multiNodeOpts{ - selectionMode: NodeSelectionModeRoundRobin, - chainID: chainID, - nodes: []Node[ID, multiNodeRPCClient]{oosNode, unreachableNode}, - logger: lggr, - }) - nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t) - nodeSelector.On("Select").Return(nil) - mn.nodeSelector = nodeSelector - selected, err := mn.selectNode(ctx) - require.NoError(t, err) - assert.Equal(t, oosNode, selected) - tests.RequireLogMessage(t, observedLogs, "No alive RPC nodes available, falling back to out-of-sync node") - }) - t.Run("Does not fall back to FinalizedBlockOutOfSync node", func(t *testing.T) { - t.Parallel() - ctx := tests.Context(t) - chainID := RandomID() - lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) - fbOosNode := newMockNode[ID, multiNodeRPCClient](t) - fbOosNode.On("State").Return(nodeStateFinalizedBlockOutOfSync).Maybe() - fbOosNode.On("String").Return("fbOosNode").Maybe() - mn := newTestMultiNode(t, multiNodeOpts{ - selectionMode: NodeSelectionModeRoundRobin, - chainID: chainID, - nodes: []Node[ID, multiNodeRPCClient]{fbOosNode}, - logger: lggr, - }) - nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t) - nodeSelector.On("Select").Return(nil) - nodeSelector.On("Name").Return("MockedNodeSelector") - mn.nodeSelector = nodeSelector - selected, err := mn.selectNode(ctx) - require.EqualError(t, err, ErrNodeError.Error()) - require.Nil(t, selected) - tests.RequireLogMessage(t, observedLogs, "No live RPC nodes available") - }) - t.Run("Selects best out-of-sync node by highest block number", func(t *testing.T) { - t.Parallel() - ctx := tests.Context(t) - chainID := RandomID() - lggr, _ := logger.TestObserved(t, zap.WarnLevel) - oosLow := newMockNode[ID, multiNodeRPCClient](t) - oosLow.On("State").Return(nodeStateOutOfSync).Maybe() - oosLow.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 30}).Maybe() - oosLow.On("String").Return("oosLow").Maybe() - oosHigh := newMockNode[ID, multiNodeRPCClient](t) - oosHigh.On("State").Return(nodeStateOutOfSync).Maybe() - oosHigh.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 90}).Maybe() - oosHigh.On("String").Return("oosHigh").Maybe() - mn := newTestMultiNode(t, multiNodeOpts{ - selectionMode: NodeSelectionModeRoundRobin, - chainID: chainID, - nodes: []Node[ID, multiNodeRPCClient]{oosLow, oosHigh}, - logger: lggr, - }) - nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t) - nodeSelector.On("Select").Return(nil) - mn.nodeSelector = nodeSelector - selected, err := mn.selectNode(ctx) - require.NoError(t, err) - assert.Equal(t, oosHigh, selected) - }) - t.Run("Keeps out-of-sync active node when no alive node becomes available", func(t *testing.T) { - t.Parallel() - ctx := tests.Context(t) - chainID := RandomID() - oosNode := newMockNode[ID, multiNodeRPCClient](t) - oosNode.On("State").Return(nodeStateOutOfSync).Maybe() - oosNode.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 50}).Maybe() - oosNode.On("String").Return("oosNode").Maybe() - oosNode2 := newMockNode[ID, multiNodeRPCClient](t) - oosNode2.On("State").Return(nodeStateOutOfSync).Maybe() - oosNode2.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 40}).Maybe() - oosNode2.On("String").Return("oosNode2").Maybe() - mn := newTestMultiNode(t, multiNodeOpts{ - selectionMode: NodeSelectionModeRoundRobin, - chainID: chainID, - nodes: []Node[ID, multiNodeRPCClient]{oosNode, oosNode2}, - }) - nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t) - nodeSelector.On("Select").Return(nil) - mn.nodeSelector = nodeSelector - // First call falls back to best out-of-sync (oosNode has higher block) - first, err := mn.selectNode(ctx) - require.NoError(t, err) - assert.Equal(t, oosNode, first) - // Second call: still no alive, keeps the same out-of-sync node (no switch to oosNode2) - second, err := mn.selectNode(ctx) - require.NoError(t, err) - assert.Equal(t, oosNode, second) - }) - t.Run("Upgrades from out-of-sync active to alive node when one becomes available", func(t *testing.T) { - t.Parallel() - ctx := tests.Context(t) - chainID := RandomID() - lggr, _ := logger.TestObserved(t, zap.DebugLevel) - oosNode := newMockNode[ID, multiNodeRPCClient](t) - oosNode.On("State").Return(nodeStateOutOfSync).Maybe() - oosNode.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 50}).Maybe() - oosNode.On("String").Return("oosNode").Maybe() - oosNode.On("UnsubscribeAllExceptAliveLoop").Maybe() - aliveNode := newMockNode[ID, multiNodeRPCClient](t) - aliveNode.On("State").Return(nodeStateAlive).Maybe() - aliveNode.On("String").Return("aliveNode").Maybe() - mn := newTestMultiNode(t, multiNodeOpts{ - selectionMode: NodeSelectionModeRoundRobin, - chainID: chainID, - nodes: []Node[ID, multiNodeRPCClient]{oosNode, aliveNode}, - logger: lggr, - }) - nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t) - // First select returns nil (no alive), second returns alive - nodeSelector.On("Select").Return(nil).Once() - mn.nodeSelector = nodeSelector - // First selection falls back to out-of-sync - first, err := mn.selectNode(ctx) - require.NoError(t, err) - assert.Equal(t, oosNode, first) - // Now an alive node becomes available - nodeSelector.On("Select").Return(aliveNode).Once() - second, err := mn.selectNode(ctx) - require.NoError(t, err) - assert.Equal(t, aliveNode, second) - }) } func TestMultiNode_RandomRPC(t *testing.T) { @@ -617,36 +480,6 @@ func TestMultiNode_RandomRPC(t *testing.T) { node1.AssertNotCalled(t, "UnsubscribeAllExceptAliveLoop") node2.AssertNotCalled(t, "UnsubscribeAllExceptAliveLoop") }) - t.Run("RandomRPC falls back to out-of-sync node when no alive nodes available", func(t *testing.T) { - t.Parallel() - ctx := t.Context() - chainID := RandomID() - lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) - oosNode := newMockNode[ID, multiNodeRPCClient](t) - oosNode.On("State").Return(nodeStateOutOfSync).Maybe() - oosNode.On("StateAndLatest").Return(nodeStateOutOfSync, ChainInfo{BlockNumber: 50}).Maybe() - oosNode.On("String").Return("oosNode").Maybe() - mn := newTestMultiNode(t, multiNodeOpts{ - selectionMode: NodeSelectionModeRandomRPC, - chainID: chainID, - nodes: []Node[ID, multiNodeRPCClient]{oosNode}, - logger: lggr, - }) - nodeSelector := newMockNodeSelector[ID, multiNodeRPCClient](t) - nodeSelector.On("Select").Return(nil) - mn.nodeSelector = nodeSelector - - first, err := mn.selectNode(ctx) - require.NoError(t, err) - assert.Equal(t, oosNode, first) - tests.RequireLogMessage(t, observedLogs, "No alive RPC nodes available, falling back to out-of-sync node") - - second, err := mn.selectNode(ctx) - require.NoError(t, err) - assert.Equal(t, oosNode, second) - - oosNode.AssertNotCalled(t, "UnsubscribeAllExceptAliveLoop") - }) t.Run("RandomRPC reports error when no nodes available", func(t *testing.T) { t.Parallel() ctx := t.Context() diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index a988b3a..a2156ce 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -136,8 +136,9 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { n.declareUnreachable() return } - if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync { - if liveNodes < 1 && !n.isLoadBalancedRPC { + if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync { + // note: there must be another live node for us to be out of sync + if liveNodes < 1 && !n.isLoadBalancedRPC { lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState) continue } @@ -163,8 +164,12 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { // threshold amount of time, mark it broken lggr.Errorw(fmt.Sprintf("RPC endpoint detected out of sync; no new heads received for %s (last head received was %v)", noNewHeadsTimeoutThreshold, localHighestChainInfo.BlockNumber), "nodeState", n.getCachedState(), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber, "noNewHeadsTimeoutThreshold", noNewHeadsTimeoutThreshold) if n.poolInfoProvider != nil { + // if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo) + // if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC { lggr.Criticalf("RPC endpoint detected out of sync; %s %s", msgCannotDisable, msgDegradedState) + // We don't necessarily want to wait the full timeout to check again, we should + // check regularly and log noisily in this state headsSub.ResetTimer(zombieNodeCheckInterval(noNewHeadsTimeoutThreshold)) continue } @@ -187,8 +192,12 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { // threshold amount of time, mark it broken lggr.Errorw(fmt.Sprintf("RPC's finalized state is out of sync; no new finalized heads received for %s (last finalized head received was %v)", noNewFinalizedBlocksTimeoutThreshold, localHighestChainInfo.FinalizedBlockNumber), "latestReceivedBlockNumber", localHighestChainInfo.BlockNumber) if n.poolInfoProvider != nil { + // if its the only node and its not a proxy, keep waiting for sync (check LatestChainInfo) + // if its a proxy, then declare out of sync and try reconnecting because proxy might return a healthier rpc if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 && !n.isLoadBalancedRPC { lggr.Criticalf("RPC's finalized state is out of sync; %s %s", msgCannotDisable, msgDegradedState) + // We don't necessarily want to wait the full timeout to check again, we should + // check regularly and log noisily in this state finalizedHeadsSub.ResetTimer(zombieNodeCheckInterval(noNewFinalizedBlocksTimeoutThreshold)) continue } @@ -452,6 +461,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { if n.poolInfoProvider != nil { if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 1 { if n.isLoadBalancedRPC { + // in case all rpcs behind a load balanced rpc are out of sync, we need to declare out of sync to prevent false transition to alive n.declareOutOfSync(syncIssues) return } diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index 3cd8227..f1eb250 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -176,7 +176,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return nodeStateUnreachable == node.State() }) }) - t.Run("with threshold poll failures and last node alive, transitions to unreachable", func(t *testing.T) { + t.Run("with threshold poll failures, but we are the last node alive, forcibly keeps it alive", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) @@ -190,10 +190,41 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { lggr: lggr, }) defer func() { assert.NoError(t, node.close()) }() + poolInfo := newMockPoolChainInfoProvider(t) + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ + BlockNumber: 20, + }).Once() + node.SetPoolChainInfoProvider(poolInfo) + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: 20}, ChainInfo{BlockNumber: 20}) + pollError := errors.New("failed to get ClientVersion") + rpc.On("ClientVersion", mock.Anything).Return("", pollError) + node.declareAlive() + tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold)) + assert.Equal(t, nodeStateAlive, node.State()) + }) + t.Run("with threshold poll failures, we are the last node alive, but is a proxy, transitions to unreachable", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + const pollFailureThreshold = 3 + node := newSubscribedNode(t, testNodeOpts{ + config: testNodeConfig{ + pollFailureThreshold: pollFailureThreshold, + pollInterval: tests.TestInterval, + }, + rpc: rpc, + lggr: lggr, + isLoadBalancedRPC: true, + }) + defer func() { assert.NoError(t, node.close()) }() + poolInfo := newMockPoolChainInfoProvider(t) + poolInfo.On("LatestChainInfo", mock.Anything).Return(0, ChainInfo{ + BlockNumber: 20, + }).Once() + node.SetPoolChainInfoProvider(poolInfo) rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: 20}, ChainInfo{BlockNumber: 20}) pollError := errors.New("failed to get ClientVersion") rpc.On("ClientVersion", mock.Anything).Return("", pollError) - rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() node.declareAlive() tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold)) tests.AssertEventually(t, func() bool { @@ -287,6 +318,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { TotalDifficulty: big.NewInt(10), }) node.SetPoolChainInfoProvider(poolInfo) + // tries to redial in outOfSync rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) { assert.Equal(t, nodeStateOutOfSync, node.State()) }).Once() @@ -387,6 +419,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { TotalDifficulty: big.NewInt(10), }).Once() node.SetPoolChainInfoProvider(poolInfo) + // tries to redial in outOfSync rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) { assert.Equal(t, nodeStateOutOfSync, node.State()) }).Once() @@ -640,6 +673,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { TotalDifficulty: big.NewInt(10), }).Once() node.SetPoolChainInfoProvider(poolInfo) + // tries to redial in outOfSync rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Run(func(_ mock.Arguments) { assert.Equal(t, nodeStateOutOfSync, node.State()) }).Once() diff --git a/multinode/types.go b/multinode/types.go index be8feea..6400b2d 100644 --- a/multinode/types.go +++ b/multinode/types.go @@ -90,7 +90,8 @@ type Head interface { // PoolChainInfoProvider - provides aggregation of nodes pool ChainInfo type PoolChainInfoProvider interface { // LatestChainInfo returns the number of live nodes available in the pool (excluding the node identified by - // callerName), so we can prevent the last alive node from being moved to an unhealthy state. + // callerName from the count), so we can prevent the last alive node in a pool from being + // moved to out-of-sync state. It is better to have one out-of-sync node than no nodes at all. // Returns highest latest ChainInfo within the alive nodes. E.g. most recent block number and highest block number // observed by Node A are 10 and 15; Node B - 12 and 14. This method will return 12. LatestChainInfo(callerName string) (int, ChainInfo) From a23bbb1411f5fa6f6ac51b685c4c2d165ca6f674 Mon Sep 17 00:00:00 2001 From: James Walker Date: Fri, 27 Feb 2026 11:35:24 -0500 Subject: [PATCH 5/5] update --- multinode/node_lifecycle.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index a2156ce..8fd4e9d 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -136,9 +136,9 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { n.declareUnreachable() return } - if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync { - // note: there must be another live node for us to be out of sync - if liveNodes < 1 && !n.isLoadBalancedRPC { + if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync { + // note: there must be another live node for us to be out of sync + if liveNodes < 1 && !n.isLoadBalancedRPC { lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState) continue }