Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/worker/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ func (bw *BaseWorker) run(job func() error) {
start := time.Now()

// Use Exponential retry for the job
if err := retry.Exponential(job, retry.ExponentialConfig{
if err := retry.Exponential(func() error {
return bw.executeRecoverable("worker job", job)
}, retry.ExponentialConfig{
InitialInterval: retryInterval,
MaxElapsedTime: bw.config.PollInterval * 4,
OnRetry: func(err error, next time.Duration) {
Expand Down
14 changes: 12 additions & 2 deletions internal/worker/catchup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
MAX_RANGE_SIZE = 20
CATCHUP_WORKERS = 3 // Number of parallel workers
PROGRESS_SAVE_INTERVAL = 1 // Save progress every N batches
catchupPanicRetryDelay = time.Second
)

type CatchupWorker struct {
Expand Down Expand Up @@ -69,7 +70,7 @@ func (cw *CatchupWorker) Start() {
"total_blocks", totalBlocks,
"parallel_workers", CATCHUP_WORKERS,
)
go cw.runCatchup()
cw.goWithRecovery("catchup loop", cw.runCatchup)
}

// runCatchup is a tight loop that processes catchup ranges without PollInterval delays.
Expand All @@ -83,9 +84,17 @@ func (cw *CatchupWorker) runCatchup() {
default:
}

if err := cw.processCatchupBlocksParallel(); err != nil {
err := cw.executeRecoverable("catchup pass", cw.processCatchupBlocksParallel)
if err != nil {
cw.logger.Error("Catchup job error", "err", err)
_ = cw.emitter.EmitError(cw.chain.GetName(), err)
if _, ok := err.(*recoveredPanicError); ok {
select {
case <-cw.ctx.Done():
return
case <-time.After(catchupPanicRetryDelay):
}
}
continue
}

Expand Down Expand Up @@ -206,6 +215,7 @@ func (cw *CatchupWorker) processCatchupBlocksParallel() error {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
defer cw.recoverPanic(fmt.Sprintf("catchup range worker %d", workerID))
cw.logger.Debug("Starting catchup worker", "worker_id", workerID)

for r := range rangeChan {
Expand Down
6 changes: 3 additions & 3 deletions internal/worker/manual.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (mw *ManualWorker) Start() {
mw.logger.Info("Starting manual worker", "chain", mw.chain.GetName())

// Periodic metrics
go func() {
mw.goWithRecovery("manual metrics", func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
Expand All @@ -74,9 +74,9 @@ func (mw *ManualWorker) Start() {
mw.logMissingRangesMetric()
}
}
}()
})

go mw.loop()
mw.goWithRecovery("manual loop", mw.loop)
}

func (mw *ManualWorker) loop() {
Expand Down
51 changes: 51 additions & 0 deletions internal/worker/recovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package worker

import (
"fmt"
"runtime/debug"
)

type recoveredPanicError struct {
task string
recovered any
}

func (e *recoveredPanicError) Error() string {
return fmt.Sprintf("%s panic: %v", e.task, e.recovered)
}

func (bw *BaseWorker) executeRecoverable(task string, fn func() error) (err error) {
defer bw.recoverPanicAsError(task, &err)
return fn()
}

func (bw *BaseWorker) goWithRecovery(task string, fn func()) {
go func() {
defer bw.recoverPanic(task)
fn()
}()
}

func (bw *BaseWorker) recoverPanic(task string) {
if recovered := recover(); recovered != nil {
bw.logger.Error("Recovered panic",
"task", task,
"panic", recovered,
"stack", string(debug.Stack()),
)
}
}

func (bw *BaseWorker) recoverPanicAsError(task string, errp *error) {
if recovered := recover(); recovered != nil {
bw.logger.Error("Recovered panic",
"task", task,
"panic", recovered,
"stack", string(debug.Stack()),
)
*errp = &recoveredPanicError{
task: task,
recovered: recovered,
}
}
}
Loading