From 17525062233cc12082d1b47b348d4cb0ca232eb1 Mon Sep 17 00:00:00 2001 From: anjor Date: Thu, 19 Feb 2026 12:34:51 +0000 Subject: [PATCH 1/2] Add PDP scheduling API boundary interfaces --- service/dealpusher/pdp_api.go | 60 ++++++++++++++++++++++++++++++ service/dealpusher/pdp_api_test.go | 25 +++++++++++++ 2 files changed, 85 insertions(+) create mode 100644 service/dealpusher/pdp_api.go create mode 100644 service/dealpusher/pdp_api_test.go diff --git a/service/dealpusher/pdp_api.go b/service/dealpusher/pdp_api.go new file mode 100644 index 00000000..da473f5e --- /dev/null +++ b/service/dealpusher/pdp_api.go @@ -0,0 +1,60 @@ +package dealpusher + +import ( + "context" + "errors" + "math/big" + "time" + + "github.com/data-preservation-programs/singularity/model" + "github.com/ipfs/go-cid" +) + +// PDPSchedulingConfig holds PDP-specific scheduling knobs for on-chain operations. +type PDPSchedulingConfig struct { + BatchSize int + GasLimit uint64 + ConfirmationDepth uint64 + PollingInterval time.Duration +} + +// Validate validates PDP scheduling configuration. +func (c PDPSchedulingConfig) Validate() error { + if c.BatchSize <= 0 { + return errors.New("pdp batch size must be greater than 0") + } + if c.ConfirmationDepth == 0 { + return errors.New("pdp confirmation depth must be greater than 0") + } + if c.PollingInterval <= 0 { + return errors.New("pdp polling interval must be greater than 0") + } + return nil +} + +// PDPProofSetManager defines proof set lifecycle operations needed by scheduling. +type PDPProofSetManager interface { + // EnsureProofSet returns an existing proof set ID or creates one for this client/provider pair. + EnsureProofSet(ctx context.Context, wallet model.Wallet, provider string) (uint64, error) + // QueueAddRoots submits root additions for a proof set and returns the queued tx reference. + QueueAddRoots(ctx context.Context, proofSetID uint64, pieceCIDs []cid.Cid, cfg PDPSchedulingConfig) (*PDPQueuedTx, error) +} + +// PDPTransactionConfirmer defines confirmation checks for queued on-chain transactions. +type PDPTransactionConfirmer interface { + WaitForConfirmations(ctx context.Context, txHash string, depth uint64, pollInterval time.Duration) (*PDPTransactionReceipt, error) +} + +// PDPQueuedTx represents an on-chain transaction submitted by PDP scheduling. +type PDPQueuedTx struct { + Hash string +} + +// PDPTransactionReceipt represents a confirmed on-chain transaction result. +type PDPTransactionReceipt struct { + Hash string + BlockNumber uint64 + GasUsed uint64 + Status uint64 + CostAttoFIL *big.Int +} diff --git a/service/dealpusher/pdp_api_test.go b/service/dealpusher/pdp_api_test.go new file mode 100644 index 00000000..34cbb3e7 --- /dev/null +++ b/service/dealpusher/pdp_api_test.go @@ -0,0 +1,25 @@ +package dealpusher + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestPDPSchedulingConfigValidate(t *testing.T) { + t.Run("valid", func(t *testing.T) { + cfg := PDPSchedulingConfig{ + BatchSize: 100, + GasLimit: 5000000, + ConfirmationDepth: 5, + PollingInterval: 30 * time.Second, + } + require.NoError(t, cfg.Validate()) + }) + + t.Run("invalid", func(t *testing.T) { + cfg := PDPSchedulingConfig{} + require.Error(t, cfg.Validate()) + }) +} From 0d2e70fa2dfb681c4f0502f3e50586e8ca6191a9 Mon Sep 17 00:00:00 2001 From: anjor Date: Thu, 19 Feb 2026 12:38:32 +0000 Subject: [PATCH 2/2] Wire DealPusher PDP boundary with guarded hook --- service/dealpusher/dealpusher.go | 27 ++++++++++-- service/dealpusher/pdp_wiring_test.go | 60 +++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 3 deletions(-) create mode 100644 service/dealpusher/pdp_wiring_test.go diff --git a/service/dealpusher/dealpusher.go b/service/dealpusher/dealpusher.go index a9f1a037..4ce9e31d 100644 --- a/service/dealpusher/dealpusher.go +++ b/service/dealpusher/dealpusher.go @@ -35,9 +35,12 @@ var waitPendingInterval = time.Minute // DealPusher represents a struct that encapsulates the data and functionality related to pushing deals in a replication process. type DealPusher struct { - dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection. - walletChooser replication.WalletChooser // Object responsible for choosing a wallet for replication. - dealMaker replication.DealMaker // Object responsible for making a deal in replication. + dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection. + walletChooser replication.WalletChooser // Object responsible for choosing a wallet for replication. + dealMaker replication.DealMaker // Object responsible for making a deal in replication. + pdpProofSetManager PDPProofSetManager // Optional PDP proof set lifecycle manager. + pdpTxConfirmer PDPTransactionConfirmer // Optional PDP transaction confirmer. + scheduleDealTypeResolver func(schedule *model.Schedule) model.DealType workerID uuid.UUID // UUID identifying the associated worker. activeSchedule map[model.ScheduleID]*model.Schedule // Map storing active schedules with schedule IDs as keys and pointers to model.Schedule objects as values. activeScheduleCancelFunc map[model.ScheduleID]context.CancelFunc // Map storing cancel functions for active schedules with schedule IDs as keys and CancelFunc as values. @@ -232,6 +235,10 @@ func (d *DealPusher) updateScheduleUnsafe(ctx context.Context, schedule model.Sc // Possible values: ScheduleCompleted, ScheduleError, or an empty string. // 2. An error if any step of the process encounters an issue, otherwise nil. func (d *DealPusher) runSchedule(ctx context.Context, schedule *model.Schedule) (model.ScheduleState, error) { + if d.resolveScheduleDealType(schedule) == model.DealTypePDP { + return d.runPDPSchedule(ctx, schedule) + } + db := d.dbNoContext.WithContext(ctx) overReplicatedCIDs := db. Table("deals"). @@ -423,6 +430,20 @@ func (d *DealPusher) runSchedule(ctx context.Context, schedule *model.Schedule) } } +func (d *DealPusher) resolveScheduleDealType(schedule *model.Schedule) model.DealType { + if d.scheduleDealTypeResolver == nil { + return model.DealTypeMarket + } + return d.scheduleDealTypeResolver(schedule) +} + +func (d *DealPusher) runPDPSchedule(_ context.Context, _ *model.Schedule) (model.ScheduleState, error) { + if d.pdpProofSetManager == nil || d.pdpTxConfirmer == nil { + return model.ScheduleError, errors.New("pdp scheduling dependencies are not configured") + } + return model.ScheduleError, errors.New("pdp scheduling path is not implemented") +} + func NewDealPusher(db *gorm.DB, lotusURL string, lotusToken string, numAttempts uint, maxReplicas uint, ) (*DealPusher, error) { diff --git a/service/dealpusher/pdp_wiring_test.go b/service/dealpusher/pdp_wiring_test.go new file mode 100644 index 00000000..5a7dbaa5 --- /dev/null +++ b/service/dealpusher/pdp_wiring_test.go @@ -0,0 +1,60 @@ +package dealpusher + +import ( + "context" + "testing" + "time" + + "github.com/data-preservation-programs/singularity/model" + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" +) + +type noopPDPProofSetManager struct{} + +func (noopPDPProofSetManager) EnsureProofSet(_ context.Context, _ model.Wallet, _ string) (uint64, error) { + return 1, nil +} + +func (noopPDPProofSetManager) QueueAddRoots(_ context.Context, _ uint64, _ []cid.Cid, _ PDPSchedulingConfig) (*PDPQueuedTx, error) { + return &PDPQueuedTx{Hash: "0x1"}, nil +} + +type noopPDPTransactionConfirmer struct{} + +func (noopPDPTransactionConfirmer) WaitForConfirmations(_ context.Context, txHash string, _ uint64, _ time.Duration) (*PDPTransactionReceipt, error) { + return &PDPTransactionReceipt{Hash: txHash}, nil +} + +func TestDealPusher_ResolveScheduleDealType_DefaultsToMarket(t *testing.T) { + d := &DealPusher{} + require.Equal(t, model.DealTypeMarket, d.resolveScheduleDealType(&model.Schedule{})) +} + +func TestDealPusher_RunSchedule_PDPWithoutDependenciesReturnsConfiguredError(t *testing.T) { + d := &DealPusher{ + scheduleDealTypeResolver: func(_ *model.Schedule) model.DealType { + return model.DealTypePDP + }, + } + + state, err := d.runSchedule(context.Background(), &model.Schedule{}) + require.Error(t, err) + require.Equal(t, model.ScheduleError, state) + require.Contains(t, err.Error(), "pdp scheduling dependencies are not configured") +} + +func TestDealPusher_RunSchedule_PDPWithDependenciesReturnsNotImplemented(t *testing.T) { + d := &DealPusher{ + pdpProofSetManager: noopPDPProofSetManager{}, + pdpTxConfirmer: noopPDPTransactionConfirmer{}, + scheduleDealTypeResolver: func(_ *model.Schedule) model.DealType { + return model.DealTypePDP + }, + } + + state, err := d.runSchedule(context.Background(), &model.Schedule{}) + require.Error(t, err) + require.Equal(t, model.ScheduleError, state) + require.Contains(t, err.Error(), "pdp scheduling path is not implemented") +}