From fbda561ee3c15a7f94e8f1e915dea3bc1c30495f Mon Sep 17 00:00:00 2001 From: Anjor Kanekar Date: Wed, 18 Feb 2026 11:10:08 +0000 Subject: [PATCH 1/2] Add PDP config/ops layer - Add PDPConfig struct with batch size, gas cap, confirmation depth, and polling interval - Wire config into PDPTracker constructor and use polling interval in tracking loop - Add start/stop logging and proof set status change tracking - Add proof set count metrics per run - Add CLI flags: --pdp-batch-size (default 100), --pdp-confirmation-depth (default 5), --pdp-poll-interval (default 30s) - Add config validation with tests - Update all PDPTracker call sites to use new config This is independent config/ops layer work that doesn't touch pdpclient.go integration. --- cmd/run/pdptracker.go | 31 ++++++++++++++++---- service/pdptracker/config.go | 33 +++++++++++++++++++++ service/pdptracker/config_test.go | 41 +++++++++++++++++++++++++++ service/pdptracker/pdptracker.go | 28 ++++++++++++++---- service/pdptracker/pdptracker_test.go | 8 +++--- 5 files changed, 126 insertions(+), 15 deletions(-) create mode 100644 service/pdptracker/config.go create mode 100644 service/pdptracker/config_test.go diff --git a/cmd/run/pdptracker.go b/cmd/run/pdptracker.go index c2055595..995b0247 100644 --- a/cmd/run/pdptracker.go +++ b/cmd/run/pdptracker.go @@ -22,17 +22,27 @@ This tracker: - Updates deal status based on on-chain proof set state - Tracks challenge epochs and live status`, Flags: []cli.Flag{ - &cli.DurationFlag{ - Name: "interval", - Usage: "How often to check for PDP deal updates", - Value: 10 * time.Minute, - }, &cli.StringFlag{ Name: "eth-rpc", Usage: "Ethereum RPC endpoint for FEVM (e.g., https://api.node.glif.io)", EnvVars: []string{"ETH_RPC_URL"}, Required: true, }, + &cli.IntFlag{ + Name: "pdp-batch-size", + Usage: "Max pieces per AddRoots transaction", + Value: 100, + }, + &cli.Uint64Flag{ + Name: "pdp-confirmation-depth", + Usage: "Blocks to wait for PDP transaction confirmation", + Value: 5, + }, + &cli.DurationFlag{ + Name: "pdp-poll-interval", + Usage: "Polling interval for PDP transaction status", + Value: 30 * time.Second, + }, }, Action: func(c *cli.Context) error { rpcURL := c.String("eth-rpc") @@ -52,9 +62,18 @@ This tracker: } defer pdpClient.Close() + cfg := pdptracker.PDPConfig{ + BatchSize: c.Int("pdp-batch-size"), + ConfirmationDepth: c.Uint64("pdp-confirmation-depth"), + PollingInterval: c.Duration("pdp-poll-interval"), + } + if err := cfg.Validate(); err != nil { + return err + } + tracker := pdptracker.NewPDPTracker( db, - c.Duration("interval"), + cfg, rpcURL, pdpClient, false, diff --git a/service/pdptracker/config.go b/service/pdptracker/config.go new file mode 100644 index 00000000..570e4092 --- /dev/null +++ b/service/pdptracker/config.go @@ -0,0 +1,33 @@ +package pdptracker + +import ( + "math/big" + "time" + + "github.com/cockroachdb/errors" +) + +// PDPConfig configures the PDP tracker operations layer. +type PDPConfig struct { + BatchSize int + GasCap *big.Int + ConfirmationDepth uint64 + PollingInterval time.Duration +} + +// Validate ensures the PDPConfig values are sane. +func (c PDPConfig) Validate() error { + if c.BatchSize <= 0 { + return errors.New("pdp batch size must be greater than 0") + } + if c.GasCap != nil && c.GasCap.Sign() <= 0 { + return errors.New("pdp gas cap must be greater than 0") + } + if c.ConfirmationDepth == 0 { + return errors.New("pdp confirmation depth must be greater than 0") + } + if c.PollingInterval <= 0 { + return errors.New("pdp polling interval must be greater than 0") + } + return nil +} diff --git a/service/pdptracker/config_test.go b/service/pdptracker/config_test.go new file mode 100644 index 00000000..829782fd --- /dev/null +++ b/service/pdptracker/config_test.go @@ -0,0 +1,41 @@ +package pdptracker + +import ( + "math/big" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestPDPConfigValidate(t *testing.T) { + t.Run("valid", func(t *testing.T) { + cfg := PDPConfig{ + BatchSize: 1, + GasCap: big.NewInt(1), + ConfirmationDepth: 1, + PollingInterval: time.Second, + } + require.NoError(t, cfg.Validate()) + }) + + t.Run("invalid batch size", func(t *testing.T) { + cfg := PDPConfig{BatchSize: 0, ConfirmationDepth: 1, PollingInterval: time.Second} + require.Error(t, cfg.Validate()) + }) + + t.Run("invalid gas cap", func(t *testing.T) { + cfg := PDPConfig{BatchSize: 1, GasCap: big.NewInt(0), ConfirmationDepth: 1, PollingInterval: time.Second} + require.Error(t, cfg.Validate()) + }) + + t.Run("invalid confirmation depth", func(t *testing.T) { + cfg := PDPConfig{BatchSize: 1, ConfirmationDepth: 0, PollingInterval: time.Second} + require.Error(t, cfg.Validate()) + }) + + t.Run("invalid polling interval", func(t *testing.T) { + cfg := PDPConfig{BatchSize: 1, ConfirmationDepth: 1, PollingInterval: 0} + require.Error(t, cfg.Validate()) + }) +} diff --git a/service/pdptracker/pdptracker.go b/service/pdptracker/pdptracker.go index 216242c9..f318037a 100644 --- a/service/pdptracker/pdptracker.go +++ b/service/pdptracker/pdptracker.go @@ -63,7 +63,7 @@ type PDPBulkClient interface { type PDPTracker struct { workerID uuid.UUID dbNoContext *gorm.DB - interval time.Duration + config PDPConfig pdpClient PDPClient rpcURL string once bool @@ -79,7 +79,7 @@ type PDPTracker struct { // - once: If true, run only once instead of continuously func NewPDPTracker( db *gorm.DB, - interval time.Duration, + config PDPConfig, rpcURL string, pdpClient PDPClient, once bool, @@ -87,7 +87,7 @@ func NewPDPTracker( return PDPTracker{ workerID: uuid.New(), dbNoContext: db, - interval: interval, + config: config, rpcURL: rpcURL, pdpClient: pdpClient, once: once, @@ -100,6 +100,12 @@ func (*PDPTracker) Name() string { // Start begins the PDP tracker service. func (p *PDPTracker) Start(ctx context.Context, exitErr chan<- error) error { + Logger.Infow("PDP tracker started", + "batchSize", p.config.BatchSize, + "confirmationDepth", p.config.ConfirmationDepth, + "pollInterval", p.config.PollingInterval, + "gasCap", p.config.GasCap, + ) var regTimer *time.Timer for { alreadyRunning, err := healthcheck.Register(ctx, p.dbNoContext, p.workerID, model.PDPTracker, false) @@ -158,10 +164,10 @@ func (p *PDPTracker) Start(ctx context.Context, exitErr chan<- error) error { break } if timer == nil { - timer = time.NewTimer(p.interval) + timer = time.NewTimer(p.config.PollingInterval) defer timer.Stop() } else { - timer.Reset(p.interval) + timer.Reset(p.config.PollingInterval) } var stopped bool @@ -193,6 +199,7 @@ func (p *PDPTracker) Start(ctx context.Context, exitErr chan<- error) error { if exitErr != nil { exitErr <- runErr } + Logger.Info("PDP tracker stopped") }() return nil @@ -218,6 +225,7 @@ func (p *PDPTracker) runOnce(ctx context.Context) error { now := time.Now() var updated, inserted int64 + trackedProofSets := make(map[uint64]struct{}) processProofSet := func(wallet model.Wallet, ps ProofSetInfo) { for _, pieceCID := range ps.PieceCIDs { @@ -240,6 +248,13 @@ func (p *PDPTracker) runOnce(ctx context.Context) error { "state": p.getPDPDealState(ps), "last_verified_at": now, } + if existingDeal.ProofSetLive == nil || *existingDeal.ProofSetLive != ps.IsLive { + Logger.Infow("PDP proof set status changed", + "proofSetID", ps.ProofSetID, + "previousLive", existingDeal.ProofSetLive, + "currentLive", ps.IsLive, + ) + } err = database.DoRetry(ctx, func() error { return db.Model(&model.Deal{}).Where("id = ?", existingDeal.ID).Updates(updates).Error }) @@ -296,6 +311,7 @@ func (p *PDPTracker) runOnce(ctx context.Context) error { return errors.Wrap(err, "failed to get PDP proof sets") } for _, ps := range proofSets { + trackedProofSets[ps.ProofSetID] = struct{}{} for _, wallet := range walletsByAddress[ps.ClientAddress.String()] { processProofSet(wallet, ps) } @@ -317,12 +333,14 @@ func (p *PDPTracker) runOnce(ctx context.Context) error { } for _, ps := range proofSets { + trackedProofSets[ps.ProofSetID] = struct{}{} processProofSet(wallet, ps) } } } Logger.Infof("PDP tracker: updated %d deals, inserted %d deals", updated, inserted) + Logger.Infof("PDP tracker: tracked %d proof sets", len(trackedProofSets)) return nil } diff --git a/service/pdptracker/pdptracker_test.go b/service/pdptracker/pdptracker_test.go index 6328c8c2..bda0c0bf 100644 --- a/service/pdptracker/pdptracker_test.go +++ b/service/pdptracker/pdptracker_test.go @@ -44,7 +44,7 @@ func (m *mockPDPClient) GetNextChallengeEpoch(_ context.Context, _ uint64) (int3 } func TestPDPTracker_Name(t *testing.T) { - tracker := NewPDPTracker(nil, time.Minute, "", nil, true) + tracker := NewPDPTracker(nil, PDPConfig{BatchSize: 1, ConfirmationDepth: 1, PollingInterval: time.Minute}, "", nil, true) require.Equal(t, "PDPTracker", tracker.Name()) } @@ -84,7 +84,7 @@ func TestPDPTracker_RunOnce_UpsertByParsedPieceCID(t *testing.T) { }, } - tracker := NewPDPTracker(db, time.Minute, "", client, true) + tracker := NewPDPTracker(db, PDPConfig{BatchSize: 1, ConfirmationDepth: 1, PollingInterval: time.Minute}, "", client, true) require.NoError(t, tracker.runOnce(ctx)) var first model.Deal @@ -148,7 +148,7 @@ func TestPDPTracker_RunOnce_SkipsInvalidPieceCID(t *testing.T) { }, }, } - tracker := NewPDPTracker(db, time.Minute, "", client, true) + tracker := NewPDPTracker(db, PDPConfig{BatchSize: 1, ConfirmationDepth: 1, PollingInterval: time.Minute}, "", client, true) require.NoError(t, tracker.runOnce(ctx)) var count int64 @@ -194,7 +194,7 @@ func TestPDPTracker_RunOnce_UsesBulkFetchWhenAvailable(t *testing.T) { }, } - tracker := NewPDPTracker(db, time.Minute, "", client, true) + tracker := NewPDPTracker(db, PDPConfig{BatchSize: 1, ConfirmationDepth: 1, PollingInterval: time.Minute}, "", client, true) require.NoError(t, tracker.runOnce(ctx)) require.Equal(t, 1, client.bulkCalls) }) From d47131acf2ea4b181850f7d4748fcbc90ae9c659 Mon Sep 17 00:00:00 2001 From: anjor Date: Wed, 18 Feb 2026 14:56:07 +0000 Subject: [PATCH 2/2] Trim PDP tracker config to active polling settings --- cmd/run/pdptracker.go | 21 ++++----------------- service/pdptracker/config.go | 15 +-------------- service/pdptracker/config_test.go | 23 ++--------------------- service/pdptracker/pdptracker.go | 12 ++---------- service/pdptracker/pdptracker_test.go | 8 ++++---- 5 files changed, 13 insertions(+), 66 deletions(-) diff --git a/cmd/run/pdptracker.go b/cmd/run/pdptracker.go index 995b0247..5d8cfffb 100644 --- a/cmd/run/pdptracker.go +++ b/cmd/run/pdptracker.go @@ -23,21 +23,11 @@ This tracker: - Tracks challenge epochs and live status`, Flags: []cli.Flag{ &cli.StringFlag{ - Name: "eth-rpc", - Usage: "Ethereum RPC endpoint for FEVM (e.g., https://api.node.glif.io)", - EnvVars: []string{"ETH_RPC_URL"}, + Name: "eth-rpc", + Usage: "Ethereum RPC endpoint for FEVM (e.g., https://api.node.glif.io)", + EnvVars: []string{"ETH_RPC_URL"}, Required: true, }, - &cli.IntFlag{ - Name: "pdp-batch-size", - Usage: "Max pieces per AddRoots transaction", - Value: 100, - }, - &cli.Uint64Flag{ - Name: "pdp-confirmation-depth", - Usage: "Blocks to wait for PDP transaction confirmation", - Value: 5, - }, &cli.DurationFlag{ Name: "pdp-poll-interval", Usage: "Polling interval for PDP transaction status", @@ -63,9 +53,7 @@ This tracker: defer pdpClient.Close() cfg := pdptracker.PDPConfig{ - BatchSize: c.Int("pdp-batch-size"), - ConfirmationDepth: c.Uint64("pdp-confirmation-depth"), - PollingInterval: c.Duration("pdp-poll-interval"), + PollingInterval: c.Duration("pdp-poll-interval"), } if err := cfg.Validate(); err != nil { return err @@ -74,7 +62,6 @@ This tracker: tracker := pdptracker.NewPDPTracker( db, cfg, - rpcURL, pdpClient, false, ) diff --git a/service/pdptracker/config.go b/service/pdptracker/config.go index 570e4092..7d411514 100644 --- a/service/pdptracker/config.go +++ b/service/pdptracker/config.go @@ -1,7 +1,6 @@ package pdptracker import ( - "math/big" "time" "github.com/cockroachdb/errors" @@ -9,23 +8,11 @@ import ( // PDPConfig configures the PDP tracker operations layer. type PDPConfig struct { - BatchSize int - GasCap *big.Int - ConfirmationDepth uint64 - PollingInterval time.Duration + PollingInterval time.Duration } // Validate ensures the PDPConfig values are sane. func (c PDPConfig) Validate() error { - if c.BatchSize <= 0 { - return errors.New("pdp batch size must be greater than 0") - } - if c.GasCap != nil && c.GasCap.Sign() <= 0 { - return errors.New("pdp gas cap must be greater than 0") - } - if c.ConfirmationDepth == 0 { - return errors.New("pdp confirmation depth must be greater than 0") - } if c.PollingInterval <= 0 { return errors.New("pdp polling interval must be greater than 0") } diff --git a/service/pdptracker/config_test.go b/service/pdptracker/config_test.go index 829782fd..c6a10445 100644 --- a/service/pdptracker/config_test.go +++ b/service/pdptracker/config_test.go @@ -1,7 +1,6 @@ package pdptracker import ( - "math/big" "testing" "time" @@ -11,31 +10,13 @@ import ( func TestPDPConfigValidate(t *testing.T) { t.Run("valid", func(t *testing.T) { cfg := PDPConfig{ - BatchSize: 1, - GasCap: big.NewInt(1), - ConfirmationDepth: 1, - PollingInterval: time.Second, + PollingInterval: time.Second, } require.NoError(t, cfg.Validate()) }) - t.Run("invalid batch size", func(t *testing.T) { - cfg := PDPConfig{BatchSize: 0, ConfirmationDepth: 1, PollingInterval: time.Second} - require.Error(t, cfg.Validate()) - }) - - t.Run("invalid gas cap", func(t *testing.T) { - cfg := PDPConfig{BatchSize: 1, GasCap: big.NewInt(0), ConfirmationDepth: 1, PollingInterval: time.Second} - require.Error(t, cfg.Validate()) - }) - - t.Run("invalid confirmation depth", func(t *testing.T) { - cfg := PDPConfig{BatchSize: 1, ConfirmationDepth: 0, PollingInterval: time.Second} - require.Error(t, cfg.Validate()) - }) - t.Run("invalid polling interval", func(t *testing.T) { - cfg := PDPConfig{BatchSize: 1, ConfirmationDepth: 1, PollingInterval: 0} + cfg := PDPConfig{PollingInterval: 0} require.Error(t, cfg.Validate()) }) } diff --git a/service/pdptracker/pdptracker.go b/service/pdptracker/pdptracker.go index f318037a..8065966c 100644 --- a/service/pdptracker/pdptracker.go +++ b/service/pdptracker/pdptracker.go @@ -3,7 +3,6 @@ // // PDP deals use proof sets managed through the PDPVerifier contract, where data is verified // through cryptographic challenges rather than the traditional sector sealing process. -// package pdptracker import ( @@ -65,7 +64,6 @@ type PDPTracker struct { dbNoContext *gorm.DB config PDPConfig pdpClient PDPClient - rpcURL string once bool } @@ -73,14 +71,12 @@ type PDPTracker struct { // // Parameters: // - db: Database connection for storing deal information -// - interval: How often to check for updates -// - rpcURL: Filecoin RPC endpoint URL -// - pdpClient: Client for interacting with PDP contracts (can be nil to disable tracking) +// - config: Tracker runtime configuration +// - pdpClient: Client for interacting with PDP contracts // - once: If true, run only once instead of continuously func NewPDPTracker( db *gorm.DB, config PDPConfig, - rpcURL string, pdpClient PDPClient, once bool, ) PDPTracker { @@ -88,7 +84,6 @@ func NewPDPTracker( workerID: uuid.New(), dbNoContext: db, config: config, - rpcURL: rpcURL, pdpClient: pdpClient, once: once, } @@ -101,10 +96,7 @@ func (*PDPTracker) Name() string { // Start begins the PDP tracker service. func (p *PDPTracker) Start(ctx context.Context, exitErr chan<- error) error { Logger.Infow("PDP tracker started", - "batchSize", p.config.BatchSize, - "confirmationDepth", p.config.ConfirmationDepth, "pollInterval", p.config.PollingInterval, - "gasCap", p.config.GasCap, ) var regTimer *time.Timer for { diff --git a/service/pdptracker/pdptracker_test.go b/service/pdptracker/pdptracker_test.go index bda0c0bf..31e7b221 100644 --- a/service/pdptracker/pdptracker_test.go +++ b/service/pdptracker/pdptracker_test.go @@ -44,7 +44,7 @@ func (m *mockPDPClient) GetNextChallengeEpoch(_ context.Context, _ uint64) (int3 } func TestPDPTracker_Name(t *testing.T) { - tracker := NewPDPTracker(nil, PDPConfig{BatchSize: 1, ConfirmationDepth: 1, PollingInterval: time.Minute}, "", nil, true) + tracker := NewPDPTracker(nil, PDPConfig{PollingInterval: time.Minute}, nil, true) require.Equal(t, "PDPTracker", tracker.Name()) } @@ -84,7 +84,7 @@ func TestPDPTracker_RunOnce_UpsertByParsedPieceCID(t *testing.T) { }, } - tracker := NewPDPTracker(db, PDPConfig{BatchSize: 1, ConfirmationDepth: 1, PollingInterval: time.Minute}, "", client, true) + tracker := NewPDPTracker(db, PDPConfig{PollingInterval: time.Minute}, client, true) require.NoError(t, tracker.runOnce(ctx)) var first model.Deal @@ -148,7 +148,7 @@ func TestPDPTracker_RunOnce_SkipsInvalidPieceCID(t *testing.T) { }, }, } - tracker := NewPDPTracker(db, PDPConfig{BatchSize: 1, ConfirmationDepth: 1, PollingInterval: time.Minute}, "", client, true) + tracker := NewPDPTracker(db, PDPConfig{PollingInterval: time.Minute}, client, true) require.NoError(t, tracker.runOnce(ctx)) var count int64 @@ -194,7 +194,7 @@ func TestPDPTracker_RunOnce_UsesBulkFetchWhenAvailable(t *testing.T) { }, } - tracker := NewPDPTracker(db, PDPConfig{BatchSize: 1, ConfirmationDepth: 1, PollingInterval: time.Minute}, "", client, true) + tracker := NewPDPTracker(db, PDPConfig{PollingInterval: time.Minute}, client, true) require.NoError(t, tracker.runOnce(ctx)) require.Equal(t, 1, client.bulkCalls) })