diff --git a/internal/worker/base.go b/internal/worker/base.go index 8707322..1fe772d 100644 --- a/internal/worker/base.go +++ b/internal/worker/base.go @@ -90,7 +90,9 @@ func (bw *BaseWorker) run(job func() error) { start := time.Now() // Use Exponential retry for the job - if err := retry.Exponential(job, retry.ExponentialConfig{ + if err := retry.Exponential(func() error { + return bw.executeRecoverable("worker job", job) + }, retry.ExponentialConfig{ InitialInterval: retryInterval, MaxElapsedTime: bw.config.PollInterval * 4, OnRetry: func(err error, next time.Duration) { diff --git a/internal/worker/catchup.go b/internal/worker/catchup.go index 1a7857e..4435f8e 100644 --- a/internal/worker/catchup.go +++ b/internal/worker/catchup.go @@ -19,6 +19,7 @@ const ( MAX_RANGE_SIZE = 20 CATCHUP_WORKERS = 3 // Number of parallel workers PROGRESS_SAVE_INTERVAL = 1 // Save progress every N batches + catchupPanicRetryDelay = time.Second ) type CatchupWorker struct { @@ -69,7 +70,7 @@ func (cw *CatchupWorker) Start() { "total_blocks", totalBlocks, "parallel_workers", CATCHUP_WORKERS, ) - go cw.runCatchup() + cw.goWithRecovery("catchup loop", cw.runCatchup) } // runCatchup is a tight loop that processes catchup ranges without PollInterval delays. @@ -83,9 +84,17 @@ func (cw *CatchupWorker) runCatchup() { default: } - if err := cw.processCatchupBlocksParallel(); err != nil { + err := cw.executeRecoverable("catchup pass", cw.processCatchupBlocksParallel) + if err != nil { cw.logger.Error("Catchup job error", "err", err) _ = cw.emitter.EmitError(cw.chain.GetName(), err) + if _, ok := err.(*recoveredPanicError); ok { + select { + case <-cw.ctx.Done(): + return + case <-time.After(catchupPanicRetryDelay): + } + } continue } @@ -206,6 +215,7 @@ func (cw *CatchupWorker) processCatchupBlocksParallel() error { wg.Add(1) go func(workerID int) { defer wg.Done() + defer cw.recoverPanic(fmt.Sprintf("catchup range worker %d", workerID)) cw.logger.Debug("Starting catchup worker", "worker_id", workerID) for r := range rangeChan { diff --git a/internal/worker/manual.go b/internal/worker/manual.go index e7a443c..b9756b3 100644 --- a/internal/worker/manual.go +++ b/internal/worker/manual.go @@ -63,7 +63,7 @@ func (mw *ManualWorker) Start() { mw.logger.Info("Starting manual worker", "chain", mw.chain.GetName()) // Periodic metrics - go func() { + mw.goWithRecovery("manual metrics", func() { ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() for { @@ -74,9 +74,9 @@ func (mw *ManualWorker) Start() { mw.logMissingRangesMetric() } } - }() + }) - go mw.loop() + mw.goWithRecovery("manual loop", mw.loop) } func (mw *ManualWorker) loop() { diff --git a/internal/worker/recovery.go b/internal/worker/recovery.go new file mode 100644 index 0000000..893ebd7 --- /dev/null +++ b/internal/worker/recovery.go @@ -0,0 +1,51 @@ +package worker + +import ( + "fmt" + "runtime/debug" +) + +type recoveredPanicError struct { + task string + recovered any +} + +func (e *recoveredPanicError) Error() string { + return fmt.Sprintf("%s panic: %v", e.task, e.recovered) +} + +func (bw *BaseWorker) executeRecoverable(task string, fn func() error) (err error) { + defer bw.recoverPanicAsError(task, &err) + return fn() +} + +func (bw *BaseWorker) goWithRecovery(task string, fn func()) { + go func() { + defer bw.recoverPanic(task) + fn() + }() +} + +func (bw *BaseWorker) recoverPanic(task string) { + if recovered := recover(); recovered != nil { + bw.logger.Error("Recovered panic", + "task", task, + "panic", recovered, + "stack", string(debug.Stack()), + ) + } +} + +func (bw *BaseWorker) recoverPanicAsError(task string, errp *error) { + if recovered := recover(); recovered != nil { + bw.logger.Error("Recovered panic", + "task", task, + "panic", recovered, + "stack", string(debug.Stack()), + ) + *errp = &recoveredPanicError{ + task: task, + recovered: recovered, + } + } +} diff --git a/internal/worker/regular.go b/internal/worker/regular.go index 5e84aef..eb23dde 100644 --- a/internal/worker/regular.go +++ b/internal/worker/regular.go @@ -2,6 +2,7 @@ package worker import ( "context" + "errors" "fmt" "time" @@ -16,9 +17,13 @@ import ( ) const ( - MaxBlockHashSize = 20 + MaxBlockHashSize = 20 + regularGapRetryAttempts = 2 + regularGapRetryDelay = time.Second ) +var errRegularRecoveryReorgHandled = errors.New("regular recovery reorg handled") + type RegularWorker struct { *BaseWorker currentBlock uint64 @@ -141,35 +146,23 @@ func (rw *RegularWorker) processRegularBlocks() error { lastSuccess := rw.currentBlock - 1 var lastSuccessHash string - - for i, res := range results { - if res.Block == nil || res.Error != nil { - continue - } - // cross-batch reorg detection - if rw.isReorgCheckRequired() { - if reorg, err := rw.detectAndHandleReorg(&res); err != nil { - return err - } else if reorg { - return nil // rollback done, stop this tick - } - } - // intra-batch continuity - if i > 0 && rw.isReorgCheckRequired() { - if !checkContinuity(results[i-1], res) { - rw.logger.Warn("Batch continuity broken, will retry next tick", - "prev", results[i-1].Block.Number, - "prev_hash", results[i-1].Block.Hash, - "curr", res.Block.Number, - "curr_parent", res.Block.ParentHash, - ) - break + var ( + processErr error + stopTick bool + ) + if rw.isReorgCheckRequired() { + stopTick, processErr = rw.processReorgCheckedBatch(results, end, &lastSuccess, &lastSuccessHash) + } else { + for _, res := range results { + if rw.handleBlockResult(res) { + lastSuccess = res.Number + lastSuccessHash = res.Block.Hash } } - if rw.handleBlockResult(res) { - lastSuccess = res.Number - lastSuccessHash = res.Block.Hash - } + } + + if stopTick { + return nil } if lastSuccess >= rw.currentBlock { @@ -190,7 +183,7 @@ func (rw *RegularWorker) processRegularBlocks() error { "expected", originalEnd-originalStart+1, "got", len(results), ) - return nil + return processErr } func (rw *RegularWorker) determineStartingBlock() uint64 { @@ -315,10 +308,205 @@ func (rw *RegularWorker) clearBlockHashes() { rw.blockHashes = rw.blockHashes[:0] // Clear slice but keep capacity } +func (rw *RegularWorker) processReorgCheckedBatch( + results []indexer.BlockResult, + end uint64, + lastSuccess *uint64, + lastSuccessHash *string, +) (bool, error) { + expected := rw.currentBlock + + for _, res := range results { + if expected > end { + break + } + + if res.Number != expected { + rw.logger.Warn("Batch result out of order, switching to single-block recovery", + "expected", expected, + "actual", res.Number, + ) + return rw.recoverRegularGap(expected, end, lastSuccess, lastSuccessHash) + } + + if res.Error != nil || res.Block == nil { + rw.logger.Warn("Batch result unresolved, switching to single-block recovery", + "block", expected, + "batch_error", blockResultError(res), + ) + return rw.recoverRegularGap(expected, end, lastSuccess, lastSuccessHash) + } + + reorg, err := rw.detectAndHandleReorg(&res) + if err != nil { + return false, err + } + if reorg { + return true, nil + } + + if *lastSuccessHash != "" && !matchesParentHash(*lastSuccessHash, res.Block.ParentHash) { + rw.logger.Warn("Batch continuity broken, switching to single-block recovery", + "expected", expected, + "prev_hash", *lastSuccessHash, + "curr_parent", res.Block.ParentHash, + ) + return rw.recoverRegularGap(expected, end, lastSuccess, lastSuccessHash) + } + + if rw.handleBlockResult(res) { + *lastSuccess = res.Number + *lastSuccessHash = res.Block.Hash + expected = res.Number + 1 + } + } + + if expected <= end { + rw.logger.Warn("Batch returned incomplete results, switching to single-block recovery", + "expected", expected, + "end", end, + "received", len(results), + ) + return rw.recoverRegularGap(expected, end, lastSuccess, lastSuccessHash) + } + + return false, nil +} + +func (rw *RegularWorker) recoverRegularGap( + start, end uint64, + lastSuccess *uint64, + lastSuccessHash *string, +) (bool, error) { + rw.logger.Warn("Recovering regular gap with failover-aware single-block fetch", + "start", start, + "end", end, + "attempts", regularGapRetryAttempts, + ) + + for blockNumber := start; blockNumber <= end; blockNumber++ { + if err := rw.recoverRegularBlock(blockNumber, lastSuccess, lastSuccessHash); err != nil { + if errors.Is(err, errRegularRecoveryReorgHandled) { + return true, nil + } + + rw.handleBlockResult(indexer.BlockResult{ + Number: blockNumber, + Error: &indexer.Error{ + ErrorType: indexer.ErrorTypeUnknown, + Message: err.Error(), + }, + }) + return false, fmt.Errorf("recover block %d: %w", blockNumber, err) + } + } + + return false, nil +} + +func (rw *RegularWorker) recoverRegularBlock( + blockNumber uint64, + lastSuccess *uint64, + lastSuccessHash *string, +) error { + var lastErr error + + for attempt := 1; attempt <= regularGapRetryAttempts; attempt++ { + res, err := rw.fetchRegularBlock(blockNumber) + if err == nil { + reorg, reorgErr := rw.detectAndHandleReorg(&res) + if reorgErr != nil { + return reorgErr + } + if reorg { + return errRegularRecoveryReorgHandled + } + + if *lastSuccessHash != "" && !matchesParentHash(*lastSuccessHash, res.Block.ParentHash) { + err = fmt.Errorf( + "continuity mismatch for block %d: expected parent %s, got %s", + blockNumber, + *lastSuccessHash, + res.Block.ParentHash, + ) + } else if rw.handleBlockResult(res) { + *lastSuccess = res.Number + *lastSuccessHash = res.Block.Hash + return nil + } else { + err = fmt.Errorf("failed to process recovered block %d", blockNumber) + } + } + + lastErr = err + rw.logger.Warn("Single-block recovery failed", + "block", blockNumber, + "attempt", attempt, + "max_attempts", regularGapRetryAttempts, + "error", err, + ) + + if attempt < regularGapRetryAttempts { + select { + case <-rw.ctx.Done(): + return rw.ctx.Err() + case <-time.After(regularGapRetryDelay): + } + } + } + + return lastErr +} + +func (rw *RegularWorker) fetchRegularBlock(blockNumber uint64) (indexer.BlockResult, error) { + block, err := rw.chain.GetBlock(rw.ctx, blockNumber) + if err != nil { + return indexer.BlockResult{Number: blockNumber}, err + } + if block == nil { + return indexer.BlockResult{Number: blockNumber}, fmt.Errorf("nil block result for %d", blockNumber) + } + return indexer.BlockResult{ + Number: blockNumber, + Block: block, + }, nil +} + func checkContinuity(prev, curr indexer.BlockResult) bool { + if prev.Error != nil || curr.Error != nil || prev.Block == nil || curr.Block == nil { + return false + } return prev.Block.Hash == curr.Block.ParentHash } +func blockResultNumber(res indexer.BlockResult) uint64 { + if res.Block != nil { + return res.Block.Number + } + return res.Number +} + +func blockResultHash(res indexer.BlockResult) string { + if res.Block != nil { + return res.Block.Hash + } + return "" +} + +func blockResultError(res indexer.BlockResult) string { + if res.Error != nil { + return res.Error.Message + } + if res.Block == nil { + return "nil block" + } + return "" +} + +func matchesParentHash(prevHash, parentHash string) bool { + return prevHash == parentHash +} + // splitCatchupRange splits a large catchup range into smaller, manageable chunks func splitCatchupRange(r blockstore.CatchupRange, maxSize uint64) []blockstore.CatchupRange { if r.End <= r.Start { diff --git a/internal/worker/regular_test.go b/internal/worker/regular_test.go new file mode 100644 index 0000000..f6f04d6 --- /dev/null +++ b/internal/worker/regular_test.go @@ -0,0 +1,269 @@ +package worker + +import ( + "context" + "errors" + "io" + "log/slog" + "testing" + "time" + + "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/common/enum" + "github.com/fystack/multichain-indexer/pkg/common/types" + "github.com/fystack/multichain-indexer/pkg/store/blockstore" + "github.com/stretchr/testify/require" +) + +func TestRegularWorkerProcessRegularBlocksRecoversGapViaGetBlock(t *testing.T) { + t.Parallel() + + chain := &stubIndexer{ + name: "ethereum", + internalCode: "eth", + networkType: enum.NetworkTypeEVM, + latest: 102, + getBlocksFunc: func(context.Context, uint64, uint64, bool) ([]indexer.BlockResult, error) { + return []indexer.BlockResult{ + { + Number: 100, + Block: &types.Block{ + Number: 100, + Hash: "0x100", + ParentHash: "0x099", + }, + }, + { + Number: 101, + Error: &indexer.Error{ErrorType: indexer.ErrorTypeUnknown, Message: "rpc timeout"}, + }, + { + Number: 102, + Block: &types.Block{ + Number: 102, + Hash: "0x102", + ParentHash: "0x101", + }, + }, + }, nil + }, + getBlockFunc: func(_ context.Context, number uint64) (*types.Block, error) { + switch number { + case 101: + return &types.Block{ + Number: 101, + Hash: "0x101", + ParentHash: "0x100", + }, nil + case 102: + return &types.Block{ + Number: 102, + Hash: "0x102", + ParentHash: "0x101", + }, nil + default: + return nil, errors.New("unexpected block") + } + }, + } + store := &stubBlockStore{} + rw := newTestRegularWorker(chain, store, 100, 3) + + err := rw.processRegularBlocks() + require.NoError(t, err) + require.Equal(t, uint64(103), rw.currentBlock) + require.Equal(t, []uint64{102}, store.savedLatest) + require.Empty(t, store.failedBlocks) + require.Equal(t, []uint64{101, 102}, chain.getBlockCalls) +} + +func TestRegularWorkerProcessRegularBlocksMarksUnresolvedGapFailed(t *testing.T) { + t.Parallel() + + chain := &stubIndexer{ + name: "ethereum", + internalCode: "eth", + networkType: enum.NetworkTypeEVM, + latest: 101, + getBlocksFunc: func(context.Context, uint64, uint64, bool) ([]indexer.BlockResult, error) { + return []indexer.BlockResult{ + { + Number: 100, + Error: &indexer.Error{ErrorType: indexer.ErrorTypeUnknown, Message: "quota exceeded"}, + }, + { + Number: 101, + Block: &types.Block{ + Number: 101, + Hash: "0x101", + ParentHash: "0x100", + }, + }, + }, nil + }, + getBlockFunc: func(context.Context, uint64) (*types.Block, error) { + return nil, errors.New("429 too many requests") + }, + } + store := &stubBlockStore{} + rw := newTestRegularWorker(chain, store, 100, 2) + + err := rw.processRegularBlocks() + require.Error(t, err) + require.Equal(t, uint64(100), rw.currentBlock) + require.Empty(t, store.savedLatest) + require.Equal(t, []uint64{100}, store.failedBlocks) + require.Equal(t, []uint64{100, 100}, chain.getBlockCalls) +} + +func TestCheckContinuityReturnsFalseForNilBlocks(t *testing.T) { + t.Parallel() + + require.False(t, checkContinuity(indexer.BlockResult{}, indexer.BlockResult{})) + require.False(t, checkContinuity( + indexer.BlockResult{Block: &types.Block{Hash: "0x100"}}, + indexer.BlockResult{}, + )) +} + +func TestBaseWorkerExecuteRecoverableConvertsPanicToError(t *testing.T) { + t.Parallel() + + bw := &BaseWorker{ + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + } + + err := bw.executeRecoverable("test panic", func() error { + panic("boom") + }) + require.Error(t, err) + + var panicErr *recoveredPanicError + require.ErrorAs(t, err, &panicErr) + require.Equal(t, "test panic panic: boom", err.Error()) +} + +func testChainConfig() config.ChainConfig { + return config.ChainConfig{ + PollInterval: time.Millisecond, + Throttle: config.Throttle{ + BatchSize: 2, + }, + } +} + +func newTestRegularWorker(chain *stubIndexer, store *stubBlockStore, currentBlock uint64, batchSize int) *RegularWorker { + cfg := testChainConfig() + cfg.Throttle.BatchSize = batchSize + + return &RegularWorker{ + BaseWorker: &BaseWorker{ + ctx: context.Background(), + cancel: func() {}, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + config: cfg, + chain: chain, + blockStore: store, + failedChan: make(chan FailedBlockEvent, 1), + }, + currentBlock: currentBlock, + blockHashes: make([]BlockHashEntry, 0, MaxBlockHashSize), + } +} + +type stubIndexer struct { + name string + internalCode string + networkType enum.NetworkType + latest uint64 + getBlocksFunc func(ctx context.Context, from, to uint64, isParallel bool) ([]indexer.BlockResult, error) + getBlockFunc func(ctx context.Context, number uint64) (*types.Block, error) + getBlockCalls []uint64 +} + +func (s *stubIndexer) GetName() string { + return s.name +} + +func (s *stubIndexer) GetNetworkType() enum.NetworkType { + return s.networkType +} + +func (s *stubIndexer) GetNetworkInternalCode() string { + return s.internalCode +} + +func (s *stubIndexer) GetLatestBlockNumber(context.Context) (uint64, error) { + return s.latest, nil +} + +func (s *stubIndexer) GetBlock(ctx context.Context, number uint64) (*types.Block, error) { + s.getBlockCalls = append(s.getBlockCalls, number) + if s.getBlockFunc != nil { + return s.getBlockFunc(ctx, number) + } + return nil, errors.New("not implemented") +} + +func (s *stubIndexer) GetBlocks(ctx context.Context, from, to uint64, isParallel bool) ([]indexer.BlockResult, error) { + if s.getBlocksFunc != nil { + return s.getBlocksFunc(ctx, from, to, isParallel) + } + return nil, errors.New("not implemented") +} + +func (s *stubIndexer) GetBlocksByNumbers(context.Context, []uint64) ([]indexer.BlockResult, error) { + return nil, errors.New("not implemented") +} + +func (s *stubIndexer) IsHealthy() bool { + return true +} + +type stubBlockStore struct { + savedLatest []uint64 + failedBlocks []uint64 +} + +func (s *stubBlockStore) GetLatestBlock(string) (uint64, error) { + return 0, errors.New("not found") +} + +func (s *stubBlockStore) SaveLatestBlock(_ string, blockNumber uint64) error { + s.savedLatest = append(s.savedLatest, blockNumber) + return nil +} + +func (s *stubBlockStore) GetFailedBlocks(string) ([]uint64, error) { + return s.failedBlocks, nil +} + +func (s *stubBlockStore) SaveFailedBlock(_ string, blockNumber uint64) error { + s.failedBlocks = append(s.failedBlocks, blockNumber) + return nil +} + +func (s *stubBlockStore) SaveFailedBlocks(string, []uint64) error { + return nil +} + +func (s *stubBlockStore) RemoveFailedBlocks(string, []uint64) error { + return nil +} + +func (s *stubBlockStore) SaveCatchupProgress(string, uint64, uint64, uint64) error { + return nil +} + +func (s *stubBlockStore) GetCatchupProgress(string) ([]blockstore.CatchupRange, error) { + return nil, nil +} + +func (s *stubBlockStore) DeleteCatchupRange(string, uint64, uint64) error { + return nil +} + +func (s *stubBlockStore) Close() error { + return nil +} diff --git a/internal/worker/rescanner.go b/internal/worker/rescanner.go index f829390..b74c5fa 100644 --- a/internal/worker/rescanner.go +++ b/internal/worker/rescanner.go @@ -74,17 +74,17 @@ func (rw *RescannerWorker) Start() { ) // listen failedChan - go func() { + rw.goWithRecovery("rescanner failed listener", func() { for evt := range rw.failedChan { rw.addFailedBlock(evt.Block, fmt.Sprintf("from failedChan attempt %d", evt.Attempt)) } - }() + }) // periodic rescan go rw.run(rw.processRescan) // periodic flush - go rw.periodicBatchFlush() + rw.goWithRecovery("rescanner flush loop", rw.periodicBatchFlush) } func (rw *RescannerWorker) Stop() {