Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions service/dealpusher/dealpusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ var waitPendingInterval = time.Minute

// DealPusher represents a struct that encapsulates the data and functionality related to pushing deals in a replication process.
type DealPusher struct {
dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection.
walletChooser replication.WalletChooser // Object responsible for choosing a wallet for replication.
dealMaker replication.DealMaker // Object responsible for making a deal in replication.
dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection.
walletChooser replication.WalletChooser // Object responsible for choosing a wallet for replication.
dealMaker replication.DealMaker // Object responsible for making a deal in replication.
pdpProofSetManager PDPProofSetManager // Optional PDP proof set lifecycle manager.
pdpTxConfirmer PDPTransactionConfirmer // Optional PDP transaction confirmer.
scheduleDealTypeResolver func(schedule *model.Schedule) model.DealType
workerID uuid.UUID // UUID identifying the associated worker.
activeSchedule map[model.ScheduleID]*model.Schedule // Map storing active schedules with schedule IDs as keys and pointers to model.Schedule objects as values.
activeScheduleCancelFunc map[model.ScheduleID]context.CancelFunc // Map storing cancel functions for active schedules with schedule IDs as keys and CancelFunc as values.
Expand Down Expand Up @@ -232,6 +235,10 @@ func (d *DealPusher) updateScheduleUnsafe(ctx context.Context, schedule model.Sc
// Possible values: ScheduleCompleted, ScheduleError, or an empty string.
// 2. An error if any step of the process encounters an issue, otherwise nil.
func (d *DealPusher) runSchedule(ctx context.Context, schedule *model.Schedule) (model.ScheduleState, error) {
if d.resolveScheduleDealType(schedule) == model.DealTypePDP {
return d.runPDPSchedule(ctx, schedule)
}

db := d.dbNoContext.WithContext(ctx)
overReplicatedCIDs := db.
Table("deals").
Expand Down Expand Up @@ -423,6 +430,20 @@ func (d *DealPusher) runSchedule(ctx context.Context, schedule *model.Schedule)
}
}

func (d *DealPusher) resolveScheduleDealType(schedule *model.Schedule) model.DealType {
if d.scheduleDealTypeResolver == nil {
return model.DealTypeMarket
}
return d.scheduleDealTypeResolver(schedule)
}

func (d *DealPusher) runPDPSchedule(_ context.Context, _ *model.Schedule) (model.ScheduleState, error) {
if d.pdpProofSetManager == nil || d.pdpTxConfirmer == nil {
return model.ScheduleError, errors.New("pdp scheduling dependencies are not configured")
}
return model.ScheduleError, errors.New("pdp scheduling path is not implemented")
}

func NewDealPusher(db *gorm.DB, lotusURL string,
lotusToken string, numAttempts uint, maxReplicas uint,
) (*DealPusher, error) {
Expand Down
60 changes: 60 additions & 0 deletions service/dealpusher/pdp_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package dealpusher

import (
"context"
"errors"
"math/big"
"time"

"github.com/data-preservation-programs/singularity/model"
"github.com/ipfs/go-cid"
)

// PDPSchedulingConfig holds PDP-specific scheduling knobs for on-chain operations.
type PDPSchedulingConfig struct {
BatchSize int
GasLimit uint64
ConfirmationDepth uint64
PollingInterval time.Duration
}

// Validate validates PDP scheduling configuration.
func (c PDPSchedulingConfig) Validate() error {
if c.BatchSize <= 0 {
return errors.New("pdp batch size must be greater than 0")
}
if c.ConfirmationDepth == 0 {
return errors.New("pdp confirmation depth must be greater than 0")
}
if c.PollingInterval <= 0 {
return errors.New("pdp polling interval must be greater than 0")
}
return nil
}

// PDPProofSetManager defines proof set lifecycle operations needed by scheduling.
type PDPProofSetManager interface {
// EnsureProofSet returns an existing proof set ID or creates one for this client/provider pair.
EnsureProofSet(ctx context.Context, wallet model.Wallet, provider string) (uint64, error)
// QueueAddRoots submits root additions for a proof set and returns the queued tx reference.
QueueAddRoots(ctx context.Context, proofSetID uint64, pieceCIDs []cid.Cid, cfg PDPSchedulingConfig) (*PDPQueuedTx, error)
}

// PDPTransactionConfirmer defines confirmation checks for queued on-chain transactions.
type PDPTransactionConfirmer interface {
WaitForConfirmations(ctx context.Context, txHash string, depth uint64, pollInterval time.Duration) (*PDPTransactionReceipt, error)
}

// PDPQueuedTx represents an on-chain transaction submitted by PDP scheduling.
type PDPQueuedTx struct {
Hash string
}

// PDPTransactionReceipt represents a confirmed on-chain transaction result.
type PDPTransactionReceipt struct {
Hash string
BlockNumber uint64
GasUsed uint64
Status uint64
CostAttoFIL *big.Int
}
25 changes: 25 additions & 0 deletions service/dealpusher/pdp_api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package dealpusher

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestPDPSchedulingConfigValidate(t *testing.T) {
t.Run("valid", func(t *testing.T) {
cfg := PDPSchedulingConfig{
BatchSize: 100,
GasLimit: 5000000,
ConfirmationDepth: 5,
PollingInterval: 30 * time.Second,
}
require.NoError(t, cfg.Validate())
})

t.Run("invalid", func(t *testing.T) {
cfg := PDPSchedulingConfig{}
require.Error(t, cfg.Validate())
})
}
60 changes: 60 additions & 0 deletions service/dealpusher/pdp_wiring_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package dealpusher

import (
"context"
"testing"
"time"

"github.com/data-preservation-programs/singularity/model"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
)

type noopPDPProofSetManager struct{}

func (noopPDPProofSetManager) EnsureProofSet(_ context.Context, _ model.Wallet, _ string) (uint64, error) {
return 1, nil
}

func (noopPDPProofSetManager) QueueAddRoots(_ context.Context, _ uint64, _ []cid.Cid, _ PDPSchedulingConfig) (*PDPQueuedTx, error) {
return &PDPQueuedTx{Hash: "0x1"}, nil
}

type noopPDPTransactionConfirmer struct{}

func (noopPDPTransactionConfirmer) WaitForConfirmations(_ context.Context, txHash string, _ uint64, _ time.Duration) (*PDPTransactionReceipt, error) {
return &PDPTransactionReceipt{Hash: txHash}, nil
}

func TestDealPusher_ResolveScheduleDealType_DefaultsToMarket(t *testing.T) {
d := &DealPusher{}
require.Equal(t, model.DealTypeMarket, d.resolveScheduleDealType(&model.Schedule{}))
}

func TestDealPusher_RunSchedule_PDPWithoutDependenciesReturnsConfiguredError(t *testing.T) {
d := &DealPusher{
scheduleDealTypeResolver: func(_ *model.Schedule) model.DealType {
return model.DealTypePDP
},
}

state, err := d.runSchedule(context.Background(), &model.Schedule{})
require.Error(t, err)
require.Equal(t, model.ScheduleError, state)
require.Contains(t, err.Error(), "pdp scheduling dependencies are not configured")
}

func TestDealPusher_RunSchedule_PDPWithDependenciesReturnsNotImplemented(t *testing.T) {
d := &DealPusher{
pdpProofSetManager: noopPDPProofSetManager{},
pdpTxConfirmer: noopPDPTransactionConfirmer{},
scheduleDealTypeResolver: func(_ *model.Schedule) model.DealType {
return model.DealTypePDP
},
}

state, err := d.runSchedule(context.Background(), &model.Schedule{})
require.Error(t, err)
require.Equal(t, model.ScheduleError, state)
require.Contains(t, err.Error(), "pdp scheduling path is not implemented")
}