diff --git a/pkg/logpoller/helper_test.go b/pkg/logpoller/helper_test.go index 24c3b072fd..d9d552460c 100644 --- a/pkg/logpoller/helper_test.go +++ b/pkg/logpoller/helper_test.go @@ -176,16 +176,6 @@ func (th *TestHarness) assertDontHave(t *testing.T, start, end int) { } } -func (th *TestHarness) assertHaveCanonical(t *testing.T, start, end int) { - for i := start; i < end; i++ { - blk, err := th.ORM.SelectBlockByNumber(testutils.Context(t), int64(i)) - require.NoError(t, err, "block %v", i) - chainBlk, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(int64(i))) - require.NoError(t, err) - assert.Equal(t, chainBlk.Hash().String(), blk.BlockHash.String(), "block %v", i) - } -} - // Simulates an RPC failover event to an alternate rpc server. This can also be used to // simulate switching back to the primary rpc after it recovers. func (th *TestHarness) SetActiveClient(backend evmtypes.Backend, chainType chaintype.ChainType) { diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index ec0d4cfc57..ee6102422f 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -1081,7 +1081,7 @@ func (lp *logPoller) handleReorg(ctx context.Context, currentBlock *evmtypes.Hea // There can be another reorg while we're finding the LCA. // That is ok, since we'll detect it on the next iteration. // Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1. - blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, latestBlock.FinalizedBlockNumber) + blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock.Number, latestBlock.FinalizedBlockNumber) if err2 != nil { return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2) } @@ -1231,10 +1231,15 @@ func (e *reorgError) Error() string { return fmt.Sprintf("reorg detected at block %d", e.ReorgedAt.Number) } -func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) ([]Block, []Log, error) { +func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) (blocks []Block, logs []Log, err error) { const maxUnfinalizedBlocks = 2000 - var logs []Log - var blocks []Block + var block *Block + defer func() { + // ensure that we always include the last block even if it's empty to use it as check point for next poll. + if block != nil && (len(blocks) == 0 || blocks[len(blocks)-1].BlockNumber != block.BlockNumber) { + blocks = append(blocks, *block) + } + }() for { h := currentBlock.Hash rpcLogs, err := lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h)) @@ -1243,15 +1248,18 @@ func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmty return blocks, logs, nil } lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlock.Number, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp) - block := Block{ + block = &Block{ BlockHash: h, BlockNumber: currentBlock.Number, BlockTimestamp: currentBlock.Timestamp, FinalizedBlockNumber: finalized, SafeBlockNumber: safe, } - logs = append(logs, convertLogs(rpcLogs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID())...) - blocks = append(blocks, block) + logs = append(logs, convertLogs(rpcLogs, []Block{*block}, lp.lggr, lp.ec.ConfiguredChainID())...) + // Always save the block with logs, to know an impact of finality violation and for better observability. + if len(rpcLogs) > 0 { + blocks = append(blocks, *block) + } if currentBlock.Number >= latest { return blocks, logs, nil @@ -1328,39 +1336,50 @@ func (lp *logPoller) latestSafeBlock(ctx context.Context, latestFinalizedBlockNu // Find the first place where our chain and their chain have the same block, // that block number is the LCA. Return the block after that, where we want to resume polling. -func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.Head, latestFinalizedBlockNumber int64) (*evmtypes.Head, error) { - // Current is where the mismatch starts. - // Check its parent to see if its the same as ours saved. - parent, err := lp.latencyMonitor.HeadByHash(ctx, current.ParentHash) - if err != nil { - return nil, err +func (lp *logPoller) findBlockAfterLCA(ctx context.Context, currentHeadNumber int64, dbLatestFinalizedBlockNumber int64) (*evmtypes.Head, error) { + if currentHeadNumber < dbLatestFinalizedBlockNumber { + lp.lggr.Criticalw("Unexpected state. Current head number is lower than latest finalized block number", "currentHeadNumber", currentHeadNumber, "dbLatestFinalizedBlockNumber", dbLatestFinalizedBlockNumber) + return nil, fmt.Errorf("current head number %d is lower than latest finalized block number %d: %w", currentHeadNumber, dbLatestFinalizedBlockNumber, commontypes.ErrFinalityViolated) } - blockAfterLCA := current + // We expect reorgs up to the block after latestFinalizedBlock - // We loop via parent instead of current so current always holds the LCA+1. // If the parent block number becomes < the first finalized block our reorg is too deep. - // This can happen only if finalityTag is not enabled and fixed finalityDepth is provided via config. - var ourParentBlockHash common.Hash - for parent.Number >= latestFinalizedBlockNumber { - outParentBlock, err := lp.orm.SelectBlockByNumber(ctx, parent.Number) + // This can happen only if finalityTag is not enabled and fixed finalityDepth is provided via config or chain violates finality guarantees. + for { + // Since we do not store all blocks in the db, it's possible that we do not have the parent block in our db. + // Find the nearest ancestor that we have in our db and check if it still belongs to canonical chain. + ourParent, err := lp.orm.SelectNewestBlock(ctx, currentHeadNumber-1) if err != nil { - return nil, err - } - ourParentBlockHash = outParentBlock.BlockHash - if parent.Hash == ourParentBlockHash { - // If we do have the blockhash, return blockAfterLCA - return blockAfterLCA, nil + if errors.Is(err, sql.ErrNoRows) { + lp.lggr.Warnw("No ancestor block found in db, this means that the reorg is deeper than the number of blocks we have in the db.", "err", err, "currentHeadNumber", currentHeadNumber, "dbLatestFinalizedBlockNumber", dbLatestFinalizedBlockNumber) + // we should return currentHeadNumber as the block after LCA, to avoid drifting too far back. + return lp.headerByNumber(ctx, currentHeadNumber) + } + return nil, fmt.Errorf("failed to select ancestor for current block %d: %w", currentHeadNumber-1, err) } - // Otherwise get a new parent and update blockAfterLCA. - blockAfterLCA = parent - parent, err = lp.latencyMonitor.HeadByHash(ctx, parent.ParentHash) + + // Since we are looking for block after LCA, fetch child of ourParent. + // If new current points to ourParent, we found the LCA and can return block after it. Otherwise, keep looking for ancestors. + rpcChild, err := lp.headerByNumber(ctx, ourParent.BlockNumber+1) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to fetch block after ancestor %d: %w", ourParent.BlockNumber+1, err) + } + if ourParent.BlockHash == rpcChild.ParentHash { + return rpcChild, nil + } + + if ourParent.BlockNumber <= dbLatestFinalizedBlockNumber { + lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, + "current", rpcChild.Number, + "latestFinalized", dbLatestFinalizedBlockNumber, + "ourParentHash", ourParent.BlockHash, + "expectedParentHash", rpcChild.ParentHash, + "childHash", rpcChild.Hash) + return nil, fmt.Errorf("%w: finalized block with hash %s is not parent of canonical block at height %d, with parent hash %s", commontypes.ErrFinalityViolated, ourParent.BlockHash, rpcChild.Number, rpcChild.ParentHash) } - } - lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, "current", current.Number, "latestFinalized", latestFinalizedBlockNumber) - return nil, fmt.Errorf("%w: finalized block hash %s does not match RPC's %s at height %d", commontypes.ErrFinalityViolated, ourParentBlockHash, blockAfterLCA.Hash, blockAfterLCA.Number) + currentHeadNumber = ourParent.BlockNumber + } } // PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block. diff --git a/pkg/logpoller/log_poller_internal_test.go b/pkg/logpoller/log_poller_internal_test.go index 64c3512107..21e4763d8c 100644 --- a/pkg/logpoller/log_poller_internal_test.go +++ b/pkg/logpoller/log_poller_internal_test.go @@ -958,6 +958,202 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { }) } +func Test_FindBlockAfterLCA(t *testing.T) { + testCases := []struct { + Name string + CurrentBlockNumber int64 + DBLatestFinalized int64 + DBBlocks []int64 + Setup func(t *testing.T, ec *clienttest.Client) + ExpectedError error + ExpectedHead *evmtypes.Head + }{ + { + Name: "current head lower than DB finalized", + CurrentBlockNumber: 3, + DBLatestFinalized: 5, + DBBlocks: nil, + Setup: nil, + ExpectedError: commontypes.ErrFinalityViolated, + ExpectedHead: nil, + }, + { + Name: "no reorg - chains match on first iteration", + CurrentBlockNumber: 5, + DBLatestFinalized: 3, + DBBlocks: []int64{4}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + return newHead(n.Int64()), nil + }) + }, + ExpectedError: nil, + ExpectedHead: newHead(5), + }, + { + Name: "reorg - LCA found after walking back", + CurrentBlockNumber: 5, + DBLatestFinalized: 1, + DBBlocks: []int64{2, 3}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + num := n.Int64() + if num == 4 { + return &evmtypes.Head{Number: 4, Hash: common.BigToHash(big.NewInt(4)), ParentHash: common.HexToHash("0xdead")}, nil + } + return newHead(num), nil + }).Maybe() + }, + ExpectedError: nil, + ExpectedHead: newHead(3), + }, + { + Name: "reorg too deep", + CurrentBlockNumber: 5, + DBLatestFinalized: 2, + DBBlocks: []int64{1}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + num := n.Int64() + return &evmtypes.Head{Number: num, Hash: common.BigToHash(big.NewInt(num)), ParentHash: common.HexToHash("0xbeef")}, nil + }) + }, + ExpectedError: commontypes.ErrFinalityViolated, + ExpectedHead: nil, + }, + { + Name: "RPC HeadByNumber returns error", + CurrentBlockNumber: 5, + DBLatestFinalized: 3, + DBBlocks: []int64{4}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + if n.Int64() == 5 { + return nil, errors.New("rpc error") + } + return newHead(n.Int64()), nil + }) + }, + ExpectedError: errors.New("rpc error"), + ExpectedHead: nil, + }, + { + Name: "All blocks in DB are on a different chain", + CurrentBlockNumber: 100, + DBLatestFinalized: 10, + DBBlocks: []int64{90, 80, 55, 20}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + head := newHead(n.Int64()) + if n.Int64() < 20 { + return head, nil + } + head.ParentHash = common.HexToHash("0xdead") + head.Hash = common.HexToHash("0xdead") + return head, nil + }) + }, + ExpectedError: nil, + ExpectedHead: &evmtypes.Head{ + Number: 20, + Hash: common.HexToHash("0xdead"), + ParentHash: common.HexToHash("0xdead"), + }, + }, + { + Name: "Sparse DB blocks - LCA found successfully", + CurrentBlockNumber: 100, + DBLatestFinalized: 10, + DBBlocks: []int64{90, 80, 55, 20}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + head := newHead(n.Int64()) + if n.Int64() > 21 { + head.ParentHash = common.HexToHash("0xdead") + return head, nil + } + return head, nil + }) + }, + ExpectedError: nil, + ExpectedHead: newHead(21), + }, + { + Name: "Child of latest finalized is not canonical - finality violation", + CurrentBlockNumber: 100, + DBLatestFinalized: 80, + DBBlocks: []int64{90, 80, 55, 20}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + head := newHead(n.Int64()) + if n.Int64() > 80 { + head.ParentHash = common.HexToHash("0xdead") + return head, nil + } + return head, nil + }) + }, + ExpectedError: commontypes.ErrFinalityViolated, + ExpectedHead: nil, + }, + { + // Such case is possible, since DBLatestFinalized is defined by FinalizedBlockNumber of the latest block. + Name: "Latest finalized DB block is in canonical but much older than DBLatestFinalized", + CurrentBlockNumber: 100, + DBLatestFinalized: 80, + DBBlocks: []int64{90, 70, 55, 20}, + Setup: func(t *testing.T, ec *clienttest.Client) { + ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) { + head := newHead(n.Int64()) + if n.Int64() > 80 { + head.ParentHash = common.HexToHash("0xdead") + return head, nil + } + return head, nil + }) + }, + ExpectedError: nil, + ExpectedHead: newHead(71), + }, + } + for _, tc := range testCases { + tc := tc + t.Run(tc.Name, func(t *testing.T) { + t.Parallel() + db := testutils.NewSqlxDB(t) + lggr, _ := logger.TestObserved(t, zapcore.DebugLevel) + orm := NewORM(testutils.NewRandomEVMChainID(), db, lggr) + headTracker := headstest.NewTracker[*evmtypes.Head, common.Hash](t) + ec := clienttest.NewClient(t) + ctx := testutils.Context(t) + for _, blockNum := range tc.DBBlocks { + hash := common.BigToHash(big.NewInt(blockNum)) + require.NoError(t, orm.InsertBlock(ctx, hash, blockNum, time.Now(), blockNum, blockNum)) + } + if tc.Setup != nil { + tc.Setup(t, ec) + } + lpOpts := Opts{ + PollPeriod: time.Second, + FinalityDepth: 3, + BackfillBatchSize: 3, + RPCBatchSize: 3, + KeepFinalizedBlocksDepth: 20, + BackupPollerBlockDelay: 0, + } + lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) + blockAfterLCA, err := lp.findBlockAfterLCA(ctx, tc.CurrentBlockNumber, tc.DBLatestFinalized) + if tc.ExpectedError != nil { + require.ErrorContains(t, err, tc.ExpectedError.Error()) + require.Nil(t, blockAfterLCA) + } else { + require.NoError(t, err) + require.Equal(t, tc.ExpectedHead, blockAfterLCA) + } + }) + } +} + func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) { lggr := logger.Test(b) lpOpts := Opts{ diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index 72cefae5b0..6792db5b31 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -723,23 +723,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber, false) currentBlock, err := lp.LatestBlock(testutils.Context(t)) require.NoError(t, err) - matchesGeth := func() bool { - // Check every block is identical - latest, err1 := ec.BlockByNumber(testutils.Context(t), nil) - require.NoError(t, err1) - for i := 1; i < int(latest.NumberU64()); i++ { - ourBlock, err1 := lp.BlockByNumber(testutils.Context(t), int64(i)) - require.NoError(t, err1) - gethBlock, err1 := ec.BlockByNumber(testutils.Context(t), big.NewInt(int64(i))) - require.NoError(t, err1) - if ourBlock.BlockHash != gethBlock.Hash() { - t.Logf("Initial poll our block differs at height %d got %x want %x\n", i, ourBlock.BlockHash, gethBlock.Hash()) - return false - } - } - return true - } - if !matchesGeth() { + if !checkDBMatchesGeth(t, orm, simulatedClient) { return false } // Randomly pick to mine or reorg @@ -776,7 +760,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { currentBlock, err = lp.LatestBlock(testutils.Context(t)) require.NoError(t, err) } - return matchesGeth() + return checkDBMatchesGeth(t, orm, simulatedClient) }, gen.SliceOfN(numChainInserts, gen.UInt64Range(1, uint64(finalityDepth-1))))) // Max reorg depth is finality depth - 1 p.TestingRun(t) } @@ -841,7 +825,7 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { lgs, err := th.ORM.SelectLogsByBlockRange(testutils.Context(t), 1, 1) require.NoError(t, err) assert.Empty(t, lgs) - th.assertHaveCanonical(t, 1, 1) + requireDBMatchesGeth(t, th.ORM, th.Client) // Polling again should be a noop, since we are at the latest. newStart = th.PollAndSaveLogs(testutils.Context(t), newStart) @@ -849,7 +833,7 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { latest, err := th.ORM.SelectLatestBlock(testutils.Context(t)) require.NoError(t, err) assert.Equal(t, int64(1), latest.BlockNumber) - th.assertHaveCanonical(t, 1, 1) + requireDBMatchesGeth(t, th.ORM, th.Client) // Test scenario: one log 2 block chain. // Chain gen <- 1 <- 2 (L1) @@ -901,7 +885,7 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { require.NoError(t, err) require.Len(t, lgs, 1) assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000002`), lgs[0].Data) - th.assertHaveCanonical(t, 1, 3) + requireDBMatchesGeth(t, th.ORM, th.Client) parent, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(1)) require.NoError(t, err) @@ -936,8 +920,7 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000001`), lgs[0].Data) assert.Equal(t, int64(3), lgs[1].BlockNumber) assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000003`), lgs[1].Data) - th.assertHaveCanonical(t, 1, 1) - th.assertHaveCanonical(t, 3, 4) + requireDBMatchesGeth(t, th.ORM, th.Client) th.assertDontHave(t, 2, 2) // 2 gets backfilled // Test scenario: multiple logs per block for many blocks (also after reorg). @@ -968,9 +951,7 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { assert.Equal(t, th.EmitterAddress2, lgs[1].Address) assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000006`), lgs[2].Data) assert.Equal(t, th.EmitterAddress1, lgs[2].Address) - th.assertHaveCanonical(t, 1, 1) - th.assertDontHave(t, 2, 2) // 2 gets backfilled - th.assertHaveCanonical(t, 3, 6) + requireDBMatchesGeth(t, th.ORM, th.Client) // Test scenario: node down for exactly finality + 2 blocks // Note we only backfill up to finalized - 1 blocks, because we need to save the @@ -996,7 +977,7 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { assert.Equal(t, int64(8), lgs[1].BlockNumber) assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000009`), lgs[2].Data) assert.Equal(t, int64(9), lgs[2].BlockNumber) - th.assertHaveCanonical(t, 8, 10) + requireDBMatchesGeth(t, th.ORM, th.Client) // Test scenario large backfill (multiple batches) // Chain gen <- 1 <- 2 (L1_1) <- 3' L1_3 <- 4 <- 5 (L1_4, L2_5) <- 6 (L1_6) <- 7 (L1_7) <- 8 (L1_8) <- 9 (L1_9) <- 10..32 @@ -1017,9 +998,9 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { lgs, err = th.ORM.SelectLogsByBlockRange(testutils.Context(t), 11, 36) require.NoError(t, err) assert.Len(t, lgs, 25) - th.assertHaveCanonical(t, 32, 36) // Should have last finalized block plus unfinalized blocks - th.assertDontHave(t, 11, 13) // Should not have older finalized blocks - th.assertDontHave(t, 14, 16) // Should not have older finalized blocks + requireDBMatchesGeth(t, th.ORM, th.Client) + th.assertDontHave(t, 11, 13) // Should not have older finalized blocks + th.assertDontHave(t, 14, 16) // Should not have older finalized blocks // Verify that a custom block timestamp will get written to db correctly also b, err = th.Client.BlockByNumber(testutils.Context(t), nil) @@ -1206,9 +1187,8 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) { require.NoError(t, err) require.Len(t, lgs, 30) assert.Equal(t, hexutil.MustDecode(`0x0000000000000000000000000000000000000000000000000000000000000002`), lgs[0].Data) - th.assertHaveCanonical(t, 1, 2) - th.assertDontHave(t, 2, 31) // These blocks are backfilled - th.assertHaveCanonical(t, 32, 36) + requireLBBlockIsFinalized(t, th.ORM, 30) + requireDBMatchesGeth(t, th.ORM, th.Client) }) } } @@ -1557,27 +1537,25 @@ func TestTooManyLogResults(t *testing.T) { } var filterLogsCall *mock.Call - head := &evmtypes.Head{} - finalized := &evmtypes.Head{} ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(ctx context.Context, blockNumber *big.Int) (*evmtypes.Head, error) { if blockNumber == nil { require.FailNow(t, "unexpected call to get current head") } - return &evmtypes.Head{Number: blockNumber.Int64(), ParentHash: common.HexToHash(fmt.Sprintf("0x%x", blockNumber.Int64()-1))}, nil + return newHead(blockNumber.Int64()), nil }) t.Run("halves size until small enough, then succeeds", func(t *testing.T) { // Simulate latestBlock = 300 - head.Number = 300 - head.Hash = common.HexToHash("0x1234") // needed to satisfy validation in fetchBlocks() - finalized.Number = head.Number - lpOpts.FinalityDepth + head := newHead(300) + finalized := newHead(head.Number - lpOpts.FinalityDepth) headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once() headTracker.On("LatestSafeBlock", mock.Anything).Return(finalized, nil).Once() headByHash := ec.On("HeadByHash", mock.Anything, mock.Anything).Return(func(ctx context.Context, blockHash common.Hash) (*evmtypes.Head, error) { - return &evmtypes.Head{Hash: blockHash}, nil + num := new(big.Int).SetBytes(blockHash.Bytes()).Int64() + return newHead(num), nil }) batchCallContext := ec.On("BatchCallContext", mock.Anything, mock.Anything).Return( @@ -1591,11 +1569,7 @@ func TestTooManyLogResults(t *testing.T) { blockNumber, ok := new(big.Int).SetString(blockNumberHex[2:], 16) require.True(t, ok, blockNumberHex) - calls[i].Result = &evmtypes.Head{ - Number: blockNumber.Int64(), - Hash: common.HexToHash(fmt.Sprintf("0x%x", blockNumber.Int64())), - ParentHash: common.HexToHash(fmt.Sprintf("0x%x", blockNumber.Int64()-1)), - } + calls[i].Result = newHead(blockNumber.Int64()) } return nil }, @@ -1629,7 +1603,7 @@ func TestTooManyLogResults(t *testing.T) { lp.PollAndSaveLogs(ctx, 5, false) block, err2 := o.SelectLatestBlock(ctx) require.NoError(t, err2) - assert.Equal(t, int64(298), block.BlockNumber) + assert.Equal(t, int64(300), block.BlockNumber) logs := obs.FilterLevelExact(zapcore.WarnLevel).FilterMessageSnippet("halving block range batch size").FilterFieldKey("newBatchSize").All() // Should have tried again 3 times--first reducing batch size to 10, then 5, then 2 @@ -1647,8 +1621,8 @@ func TestTooManyLogResults(t *testing.T) { // Now jump to block 500, but return error no matter how small the block range gets. // Should exit the loop with a critical error instead of hanging. - head.Number = 500 - finalized.Number = head.Number - lpOpts.FinalityDepth + head := newHead(500) + finalized := newHead(head.Number - lpOpts.FinalityDepth) headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once() headTracker.On("LatestSafeBlock", mock.Anything).Return(finalized, nil).Once() filterLogsCall = ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) { @@ -1663,7 +1637,7 @@ func TestTooManyLogResults(t *testing.T) { if err != nil { require.ErrorContains(t, err, "no rows") // In case this subtest is run by itself } else { - assert.Equal(t, int64(298), block.BlockNumber) + assert.Equal(t, int64(300), block.BlockNumber) } warns := obs.FilterMessageSnippet("halving block range").FilterLevelExact(zapcore.WarnLevel).All() crit := obs.FilterMessageSnippet("failed to retrieve logs").FilterLevelExact(zapcore.DPanicLevel).All() @@ -1679,8 +1653,8 @@ func TestTooManyLogResults(t *testing.T) { t.Run("Unrelated error are retried without adjusting size", func(t *testing.T) { unrelatedError := errors.New("Unrelated to the size of the request") - head.Number = 500 - finalized.Number = head.Number - lpOpts.FinalityDepth + head := newHead(500) + finalized := newHead(head.Number - lpOpts.FinalityDepth) obs.TakeAll() filterLogsCall = ec.On("FilterLogs", mock.Anything, mock.Anything).Return(func(ctx context.Context, fq ethereum.FilterQuery) (logs []types.Log, err error) { @@ -1697,7 +1671,7 @@ func TestTooManyLogResults(t *testing.T) { if err != nil { require.ErrorContains(t, err, "no rows") // In case this subtest is run by itself } else { - assert.Equal(t, int64(298), block.BlockNumber) + assert.Equal(t, int64(300), block.BlockNumber) } crit := obs.FilterLevelExact(zapcore.DPanicLevel).All() errors := obs.FilterLevelExact(zapcore.ErrorLevel).All() @@ -2261,10 +2235,59 @@ func TestLogPoller_Reorg_On_Replay(t *testing.T) { require.Len(t, logs, 1) require.Equal(t, newLogData, big.NewInt(0).SetBytes(logs[0].Data).Int64(), "Log data should match the log from the new block, indicating that the old block's log was properly removed during replay") // Ensure reorged block was replaced by a new one - dbBlock, err := th.ORM.SelectBlockByNumber(testutils.Context(t), reorgedBlock.Number().Int64()) - require.NoError(t, err) - require.Equal(t, reorgedBlock.Number().Int64(), dbBlock.BlockNumber) - require.NotEqual(t, reorgedBlock.Hash(), dbBlock.BlockHash) + requireDBMatchesGeth(t, th.ORM, th.Client) }) } } + +func requireDBMatchesGeth(t *testing.T, orm logpoller.ORM, client logpoller.Client) { + require.True(t, checkDBMatchesGeth(t, orm, client), "DB state does not match geth canonical chain") +} + +func checkDBMatchesGeth(t *testing.T, orm logpoller.ORM, client logpoller.Client) bool { + // Check every block is identical + latest, err1 := client.HeadByNumber(testutils.Context(t), nil) + require.NoError(t, err1) + dbBlocks, err := orm.SelectLogsByBlockRange(t.Context(), 0, latest.Number) + require.NoError(t, err) + // ensure all blocks present in db are on geth canonical chain + for _, ourBlock := range dbBlocks { + gethBlock, err1 := client.HeadByNumber(testutils.Context(t), big.NewInt(ourBlock.BlockNumber)) + require.NoError(t, err1) + if ourBlock.BlockHash != gethBlock.Hash { + t.Logf("Initial poll our block differs at height %d got %x want %x\n", ourBlock.BlockNumber, ourBlock.BlockHash, gethBlock.Hash) + return false + } + } + + latestDB, err := orm.SelectLatestBlock(t.Context()) + require.NoError(t, err) + require.Equal(t, latest.Number, latestDB.BlockNumber, "latest block number in db should match geth") + + // ensure all logs present in db are on geth canonical chain + logs, err1 := orm.SelectLogsByBlockRange(t.Context(), 0, latest.Number) + require.NoError(t, err1) + for _, log := range logs { + gethBlock, err1 := client.HeadByNumber(testutils.Context(t), big.NewInt(log.BlockNumber)) + require.NoError(t, err1) + if log.BlockHash != gethBlock.Hash { + t.Logf("Log present in db, is not present in canonical chain. Log block number %d, Log block Hash: %s, block hash %s\n", log.BlockNumber, log.BlockHash, gethBlock.Hash) + return false + } + } + return true +} + +func requireLBBlockIsFinalized(t *testing.T, orm logpoller.ORM, block int64) { + latest, err := orm.SelectLatestBlock(t.Context()) + require.NoError(t, err) + require.GreaterOrEqual(t, latest.FinalizedBlockNumber, block, "specified block should be finalized") +} + +func newHead(num int64) *evmtypes.Head { + return &evmtypes.Head{ + Number: num, + Hash: common.BigToHash(big.NewInt(num)), + ParentHash: common.BigToHash(big.NewInt(num - 1)), + } +} diff --git a/pkg/logpoller/observability.go b/pkg/logpoller/observability.go index ffa1b47456..a1d4fdd3a3 100644 --- a/pkg/logpoller/observability.go +++ b/pkg/logpoller/observability.go @@ -138,6 +138,12 @@ func (o *ObservedORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumb }) } +func (o *ObservedORM) SelectNewestBlock(ctx context.Context, maxAllowedBlockNumber int64) (*Block, error) { + return withObservedQuery(ctx, o, "SelectNewestBlock", func() (*Block, error) { + return o.ORM.SelectNewestBlock(ctx, maxAllowedBlockNumber) + }) +} + func (o *ObservedORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs evmtypes.Confirmations) (*Log, error) { return withObservedQuery(ctx, o, "SelectLatestLogByEventSigWithConfs", func() (*Log, error) { return o.ORM.SelectLatestLogByEventSigWithConfs(ctx, eventSig, address, confs) diff --git a/pkg/logpoller/orm.go b/pkg/logpoller/orm.go index 012c450f3a..f0792ea309 100644 --- a/pkg/logpoller/orm.go +++ b/pkg/logpoller/orm.go @@ -43,6 +43,7 @@ type ORM interface { SelectBlockByHash(ctx context.Context, hash common.Hash) (*Block, error) SelectLatestBlock(ctx context.Context) (*Block, error) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*Block, error) + SelectNewestBlock(ctx context.Context, maxAllowedBlockNumber int64) (*Block, error) SelectLatestFinalizedBlock(ctx context.Context) (*Block, error) SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error) @@ -295,6 +296,17 @@ func (o *DSORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int return &b, nil } +func (o *DSORM) SelectNewestBlock(ctx context.Context, maxAllowedBlockNumber int64) (*Block, error) { + var b Block + if err := o.ds.GetContext(ctx, &b, + blocksQuery(`WHERE evm_chain_id = $1 AND block_number <= $2 ORDER BY block_number DESC LIMIT 1`), + sqlutil.New(o.chainID), maxAllowedBlockNumber, + ); err != nil { + return nil, err + } + return &b, nil +} + func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs evmtypes.Confirmations) (*Log, error) { args, err := newQueryArgsForEvent(o.chainID, address, eventSig). withConfs(confs).