diff --git a/cmd/run/pdptracker.go b/cmd/run/pdptracker.go index c2055595..5d8cfffb 100644 --- a/cmd/run/pdptracker.go +++ b/cmd/run/pdptracker.go @@ -22,17 +22,17 @@ This tracker: - Updates deal status based on on-chain proof set state - Tracks challenge epochs and live status`, Flags: []cli.Flag{ - &cli.DurationFlag{ - Name: "interval", - Usage: "How often to check for PDP deal updates", - Value: 10 * time.Minute, - }, &cli.StringFlag{ - Name: "eth-rpc", - Usage: "Ethereum RPC endpoint for FEVM (e.g., https://api.node.glif.io)", - EnvVars: []string{"ETH_RPC_URL"}, + Name: "eth-rpc", + Usage: "Ethereum RPC endpoint for FEVM (e.g., https://api.node.glif.io)", + EnvVars: []string{"ETH_RPC_URL"}, Required: true, }, + &cli.DurationFlag{ + Name: "pdp-poll-interval", + Usage: "Polling interval for PDP transaction status", + Value: 30 * time.Second, + }, }, Action: func(c *cli.Context) error { rpcURL := c.String("eth-rpc") @@ -52,10 +52,16 @@ This tracker: } defer pdpClient.Close() + cfg := pdptracker.PDPConfig{ + PollingInterval: c.Duration("pdp-poll-interval"), + } + if err := cfg.Validate(); err != nil { + return err + } + tracker := pdptracker.NewPDPTracker( db, - c.Duration("interval"), - rpcURL, + cfg, pdpClient, false, ) diff --git a/service/pdptracker/config.go b/service/pdptracker/config.go new file mode 100644 index 00000000..7d411514 --- /dev/null +++ b/service/pdptracker/config.go @@ -0,0 +1,20 @@ +package pdptracker + +import ( + "time" + + "github.com/cockroachdb/errors" +) + +// PDPConfig configures the PDP tracker operations layer. +type PDPConfig struct { + PollingInterval time.Duration +} + +// Validate ensures the PDPConfig values are sane. +func (c PDPConfig) Validate() error { + if c.PollingInterval <= 0 { + return errors.New("pdp polling interval must be greater than 0") + } + return nil +} diff --git a/service/pdptracker/config_test.go b/service/pdptracker/config_test.go new file mode 100644 index 00000000..c6a10445 --- /dev/null +++ b/service/pdptracker/config_test.go @@ -0,0 +1,22 @@ +package pdptracker + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestPDPConfigValidate(t *testing.T) { + t.Run("valid", func(t *testing.T) { + cfg := PDPConfig{ + PollingInterval: time.Second, + } + require.NoError(t, cfg.Validate()) + }) + + t.Run("invalid polling interval", func(t *testing.T) { + cfg := PDPConfig{PollingInterval: 0} + require.Error(t, cfg.Validate()) + }) +} diff --git a/service/pdptracker/pdptracker.go b/service/pdptracker/pdptracker.go index 216242c9..8065966c 100644 --- a/service/pdptracker/pdptracker.go +++ b/service/pdptracker/pdptracker.go @@ -3,7 +3,6 @@ // // PDP deals use proof sets managed through the PDPVerifier contract, where data is verified // through cryptographic challenges rather than the traditional sector sealing process. -// package pdptracker import ( @@ -63,9 +62,8 @@ type PDPBulkClient interface { type PDPTracker struct { workerID uuid.UUID dbNoContext *gorm.DB - interval time.Duration + config PDPConfig pdpClient PDPClient - rpcURL string once bool } @@ -73,22 +71,19 @@ type PDPTracker struct { // // Parameters: // - db: Database connection for storing deal information -// - interval: How often to check for updates -// - rpcURL: Filecoin RPC endpoint URL -// - pdpClient: Client for interacting with PDP contracts (can be nil to disable tracking) +// - config: Tracker runtime configuration +// - pdpClient: Client for interacting with PDP contracts // - once: If true, run only once instead of continuously func NewPDPTracker( db *gorm.DB, - interval time.Duration, - rpcURL string, + config PDPConfig, pdpClient PDPClient, once bool, ) PDPTracker { return PDPTracker{ workerID: uuid.New(), dbNoContext: db, - interval: interval, - rpcURL: rpcURL, + config: config, pdpClient: pdpClient, once: once, } @@ -100,6 +95,9 @@ func (*PDPTracker) Name() string { // Start begins the PDP tracker service. func (p *PDPTracker) Start(ctx context.Context, exitErr chan<- error) error { + Logger.Infow("PDP tracker started", + "pollInterval", p.config.PollingInterval, + ) var regTimer *time.Timer for { alreadyRunning, err := healthcheck.Register(ctx, p.dbNoContext, p.workerID, model.PDPTracker, false) @@ -158,10 +156,10 @@ func (p *PDPTracker) Start(ctx context.Context, exitErr chan<- error) error { break } if timer == nil { - timer = time.NewTimer(p.interval) + timer = time.NewTimer(p.config.PollingInterval) defer timer.Stop() } else { - timer.Reset(p.interval) + timer.Reset(p.config.PollingInterval) } var stopped bool @@ -193,6 +191,7 @@ func (p *PDPTracker) Start(ctx context.Context, exitErr chan<- error) error { if exitErr != nil { exitErr <- runErr } + Logger.Info("PDP tracker stopped") }() return nil @@ -218,6 +217,7 @@ func (p *PDPTracker) runOnce(ctx context.Context) error { now := time.Now() var updated, inserted int64 + trackedProofSets := make(map[uint64]struct{}) processProofSet := func(wallet model.Wallet, ps ProofSetInfo) { for _, pieceCID := range ps.PieceCIDs { @@ -240,6 +240,13 @@ func (p *PDPTracker) runOnce(ctx context.Context) error { "state": p.getPDPDealState(ps), "last_verified_at": now, } + if existingDeal.ProofSetLive == nil || *existingDeal.ProofSetLive != ps.IsLive { + Logger.Infow("PDP proof set status changed", + "proofSetID", ps.ProofSetID, + "previousLive", existingDeal.ProofSetLive, + "currentLive", ps.IsLive, + ) + } err = database.DoRetry(ctx, func() error { return db.Model(&model.Deal{}).Where("id = ?", existingDeal.ID).Updates(updates).Error }) @@ -296,6 +303,7 @@ func (p *PDPTracker) runOnce(ctx context.Context) error { return errors.Wrap(err, "failed to get PDP proof sets") } for _, ps := range proofSets { + trackedProofSets[ps.ProofSetID] = struct{}{} for _, wallet := range walletsByAddress[ps.ClientAddress.String()] { processProofSet(wallet, ps) } @@ -317,12 +325,14 @@ func (p *PDPTracker) runOnce(ctx context.Context) error { } for _, ps := range proofSets { + trackedProofSets[ps.ProofSetID] = struct{}{} processProofSet(wallet, ps) } } } Logger.Infof("PDP tracker: updated %d deals, inserted %d deals", updated, inserted) + Logger.Infof("PDP tracker: tracked %d proof sets", len(trackedProofSets)) return nil } diff --git a/service/pdptracker/pdptracker_test.go b/service/pdptracker/pdptracker_test.go index 6328c8c2..31e7b221 100644 --- a/service/pdptracker/pdptracker_test.go +++ b/service/pdptracker/pdptracker_test.go @@ -44,7 +44,7 @@ func (m *mockPDPClient) GetNextChallengeEpoch(_ context.Context, _ uint64) (int3 } func TestPDPTracker_Name(t *testing.T) { - tracker := NewPDPTracker(nil, time.Minute, "", nil, true) + tracker := NewPDPTracker(nil, PDPConfig{PollingInterval: time.Minute}, nil, true) require.Equal(t, "PDPTracker", tracker.Name()) } @@ -84,7 +84,7 @@ func TestPDPTracker_RunOnce_UpsertByParsedPieceCID(t *testing.T) { }, } - tracker := NewPDPTracker(db, time.Minute, "", client, true) + tracker := NewPDPTracker(db, PDPConfig{PollingInterval: time.Minute}, client, true) require.NoError(t, tracker.runOnce(ctx)) var first model.Deal @@ -148,7 +148,7 @@ func TestPDPTracker_RunOnce_SkipsInvalidPieceCID(t *testing.T) { }, }, } - tracker := NewPDPTracker(db, time.Minute, "", client, true) + tracker := NewPDPTracker(db, PDPConfig{PollingInterval: time.Minute}, client, true) require.NoError(t, tracker.runOnce(ctx)) var count int64 @@ -194,7 +194,7 @@ func TestPDPTracker_RunOnce_UsesBulkFetchWhenAvailable(t *testing.T) { }, } - tracker := NewPDPTracker(db, time.Minute, "", client, true) + tracker := NewPDPTracker(db, PDPConfig{PollingInterval: time.Minute}, client, true) require.NoError(t, tracker.runOnce(ctx)) require.Equal(t, 1, client.bulkCalls) })