Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions pkg/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,6 @@ func (th *TestHarness) assertDontHave(t *testing.T, start, end int) {
}
}

func (th *TestHarness) assertHaveCanonical(t *testing.T, start, end int) {
for i := start; i < end; i++ {
blk, err := th.ORM.SelectBlockByNumber(testutils.Context(t), int64(i))
require.NoError(t, err, "block %v", i)
chainBlk, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(int64(i)))
require.NoError(t, err)
assert.Equal(t, chainBlk.Hash().String(), blk.BlockHash.String(), "block %v", i)
}
}

// Simulates an RPC failover event to an alternate rpc server. This can also be used to
// simulate switching back to the primary rpc after it recovers.
func (th *TestHarness) SetActiveClient(backend evmtypes.Backend, chainType chaintype.ChainType) {
Expand Down
83 changes: 51 additions & 32 deletions pkg/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,7 @@ func (lp *logPoller) handleReorg(ctx context.Context, currentBlock *evmtypes.Hea
// There can be another reorg while we're finding the LCA.
// That is ok, since we'll detect it on the next iteration.
// Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1.
blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, latestBlock.FinalizedBlockNumber)
blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock.Number, latestBlock.FinalizedBlockNumber)
if err2 != nil {
return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2)
}
Expand Down Expand Up @@ -1231,10 +1231,15 @@ func (e *reorgError) Error() string {
return fmt.Sprintf("reorg detected at block %d", e.ReorgedAt.Number)
}

func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) ([]Block, []Log, error) {
func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) (blocks []Block, logs []Log, err error) {
const maxUnfinalizedBlocks = 2000
var logs []Log
var blocks []Block
var block *Block
defer func() {
// ensure that we always include the last block even if it's empty to use it as check point for next poll.
if block != nil && (len(blocks) == 0 || blocks[len(blocks)-1].BlockNumber != block.BlockNumber) {
blocks = append(blocks, *block)
}
}()
for {
h := currentBlock.Hash
rpcLogs, err := lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h))
Expand All @@ -1243,15 +1248,18 @@ func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmty
return blocks, logs, nil
}
lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlock.Number, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp)
block := Block{
block = &Block{
BlockHash: h,
BlockNumber: currentBlock.Number,
BlockTimestamp: currentBlock.Timestamp,
FinalizedBlockNumber: finalized,
SafeBlockNumber: safe,
}
logs = append(logs, convertLogs(rpcLogs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID())...)
blocks = append(blocks, block)
logs = append(logs, convertLogs(rpcLogs, []Block{*block}, lp.lggr, lp.ec.ConfiguredChainID())...)
// Always save the block with logs, to know an impact of finality violation and for better observability.
if len(rpcLogs) > 0 {
blocks = append(blocks, *block)
}

if currentBlock.Number >= latest {
return blocks, logs, nil
Expand Down Expand Up @@ -1328,39 +1336,50 @@ func (lp *logPoller) latestSafeBlock(ctx context.Context, latestFinalizedBlockNu

// Find the first place where our chain and their chain have the same block,
// that block number is the LCA. Return the block after that, where we want to resume polling.
func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.Head, latestFinalizedBlockNumber int64) (*evmtypes.Head, error) {
// Current is where the mismatch starts.
// Check its parent to see if its the same as ours saved.
parent, err := lp.latencyMonitor.HeadByHash(ctx, current.ParentHash)
if err != nil {
return nil, err
func (lp *logPoller) findBlockAfterLCA(ctx context.Context, currentHeadNumber int64, dbLatestFinalizedBlockNumber int64) (*evmtypes.Head, error) {
if currentHeadNumber < dbLatestFinalizedBlockNumber {
lp.lggr.Criticalw("Unexpected state. Current head number is lower than latest finalized block number", "currentHeadNumber", currentHeadNumber, "dbLatestFinalizedBlockNumber", dbLatestFinalizedBlockNumber)
return nil, fmt.Errorf("current head number %d is lower than latest finalized block number %d: %w", currentHeadNumber, dbLatestFinalizedBlockNumber, commontypes.ErrFinalityViolated)
}
blockAfterLCA := current

// We expect reorgs up to the block after latestFinalizedBlock
// We loop via parent instead of current so current always holds the LCA+1.
// If the parent block number becomes < the first finalized block our reorg is too deep.
// This can happen only if finalityTag is not enabled and fixed finalityDepth is provided via config.
var ourParentBlockHash common.Hash
for parent.Number >= latestFinalizedBlockNumber {
outParentBlock, err := lp.orm.SelectBlockByNumber(ctx, parent.Number)
// This can happen only if finalityTag is not enabled and fixed finalityDepth is provided via config or chain violates finality guarantees.
for {
// Since we do not store all blocks in the db, it's possible that we do not have the parent block in our db.
// Find the nearest ancestor that we have in our db and check if it still belongs to canonical chain.
ourParent, err := lp.orm.SelectNewestBlock(ctx, currentHeadNumber-1)
if err != nil {
return nil, err
}
ourParentBlockHash = outParentBlock.BlockHash
if parent.Hash == ourParentBlockHash {
// If we do have the blockhash, return blockAfterLCA
return blockAfterLCA, nil
if errors.Is(err, sql.ErrNoRows) {
lp.lggr.Warnw("No ancestor block found in db, this means that the reorg is deeper than the number of blocks we have in the db.", "err", err, "currentHeadNumber", currentHeadNumber, "dbLatestFinalizedBlockNumber", dbLatestFinalizedBlockNumber)
// we should return currentHeadNumber as the block after LCA, to avoid drifting too far back.
return lp.headerByNumber(ctx, currentHeadNumber)
}
return nil, fmt.Errorf("failed to select ancestor for current block %d: %w", currentHeadNumber-1, err)
}
// Otherwise get a new parent and update blockAfterLCA.
blockAfterLCA = parent
parent, err = lp.latencyMonitor.HeadByHash(ctx, parent.ParentHash)

// Since we are looking for block after LCA, fetch child of ourParent.
// If new current points to ourParent, we found the LCA and can return block after it. Otherwise, keep looking for ancestors.
rpcChild, err := lp.headerByNumber(ctx, ourParent.BlockNumber+1)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to fetch block after ancestor %d: %w", ourParent.BlockNumber+1, err)
}
if ourParent.BlockHash == rpcChild.ParentHash {
return rpcChild, nil
}

if ourParent.BlockNumber <= dbLatestFinalizedBlockNumber {
lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag,
"current", rpcChild.Number,
"latestFinalized", dbLatestFinalizedBlockNumber,
"ourParentHash", ourParent.BlockHash,
"expectedParentHash", rpcChild.ParentHash,
"childHash", rpcChild.Hash)
return nil, fmt.Errorf("%w: finalized block with hash %s is not parent of canonical block at height %d, with parent hash %s", commontypes.ErrFinalityViolated, ourParent.BlockHash, rpcChild.Number, rpcChild.ParentHash)
}
}

lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, "current", current.Number, "latestFinalized", latestFinalizedBlockNumber)
return nil, fmt.Errorf("%w: finalized block hash %s does not match RPC's %s at height %d", commontypes.ErrFinalityViolated, ourParentBlockHash, blockAfterLCA.Hash, blockAfterLCA.Number)
currentHeadNumber = ourParent.BlockNumber
}
}

// PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block.
Expand Down
196 changes: 196 additions & 0 deletions pkg/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,202 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
})
}

func Test_FindBlockAfterLCA(t *testing.T) {
testCases := []struct {
Name string
CurrentBlockNumber int64
DBLatestFinalized int64
DBBlocks []int64
Setup func(t *testing.T, ec *clienttest.Client)
ExpectedError error
ExpectedHead *evmtypes.Head
}{
{
Name: "current head lower than DB finalized",
CurrentBlockNumber: 3,
DBLatestFinalized: 5,
DBBlocks: nil,
Setup: nil,
ExpectedError: commontypes.ErrFinalityViolated,
ExpectedHead: nil,
},
{
Name: "no reorg - chains match on first iteration",
CurrentBlockNumber: 5,
DBLatestFinalized: 3,
DBBlocks: []int64{4},
Setup: func(t *testing.T, ec *clienttest.Client) {
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
return newHead(n.Int64()), nil
})
},
ExpectedError: nil,
ExpectedHead: newHead(5),
},
{
Name: "reorg - LCA found after walking back",
CurrentBlockNumber: 5,
DBLatestFinalized: 1,
DBBlocks: []int64{2, 3},
Setup: func(t *testing.T, ec *clienttest.Client) {
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
num := n.Int64()
if num == 4 {
return &evmtypes.Head{Number: 4, Hash: common.BigToHash(big.NewInt(4)), ParentHash: common.HexToHash("0xdead")}, nil
}
return newHead(num), nil
}).Maybe()
},
ExpectedError: nil,
ExpectedHead: newHead(3),
},
{
Name: "reorg too deep",
CurrentBlockNumber: 5,
DBLatestFinalized: 2,
DBBlocks: []int64{1},
Setup: func(t *testing.T, ec *clienttest.Client) {
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
num := n.Int64()
return &evmtypes.Head{Number: num, Hash: common.BigToHash(big.NewInt(num)), ParentHash: common.HexToHash("0xbeef")}, nil
})
},
ExpectedError: commontypes.ErrFinalityViolated,
ExpectedHead: nil,
},
{
Name: "RPC HeadByNumber returns error",
CurrentBlockNumber: 5,
DBLatestFinalized: 3,
DBBlocks: []int64{4},
Setup: func(t *testing.T, ec *clienttest.Client) {
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
if n.Int64() == 5 {
return nil, errors.New("rpc error")
}
return newHead(n.Int64()), nil
})
},
ExpectedError: errors.New("rpc error"),
ExpectedHead: nil,
},
{
Name: "All blocks in DB are on a different chain",
CurrentBlockNumber: 100,
DBLatestFinalized: 10,
DBBlocks: []int64{90, 80, 55, 20},
Setup: func(t *testing.T, ec *clienttest.Client) {
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
head := newHead(n.Int64())
if n.Int64() < 20 {
return head, nil
}
head.ParentHash = common.HexToHash("0xdead")
head.Hash = common.HexToHash("0xdead")
return head, nil
})
},
ExpectedError: nil,
ExpectedHead: &evmtypes.Head{
Number: 20,
Hash: common.HexToHash("0xdead"),
ParentHash: common.HexToHash("0xdead"),
},
},
{
Name: "Sparse DB blocks - LCA found successfully",
CurrentBlockNumber: 100,
DBLatestFinalized: 10,
DBBlocks: []int64{90, 80, 55, 20},
Setup: func(t *testing.T, ec *clienttest.Client) {
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
head := newHead(n.Int64())
if n.Int64() > 21 {
head.ParentHash = common.HexToHash("0xdead")
return head, nil
}
return head, nil
})
},
ExpectedError: nil,
ExpectedHead: newHead(21),
},
{
Name: "Child of latest finalized is not canonical - finality violation",
CurrentBlockNumber: 100,
DBLatestFinalized: 80,
DBBlocks: []int64{90, 80, 55, 20},
Setup: func(t *testing.T, ec *clienttest.Client) {
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
head := newHead(n.Int64())
if n.Int64() > 80 {
head.ParentHash = common.HexToHash("0xdead")
return head, nil
}
return head, nil
})
},
ExpectedError: commontypes.ErrFinalityViolated,
ExpectedHead: nil,
},
{
// Such case is possible, since DBLatestFinalized is defined by FinalizedBlockNumber of the latest block.
Name: "Latest finalized DB block is in canonical but much older than DBLatestFinalized",
CurrentBlockNumber: 100,
DBLatestFinalized: 80,
DBBlocks: []int64{90, 70, 55, 20},
Setup: func(t *testing.T, ec *clienttest.Client) {
ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(_ context.Context, n *big.Int) (*evmtypes.Head, error) {
head := newHead(n.Int64())
if n.Int64() > 80 {
head.ParentHash = common.HexToHash("0xdead")
return head, nil
}
return head, nil
})
},
ExpectedError: nil,
ExpectedHead: newHead(71),
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
db := testutils.NewSqlxDB(t)
lggr, _ := logger.TestObserved(t, zapcore.DebugLevel)
orm := NewORM(testutils.NewRandomEVMChainID(), db, lggr)
headTracker := headstest.NewTracker[*evmtypes.Head, common.Hash](t)
ec := clienttest.NewClient(t)
ctx := testutils.Context(t)
for _, blockNum := range tc.DBBlocks {
hash := common.BigToHash(big.NewInt(blockNum))
require.NoError(t, orm.InsertBlock(ctx, hash, blockNum, time.Now(), blockNum, blockNum))
}
if tc.Setup != nil {
tc.Setup(t, ec)
}
lpOpts := Opts{
PollPeriod: time.Second,
FinalityDepth: 3,
BackfillBatchSize: 3,
RPCBatchSize: 3,
KeepFinalizedBlocksDepth: 20,
BackupPollerBlockDelay: 0,
}
lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
blockAfterLCA, err := lp.findBlockAfterLCA(ctx, tc.CurrentBlockNumber, tc.DBLatestFinalized)
if tc.ExpectedError != nil {
require.ErrorContains(t, err, tc.ExpectedError.Error())
require.Nil(t, blockAfterLCA)
} else {
require.NoError(t, err)
require.Equal(t, tc.ExpectedHead, blockAfterLCA)
}
})
}
}

func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) {
lggr := logger.Test(b)
lpOpts := Opts{
Expand Down
Loading
Loading