Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions cmd/run/pdptracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ This tracker:
- Updates deal status based on on-chain proof set state
- Tracks challenge epochs and live status`,
Flags: []cli.Flag{
&cli.DurationFlag{
Name: "interval",
Usage: "How often to check for PDP deal updates",
Value: 10 * time.Minute,
},
&cli.StringFlag{
Name: "eth-rpc",
Usage: "Ethereum RPC endpoint for FEVM (e.g., https://api.node.glif.io)",
EnvVars: []string{"ETH_RPC_URL"},
Name: "eth-rpc",
Usage: "Ethereum RPC endpoint for FEVM (e.g., https://api.node.glif.io)",
EnvVars: []string{"ETH_RPC_URL"},
Required: true,
},
&cli.DurationFlag{
Name: "pdp-poll-interval",
Usage: "Polling interval for PDP transaction status",
Value: 30 * time.Second,
},
},
Action: func(c *cli.Context) error {
rpcURL := c.String("eth-rpc")
Expand All @@ -52,10 +52,16 @@ This tracker:
}
defer pdpClient.Close()

cfg := pdptracker.PDPConfig{
PollingInterval: c.Duration("pdp-poll-interval"),
}
if err := cfg.Validate(); err != nil {
return err
}

tracker := pdptracker.NewPDPTracker(
db,
c.Duration("interval"),
rpcURL,
cfg,
pdpClient,
false,
)
Expand Down
20 changes: 20 additions & 0 deletions service/pdptracker/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package pdptracker

import (
"time"

"github.com/cockroachdb/errors"
)

// PDPConfig configures the PDP tracker operations layer.
type PDPConfig struct {
PollingInterval time.Duration
}

// Validate ensures the PDPConfig values are sane.
func (c PDPConfig) Validate() error {
if c.PollingInterval <= 0 {
return errors.New("pdp polling interval must be greater than 0")
}
return nil
}
22 changes: 22 additions & 0 deletions service/pdptracker/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package pdptracker

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestPDPConfigValidate(t *testing.T) {
t.Run("valid", func(t *testing.T) {
cfg := PDPConfig{
PollingInterval: time.Second,
}
require.NoError(t, cfg.Validate())
})

t.Run("invalid polling interval", func(t *testing.T) {
cfg := PDPConfig{PollingInterval: 0}
require.Error(t, cfg.Validate())
})
}
34 changes: 22 additions & 12 deletions service/pdptracker/pdptracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//
// PDP deals use proof sets managed through the PDPVerifier contract, where data is verified
// through cryptographic challenges rather than the traditional sector sealing process.
//
package pdptracker

import (
Expand Down Expand Up @@ -63,32 +62,28 @@ type PDPBulkClient interface {
type PDPTracker struct {
workerID uuid.UUID
dbNoContext *gorm.DB
interval time.Duration
config PDPConfig
pdpClient PDPClient
rpcURL string
once bool
}

// NewPDPTracker creates a new PDP deal tracker.
//
// Parameters:
// - db: Database connection for storing deal information
// - interval: How often to check for updates
// - rpcURL: Filecoin RPC endpoint URL
// - pdpClient: Client for interacting with PDP contracts (can be nil to disable tracking)
// - config: Tracker runtime configuration
// - pdpClient: Client for interacting with PDP contracts
// - once: If true, run only once instead of continuously
func NewPDPTracker(
db *gorm.DB,
interval time.Duration,
rpcURL string,
config PDPConfig,
pdpClient PDPClient,
once bool,
) PDPTracker {
return PDPTracker{
workerID: uuid.New(),
dbNoContext: db,
interval: interval,
rpcURL: rpcURL,
config: config,
pdpClient: pdpClient,
once: once,
}
Expand All @@ -100,6 +95,9 @@ func (*PDPTracker) Name() string {

// Start begins the PDP tracker service.
func (p *PDPTracker) Start(ctx context.Context, exitErr chan<- error) error {
Logger.Infow("PDP tracker started",
"pollInterval", p.config.PollingInterval,
)
var regTimer *time.Timer
for {
alreadyRunning, err := healthcheck.Register(ctx, p.dbNoContext, p.workerID, model.PDPTracker, false)
Expand Down Expand Up @@ -158,10 +156,10 @@ func (p *PDPTracker) Start(ctx context.Context, exitErr chan<- error) error {
break
}
if timer == nil {
timer = time.NewTimer(p.interval)
timer = time.NewTimer(p.config.PollingInterval)
defer timer.Stop()
} else {
timer.Reset(p.interval)
timer.Reset(p.config.PollingInterval)
}

var stopped bool
Expand Down Expand Up @@ -193,6 +191,7 @@ func (p *PDPTracker) Start(ctx context.Context, exitErr chan<- error) error {
if exitErr != nil {
exitErr <- runErr
}
Logger.Info("PDP tracker stopped")
}()

return nil
Expand All @@ -218,6 +217,7 @@ func (p *PDPTracker) runOnce(ctx context.Context) error {

now := time.Now()
var updated, inserted int64
trackedProofSets := make(map[uint64]struct{})

processProofSet := func(wallet model.Wallet, ps ProofSetInfo) {
for _, pieceCID := range ps.PieceCIDs {
Expand All @@ -240,6 +240,13 @@ func (p *PDPTracker) runOnce(ctx context.Context) error {
"state": p.getPDPDealState(ps),
"last_verified_at": now,
}
if existingDeal.ProofSetLive == nil || *existingDeal.ProofSetLive != ps.IsLive {
Logger.Infow("PDP proof set status changed",
"proofSetID", ps.ProofSetID,
"previousLive", existingDeal.ProofSetLive,
"currentLive", ps.IsLive,
)
}
err = database.DoRetry(ctx, func() error {
return db.Model(&model.Deal{}).Where("id = ?", existingDeal.ID).Updates(updates).Error
})
Expand Down Expand Up @@ -296,6 +303,7 @@ func (p *PDPTracker) runOnce(ctx context.Context) error {
return errors.Wrap(err, "failed to get PDP proof sets")
}
for _, ps := range proofSets {
trackedProofSets[ps.ProofSetID] = struct{}{}
for _, wallet := range walletsByAddress[ps.ClientAddress.String()] {
processProofSet(wallet, ps)
}
Expand All @@ -317,12 +325,14 @@ func (p *PDPTracker) runOnce(ctx context.Context) error {
}

for _, ps := range proofSets {
trackedProofSets[ps.ProofSetID] = struct{}{}
processProofSet(wallet, ps)
}
}
}

Logger.Infof("PDP tracker: updated %d deals, inserted %d deals", updated, inserted)
Logger.Infof("PDP tracker: tracked %d proof sets", len(trackedProofSets))
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions service/pdptracker/pdptracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (m *mockPDPClient) GetNextChallengeEpoch(_ context.Context, _ uint64) (int3
}

func TestPDPTracker_Name(t *testing.T) {
tracker := NewPDPTracker(nil, time.Minute, "", nil, true)
tracker := NewPDPTracker(nil, PDPConfig{PollingInterval: time.Minute}, nil, true)
require.Equal(t, "PDPTracker", tracker.Name())
}

Expand Down Expand Up @@ -84,7 +84,7 @@ func TestPDPTracker_RunOnce_UpsertByParsedPieceCID(t *testing.T) {
},
}

tracker := NewPDPTracker(db, time.Minute, "", client, true)
tracker := NewPDPTracker(db, PDPConfig{PollingInterval: time.Minute}, client, true)
require.NoError(t, tracker.runOnce(ctx))

var first model.Deal
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestPDPTracker_RunOnce_SkipsInvalidPieceCID(t *testing.T) {
},
},
}
tracker := NewPDPTracker(db, time.Minute, "", client, true)
tracker := NewPDPTracker(db, PDPConfig{PollingInterval: time.Minute}, client, true)
require.NoError(t, tracker.runOnce(ctx))

var count int64
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestPDPTracker_RunOnce_UsesBulkFetchWhenAvailable(t *testing.T) {
},
}

tracker := NewPDPTracker(db, time.Minute, "", client, true)
tracker := NewPDPTracker(db, PDPConfig{PollingInterval: time.Minute}, client, true)
require.NoError(t, tracker.runOnce(ctx))
require.Equal(t, 1, client.bulkCalls)
})
Expand Down