diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 2fccf82f..713bcecf 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -24,7 +24,9 @@ RUN apt-get update \ mariadb-server mariadb-client \ && rm -rf /var/lib/apt/lists/* -# Prepare user-owned data dirs for rootless startup +COPY --from=ghcr.io/foundry-rs/foundry:latest /usr/local/bin/anvil /usr/local/bin/anvil + +# prepare user-owned data dirs for rootless startup RUN mkdir -p /home/vscode/.local/share/pg/pgdata \ && mkdir -p /home/vscode/.local/share/mysql \ && chown -R vscode:vscode /home/vscode/.local/share diff --git a/.github/workflows/devcontainer-podman.yml b/.github/workflows/devcontainer-podman.yml index aed602c3..8e78fcef 100644 --- a/.github/workflows/devcontainer-podman.yml +++ b/.github/workflows/devcontainer-podman.yml @@ -113,7 +113,7 @@ jobs: command: > cd /workspaces/singularity && mkdir -p artifacts && - SINGULARITY_TEST_INTEGRATION=true make test GOTESTSUM_ARGS="--junitfile artifacts/integration-tests.xml -- -timeout 20m -run Integration ./cmd/..." + SINGULARITY_TEST_INTEGRATION=true make test GOTESTSUM_ARGS="--junitfile artifacts/integration-tests.xml -- -timeout 20m -run Integration ./cmd/... ./service/pdptracker/..." container-runtime: podman - name: Report test results diff --git a/cmd/run/pdptracker.go b/cmd/run/pdptracker.go index 5d8cfffb..662ad925 100644 --- a/cmd/run/pdptracker.go +++ b/cmd/run/pdptracker.go @@ -2,55 +2,85 @@ package run import ( "fmt" + "strings" "time" + "github.com/cockroachdb/errors" + "github.com/data-preservation-programs/go-synapse" + "github.com/data-preservation-programs/go-synapse/constants" "github.com/data-preservation-programs/singularity/database" "github.com/data-preservation-programs/singularity/service" "github.com/data-preservation-programs/singularity/service/pdptracker" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" "github.com/urfave/cli/v2" ) var PDPTrackerCmd = &cli.Command{ Name: "pdp-tracker", - Usage: "Start a PDP deal tracker that tracks f41 PDP deals for all relevant wallets", - Description: `The PDP tracker monitors Proof of Data Possession (PDP) deals on the Filecoin network. -Unlike legacy f05 market deals, PDP deals use proof sets managed through the PDPVerifier contract -where data is verified through cryptographic challenges. - -This tracker: -- Monitors proof sets for tracked wallets -- Updates deal status based on on-chain proof set state -- Tracks challenge epochs and live status`, + Usage: "Track PDP deals via Shovel event indexing (requires PostgreSQL)", Flags: []cli.Flag{ &cli.StringFlag{ - Name: "eth-rpc", - Usage: "Ethereum RPC endpoint for FEVM (e.g., https://api.node.glif.io)", - EnvVars: []string{"ETH_RPC_URL"}, - Required: true, + Name: "eth-rpc", + Usage: "Ethereum RPC endpoint for FEVM", + Value: "https://api.node.glif.io/rpc/v1", + EnvVars: []string{"ETH_RPC_URL"}, }, &cli.DurationFlag{ Name: "pdp-poll-interval", - Usage: "Polling interval for PDP transaction status", + Usage: "How often to check for new events in Shovel tables", Value: 30 * time.Second, }, + &cli.BoolFlag{ + Name: "full-sync", + Usage: "Re-index all events from contract deployment (mainnet: block 5441432, calibnet: block 3140755). Requires an archival RPC node. Involves one RPC call per historical proof set.", + }, }, Action: func(c *cli.Context) error { rpcURL := c.String("eth-rpc") - if rpcURL == "" { - return fmt.Errorf("eth-rpc is required") + connStr := c.String("database-connection-string") + if !strings.HasPrefix(connStr, "postgres:") && !strings.HasPrefix(connStr, "postgresql:") { + return errors.New("PDP tracking requires PostgreSQL (Shovel is Postgres-only)") } db, closer, err := database.OpenFromCLI(c) if err != nil { - return err + return errors.WithStack(err) } defer closer.Close() - pdpClient, err := pdptracker.NewPDPClient(c.Context, rpcURL) + // detect network and contract address once, shared by indexer and rpc client + ethClient, err := ethclient.DialContext(c.Context, rpcURL) if err != nil { - return err + return errors.Wrap(err, "failed to connect to RPC") + } + network, chainID, err := synapse.DetectNetwork(c.Context, ethClient) + ethClient.Close() + if err != nil { + return errors.Wrap(err, "failed to detect network") + } + + contractAddr := constants.GetPDPVerifierAddress(network) + if contractAddr == (common.Address{}) { + return fmt.Errorf("no PDPVerifier contract for network %s", network) } - defer pdpClient.Close() + + pdptracker.Logger.Infow("detected PDP network", + "network", network, + "chainId", chainID, + "contract", contractAddr.Hex(), + ) + + indexer, err := pdptracker.NewPDPIndexer(c.Context, connStr, rpcURL, uint64(chainID), contractAddr, c.Bool("full-sync")) + if err != nil { + return errors.Wrap(err, "failed to create PDP indexer") + } + + rpcClient, err := pdptracker.NewPDPClient(c.Context, rpcURL, contractAddr) + if err != nil { + return errors.Wrap(err, "failed to create PDP RPC client") + } + defer rpcClient.Close() cfg := pdptracker.PDPConfig{ PollingInterval: c.Duration("pdp-poll-interval"), @@ -59,13 +89,8 @@ This tracker: return err } - tracker := pdptracker.NewPDPTracker( - db, - cfg, - pdpClient, - false, - ) + tracker := pdptracker.NewPDPTracker(db, cfg, rpcClient, false) - return service.StartServers(c.Context, pdptracker.Logger, &tracker) + return service.StartServers(c.Context, pdptracker.Logger, indexer, &tracker) }, } diff --git a/go.mod b/go.mod index 4551c5e0..63dfea19 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,10 @@ require ( github.com/bcicen/jstream v1.0.1 github.com/brianvoe/gofakeit/v6 v6.23.2 github.com/cockroachdb/errors v1.11.3 + github.com/data-preservation-programs/go-synapse v0.0.0-20260206105716-b6a5e7e6808e github.com/data-preservation-programs/table v0.0.3 github.com/dustin/go-humanize v1.0.1 + github.com/ethereum/go-ethereum v1.14.12 github.com/fatih/color v1.18.0 github.com/filecoin-project/go-address v1.2.0 github.com/filecoin-project/go-cbor-util v0.0.2 @@ -30,6 +32,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gotidy/ptr v1.4.0 github.com/hashicorp/golang-lru/v2 v2.0.7 + github.com/indexsupply/shovel v0.1.9-0.20260111041930-aea8d42c335c github.com/ipfs/boxo v0.35.0 github.com/ipfs/go-block-format v0.2.3 github.com/ipfs/go-cid v0.5.0 @@ -46,6 +49,7 @@ require ( github.com/ipld/go-ipld-prime v0.21.0 github.com/ipld/go-trustless-utils v0.4.1 github.com/ipni/go-libipni v0.6.14 + github.com/jackc/pgx/v5 v5.7.6 github.com/jellydator/ttlcache/v3 v3.0.1 github.com/joho/godotenv v1.5.1 github.com/jsign/go-filsigner v0.4.1 @@ -80,6 +84,7 @@ require ( ) require ( + blake.io/pqx v0.2.1 // indirect cloud.google.com/go/auth v0.17.0 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect cloud.google.com/go/compute/metadata v0.9.0 // indirect @@ -145,7 +150,6 @@ require ( github.com/crackcomm/go-gitignore v0.0.0-20241020182519-7843d2ba8fdf // indirect github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c // indirect github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect - github.com/data-preservation-programs/go-synapse v0.0.0-20260206105716-b6a5e7e6808e // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/dchest/blake2b v1.0.0 // indirect @@ -162,7 +166,6 @@ require ( github.com/emersion/go-message v0.18.2 // indirect github.com/emersion/go-vcard v0.0.0-20241024213814-c9703dde27ff // indirect github.com/ethereum/c-kzg-4844 v1.0.0 // indirect - github.com/ethereum/go-ethereum v1.14.12 // indirect github.com/ethereum/go-verkle v0.1.1-0.20240829091221-dffa7562dbe9 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/filecoin-project/filecoin-ffi v1.34.0 // indirect @@ -201,6 +204,7 @@ require ( github.com/go-openapi/spec v0.20.9 // indirect github.com/go-resty/resty/v2 v2.16.5 // indirect github.com/go-sql-driver/mysql v1.9.3 // indirect + github.com/goccy/go-json v0.10.5 // indirect github.com/gofrs/flock v0.13.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect @@ -239,7 +243,6 @@ require ( github.com/ipni/index-provider v0.15.4 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect - github.com/jackc/pgx/v5 v5.7.6 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect @@ -369,6 +372,7 @@ require ( github.com/wlynxg/anet v0.0.5 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect + github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect github.com/yunify/qingstor-sdk-go/v3 v3.2.0 // indirect @@ -407,12 +411,14 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gotest.tools/v3 v3.5.2 // indirect + kr.dev/errorfmt v0.1.1 // indirect lukechampine.com/blake3 v1.4.1 // indirect modernc.org/libc v1.22.3 // indirect modernc.org/mathutil v1.5.0 // indirect modernc.org/memory v1.5.0 // indirect modernc.org/sqlite v1.21.1 // indirect moul.io/http2curl v1.0.0 // indirect + nhooyr.io/websocket v1.8.10 // indirect rsc.io/tmplfunc v0.0.3 // indirect storj.io/common v0.0.0-20251022143549-19bf6a9f274a // indirect storj.io/drpc v0.0.35-0.20250513201419-f7819ea69b55 // indirect diff --git a/go.sum b/go.sum index c0cafd48..045fe540 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +blake.io/pqx v0.2.1 h1:Qz3yyNmPIFCyRS9HLnxtQNIL809ZC13aWvpeiXU3oS8= +blake.io/pqx v0.2.1/go.mod h1:hcG2tklM4QIxdfL+laWGAmtIDVgPKkWtxGG/t7umOfA= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.31.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= @@ -70,6 +72,8 @@ github.com/AzureAD/microsoft-authentication-library-for-go v1.5.0 h1:XkkQbfMyuH2 github.com/AzureAD/microsoft-authentication-library-for-go v1.5.0/go.mod h1:HKpQxkWaGLJ+D/5H8QRpyQXA1eKjxkFlOMwck5+33Jk= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= +github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/Files-com/files-sdk-go/v3 v3.2.258 h1:9wfRblRg0qc7SIMD8AaE+pMWp1KvR6eFwvMVTwDVIG4= github.com/Files-com/files-sdk-go/v3 v3.2.258/go.mod h1:wGqkOzRu/ClJibvDgcfuJNAqI2nLhe8g91tPlDKRCdE= github.com/KyleBanks/depth v1.2.1 h1:5h8fQADFrWtarTdtDudMmGsC7GPbOAu6RVB3ffsVFHc= @@ -99,6 +103,8 @@ github.com/PuerkitoBio/purell v1.1.1/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE= github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= +github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= +github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= github.com/aalpar/deheap v0.0.0-20210914013432-0cc84d79dec3 h1:hhdWprfSpFbN7lz3W1gM40vOgvSh1WCSMxYD6gGB4Hs= github.com/aalpar/deheap v0.0.0-20210914013432-0cc84d79dec3/go.mod h1:XaUnRxSCYgL3kkgX0QHIV0D+znljPIDImxlv2kbGv0Y= github.com/abbot/go-http-auth v0.4.0 h1:QjmvZ5gSC7jm3Zg54DqWE/T5m1t2AfDu6QlXJT0EVT0= @@ -183,6 +189,8 @@ github.com/calebcase/tmpfile v1.0.3/go.mod h1:UAUc01aHeC+pudPagY/lWvt2qS9ZO5Zzof github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk= +github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -202,10 +210,16 @@ github.com/cloudsoda/sddl v0.0.0-20250224235906-926454e91efc/go.mod h1:uvR42Hb/t github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I= github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce h1:giXvy4KSc/6g/esnpM7Geqxka4WSqI1SZc7sMJFd3y4= +github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce/go.mod h1:9/y3cnZ5GKakj/H4y9r9GTjCvAFta7KLgSHPJJYc52M= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs= +github.com/cockroachdb/pebble v1.1.2 h1:CUh2IPtR4swHlEj48Rhfzw6l/d0qA31fItcIszQVIsA= +github.com/cockroachdb/pebble v1.1.2/go.mod h1:4exszw1r40423ZsmkG/09AFEG83I0uDgfujJdbL6kYU= github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwPJ30= github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= +github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= github.com/colinmarc/hdfs/v2 v2.4.0 h1:v6R8oBx/Wu9fHpdPoJJjpGSUxo8NhHIwrwsfhFvU9W0= github.com/colinmarc/hdfs/v2 v2.4.0/go.mod h1:0NAO+/3knbMx6+5pCv+Hcbaz4xn/Zzbn9+WIib2rKVI= github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/YjhQ= @@ -501,6 +515,8 @@ github.com/gobuffalo/packd v0.1.0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWe github.com/gobuffalo/packr/v2 v2.0.9/go.mod h1:emmyGweYTm6Kdper+iywB6YK5YzuKchGtJQZ0Odn4pQ= github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0= github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/gofrs/flock v0.13.0 h1:95JolYOvGMqeH31+FC7D2+uULf6mG61mEZ/A8dRYMzw= github.com/gofrs/flock v0.13.0/go.mod h1:jxeyy9R1auM5S6JYDBhDt+E2TCo7DkratH4Pgi8P+Z0= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -542,6 +558,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -556,6 +574,8 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -608,6 +628,8 @@ github.com/hannahhoward/go-pubsub v1.0.0/go.mod h1:3lHsAt5uM7YFHauT5whoifwfgIgVw github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE= +github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= @@ -627,12 +649,18 @@ github.com/henrybear327/Proton-API-Bridge v1.0.0 h1:gjKAaWfKu++77WsZTHg6FUyPC5W0 github.com/henrybear327/Proton-API-Bridge v1.0.0/go.mod h1:gunH16hf6U74W2b9CGDaWRadiLICsoJ6KRkSt53zLts= github.com/henrybear327/go-proton-api v1.0.0 h1:zYi/IbjLwFAW7ltCeqXneUGJey0TN//Xo851a/BgLXw= github.com/henrybear327/go-proton-api v1.0.0/go.mod h1:w63MZuzufKcIZ93pwRgiOtxMXYafI8H74D77AxytOBc= +github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4 h1:X4egAf/gcS1zATw6wn4Ej8vjuVGxeHdan+bRb2ebyv4= +github.com/holiman/billy v0.0.0-20240216141850-2abb0c79d3c4/go.mod h1:5GuXa7vkL8u9FkFuWdVvfR5ix8hRB7DbOAaYULamFpc= +github.com/holiman/bloomfilter/v2 v2.0.3 h1:73e0e/V0tCydx14a0SCYS/EWCxgwLZ18CZcZKVu0fao= +github.com/holiman/bloomfilter/v2 v2.0.3/go.mod h1:zpoh+gs7qcpqrHr3dB55AMiJwo0iURXE7ZOP9L9hSkA= github.com/holiman/uint256 v1.3.1 h1:JfTzmih28bittyHM8z360dCjIA9dbPIBlcTI6lmctQs= github.com/holiman/uint256 v1.3.1/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXeiRV4ng7E= github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/indexsupply/shovel v0.1.9-0.20260111041930-aea8d42c335c h1:w/28kFk3BSpsdRbTyAWJAnCt224qASF+FK92nlq/OBY= +github.com/indexsupply/shovel v0.1.9-0.20260111041930-aea8d42c335c/go.mod h1:2fbvQP5CghUhrRn/XEQtwHXt51oaefJc0xqbAH+TTBE= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= github.com/ipfs/boxo v0.35.0 h1:3Mku5arSbAZz0dvb4goXRsQuZkFkPrGr5yYdu0YM1pY= @@ -865,6 +893,10 @@ github.com/labstack/echo/v4 v4.10.2 h1:n1jAhnq/elIFTHr1EYpiYtyKgx4RW9ccVgkqByZaN github.com/labstack/echo/v4 v4.10.2/go.mod h1:OEyqf2//K1DFdE57vw2DRgWY0M7s65IVQO2FzvI4J5k= github.com/labstack/gommon v0.4.0 h1:y7cvthEAEbU0yHOf4axH8ZG2NH8knB9iNSoTO8dyIk8= github.com/labstack/gommon v0.4.0/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3MFxTMTM= +github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= +github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/libp2p/go-flow-metrics v0.3.0 h1:q31zcHUvHnwDO0SHaukewPYgwOBSxtt830uJtUx6784= @@ -908,6 +940,8 @@ github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stg github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-shellwords v1.0.12 h1:M2zGm7EW6UQJvDeQxo4T51eKPurbeFbe8WtebGE2xrk= github.com/mattn/go-shellwords v1.0.12/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lLtQsUlTZDWQ8Y= github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs= @@ -936,6 +970,8 @@ github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/pointerstructure v1.2.0 h1:O+i9nHnXS3l/9Wu7r4NrEdwA2VFTicjUEN1uBnDo34A= +github.com/mitchellh/pointerstructure v1.2.0/go.mod h1:BRAsLI5zgXmw97Lf6s25bs8ohIXc3tViBH44KcwB2g4= github.com/mmcloughlin/addchain v0.4.0 h1:SobOdjm2xLj1KkXN5/n0xTIWyZA2+s99UCY1iPfkHRY= github.com/mmcloughlin/addchain v0.4.0/go.mod h1:A86O+tHqZLMNO4w6ZZ4FlVQEadcoqkyU72HC5wJ4RlU= github.com/mmcloughlin/profile v0.1.1/go.mod h1:IhHD7q1ooxgwTgjxQYkACGA77oFTDdFVejUS1/tS/qU= @@ -1002,6 +1038,8 @@ github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/ginkgo/v2 v2.23.4 h1:ktYTpKJAVZnDT4VjxSbiBenUjmlL/5QkBEocaWXiQus= @@ -1081,6 +1119,7 @@ github.com/pion/webrtc/v4 v4.1.6 h1:srHH2HwvCGwPba25EYJgUzgLqCQoXl1VCUnrGQMSzUw= github.com/pion/webrtc/v4 v4.1.6/go.mod h1:wKecGRlkl3ox/As/MYghJL+b/cVXMEhoPMJWPuGQFhU= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e h1:aoZm08cpOy4WuID//EZDgcC4zIxODThtZNPirFr42+A= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -1130,6 +1169,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94 github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rfjakob/eme v1.1.2 h1:SxziR8msSOElPayZNFfQw4Tjx/Sbaeeh3eRvrHVMUs4= github.com/rfjakob/eme v1.1.2/go.mod h1:cVvpasglm/G3ngEfcfT/Wt0GwhkuO32pf/poW6Nyk1k= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rjNemo/underscore v0.5.0 h1:Pa58PfchgZWgCY1eBKjER/lm0repbGrTzq6RRxtnGmg= github.com/rjNemo/underscore v0.5.0/go.mod h1:y3LuKy2UP6zp7yZff5ZGRm1s/s9QvCoCoQZVqAkk3hM= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= @@ -1141,6 +1182,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= +github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= @@ -1242,6 +1285,8 @@ github.com/swaggo/files/v2 v2.0.0 h1:hmAt8Dkynw7Ssz46F6pn8ok6YmGZqHSVLZ+HQM7i0kw github.com/swaggo/files/v2 v2.0.0/go.mod h1:24kk2Y9NYEJ5lHuCra6iVwkMjIekMCaFq/0JQj66kyM= github.com/swaggo/swag v1.16.1 h1:fTNRhKstPKxcnoKsytm4sahr8FaYzUcT7i1/3nd/fBg= github.com/swaggo/swag v1.16.1/go.mod h1:9/LMvHycG3NFHfR6LwvikHv5iFvmPADQ359cKikGxto= +github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= +github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/t3rm1n4l/go-mega v0.0.0-20250926104142-ccb8d3498e6c h1:BLopNCyqewbE8+BtlIp/Juzu8AJGxz0gHdGADnsblVc= github.com/t3rm1n4l/go-mega v0.0.0-20250926104142-ccb8d3498e6c/go.mod h1:ykucQyiE9Q2qx1wLlEtZkkNn1IURib/2O+Mvd25i1Fo= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= @@ -1303,6 +1348,8 @@ github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+ github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 h1:nIPpBwaJSVYIxUFsDv3M8ofmx9yWTog9BfvIu0q41lo= +github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8/go.mod h1:HUYIGzjTL3rfEspMxjDjgmT5uz5wzYJKVo23qUhYTos= github.com/xlab/c-for-go v0.0.0-20200718154222-87b0065af829/go.mod h1:h/1PEBwj7Ym/8kOuMWvO2ujZ6Lt+TMbySEXNhjjR87I= github.com/xlab/pkgconfig v0.0.0-20170226114623-cea12a0fd245/go.mod h1:C+diUUz7pxhNY6KAoLgrTYARGWnt82zWTylZlxT92vk= github.com/xorcare/golden v0.6.0/go.mod h1:7T39/ZMvaSEZlBPoYfVFmsBLmUl3uz9IuzWj/U6FtvQ= @@ -1820,6 +1867,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/validator.v2 v2.0.1 h1:xF0KWyGWXm/LM2G1TrEjqOu4pa6coO9AlWSf3msVfDY= @@ -1858,6 +1907,10 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= +kr.dev/diff v0.3.0 h1:o/T8/tkAq9IuRIuFqCupyKPC5iSY3WXpVZ2p6ZK3Emw= +kr.dev/diff v0.3.0/go.mod h1:XiTaLOg2/PD0cmXY7WQXUR8RAF3RwWpqIQEj910J2NY= +kr.dev/errorfmt v0.1.1 h1:0YA5N2yV0xKxJ4eD5cX2S9wEnJHDHOZzerKbrZqtRrQ= +kr.dev/errorfmt v0.1.1/go.mod h1:X5EQZa3qf6c/l1DMjhflAbKGAGvlP6/ByWnaOpfbJME= lukechampine.com/blake3 v1.4.1 h1:I3Smz7gso8w4/TunLKec6K2fn+kyKtDxr/xcQEN84Wg= lukechampine.com/blake3 v1.4.1/go.mod h1:QFosUxmjB8mnrWFSNwKmvxHpfY72bmD2tQ0kBMM3kwo= modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= @@ -1877,6 +1930,8 @@ moul.io/http2curl v1.0.0 h1:6XwpyZOYsgZJrU8exnG87ncVkU1FVCcTRpwzOkTDUi8= moul.io/http2curl v1.0.0/go.mod h1:f6cULg+e4Md/oW1cYmwW4IWQOVl2lGbmCNGOHvzX2kE= moul.io/http2curl/v2 v2.3.0 h1:9r3JfDzWPcbIklMOs2TnIFzDYvfAZvjeavG6EzP7jYs= moul.io/http2curl/v2 v2.3.0/go.mod h1:RW4hyBjTWSYDOxapodpNEtX0g5Eb16sxklBqmd2RHcE= +nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q= +nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/model/migrate.go b/model/migrate.go index 85a5aed1..53cfa834 100644 --- a/model/migrate.go +++ b/model/migrate.go @@ -27,6 +27,7 @@ var Tables = []any{ &Deal{}, &Schedule{}, &Wallet{}, + &PDPProofSet{}, } var logger = logging.Logger("model") diff --git a/model/replication.go b/model/replication.go index 08150da6..6e35e312 100644 --- a/model/replication.go +++ b/model/replication.go @@ -172,3 +172,16 @@ type Wallet struct { Address string `gorm:"index" json:"address"` // Address is the Filecoin full address of the wallet PrivateKey string `json:"privateKey,omitempty" table:"-"` // PrivateKey is the private key of the wallet } + +// PDPProofSet tracks on-chain PDP proof set state derived from contract events. +// This is a materialized view built from Shovel-indexed events, replacing +// the per-cycle RPC scans of GetProofSets/GetProofSetsForClient. +type PDPProofSet struct { + SetID uint64 `gorm:"primaryKey;autoIncrement:false" json:"setId"` + ClientAddress string `gorm:"not null;index" json:"clientAddress"` + Provider string `gorm:"not null" json:"provider"` + IsLive bool `gorm:"default:false" json:"isLive"` + ChallengeEpoch *int64 ` json:"challengeEpoch,omitempty"` + CreatedBlock int64 `gorm:"not null" json:"createdBlock"` + Deleted bool `gorm:"default:false" json:"deleted"` +} diff --git a/service/pdptracker/config.go b/service/pdptracker/config.go index 7d411514..ed458daf 100644 --- a/service/pdptracker/config.go +++ b/service/pdptracker/config.go @@ -6,12 +6,10 @@ import ( "github.com/cockroachdb/errors" ) -// PDPConfig configures the PDP tracker operations layer. type PDPConfig struct { PollingInterval time.Duration } -// Validate ensures the PDPConfig values are sane. func (c PDPConfig) Validate() error { if c.PollingInterval <= 0 { return errors.New("pdp polling interval must be greater than 0") diff --git a/service/pdptracker/eventprocessor.go b/service/pdptracker/eventprocessor.go new file mode 100644 index 00000000..38b766f8 --- /dev/null +++ b/service/pdptracker/eventprocessor.go @@ -0,0 +1,383 @@ +package pdptracker + +import ( + "context" + "math" + "time" + + "github.com/cockroachdb/errors" + "github.com/data-preservation-programs/singularity/database" + "github.com/data-preservation-programs/singularity/model" + "github.com/ethereum/go-ethereum/common" + "github.com/gotidy/ptr" + "github.com/ipfs/go-cid" + "gorm.io/gorm" +) + +// idempotent — safe to re-process after reorgs +func processNewEvents(ctx context.Context, db *gorm.DB, rpcClient *ChainPDPClient) error { + // process in dependency order + if err := processDataSetCreated(ctx, db, rpcClient); err != nil { + return errors.Wrap(err, "processing DataSetCreated") + } + if err := processSPChanged(ctx, db); err != nil { + return errors.Wrap(err, "processing StorageProviderChanged") + } + if err := processPiecesChanged(ctx, db, rpcClient); err != nil { + return errors.Wrap(err, "processing PiecesAdded/Removed") + } + if err := processNextProvingPeriod(ctx, db); err != nil { + return errors.Wrap(err, "processing NextProvingPeriod") + } + if err := processPossessionProven(ctx, db); err != nil { + return errors.Wrap(err, "processing PossessionProven") + } + if err := processDataSetDeleted(ctx, db); err != nil { + return errors.Wrap(err, "processing DataSetDeleted") + } + return nil +} + +type inboxRow interface { + setID() uint64 +} + +// failed rows are retained for retry on next cycle +func processInbox[R inboxRow](db *gorm.DB, query, table string, fn func(R) error) error { + var rows []R + if err := db.Raw(query).Scan(&rows).Error; err != nil { + return err + } + if len(rows) == 0 { + return nil + } + + var failed []uint64 + for _, r := range rows { + if err := fn(r); err != nil { + Logger.Errorw("inbox processing failed", "table", table, "setId", r.setID(), "error", err) + failed = append(failed, r.setID()) + } + } + + return deleteProcessedRows(db, table, "set_id", failed) +} + +// table and keyCol are interpolated into sql. pass literals only. +// this is internal to the package so if you're passing user input here +// you've already made worse decisions than we can protect against. +func deleteProcessedRows(db *gorm.DB, table, keyCol string, failed []uint64) error { + if len(failed) == 0 { + return db.Exec("DELETE FROM " + table).Error + } + return db.Exec("DELETE FROM "+table+" WHERE "+keyCol+" NOT IN (?)", failed).Error +} + +type dataSetCreatedRow struct { + SetID_ uint64 `gorm:"column:set_id"` + StorageProvider []byte `gorm:"column:storage_provider"` + BlockNum int64 `gorm:"column:block_num"` +} + +func (r dataSetCreatedRow) setID() uint64 { return r.SetID_ } + +func processDataSetCreated(ctx context.Context, db *gorm.DB, rpcClient *ChainPDPClient) error { + return processInbox(db, + "SELECT set_id, storage_provider, block_num FROM pdp_dataset_created", + "pdp_dataset_created", + func(r dataSetCreatedRow) error { + listener, err := rpcClient.GetDataSetListener(ctx, r.SetID_) + if err != nil { + return errors.Wrapf(err, "getDataSetListener for set %d", r.SetID_) + } + + clientAddr, err := commonToDelegatedAddress(listener) + if err != nil { + return errors.Wrap(err, "converting listener address") + } + + providerAddr, err := commonToDelegatedAddress(common.BytesToAddress(r.StorageProvider)) + if err != nil { + return errors.Wrap(err, "converting provider address") + } + + ps := model.PDPProofSet{ + SetID: r.SetID_, + ClientAddress: clientAddr.String(), + Provider: providerAddr.String(), + CreatedBlock: r.BlockNum, + } + + return database.DoRetry(ctx, func() error { + return db.Where("set_id = ?", r.SetID_).Attrs(ps).FirstOrCreate(&model.PDPProofSet{}).Error + }) + }, + ) +} + +func processPiecesChanged(ctx context.Context, db *gorm.DB, rpcClient *ChainPDPClient) error { + type row struct { + SetID uint64 `gorm:"column:set_id"` + } + + setIDs := make(map[uint64]struct{}) + for _, q := range []string{ + "SELECT DISTINCT set_id FROM pdp_pieces_added", + "SELECT DISTINCT set_id FROM pdp_pieces_removed", + } { + var rows []row + if err := db.Raw(q).Scan(&rows).Error; err != nil { + return err + } + for _, r := range rows { + setIDs[r.SetID] = struct{}{} + } + } + + if len(setIDs) == 0 { + return nil + } + + var failed []uint64 + for id := range setIDs { + if err := reconcileProofSetPieces(ctx, db, rpcClient, id); err != nil { + Logger.Errorw("failed to reconcile pieces", "setId", id, "error", err) + failed = append(failed, id) + } + } + + if err := deleteProcessedRows(db, "pdp_pieces_added", "set_id", failed); err != nil { + return err + } + return deleteProcessedRows(db, "pdp_pieces_removed", "set_id", failed) +} + +func reconcileProofSetPieces(ctx context.Context, db *gorm.DB, rpcClient *ChainPDPClient, setID uint64) error { + var ps model.PDPProofSet + if err := db.Where("set_id = ?", setID).First(&ps).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + // proof set not yet materialized locally; DataSetCreated may + // still be pending retry — retain inbox rows for next cycle + return errors.Errorf("proof set %d not found, retaining piece events", setID) + } + return err + } + if ps.Deleted { + Logger.Debugw("ignoring piece events for deleted proof set", "setId", setID) + return nil + } + + var wallet model.Wallet + if err := db.Where("address = ?", ps.ClientAddress).First(&wallet).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + Logger.Debugw("pieces changed for untracked client", "setId", setID, "client", ps.ClientAddress) + return nil + } + return err + } + + pieces, err := rpcClient.GetActivePieces(ctx, setID) + if err != nil { + return errors.Wrapf(err, "getActivePieces for set %d", setID) + } + + activeCIDs := make(map[string]cid.Cid, len(pieces)) + for _, c := range pieces { + if c != cid.Undef { + activeCIDs[c.String()] = c + } + } + + now := time.Now() + initialState := model.DealPublished + if ps.IsLive { + initialState = model.DealActive + } + + var hadErrors bool + + for _, pieceCID := range activeCIDs { + modelCID := model.CID(pieceCID) + err = database.DoRetry(ctx, func() error { + var existing model.Deal + result := db.Where("proof_set_id = ? AND piece_cid = ? AND deal_type = ?", + setID, modelCID, model.DealTypePDP).First(&existing) + if result.Error == nil { + if existing.State == model.DealExpired { + return db.Model(&model.Deal{}).Where("id = ?", existing.ID). + Update("state", initialState).Error + } + return nil + } + if !errors.Is(result.Error, gorm.ErrRecordNotFound) { + return result.Error + } + return db.Create(&model.Deal{ + DealType: model.DealTypePDP, + State: initialState, + ClientID: wallet.ID, + Provider: ps.Provider, + PieceCID: modelCID, + ProofSetID: ptr.Of(setID), + ProofSetLive: ptr.Of(ps.IsLive), + LastVerifiedAt: ptr.Of(now), + }).Error + }) + if err != nil { + Logger.Errorw("failed to upsert deal", "setId", setID, "pieceCid", pieceCID, "error", err) + hadErrors = true + } + } + + var existingDeals []model.Deal + if err := db.Where("proof_set_id = ? AND deal_type = ? AND state != ?", + setID, model.DealTypePDP, model.DealExpired).Find(&existingDeals).Error; err != nil { + return err + } + + for _, deal := range existingDeals { + if _, ok := activeCIDs[deal.PieceCID.String()]; !ok { + err = database.DoRetry(ctx, func() error { + return db.Model(&model.Deal{}).Where("id = ?", deal.ID). + Update("state", model.DealExpired).Error + }) + if err != nil { + Logger.Errorw("failed to expire removed deal", "dealId", deal.ID, "error", err) + hadErrors = true + } + } + } + + if hadErrors { + return errors.Errorf("partial reconciliation failure for proof set %d", setID) + } + return nil +} + +type nextProvingPeriodRow struct { + SetID_ uint64 `gorm:"column:set_id"` + ChallengeEpoch int64 `gorm:"column:challenge_epoch"` +} + +func (r nextProvingPeriodRow) setID() uint64 { return r.SetID_ } + +func processNextProvingPeriod(ctx context.Context, db *gorm.DB) error { + return processInbox(db, + "SELECT set_id, challenge_epoch FROM pdp_next_proving_period", + "pdp_next_proving_period", + func(r nextProvingPeriodRow) error { + if r.ChallengeEpoch > math.MaxInt32 || r.ChallengeEpoch < math.MinInt32 { + return errors.Errorf("challenge epoch %d overflows int32", r.ChallengeEpoch) + } + epoch32 := int32(r.ChallengeEpoch) + + if err := database.DoRetry(ctx, func() error { + return db.Model(&model.PDPProofSet{}).Where("set_id = ?", r.SetID_). + Update("challenge_epoch", r.ChallengeEpoch).Error + }); err != nil { + return err + } + + return database.DoRetry(ctx, func() error { + return db.Model(&model.Deal{}).Where("proof_set_id = ? AND deal_type = ?", + r.SetID_, model.DealTypePDP). + Update("next_challenge_epoch", epoch32).Error + }) + }, + ) +} + +type possessionProvenRow struct { + SetID_ uint64 `gorm:"column:set_id"` +} + +func (r possessionProvenRow) setID() uint64 { return r.SetID_ } + +func processPossessionProven(ctx context.Context, db *gorm.DB) error { + now := time.Now() + return processInbox(db, + "SELECT DISTINCT set_id FROM pdp_possession_proven", + "pdp_possession_proven", + func(r possessionProvenRow) error { + if err := database.DoRetry(ctx, func() error { + return db.Model(&model.PDPProofSet{}).Where("set_id = ?", r.SetID_). + Update("is_live", true).Error + }); err != nil { + return err + } + + // only activate non-expired deals; expired deals (from piece removal + // or dataset deletion) must not be resurrected by a later proof + return database.DoRetry(ctx, func() error { + return db.Model(&model.Deal{}). + Where("proof_set_id = ? AND deal_type = ? AND state != ?", + r.SetID_, model.DealTypePDP, model.DealExpired). + Updates(map[string]any{ + "proof_set_live": true, + "state": model.DealActive, + "last_verified_at": now, + }).Error + }) + }, + ) +} + +type dataSetDeletedRow struct { + SetID_ uint64 `gorm:"column:set_id"` +} + +func (r dataSetDeletedRow) setID() uint64 { return r.SetID_ } + +func processDataSetDeleted(ctx context.Context, db *gorm.DB) error { + return processInbox(db, + "SELECT set_id FROM pdp_dataset_deleted", + "pdp_dataset_deleted", + func(r dataSetDeletedRow) error { + if err := database.DoRetry(ctx, func() error { + return db.Model(&model.PDPProofSet{}).Where("set_id = ?", r.SetID_). + Update("deleted", true).Error + }); err != nil { + return err + } + + return database.DoRetry(ctx, func() error { + return db.Model(&model.Deal{}).Where("proof_set_id = ? AND deal_type = ?", + r.SetID_, model.DealTypePDP). + Update("state", model.DealExpired).Error + }) + }, + ) +} + +type spChangedRow struct { + SetID_ uint64 `gorm:"column:set_id"` + NewSP []byte `gorm:"column:new_sp"` +} + +func (r spChangedRow) setID() uint64 { return r.SetID_ } + +func processSPChanged(ctx context.Context, db *gorm.DB) error { + return processInbox(db, + "SELECT set_id, new_sp FROM pdp_sp_changed", + "pdp_sp_changed", + func(r spChangedRow) error { + newAddr, err := commonToDelegatedAddress(common.BytesToAddress(r.NewSP)) + if err != nil { + return errors.Wrap(err, "converting SP address") + } + + if err := database.DoRetry(ctx, func() error { + return db.Model(&model.PDPProofSet{}).Where("set_id = ?", r.SetID_). + Update("provider", newAddr.String()).Error + }); err != nil { + return err + } + + return database.DoRetry(ctx, func() error { + return db.Model(&model.Deal{}).Where("proof_set_id = ? AND deal_type = ?", + r.SetID_, model.DealTypePDP). + Update("provider", newAddr.String()).Error + }) + }, + ) +} diff --git a/service/pdptracker/eventprocessor_test.go b/service/pdptracker/eventprocessor_test.go new file mode 100644 index 00000000..055bdafd --- /dev/null +++ b/service/pdptracker/eventprocessor_test.go @@ -0,0 +1,452 @@ +package pdptracker + +import ( + "context" + "testing" + + "github.com/data-preservation-programs/singularity/model" + "github.com/data-preservation-programs/singularity/util/testutil" + "github.com/ethereum/go-ethereum/common" + "github.com/filecoin-project/go-address" + "github.com/indexsupply/shovel/shovel/config" + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" + "gorm.io/gorm" +) + +func createShovelTables(t *testing.T, db *gorm.DB) { + t.Helper() + conf := buildShovelConfig("postgres://unused", "https://unused", 314, common.Address{}, 0) + require.NoError(t, config.ValidateFix(&conf)) + for _, ig := range conf.Integrations { + for _, stmt := range ig.Table.DDL() { + require.NoError(t, db.Exec(stmt).Error) + } + } +} + +var testPieceCID cid.Cid + +func init() { + c, err := cid.Decode("baga6ea4seaqao7s73y24kcutaosvacpdjgfe5pw76ooefnyqw4ynr3d2y6x2mpq") + if err != nil { + panic(err) + } + testPieceCID = c +} + +type pgTestEnv struct { + ctx context.Context + db *gorm.DB + client *ChainPDPClient + mock *mockContractCaller + listenerEth common.Address + providerEth common.Address + listenerFil address.Address + providerFil address.Address +} + +func pgTest(t *testing.T, fn func(t *testing.T, e pgTestEnv)) { + t.Helper() + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + if db.Dialector.Name() != "postgres" { + t.Skip("PDP event processing requires Postgres") + } + createShovelTables(t, db) + + orig := address.CurrentNetwork + t.Cleanup(func() { address.CurrentNetwork = orig }) + address.CurrentNetwork = address.Mainnet + + le := common.HexToAddress("0x1111111111111111111111111111111111111111") + pe := common.HexToAddress("0x2222222222222222222222222222222222222222") + lf, err := commonToDelegatedAddress(le) + require.NoError(t, err) + pf, err := commonToDelegatedAddress(pe) + require.NoError(t, err) + + m := &mockContractCaller{ + listeners: map[uint64]common.Address{1: le}, + pieces: map[uint64][]cid.Cid{1: {testPieceCID}}, + } + + fn(t, pgTestEnv{ + ctx: ctx, db: db, + client: &ChainPDPClient{contract: m, pageSize: 100}, + mock: m, + listenerEth: le, providerEth: pe, + listenerFil: lf, providerFil: pf, + }) + }) +} + +func (e pgTestEnv) setupFixtures(t *testing.T) { + t.Helper() + require.NoError(t, e.db.Create(&model.Wallet{ + ID: "f0100", Address: e.listenerFil.String(), + }).Error) + require.NoError(t, e.db.Create(&model.PDPProofSet{ + SetID: 1, ClientAddress: e.listenerFil.String(), + Provider: e.providerFil.String(), CreatedBlock: 100, + }).Error) +} + +func (e pgTestEnv) insertDeal(t *testing.T, state model.DealState, opts ...func(*model.Deal)) model.Deal { + t.Helper() + setID := uint64(1) + d := model.Deal{ + DealType: model.DealTypePDP, + State: state, + ClientID: "f0100", + ProofSetID: &setID, + } + for _, o := range opts { + o(&d) + } + require.NoError(t, e.db.Create(&d).Error) + return d +} + +func (e pgTestEnv) shovelCount(t *testing.T, table string) int64 { + t.Helper() + var n int64 + require.NoError(t, e.db.Raw("SELECT COUNT(*) FROM "+table).Scan(&n).Error) + return n +} + +func TestProcessDataSetCreated(t *testing.T) { + pgTest(t, func(t *testing.T, e pgTestEnv) { + require.NoError(t, e.db.Exec( + "INSERT INTO pdp_dataset_created (set_id, storage_provider, block_num) VALUES (?, ?, ?)", + 1, e.providerEth.Bytes(), 100, + ).Error) + + require.NoError(t, processDataSetCreated(e.ctx, e.db, e.client)) + + var ps model.PDPProofSet + require.NoError(t, e.db.Where("set_id = ?", 1).First(&ps).Error) + require.Equal(t, e.listenerFil.String(), ps.ClientAddress) + require.Equal(t, e.providerFil.String(), ps.Provider) + require.EqualValues(t, 100, ps.CreatedBlock) + require.EqualValues(t, 0, e.shovelCount(t, "pdp_dataset_created")) + }) +} + +func TestProcessDataSetCreated_Idempotent(t *testing.T) { + pgTest(t, func(t *testing.T, e pgTestEnv) { + for range 2 { + require.NoError(t, e.db.Exec( + "INSERT INTO pdp_dataset_created (set_id, storage_provider, block_num) VALUES (?, ?, ?)", + 1, e.providerEth.Bytes(), 100, + ).Error) + require.NoError(t, processDataSetCreated(e.ctx, e.db, e.client)) + } + + var count int64 + require.NoError(t, e.db.Model(&model.PDPProofSet{}).Count(&count).Error) + require.EqualValues(t, 1, count) + }) +} + +func TestProcessPiecesChanged_CreatesDeals(t *testing.T) { + pgTest(t, func(t *testing.T, e pgTestEnv) { + e.setupFixtures(t) + + require.NoError(t, e.db.Exec("INSERT INTO pdp_pieces_added (set_id) VALUES (?)", 1).Error) + require.NoError(t, processPiecesChanged(e.ctx, e.db, e.client)) + + var deals []model.Deal + require.NoError(t, e.db.Where("deal_type = ?", model.DealTypePDP).Find(&deals).Error) + require.Len(t, deals, 1) + require.Equal(t, model.DealPublished, deals[0].State) + require.Equal(t, testPieceCID.String(), deals[0].PieceCID.String()) + require.EqualValues(t, 1, *deals[0].ProofSetID) + require.Equal(t, "f0100", deals[0].ClientID) + require.EqualValues(t, 0, e.shovelCount(t, "pdp_pieces_added")) + }) +} + +func TestProcessPiecesChanged_LiveProofSetCreatesActiveDeals(t *testing.T) { + pgTest(t, func(t *testing.T, e pgTestEnv) { + e.setupFixtures(t) + require.NoError(t, e.db.Model(&model.PDPProofSet{}).Where("set_id = ?", 1). + Update("is_live", true).Error) + + require.NoError(t, e.db.Exec("INSERT INTO pdp_pieces_added (set_id) VALUES (?)", 1).Error) + require.NoError(t, processPiecesChanged(e.ctx, e.db, e.client)) + + var deals []model.Deal + require.NoError(t, e.db.Where("deal_type = ?", model.DealTypePDP).Find(&deals).Error) + require.Len(t, deals, 1) + require.Equal(t, model.DealActive, deals[0].State) + require.True(t, *deals[0].ProofSetLive) + }) +} + +func TestProcessPiecesChanged_ExpiresRemovedPieces(t *testing.T) { + pgTest(t, func(t *testing.T, e pgTestEnv) { + e.setupFixtures(t) + e.insertDeal(t, model.DealActive, func(d *model.Deal) { + d.Provider = e.providerFil.String() + d.PieceCID = model.CID(testPieceCID) + }) + + require.NoError(t, e.db.Exec("INSERT INTO pdp_pieces_removed (set_id) VALUES (?)", 1).Error) + + // contract returns empty active pieces + e.mock.pieces[1] = nil + require.NoError(t, processPiecesChanged(e.ctx, e.db, e.client)) + + var deal model.Deal + require.NoError(t, e.db.Where("deal_type = ?", model.DealTypePDP).First(&deal).Error) + require.Equal(t, model.DealExpired, deal.State) + }) +} + +func TestProcessNextProvingPeriod(t *testing.T) { + pgTest(t, func(t *testing.T, e pgTestEnv) { + e.setupFixtures(t) + e.insertDeal(t, model.DealPublished) + + require.NoError(t, e.db.Exec( + "INSERT INTO pdp_next_proving_period (set_id, challenge_epoch, leaf_count) VALUES (?, ?, ?)", + 1, 500, 42, + ).Error) + + require.NoError(t, processNextProvingPeriod(e.ctx, e.db)) + + var ps model.PDPProofSet + require.NoError(t, e.db.Where("set_id = ?", 1).First(&ps).Error) + require.EqualValues(t, 500, *ps.ChallengeEpoch) + + var deal model.Deal + require.NoError(t, e.db.Where("deal_type = ?", model.DealTypePDP).First(&deal).Error) + require.EqualValues(t, 500, *deal.NextChallengeEpoch) + require.EqualValues(t, 0, e.shovelCount(t, "pdp_next_proving_period")) + }) +} + +func TestProcessPossessionProven(t *testing.T) { + pgTest(t, func(t *testing.T, e pgTestEnv) { + e.setupFixtures(t) + e.insertDeal(t, model.DealPublished) + + require.NoError(t, e.db.Exec("INSERT INTO pdp_possession_proven (set_id) VALUES (?)", 1).Error) + require.NoError(t, processPossessionProven(e.ctx, e.db)) + + var ps model.PDPProofSet + require.NoError(t, e.db.Where("set_id = ?", 1).First(&ps).Error) + require.True(t, ps.IsLive) + + var deal model.Deal + require.NoError(t, e.db.Where("deal_type = ?", model.DealTypePDP).First(&deal).Error) + require.Equal(t, model.DealActive, deal.State) + require.True(t, *deal.ProofSetLive) + require.NotNil(t, deal.LastVerifiedAt) + require.EqualValues(t, 0, e.shovelCount(t, "pdp_possession_proven")) + }) +} + +func TestProcessPossessionProven_DoesNotResurrectExpired(t *testing.T) { + pgTest(t, func(t *testing.T, e pgTestEnv) { + e.setupFixtures(t) + e.insertDeal(t, model.DealPublished) + e.insertDeal(t, model.DealExpired) + + require.NoError(t, e.db.Exec("INSERT INTO pdp_possession_proven (set_id) VALUES (?)", 1).Error) + require.NoError(t, processPossessionProven(e.ctx, e.db)) + + var deals []model.Deal + require.NoError(t, e.db.Where("deal_type = ?", model.DealTypePDP).Order("id").Find(&deals).Error) + require.Len(t, deals, 2) + require.Equal(t, model.DealActive, deals[0].State) + require.Equal(t, model.DealExpired, deals[1].State) + }) +} + +func TestProcessDataSetDeleted(t *testing.T) { + pgTest(t, func(t *testing.T, e pgTestEnv) { + e.setupFixtures(t) + e.insertDeal(t, model.DealActive) + + require.NoError(t, e.db.Exec( + "INSERT INTO pdp_dataset_deleted (set_id, deleted_leaf_count) VALUES (?, ?)", 1, 10, + ).Error) + + require.NoError(t, processDataSetDeleted(e.ctx, e.db)) + + var ps model.PDPProofSet + require.NoError(t, e.db.Where("set_id = ?", 1).First(&ps).Error) + require.True(t, ps.Deleted) + + var deal model.Deal + require.NoError(t, e.db.Where("deal_type = ?", model.DealTypePDP).First(&deal).Error) + require.Equal(t, model.DealExpired, deal.State) + require.EqualValues(t, 0, e.shovelCount(t, "pdp_dataset_deleted")) + }) +} + +func TestProcessSPChanged(t *testing.T) { + pgTest(t, func(t *testing.T, e pgTestEnv) { + e.setupFixtures(t) + e.insertDeal(t, model.DealActive, func(d *model.Deal) { + d.Provider = e.providerFil.String() + }) + + newSP := common.HexToAddress("0x3333333333333333333333333333333333333333") + require.NoError(t, e.db.Exec( + "INSERT INTO pdp_sp_changed (set_id, old_sp, new_sp) VALUES (?, ?, ?)", + 1, e.providerEth.Bytes(), newSP.Bytes(), + ).Error) + + require.NoError(t, processSPChanged(e.ctx, e.db)) + + expectedNewSP, _ := commonToDelegatedAddress(newSP) + + var ps model.PDPProofSet + require.NoError(t, e.db.Where("set_id = ?", 1).First(&ps).Error) + require.Equal(t, expectedNewSP.String(), ps.Provider) + + var deal model.Deal + require.NoError(t, e.db.Where("deal_type = ?", model.DealTypePDP).First(&deal).Error) + require.Equal(t, expectedNewSP.String(), deal.Provider) + require.EqualValues(t, 0, e.shovelCount(t, "pdp_sp_changed")) + }) +} + +func TestDeleteProcessedRows(t *testing.T) { + pgTest(t, func(t *testing.T, e pgTestEnv) { + insert := func(ids ...int) { + for _, id := range ids { + require.NoError(t, e.db.Exec( + "INSERT INTO pdp_dataset_created (set_id, storage_provider, block_num) VALUES (?, ?, ?)", + id, []byte{0x01}, 100, + ).Error) + } + } + remaining := func() []uint64 { + type r struct { + SetID uint64 `gorm:"column:set_id"` + } + var rows []r + require.NoError(t, e.db.Raw("SELECT set_id FROM pdp_dataset_created ORDER BY set_id").Scan(&rows).Error) + out := make([]uint64, len(rows)) + for i, row := range rows { + out[i] = row.SetID + } + return out + } + clear := func() { require.NoError(t, e.db.Exec("DELETE FROM pdp_dataset_created").Error) } + + // no failures → delete all + insert(1, 2, 3) + require.NoError(t, deleteProcessedRows(e.db, "pdp_dataset_created", "set_id", nil)) + require.Empty(t, remaining()) + + // partial failures → retain only failed + insert(10, 20, 30) + require.NoError(t, deleteProcessedRows(e.db, "pdp_dataset_created", "set_id", []uint64{10, 30})) + require.Equal(t, []uint64{10, 30}, remaining()) + clear() + + // duplicate failed IDs → correct retention + insert(1, 2, 3) + require.NoError(t, deleteProcessedRows(e.db, "pdp_dataset_created", "set_id", []uint64{2, 2, 2})) + require.Equal(t, []uint64{2}, remaining()) + clear() + + // all failed → retain all + insert(5, 6, 7) + require.NoError(t, deleteProcessedRows(e.db, "pdp_dataset_created", "set_id", []uint64{5, 6, 7})) + require.Equal(t, []uint64{5, 6, 7}, remaining()) + }) +} + +func TestPiecesRetainedWhenProofSetMissing(t *testing.T) { + pgTest(t, func(t *testing.T, e pgTestEnv) { + // only create wallet, no proof set yet + require.NoError(t, e.db.Create(&model.Wallet{ + ID: "f0100", Address: e.listenerFil.String(), + }).Error) + + // PiecesAdded arrives before DataSetCreated is processed + require.NoError(t, e.db.Exec("INSERT INTO pdp_pieces_added (set_id) VALUES (?)", 1).Error) + + // process — proof set missing, rows must be retained + require.NoError(t, processPiecesChanged(e.ctx, e.db, e.client)) + require.EqualValues(t, 1, e.shovelCount(t, "pdp_pieces_added")) + + var dealCount int64 + require.NoError(t, e.db.Model(&model.Deal{}).Where("deal_type = ?", model.DealTypePDP).Count(&dealCount).Error) + require.EqualValues(t, 0, dealCount) + + // DataSetCreated succeeds + require.NoError(t, e.db.Exec( + "INSERT INTO pdp_dataset_created (set_id, storage_provider, block_num) VALUES (?, ?, ?)", + 1, e.providerEth.Bytes(), 100, + ).Error) + require.NoError(t, processDataSetCreated(e.ctx, e.db, e.client)) + + // retry — proof set exists now, pieces processed + require.NoError(t, processPiecesChanged(e.ctx, e.db, e.client)) + require.EqualValues(t, 0, e.shovelCount(t, "pdp_pieces_added")) + + require.NoError(t, e.db.Model(&model.Deal{}).Where("deal_type = ?", model.DealTypePDP).Count(&dealCount).Error) + require.EqualValues(t, 1, dealCount) + }) +} + +func TestProcessNewEvents_EmptyTables(t *testing.T) { + pgTest(t, func(t *testing.T, e pgTestEnv) { + require.NoError(t, processNewEvents(e.ctx, e.db, e.client)) + }) +} + +func TestProcessNewEvents_FullLifecycle(t *testing.T) { + pgTest(t, func(t *testing.T, e pgTestEnv) { + require.NoError(t, e.db.Create(&model.Wallet{ + ID: "f0100", Address: e.listenerFil.String(), + }).Error) + + // step 1: DataSetCreated + require.NoError(t, e.db.Exec( + "INSERT INTO pdp_dataset_created (set_id, storage_provider, block_num) VALUES (?, ?, ?)", + 1, e.providerEth.Bytes(), 100, + ).Error) + require.NoError(t, processNewEvents(e.ctx, e.db, e.client)) + + var psCount int64 + require.NoError(t, e.db.Model(&model.PDPProofSet{}).Count(&psCount).Error) + require.EqualValues(t, 1, psCount) + + // step 2: PiecesAdded + require.NoError(t, e.db.Exec("INSERT INTO pdp_pieces_added (set_id) VALUES (?)", 1).Error) + require.NoError(t, processNewEvents(e.ctx, e.db, e.client)) + + var dealCount int64 + require.NoError(t, e.db.Model(&model.Deal{}).Where("deal_type = ?", model.DealTypePDP).Count(&dealCount).Error) + require.EqualValues(t, 1, dealCount) + + // step 3: PossessionProven → active + require.NoError(t, e.db.Exec("INSERT INTO pdp_possession_proven (set_id) VALUES (?)", 1).Error) + require.NoError(t, processNewEvents(e.ctx, e.db, e.client)) + + var deal model.Deal + require.NoError(t, e.db.Where("deal_type = ?", model.DealTypePDP).First(&deal).Error) + require.Equal(t, model.DealActive, deal.State) + require.True(t, *deal.ProofSetLive) + + // step 4: DataSetDeleted → expired + require.NoError(t, e.db.Exec( + "INSERT INTO pdp_dataset_deleted (set_id, deleted_leaf_count) VALUES (?, ?)", 1, 1, + ).Error) + require.NoError(t, processNewEvents(e.ctx, e.db, e.client)) + + require.NoError(t, e.db.Where("deal_type = ?", model.DealTypePDP).First(&deal).Error) + require.Equal(t, model.DealExpired, deal.State) + + var ps model.PDPProofSet + require.NoError(t, e.db.Where("set_id = ?", 1).First(&ps).Error) + require.True(t, ps.Deleted) + }) +} diff --git a/service/pdptracker/indexer.go b/service/pdptracker/indexer.go new file mode 100644 index 00000000..c417d32b --- /dev/null +++ b/service/pdptracker/indexer.go @@ -0,0 +1,328 @@ +package pdptracker + +import ( + "context" + "strings" + "time" + + "github.com/cockroachdb/errors" + "github.com/ethereum/go-ethereum/common" + "github.com/indexsupply/shovel/dig" + "github.com/indexsupply/shovel/shovel" + "github.com/indexsupply/shovel/shovel/config" + "github.com/indexsupply/shovel/wpg" + "github.com/jackc/pgx/v5/pgxpool" +) + +type PDPIndexer struct { + pgp *pgxpool.Pool + conf config.Root +} + +func NewPDPIndexer(ctx context.Context, pgURL string, rpcURL string, chainID uint64, contractAddr common.Address, fullResync bool) (*PDPIndexer, error) { + var startBlock uint64 + if fullResync { + startBlock = pdpVerifierDeployBlock[chainID] + } + conf := buildShovelConfig(pgURL, rpcURL, chainID, contractAddr, startBlock) + if err := config.ValidateFix(&conf); err != nil { + return nil, errors.Wrap(err, "invalid shovel config") + } + + pgp, err := wpg.NewPool(ctx, pgURL) + if err != nil { + return nil, errors.Wrap(err, "failed to create indexer pg pool") + } + + tx, err := pgp.Begin(ctx) + if err != nil { + pgp.Close() + return nil, errors.Wrap(err, "failed to begin migration tx") + } + if _, err := tx.Exec(ctx, shovel.Schema); err != nil { + //nolint:errcheck + tx.Rollback(ctx) + pgp.Close() + return nil, errors.Wrap(err, "failed to apply shovel schema") + } + if err := config.Migrate(ctx, tx, conf); err != nil { + //nolint:errcheck + tx.Rollback(ctx) + pgp.Close() + return nil, errors.Wrap(err, "failed to migrate integration tables") + } + if fullResync { + if _, err := tx.Exec(ctx, + "DELETE FROM shovel.task_updates WHERE src_name = $1", srcName); err != nil { + //nolint:errcheck + tx.Rollback(ctx) + pgp.Close() + return nil, errors.Wrap(err, "failed to reset indexer cursor") + } + for _, ig := range conf.Integrations { + if _, err := tx.Exec(ctx, + "TRUNCATE "+ig.Table.Name); err != nil { + //nolint:errcheck + tx.Rollback(ctx) + pgp.Close() + return nil, errors.Wrap(err, "failed to truncate "+ig.Table.Name) + } + } + Logger.Infow("full resync: cleared indexer state, restarting from deployment block", + "startBlock", conf.Sources[0].Start) + } + if err := tx.Commit(ctx); err != nil { + pgp.Close() + return nil, errors.Wrap(err, "failed to commit migration") + } + + return &PDPIndexer{pgp: pgp, conf: conf}, nil +} + +func (idx *PDPIndexer) Start(ctx context.Context, exitErr chan<- error) error { + mgr := shovel.NewManager(ctx, idx.pgp, idx.conf) + ec := make(chan error, 1) + go mgr.Run(ec) + if err := <-ec; err != nil { + return errors.Wrap(err, "shovel indexer startup failed") + } + Logger.Info("shovel indexer started") + + go func() { + <-ctx.Done() + idx.pgp.Close() + Logger.Info("shovel indexer stopped") + if exitErr != nil { + exitErr <- nil + } + }() + + return nil +} + +func (*PDPIndexer) Name() string { return "PDPIndexer" } + +const srcName = "fevm" + +// block at which the PDPVerifier contract was deployed per chain +var pdpVerifierDeployBlock = map[uint64]uint64{ + 314: 5441432, // mainnet + 314159: 3140755, // calibration +} + +func buildShovelConfig(pgURL, rpcURL string, chainID uint64, contract common.Address, startBlock uint64) config.Root { + addrHex := strings.ToLower(contract.Hex()) + src := config.Source{ + Name: srcName, + ChainID: chainID, + URLs: []string{rpcURL}, + Start: startBlock, + PollDuration: time.Second, + } + + addrFilter := dig.BlockData{ + Name: "log_addr", + Column: "log_addr", + Filter: dig.Filter{Op: "contains", Arg: []string{addrHex}}, + } + addrCol := wpg.Column{Name: "log_addr", Type: "bytea"} + + return config.Root{ + PGURL: pgURL, + Sources: []config.Source{src}, + Integrations: []config.Integration{ + dataSetCreatedIG(src, addrFilter, addrCol), + piecesAddedIG(src, addrFilter, addrCol), + piecesRemovedIG(src, addrFilter, addrCol), + nextProvingPeriodIG(src, addrFilter, addrCol), + possessionProvenIG(src, addrFilter, addrCol), + dataSetDeletedIG(src, addrFilter, addrCol), + spChangedIG(src, addrFilter, addrCol), + }, + } +} + +func dataSetCreatedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { + return config.Integration{ + Name: "pdp_dataset_created", + Enabled: true, + Sources: []config.Source{{Name: src.Name, Start: src.Start}}, + Table: wpg.Table{ + Name: "pdp_dataset_created", + Columns: []wpg.Column{ + ac, + {Name: "set_id", Type: "numeric"}, + {Name: "storage_provider", Type: "bytea"}, + }, + }, + Block: []dig.BlockData{af}, + Event: dig.Event{ + Name: "DataSetCreated", + Type: "event", + Inputs: []dig.Input{ + {Indexed: true, Name: "setId", Type: "uint256", Column: "set_id"}, + {Indexed: true, Name: "storageProvider", Type: "address", Column: "storage_provider"}, + }, + }, + } +} + +// only set_id captured; array fields reconciled via getActivePieces RPC +func piecesAddedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { + return config.Integration{ + Name: "pdp_pieces_added", + Enabled: true, + Sources: []config.Source{{Name: src.Name, Start: src.Start}}, + Table: wpg.Table{ + Name: "pdp_pieces_added", + Columns: []wpg.Column{ + ac, + {Name: "set_id", Type: "numeric"}, + }, + }, + Block: []dig.BlockData{af}, + Event: dig.Event{ + Name: "PiecesAdded", + Type: "event", + Inputs: []dig.Input{ + {Indexed: true, Name: "setId", Type: "uint256", Column: "set_id"}, + // non-indexed array fields listed for correct signature, not selected + {Name: "pieceIds", Type: "uint256[]"}, + {Name: "pieceCids", Type: "tuple[]", Components: []dig.Input{ + {Name: "data", Type: "bytes"}, + }}, + }, + }, + } +} + +func piecesRemovedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { + return config.Integration{ + Name: "pdp_pieces_removed", + Enabled: true, + Sources: []config.Source{{Name: src.Name, Start: src.Start}}, + Table: wpg.Table{ + Name: "pdp_pieces_removed", + Columns: []wpg.Column{ + ac, + {Name: "set_id", Type: "numeric"}, + }, + }, + Block: []dig.BlockData{af}, + Event: dig.Event{ + Name: "PiecesRemoved", + Type: "event", + Inputs: []dig.Input{ + {Indexed: true, Name: "setId", Type: "uint256", Column: "set_id"}, + {Name: "pieceIds", Type: "uint256[]"}, + }, + }, + } +} + +func nextProvingPeriodIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { + return config.Integration{ + Name: "pdp_next_proving_period", + Enabled: true, + Sources: []config.Source{{Name: src.Name, Start: src.Start}}, + Table: wpg.Table{ + Name: "pdp_next_proving_period", + Columns: []wpg.Column{ + ac, + {Name: "set_id", Type: "numeric"}, + {Name: "challenge_epoch", Type: "numeric"}, + {Name: "leaf_count", Type: "numeric"}, + }, + }, + Block: []dig.BlockData{af}, + Event: dig.Event{ + Name: "NextProvingPeriod", + Type: "event", + Inputs: []dig.Input{ + {Indexed: true, Name: "setId", Type: "uint256", Column: "set_id"}, + {Name: "challengeEpoch", Type: "uint256", Column: "challenge_epoch"}, + {Name: "leafCount", Type: "uint256", Column: "leaf_count"}, + }, + }, + } +} + +// only set_id captured; challenges tuple not needed for deal tracking +func possessionProvenIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { + return config.Integration{ + Name: "pdp_possession_proven", + Enabled: true, + Sources: []config.Source{{Name: src.Name, Start: src.Start}}, + Table: wpg.Table{ + Name: "pdp_possession_proven", + Columns: []wpg.Column{ + ac, + {Name: "set_id", Type: "numeric"}, + }, + }, + Block: []dig.BlockData{af}, + Event: dig.Event{ + Name: "PossessionProven", + Type: "event", + Inputs: []dig.Input{ + {Indexed: true, Name: "setId", Type: "uint256", Column: "set_id"}, + {Name: "challenges", Type: "tuple[]", Components: []dig.Input{ + {Name: "pieceId", Type: "uint256"}, + {Name: "offset", Type: "uint256"}, + }}, + }, + }, + } +} + +func dataSetDeletedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { + return config.Integration{ + Name: "pdp_dataset_deleted", + Enabled: true, + Sources: []config.Source{{Name: src.Name, Start: src.Start}}, + Table: wpg.Table{ + Name: "pdp_dataset_deleted", + Columns: []wpg.Column{ + ac, + {Name: "set_id", Type: "numeric"}, + {Name: "deleted_leaf_count", Type: "numeric"}, + }, + }, + Block: []dig.BlockData{af}, + Event: dig.Event{ + Name: "DataSetDeleted", + Type: "event", + Inputs: []dig.Input{ + {Indexed: true, Name: "setId", Type: "uint256", Column: "set_id"}, + {Name: "deletedLeafCount", Type: "uint256", Column: "deleted_leaf_count"}, + }, + }, + } +} + +func spChangedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { + return config.Integration{ + Name: "pdp_sp_changed", + Enabled: true, + Sources: []config.Source{{Name: src.Name, Start: src.Start}}, + Table: wpg.Table{ + Name: "pdp_sp_changed", + Columns: []wpg.Column{ + ac, + {Name: "set_id", Type: "numeric"}, + {Name: "old_sp", Type: "bytea"}, + {Name: "new_sp", Type: "bytea"}, + }, + }, + Block: []dig.BlockData{af}, + Event: dig.Event{ + Name: "StorageProviderChanged", + Type: "event", + Inputs: []dig.Input{ + {Indexed: true, Name: "setId", Type: "uint256", Column: "set_id"}, + {Indexed: true, Name: "oldStorageProvider", Type: "address", Column: "old_sp"}, + {Indexed: true, Name: "newStorageProvider", Type: "address", Column: "new_sp"}, + }, + }, + } +} diff --git a/service/pdptracker/indexer_test.go b/service/pdptracker/indexer_test.go new file mode 100644 index 00000000..43b0d7b2 --- /dev/null +++ b/service/pdptracker/indexer_test.go @@ -0,0 +1,96 @@ +package pdptracker + +import ( + "strings" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/indexsupply/shovel/shovel/config" + "github.com/stretchr/testify/require" +) + +func TestBuildShovelConfig(t *testing.T) { + contract := common.HexToAddress("0xBADd0B92C1c71d02E7d520f64c0876538fa2557F") + conf := buildShovelConfig("postgres://localhost/test", "https://rpc.example.com", 314, contract, 0) + require.NoError(t, config.ValidateFix(&conf)) + + require.Len(t, conf.Sources, 1) + require.Equal(t, "fevm", conf.Sources[0].Name) + require.Equal(t, uint64(314), conf.Sources[0].ChainID) + require.Len(t, conf.Sources[0].URLs, 1) + require.Equal(t, uint64(0), conf.Sources[0].Start) + + // 7 event integrations + require.Len(t, conf.Integrations, 7) + + names := make(map[string]bool) + for _, ig := range conf.Integrations { + names[ig.Name] = true + require.True(t, ig.Enabled) + require.Len(t, ig.Sources, 1) + require.Equal(t, "fevm", ig.Sources[0].Name) + require.Equal(t, conf.Sources[0].Start, ig.Sources[0].Start) + + // each integration must have a contract address filter + require.NotEmpty(t, ig.Block) + require.Equal(t, "log_addr", ig.Block[0].Name) + require.Equal(t, "contains", ig.Block[0].Filter.Op) + require.Contains(t, ig.Block[0].Filter.Arg[0], strings.ToLower(contract.Hex())) + } + + expectedNames := []string{ + "pdp_dataset_created", + "pdp_pieces_added", + "pdp_pieces_removed", + "pdp_next_proving_period", + "pdp_possession_proven", + "pdp_dataset_deleted", + "pdp_sp_changed", + } + for _, name := range expectedNames { + require.True(t, names[name], "missing integration: %s", name) + } +} + +func TestBuildShovelConfig_UnknownChain(t *testing.T) { + contract := common.HexToAddress("0xBADd0B92C1c71d02E7d520f64c0876538fa2557F") + conf := buildShovelConfig("postgres://localhost/test", "https://rpc.example.com", 999, contract, 0) + require.NoError(t, config.ValidateFix(&conf)) + require.Equal(t, uint64(0), conf.Sources[0].Start) +} + +func TestBuildShovelConfig_EventInputs(t *testing.T) { + contract := common.HexToAddress("0x85e366Cf9DD2c0aE37E963d9556F5f4718d6417C") + conf := buildShovelConfig("postgres://localhost/test", "https://rpc.example.com", 314159, contract, 0) + + // find DataSetCreated and verify inputs + for _, ig := range conf.Integrations { + if ig.Name == "pdp_dataset_created" { + require.Equal(t, "DataSetCreated", ig.Event.Name) + require.Len(t, ig.Event.Inputs, 2) + require.True(t, ig.Event.Inputs[0].Indexed) + require.Equal(t, "uint256", ig.Event.Inputs[0].Type) + require.Equal(t, "set_id", ig.Event.Inputs[0].Column) + require.True(t, ig.Event.Inputs[1].Indexed) + require.Equal(t, "address", ig.Event.Inputs[1].Type) + require.Equal(t, "storage_provider", ig.Event.Inputs[1].Column) + } + + if ig.Name == "pdp_pieces_added" { + require.Equal(t, "PiecesAdded", ig.Event.Name) + require.Len(t, ig.Event.Inputs, 3) + // only first input selected + require.NotEmpty(t, ig.Event.Inputs[0].Column) + require.Empty(t, ig.Event.Inputs[1].Column) + require.Empty(t, ig.Event.Inputs[2].Column) + } + + if ig.Name == "pdp_next_proving_period" { + require.Equal(t, "NextProvingPeriod", ig.Event.Name) + require.Len(t, ig.Event.Inputs, 3) + require.Equal(t, "set_id", ig.Event.Inputs[0].Column) + require.Equal(t, "challenge_epoch", ig.Event.Inputs[1].Column) + require.Equal(t, "leaf_count", ig.Event.Inputs[2].Column) + } + } +} diff --git a/service/pdptracker/integration_test.go b/service/pdptracker/integration_test.go new file mode 100644 index 00000000..294ea797 --- /dev/null +++ b/service/pdptracker/integration_test.go @@ -0,0 +1,199 @@ +package pdptracker + +import ( + "context" + "os" + "testing" + "time" + + "github.com/data-preservation-programs/go-synapse" + "github.com/data-preservation-programs/go-synapse/constants" + "github.com/data-preservation-programs/singularity/util/testutil" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/stretchr/testify/require" + "gorm.io/gorm" +) + +const calibnetRPC = "https://api.calibration.node.glif.io/rpc/v1" + +func startCalibnetFork(t *testing.T) string { + t.Helper() + anvil := testutil.StartAnvil(t, calibnetRPC) + return anvil.RPCURL +} + +func TestIntegration_NetworkDetection(t *testing.T) { + rpcURL := startCalibnetFork(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + ethClient, err := ethclient.DialContext(ctx, rpcURL) + require.NoError(t, err) + defer ethClient.Close() + + network, chainID, err := synapse.DetectNetwork(ctx, ethClient) + require.NoError(t, err) + require.Equal(t, constants.NetworkCalibration, network) + require.EqualValues(t, 314159, chainID) + + contractAddr := constants.GetPDPVerifierAddress(network) + require.NotEqual(t, common.Address{}, contractAddr) + t.Logf("calibnet PDPVerifier: %s", contractAddr.Hex()) +} + +func TestIntegration_ShovelConfig(t *testing.T) { + contractAddr := constants.GetPDPVerifierAddress(constants.NetworkCalibration) + conf := buildShovelConfig( + "postgres://localhost/test", + calibnetRPC, + uint64(constants.ChainIDCalibration), + contractAddr, + 0, + ) + + require.Len(t, conf.Integrations, 7) + require.Len(t, conf.Sources, 1) + require.Equal(t, uint64(314159), conf.Sources[0].ChainID) +} + +func TestIntegration_ShovelIndexer(t *testing.T) { + rpcURL := startCalibnetFork(t) + + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + if db.Dialector.Name() != "postgres" { + t.Skip("Shovel requires Postgres") + return + } + + connStr := os.Getenv("DATABASE_CONNECTION_STRING") + require.NotEmpty(t, connStr) + + contractAddr := constants.GetPDPVerifierAddress(constants.NetworkCalibration) + + indexer, err := NewPDPIndexer(ctx, connStr, rpcURL, uint64(constants.ChainIDCalibration), contractAddr, false) + require.NoError(t, err) + + indexCtx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + + exitErr := make(chan error, 1) + err = indexer.Start(indexCtx, exitErr) + require.NoError(t, err) + + time.Sleep(10 * time.Second) + + var schemaExists bool + err = db.Raw("SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = 'shovel')").Scan(&schemaExists).Error + require.NoError(t, err) + require.True(t, schemaExists, "shovel schema should exist") + + for _, table := range []string{ + "pdp_dataset_created", + "pdp_pieces_added", + "pdp_pieces_removed", + "pdp_next_proving_period", + "pdp_possession_proven", + "pdp_dataset_deleted", + "pdp_sp_changed", + } { + var exists bool + err = db.Raw( + "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = ?)", table, + ).Scan(&exists).Error + require.NoError(t, err) + require.True(t, exists, "table %s should exist", table) + } + + cancel() + select { + case err := <-exitErr: + require.NoError(t, err) + case <-time.After(5 * time.Second): + } + }) +} + +func TestIntegration_FullResync(t *testing.T) { + rpcURL := startCalibnetFork(t) + + testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { + if db.Dialector.Name() != "postgres" { + t.Skip("Shovel requires Postgres") + return + } + + connStr := os.Getenv("DATABASE_CONNECTION_STRING") + require.NotEmpty(t, connStr) + + contractAddr := constants.GetPDPVerifierAddress(constants.NetworkCalibration) + chainID := uint64(constants.ChainIDCalibration) + + // first run: let shovel index at least one block + indexer, err := NewPDPIndexer(ctx, connStr, rpcURL, chainID, contractAddr, false) + require.NoError(t, err) + + indexCtx, cancel := context.WithTimeout(ctx, 60*time.Second) + exitErr := make(chan error, 1) + require.NoError(t, indexer.Start(indexCtx, exitErr)) + time.Sleep(10 * time.Second) + cancel() + select { + case <-exitErr: + case <-time.After(5 * time.Second): + } + + // verify cursor and data rows exist + var cursorCount int64 + err = db.Raw( + "SELECT count(*) FROM shovel.task_updates WHERE src_name = ?", srcName, + ).Scan(&cursorCount).Error + require.NoError(t, err) + require.Greater(t, cursorCount, int64(0), "cursor rows should exist after first run") + + var dataCount int64 + err = db.Raw("SELECT count(*) FROM pdp_dataset_created").Scan(&dataCount).Error + require.NoError(t, err) + t.Logf("data rows before resync: task_updates=%d, dataset_created=%d", cursorCount, dataCount) + + // full resync: cursor and data tables should be cleared + _, err = NewPDPIndexer(ctx, connStr, rpcURL, chainID, contractAddr, true) + require.NoError(t, err) + + err = db.Raw( + "SELECT count(*) FROM shovel.task_updates WHERE src_name = ?", srcName, + ).Scan(&cursorCount).Error + require.NoError(t, err) + require.Equal(t, int64(0), cursorCount, "cursor rows should be gone after full resync") + + for _, table := range []string{ + "pdp_dataset_created", "pdp_pieces_added", "pdp_pieces_removed", + "pdp_next_proving_period", "pdp_possession_proven", + "pdp_dataset_deleted", "pdp_sp_changed", + } { + var count int64 + err = db.Raw("SELECT count(*) FROM " + table).Scan(&count).Error + require.NoError(t, err) + require.Equal(t, int64(0), count, "table %s should be empty after full resync", table) + } + }) +} + +func TestIntegration_RPCClient(t *testing.T) { + rpcURL := startCalibnetFork(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + contractAddr := constants.GetPDPVerifierAddress(constants.NetworkCalibration) + client, err := NewPDPClient(ctx, rpcURL, contractAddr) + require.NoError(t, err) + defer client.Close() + + _, err = client.GetDataSetListener(ctx, 0) + t.Logf("GetDataSetListener(0): err=%v", err) + + _, err = client.GetActivePieces(ctx, 0) + t.Logf("GetActivePieces(0): err=%v", err) +} diff --git a/service/pdptracker/pdpclient.go b/service/pdptracker/pdpclient.go index f95dffb2..b21485c8 100644 --- a/service/pdptracker/pdpclient.go +++ b/service/pdptracker/pdpclient.go @@ -3,12 +3,9 @@ package pdptracker import ( "context" "fmt" - "math" "math/big" "github.com/cockroachdb/errors" - "github.com/data-preservation-programs/go-synapse" - "github.com/data-preservation-programs/go-synapse/constants" "github.com/data-preservation-programs/go-synapse/contracts" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -25,59 +22,34 @@ type activePiecesResult struct { HasMore bool } -type pdpVerifierAPI interface { - GetNextDataSetId(opts *bind.CallOpts) (uint64, error) +type pdpContractCaller interface { GetDataSetListener(opts *bind.CallOpts, setId *big.Int) (common.Address, error) - GetDataSetStorageProvider(opts *bind.CallOpts, setId *big.Int) (common.Address, common.Address, error) - DataSetLive(opts *bind.CallOpts, setId *big.Int) (bool, error) - GetNextChallengeEpoch(opts *bind.CallOpts, setId *big.Int) (*big.Int, error) GetActivePieces(opts *bind.CallOpts, setId *big.Int, offset *big.Int, limit *big.Int) (activePiecesResult, error) } -type pdpVerifierContract struct { +type pdpVerifierCaller struct { contract *contracts.PDPVerifier } -func (c pdpVerifierContract) GetNextDataSetId(opts *bind.CallOpts) (uint64, error) { - return c.contract.GetNextDataSetId(opts) -} - -func (c pdpVerifierContract) GetDataSetListener(opts *bind.CallOpts, setId *big.Int) (common.Address, error) { +func (c pdpVerifierCaller) GetDataSetListener(opts *bind.CallOpts, setId *big.Int) (common.Address, error) { return c.contract.GetDataSetListener(opts, setId) } -func (c pdpVerifierContract) GetDataSetStorageProvider(opts *bind.CallOpts, setId *big.Int) (common.Address, common.Address, error) { - return c.contract.GetDataSetStorageProvider(opts, setId) -} - -func (c pdpVerifierContract) DataSetLive(opts *bind.CallOpts, setId *big.Int) (bool, error) { - return c.contract.DataSetLive(opts, setId) -} - -func (c pdpVerifierContract) GetNextChallengeEpoch(opts *bind.CallOpts, setId *big.Int) (*big.Int, error) { - return c.contract.GetNextChallengeEpoch(opts, setId) -} - -func (c pdpVerifierContract) GetActivePieces(opts *bind.CallOpts, setId *big.Int, offset *big.Int, limit *big.Int) (activePiecesResult, error) { +func (c pdpVerifierCaller) GetActivePieces(opts *bind.CallOpts, setId *big.Int, offset *big.Int, limit *big.Int) (activePiecesResult, error) { result, err := c.contract.GetActivePieces(opts, setId, offset, limit) if err != nil { return activePiecesResult{}, err } - return activePiecesResult{ - Pieces: result.Pieces, - HasMore: result.HasMore, - }, nil + return activePiecesResult{Pieces: result.Pieces, HasMore: result.HasMore}, nil } -// ChainPDPClient implements PDPClient using the PDPVerifier contract on-chain. type ChainPDPClient struct { ethClient *ethclient.Client - contract pdpVerifierAPI + contract pdpContractCaller pageSize uint64 } -// NewPDPClient creates a new PDP client backed by the PDPVerifier contract. -func NewPDPClient(ctx context.Context, rpcURL string) (*ChainPDPClient, error) { +func NewPDPClient(ctx context.Context, rpcURL string, contractAddr common.Address) (*ChainPDPClient, error) { if rpcURL == "" { return nil, errors.New("rpc URL is required") } @@ -87,169 +59,37 @@ func NewPDPClient(ctx context.Context, rpcURL string) (*ChainPDPClient, error) { return nil, errors.Wrap(err, "failed to connect to PDP RPC") } - network, _, err := synapse.DetectNetwork(ctx, ethClient) - if err != nil { - ethClient.Close() - return nil, errors.Wrap(err, "failed to detect PDP network") - } - - contractAddr := constants.GetPDPVerifierAddress(network) - if contractAddr == (common.Address{}) { - ethClient.Close() - return nil, errors.New("unsupported PDP network: missing contract address") - } - verifier, err := contracts.NewPDPVerifier(contractAddr, ethClient) if err != nil { ethClient.Close() - return nil, errors.Wrap(err, "failed to initialize PDP verifier contract") + return nil, errors.Wrap(err, "failed to init PDP verifier contract") } return &ChainPDPClient{ ethClient: ethClient, - contract: pdpVerifierContract{contract: verifier}, + contract: pdpVerifierCaller{contract: verifier}, pageSize: pdpDefaultPageSize, }, nil } -// Close releases the underlying RPC client. func (c *ChainPDPClient) Close() error { - if c.ethClient == nil { - return nil + if c.ethClient != nil { + c.ethClient.Close() } - c.ethClient.Close() return nil } -// GetProofSetsForClient returns all proof sets associated with a client address. -func (c *ChainPDPClient) GetProofSetsForClient(ctx context.Context, clientAddress address.Address) ([]ProofSetInfo, error) { - listenerAddr, err := delegatedAddressToCommon(clientAddress) - if err != nil { - return nil, err - } - - allSets, err := c.GetProofSets(ctx) - if err != nil { - return nil, err - } - - proofSets := make([]ProofSetInfo, 0, len(allSets)) - for _, ps := range allSets { - clientCommon, err := delegatedAddressToCommon(ps.ClientAddress) - if err != nil { - Logger.Debugw("failed to decode proof set client address", "proofSetID", ps.ProofSetID, "error", err) - continue - } - if clientCommon == listenerAddr { - proofSets = append(proofSets, ps) - } - } - return proofSets, nil -} - -// GetProofSets returns all proof sets visible in the contract. -func (c *ChainPDPClient) GetProofSets(ctx context.Context) ([]ProofSetInfo, error) { - nextID, err := c.contract.GetNextDataSetId(&bind.CallOpts{Context: ctx}) - if err != nil { - return nil, errors.Wrap(err, "failed to get next data set ID") - } - - var proofSets []ProofSetInfo - for setID := uint64(0); setID < nextID; setID++ { - setIDBig := new(big.Int).SetUint64(setID) - - listener, err := c.contract.GetDataSetListener(&bind.CallOpts{Context: ctx}, setIDBig) - if err != nil { - Logger.Debugw("failed to get PDP data set listener", "setID", setID, "error", err) - continue - } - info, err := c.buildProofSetInfo(ctx, setID, listener) - if err != nil { - Logger.Warnw("failed to build PDP proof set info", "setID", setID, "error", err) - continue - } - proofSets = append(proofSets, *info) - } - - return proofSets, nil -} - -// GetProofSetInfo returns detailed information about a specific proof set. -func (c *ChainPDPClient) GetProofSetInfo(ctx context.Context, proofSetID uint64) (*ProofSetInfo, error) { - listener, err := c.contract.GetDataSetListener(&bind.CallOpts{Context: ctx}, new(big.Int).SetUint64(proofSetID)) - if err != nil { - return nil, errors.Wrap(err, "failed to get PDP data set listener") - } - return c.buildProofSetInfo(ctx, proofSetID, listener) -} - -// IsProofSetLive checks if a proof set is actively being challenged. -func (c *ChainPDPClient) IsProofSetLive(ctx context.Context, proofSetID uint64) (bool, error) { - live, err := c.contract.DataSetLive(&bind.CallOpts{Context: ctx}, new(big.Int).SetUint64(proofSetID)) - if err != nil { - return false, errors.Wrap(err, "failed to check PDP data set live status") - } - return live, nil -} - -// GetNextChallengeEpoch returns the next challenge epoch for a proof set. -func (c *ChainPDPClient) GetNextChallengeEpoch(ctx context.Context, proofSetID uint64) (int32, error) { - epoch, err := c.contract.GetNextChallengeEpoch(&bind.CallOpts{Context: ctx}, new(big.Int).SetUint64(proofSetID)) +func (c *ChainPDPClient) GetDataSetListener(ctx context.Context, setID uint64) (common.Address, error) { + addr, err := c.contract.GetDataSetListener(&bind.CallOpts{Context: ctx}, new(big.Int).SetUint64(setID)) if err != nil { - return 0, errors.Wrap(err, "failed to get PDP next challenge epoch") + return common.Address{}, errors.Wrap(err, "failed to get dataset listener") } - if !epoch.IsInt64() || epoch.Int64() > math.MaxInt32 { - return 0, fmt.Errorf("PDP next challenge epoch out of range: %s", epoch.String()) - } - return int32(epoch.Int64()), nil + return addr, nil } -func (c *ChainPDPClient) buildProofSetInfo(ctx context.Context, setID uint64, listener common.Address) (*ProofSetInfo, error) { +// paginates internally +func (c *ChainPDPClient) GetActivePieces(ctx context.Context, setID uint64) ([]cid.Cid, error) { setIDBig := new(big.Int).SetUint64(setID) - - storageProvider, _, err := c.contract.GetDataSetStorageProvider(&bind.CallOpts{Context: ctx}, setIDBig) - if err != nil { - return nil, errors.Wrap(err, "failed to get PDP data set storage provider") - } - - isLive, err := c.contract.DataSetLive(&bind.CallOpts{Context: ctx}, setIDBig) - if err != nil { - return nil, errors.Wrap(err, "failed to check PDP data set live status") - } - - nextChallenge, err := c.contract.GetNextChallengeEpoch(&bind.CallOpts{Context: ctx}, setIDBig) - if err != nil { - return nil, errors.Wrap(err, "failed to get PDP next challenge epoch") - } - if !nextChallenge.IsInt64() || nextChallenge.Int64() > math.MaxInt32 { - return nil, fmt.Errorf("PDP next challenge epoch out of range: %s", nextChallenge.String()) - } - - pieces, err := c.getPieceCIDs(ctx, setIDBig) - if err != nil { - return nil, errors.Wrap(err, "failed to get PDP active pieces") - } - - clientAddr, err := commonToDelegatedAddress(listener) - if err != nil { - return nil, err - } - providerAddr, err := commonToDelegatedAddress(storageProvider) - if err != nil { - return nil, err - } - - return &ProofSetInfo{ - ProofSetID: setID, - ClientAddress: clientAddr, - ProviderAddress: providerAddr, - IsLive: isLive, - NextChallengeEpoch: int32(nextChallenge.Int64()), - PieceCIDs: pieces, - }, nil -} - -func (c *ChainPDPClient) getPieceCIDs(ctx context.Context, setID *big.Int) ([]cid.Cid, error) { var ( offset uint64 result []cid.Cid @@ -258,7 +98,7 @@ func (c *ChainPDPClient) getPieceCIDs(ctx context.Context, setID *big.Int) ([]ci for { pieces, err := c.contract.GetActivePieces( &bind.CallOpts{Context: ctx}, - setID, + setIDBig, new(big.Int).SetUint64(offset), new(big.Int).SetUint64(c.pageSize), ) diff --git a/service/pdptracker/pdpclient_test.go b/service/pdptracker/pdpclient_test.go index b1ddb1fc..ba970931 100644 --- a/service/pdptracker/pdpclient_test.go +++ b/service/pdptracker/pdpclient_test.go @@ -14,153 +14,102 @@ import ( "github.com/stretchr/testify/require" ) -type mockDataSet struct { - listener common.Address - provider common.Address - live bool - nextChallenge uint64 - pieces []cid.Cid +type mockContractCaller struct { + listeners map[uint64]common.Address + pieces map[uint64][]cid.Cid } -type mockPDPVerifier struct { - dataSets map[uint64]*mockDataSet -} - -func (m *mockPDPVerifier) GetNextDataSetId(_ *bind.CallOpts) (uint64, error) { - var max uint64 - for id := range m.dataSets { - if id > max { - max = id - } - } - return max + 1, nil -} - -func (m *mockPDPVerifier) GetDataSetListener(_ *bind.CallOpts, setId *big.Int) (common.Address, error) { - data, ok := m.dataSets[setId.Uint64()] +func (m *mockContractCaller) GetDataSetListener(_ *bind.CallOpts, setId *big.Int) (common.Address, error) { + addr, ok := m.listeners[setId.Uint64()] if !ok { return common.Address{}, errors.New("not found") } - return data.listener, nil -} - -func (m *mockPDPVerifier) GetDataSetStorageProvider(_ *bind.CallOpts, setId *big.Int) (common.Address, common.Address, error) { - data, ok := m.dataSets[setId.Uint64()] - if !ok { - return common.Address{}, common.Address{}, errors.New("not found") - } - return data.provider, common.Address{}, nil + return addr, nil } -func (m *mockPDPVerifier) DataSetLive(_ *bind.CallOpts, setId *big.Int) (bool, error) { - data, ok := m.dataSets[setId.Uint64()] - if !ok { - return false, errors.New("not found") - } - return data.live, nil -} - -func (m *mockPDPVerifier) GetNextChallengeEpoch(_ *bind.CallOpts, setId *big.Int) (*big.Int, error) { - data, ok := m.dataSets[setId.Uint64()] - if !ok { - return nil, errors.New("not found") - } - return new(big.Int).SetUint64(data.nextChallenge), nil -} - -func (m *mockPDPVerifier) GetActivePieces(_ *bind.CallOpts, setId *big.Int, offset *big.Int, limit *big.Int) (activePiecesResult, error) { - data, ok := m.dataSets[setId.Uint64()] +func (m *mockContractCaller) GetActivePieces(_ *bind.CallOpts, setId *big.Int, offset *big.Int, limit *big.Int) (activePiecesResult, error) { + all, ok := m.pieces[setId.Uint64()] if !ok { return activePiecesResult{}, errors.New("not found") } start := int(offset.Uint64()) - if start >= len(data.pieces) { + if start >= len(all) { return activePiecesResult{Pieces: nil, HasMore: false}, nil } end := start + int(limit.Uint64()) - if end > len(data.pieces) { - end = len(data.pieces) + if end > len(all) { + end = len(all) } out := make([]contracts.CidsCid, 0, end-start) - for _, piece := range data.pieces[start:end] { + for _, piece := range all[start:end] { out = append(out, contracts.CidsCid{Data: piece.Bytes()}) } return activePiecesResult{ Pieces: out, - HasMore: end < len(data.pieces), + HasMore: end < len(all), }, nil } -func TestChainPDPClient_GetProofSetsForClient(t *testing.T) { - originalNetwork := address.CurrentNetwork - t.Cleanup(func() { - address.CurrentNetwork = originalNetwork - }) - address.CurrentNetwork = address.Mainnet - +func TestChainPDPClient_GetDataSetListener(t *testing.T) { listener := common.HexToAddress("0x1111111111111111111111111111111111111111") - provider := common.HexToAddress("0x2222222222222222222222222222222222222222") + mock := &mockContractCaller{ + listeners: map[uint64]common.Address{ + 1: listener, + }, + } + client := &ChainPDPClient{contract: mock, pageSize: 100} - listenerAddr, err := address.NewDelegatedAddress(10, listener.Bytes()) - require.NoError(t, err) - providerAddr, err := address.NewDelegatedAddress(10, provider.Bytes()) + addr, err := client.GetDataSetListener(context.Background(), 1) require.NoError(t, err) + require.Equal(t, listener, addr) + _, err = client.GetDataSetListener(context.Background(), 99) + require.Error(t, err) +} + +func TestChainPDPClient_GetActivePieces_Pagination(t *testing.T) { piece1, err := cid.Decode("baga6ea4seaqao7s73y24kcutaosvacpdjgfe5pw76ooefnyqw4ynr3d2y6x2mpq") require.NoError(t, err) piece2, err := cid.Decode("baga6ea4seaqgwm2a6rfh53y5a4qbm5zhqyixwut3wst6dfrlghm2f5l6t4o2mry") require.NoError(t, err) - mock := &mockPDPVerifier{ - dataSets: map[uint64]*mockDataSet{ - 1: { - listener: listener, - provider: provider, - live: true, - nextChallenge: 42, - pieces: []cid.Cid{piece1, piece2}, - }, - 2: { - listener: common.HexToAddress("0x3333333333333333333333333333333333333333"), - provider: provider, - live: false, - pieces: []cid.Cid{piece1}, - }, + mock := &mockContractCaller{ + pieces: map[uint64][]cid.Cid{ + 1: {piece1, piece2}, }, } + // page size 1 to force multiple pages + client := &ChainPDPClient{contract: mock, pageSize: 1} - client := &ChainPDPClient{ - contract: mock, - pageSize: 1, - } - - proofSets, err := client.GetProofSetsForClient(context.Background(), listenerAddr) + result, err := client.GetActivePieces(context.Background(), 1) require.NoError(t, err) - require.Len(t, proofSets, 1) - - proofSet := proofSets[0] - require.EqualValues(t, 1, proofSet.ProofSetID) - require.Equal(t, listenerAddr, proofSet.ClientAddress) - require.Equal(t, providerAddr, proofSet.ProviderAddress) - require.True(t, proofSet.IsLive) - require.EqualValues(t, 42, proofSet.NextChallengeEpoch) - require.Len(t, proofSet.PieceCIDs, 2) - require.True(t, piece1.Equals(proofSet.PieceCIDs[0])) - require.True(t, piece2.Equals(proofSet.PieceCIDs[1])) + require.Len(t, result, 2) + require.True(t, piece1.Equals(result[0])) + require.True(t, piece2.Equals(result[1])) } -func TestChainPDPClient_GetProofSetsForClient_InvalidAddress(t *testing.T) { - client := &ChainPDPClient{ - contract: &mockPDPVerifier{dataSets: map[uint64]*mockDataSet{}}, - pageSize: 1, - } +func TestDelegatedAddressRoundtrip(t *testing.T) { + originalNetwork := address.CurrentNetwork + t.Cleanup(func() { address.CurrentNetwork = originalNetwork }) + address.CurrentNetwork = address.Mainnet + + ethAddr := common.HexToAddress("0x1111111111111111111111111111111111111111") + filAddr, err := commonToDelegatedAddress(ethAddr) + require.NoError(t, err) + require.Equal(t, address.Delegated, filAddr.Protocol()) + + roundtrip, err := delegatedAddressToCommon(filAddr) + require.NoError(t, err) + require.Equal(t, ethAddr, roundtrip) +} +func TestDelegatedAddressToCommon_InvalidProtocol(t *testing.T) { addr, err := address.NewFromString("f0100") require.NoError(t, err) - _, err = client.GetProofSetsForClient(context.Background(), addr) + _, err = delegatedAddressToCommon(addr) require.Error(t, err) } diff --git a/service/pdptracker/pdptracker.go b/service/pdptracker/pdptracker.go index 8065966c..edba8265 100644 --- a/service/pdptracker/pdptracker.go +++ b/service/pdptracker/pdptracker.go @@ -1,8 +1,3 @@ -// Package pdptracker provides a service for tracking PDP (Proof of Data Possession) deals -// using the f41 actor on Filecoin. This is distinct from legacy f05 market deals. -// -// PDP deals use proof sets managed through the PDPVerifier contract, where data is verified -// through cryptographic challenges rather than the traditional sector sealing process. package pdptracker import ( @@ -13,10 +8,7 @@ import ( "github.com/data-preservation-programs/singularity/database" "github.com/data-preservation-programs/singularity/model" "github.com/data-preservation-programs/singularity/service/healthcheck" - "github.com/filecoin-project/go-address" "github.com/google/uuid" - "github.com/gotidy/ptr" - "github.com/ipfs/go-cid" "github.com/ipfs/go-log/v2" "gorm.io/gorm" ) @@ -30,61 +22,25 @@ const ( var Logger = log.Logger("pdptracker") -// ProofSetInfo contains information about a PDP proof set retrieved from on-chain state -type ProofSetInfo struct { - ProofSetID uint64 - ClientAddress address.Address // f4 address of the client - ProviderAddress address.Address // Provider/record keeper address - IsLive bool // Whether the proof set is actively being challenged - NextChallengeEpoch int32 // Next epoch when a challenge is due - PieceCIDs []cid.Cid -} - -// PDPClient is the interface for interacting with PDP on-chain state. -type PDPClient interface { - // GetProofSetsForClient returns all proof sets associated with a client address - GetProofSetsForClient(ctx context.Context, clientAddress address.Address) ([]ProofSetInfo, error) - // GetProofSetInfo returns detailed information about a specific proof set - GetProofSetInfo(ctx context.Context, proofSetID uint64) (*ProofSetInfo, error) - // IsProofSetLive checks if a proof set is actively being challenged - IsProofSetLive(ctx context.Context, proofSetID uint64) (bool, error) - // GetNextChallengeEpoch returns the next challenge epoch for a proof set - GetNextChallengeEpoch(ctx context.Context, proofSetID uint64) (int32, error) -} - -// PDPBulkClient is an optional optimization interface for fetching all proof sets in one call. -type PDPBulkClient interface { - GetProofSets(ctx context.Context) ([]ProofSetInfo, error) -} - -// PDPTracker tracks PDP deals (f41 actor) on the Filecoin network. -// It monitors proof sets and updates deal status based on on-chain state. type PDPTracker struct { workerID uuid.UUID dbNoContext *gorm.DB config PDPConfig - pdpClient PDPClient + rpcClient *ChainPDPClient once bool } -// NewPDPTracker creates a new PDP deal tracker. -// -// Parameters: -// - db: Database connection for storing deal information -// - config: Tracker runtime configuration -// - pdpClient: Client for interacting with PDP contracts -// - once: If true, run only once instead of continuously func NewPDPTracker( db *gorm.DB, config PDPConfig, - pdpClient PDPClient, + rpcClient *ChainPDPClient, once bool, ) PDPTracker { return PDPTracker{ workerID: uuid.New(), dbNoContext: db, config: config, - pdpClient: pdpClient, + rpcClient: rpcClient, once: once, } } @@ -93,11 +49,9 @@ func (*PDPTracker) Name() string { return "PDPTracker" } -// Start begins the PDP tracker service. func (p *PDPTracker) Start(ctx context.Context, exitErr chan<- error) error { - Logger.Infow("PDP tracker started", - "pollInterval", p.config.PollingInterval, - ) + Logger.Infow("PDP tracker starting", "pollInterval", p.config.PollingInterval) + var regTimer *time.Timer for { alreadyRunning, err := healthcheck.Register(ctx, p.dbNoContext, p.workerID, model.PDPTracker, false) @@ -203,145 +157,7 @@ func (p *PDPTracker) cleanup(ctx context.Context) error { }) } -// runOnce performs a single cycle of PDP deal tracking. -// It queries wallets, fetches their PDP proof sets, and updates deal status. func (p *PDPTracker) runOnce(ctx context.Context) error { db := p.dbNoContext.WithContext(ctx) - - // Get all wallets to track - var wallets []model.Wallet - err := db.Find(&wallets).Error - if err != nil { - return errors.Wrap(err, "failed to get wallets from database") - } - - now := time.Now() - var updated, inserted int64 - trackedProofSets := make(map[uint64]struct{}) - - processProofSet := func(wallet model.Wallet, ps ProofSetInfo) { - for _, pieceCID := range ps.PieceCIDs { - if pieceCID == cid.Undef { - Logger.Warnw("invalid piece CID from PDP proof set", "pieceCID", pieceCID.String(), "proofSetID", ps.ProofSetID) - continue - } - modelPieceCID := model.CID(pieceCID) - - // Check if we already have this deal tracked. - var existingDeal model.Deal - err := db.Where("proof_set_id = ? AND piece_cid = ? AND deal_type = ?", - ps.ProofSetID, modelPieceCID, model.DealTypePDP).First(&existingDeal).Error - - if err == nil { - // Overwrite tracked state idempotently each cycle instead of diffing fields. - updates := map[string]any{ - "proof_set_live": ps.IsLive, - "next_challenge_epoch": ps.NextChallengeEpoch, - "state": p.getPDPDealState(ps), - "last_verified_at": now, - } - if existingDeal.ProofSetLive == nil || *existingDeal.ProofSetLive != ps.IsLive { - Logger.Infow("PDP proof set status changed", - "proofSetID", ps.ProofSetID, - "previousLive", existingDeal.ProofSetLive, - "currentLive", ps.IsLive, - ) - } - err = database.DoRetry(ctx, func() error { - return db.Model(&model.Deal{}).Where("id = ?", existingDeal.ID).Updates(updates).Error - }) - if err != nil { - Logger.Errorw("failed to update PDP deal", "dealID", existingDeal.ID, "error", err) - continue - } - Logger.Infow("PDP deal updated", "dealID", existingDeal.ID, "proofSetID", ps.ProofSetID) - updated++ - } else if errors.Is(err, gorm.ErrRecordNotFound) { - // New PDP deal, insert it. - newState := p.getPDPDealState(ps) - newDeal := model.Deal{ - DealType: model.DealTypePDP, - State: newState, - ClientID: wallet.ID, - Provider: ps.ProviderAddress.String(), - PieceCID: modelPieceCID, - ProofSetID: ptr.Of(ps.ProofSetID), - ProofSetLive: ptr.Of(ps.IsLive), - NextChallengeEpoch: ptr.Of(ps.NextChallengeEpoch), - LastVerifiedAt: ptr.Of(now), - } - - err = database.DoRetry(ctx, func() error { - return db.Create(&newDeal).Error - }) - if err != nil { - Logger.Errorw("failed to insert PDP deal", "proofSetID", ps.ProofSetID, "error", err) - continue - } - Logger.Infow("PDP deal inserted", "proofSetID", ps.ProofSetID, "state", newState) - inserted++ - } else { - Logger.Errorw("failed to query existing PDP deal", "error", err) - } - } - } - - if bulkClient, ok := p.pdpClient.(PDPBulkClient); ok { - walletsByAddress := make(map[string][]model.Wallet, len(wallets)) - for _, wallet := range wallets { - walletAddr, err := address.NewFromString(wallet.Address) - if err != nil { - Logger.Warnw("invalid wallet address for PDP tracking", "walletID", wallet.ID, "address", wallet.Address, "error", err) - continue - } - walletsByAddress[walletAddr.String()] = append(walletsByAddress[walletAddr.String()], wallet) - } - - // Fetch once and fan out by client address to avoid full on-chain scans per wallet. - proofSets, err := bulkClient.GetProofSets(ctx) - if err != nil { - return errors.Wrap(err, "failed to get PDP proof sets") - } - for _, ps := range proofSets { - trackedProofSets[ps.ProofSetID] = struct{}{} - for _, wallet := range walletsByAddress[ps.ClientAddress.String()] { - processProofSet(wallet, ps) - } - } - } else { - for _, wallet := range wallets { - Logger.Infof("tracking PDP deals for wallet %s", wallet.ID) - - walletAddr, err := address.NewFromString(wallet.Address) - if err != nil { - Logger.Warnw("invalid wallet address for PDP tracking", "walletID", wallet.ID, "address", wallet.Address, "error", err) - continue - } - - proofSets, err := p.pdpClient.GetProofSetsForClient(ctx, walletAddr) - if err != nil { - Logger.Warnw("failed to get proof sets for wallet", "wallet", wallet.ID, "error", err) - continue - } - - for _, ps := range proofSets { - trackedProofSets[ps.ProofSetID] = struct{}{} - processProofSet(wallet, ps) - } - } - } - - Logger.Infof("PDP tracker: updated %d deals, inserted %d deals", updated, inserted) - Logger.Infof("PDP tracker: tracked %d proof sets", len(trackedProofSets)) - return nil -} - -// getPDPDealState determines the deal state based on proof set status -func (p *PDPTracker) getPDPDealState(ps ProofSetInfo) model.DealState { - if ps.IsLive { - return model.DealActive - } - // If not live, it might be proposed (waiting for first challenge) or expired - // This logic may need refinement based on actual PDP contract semantics - return model.DealPublished + return processNewEvents(ctx, db, p.rpcClient) } diff --git a/service/pdptracker/pdptracker_test.go b/service/pdptracker/pdptracker_test.go index 31e7b221..14422b41 100644 --- a/service/pdptracker/pdptracker_test.go +++ b/service/pdptracker/pdptracker_test.go @@ -1,201 +1,13 @@ package pdptracker import ( - "context" "testing" "time" - "github.com/data-preservation-programs/singularity/model" - "github.com/data-preservation-programs/singularity/util/testutil" - "github.com/filecoin-project/go-address" - "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" - "gorm.io/gorm" ) -type mockPDPClient struct { - proofSets map[address.Address][]ProofSetInfo - bulkCalls int -} - -func (m *mockPDPClient) GetProofSetsForClient(_ context.Context, clientAddress address.Address) ([]ProofSetInfo, error) { - return m.proofSets[clientAddress], nil -} - -func (m *mockPDPClient) GetProofSets(_ context.Context) ([]ProofSetInfo, error) { - m.bulkCalls++ - var all []ProofSetInfo - for _, sets := range m.proofSets { - all = append(all, sets...) - } - return all, nil -} - -func (m *mockPDPClient) GetProofSetInfo(_ context.Context, _ uint64) (*ProofSetInfo, error) { - return nil, nil -} - -func (m *mockPDPClient) IsProofSetLive(_ context.Context, _ uint64) (bool, error) { - return false, nil -} - -func (m *mockPDPClient) GetNextChallengeEpoch(_ context.Context, _ uint64) (int32, error) { - return 0, nil -} - func TestPDPTracker_Name(t *testing.T) { tracker := NewPDPTracker(nil, PDPConfig{PollingInterval: time.Minute}, nil, true) require.Equal(t, "PDPTracker", tracker.Name()) } - -func TestPDPTracker_RunOnce_UpsertByParsedPieceCID(t *testing.T) { - testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { - walletSubaddr := make([]byte, 20) - walletSubaddr[19] = 1 - walletAddr, err := address.NewDelegatedAddress(10, walletSubaddr) - require.NoError(t, err) - - providerSubaddr := make([]byte, 20) - providerSubaddr[19] = 2 - providerAddr, err := address.NewDelegatedAddress(10, providerSubaddr) - require.NoError(t, err) - - err = db.Create(&model.Wallet{ - ID: "f0100", - Address: walletAddr.String(), - }).Error - require.NoError(t, err) - - const pieceCID = "baga6ea4seaqao7s73y24kcutaosvacpdjgfe5pw76ooefnyqw4ynr3d2y6x2mpq" - parsedPieceCID, err := cid.Decode(pieceCID) - require.NoError(t, err) - client := &mockPDPClient{ - proofSets: map[address.Address][]ProofSetInfo{ - walletAddr: { - { - ProofSetID: 7, - ClientAddress: walletAddr, - ProviderAddress: providerAddr, - IsLive: true, - NextChallengeEpoch: 10, - PieceCIDs: []cid.Cid{parsedPieceCID}, - }, - }, - }, - } - - tracker := NewPDPTracker(db, PDPConfig{PollingInterval: time.Minute}, client, true) - require.NoError(t, tracker.runOnce(ctx)) - - var first model.Deal - err = db.Where("deal_type = ?", model.DealTypePDP).First(&first).Error - require.NoError(t, err) - require.Equal(t, model.DealTypePDP, first.DealType) - require.Equal(t, pieceCID, first.PieceCID.String()) - require.NotNil(t, first.ProofSetID) - require.EqualValues(t, 7, *first.ProofSetID) - require.NotNil(t, first.ProofSetLive) - require.True(t, *first.ProofSetLive) - require.Equal(t, model.DealActive, first.State) - require.NotNil(t, first.LastVerifiedAt) - - client.proofSets[walletAddr][0].IsLive = false - client.proofSets[walletAddr][0].NextChallengeEpoch = 11 - require.NoError(t, tracker.runOnce(ctx)) - - var deals []model.Deal - err = db.Where("deal_type = ?", model.DealTypePDP).Find(&deals).Error - require.NoError(t, err) - require.Len(t, deals, 1) - require.NotNil(t, deals[0].ProofSetLive) - require.False(t, *deals[0].ProofSetLive) - require.NotNil(t, deals[0].NextChallengeEpoch) - require.EqualValues(t, 11, *deals[0].NextChallengeEpoch) - require.Equal(t, model.DealPublished, deals[0].State) - require.NotNil(t, deals[0].LastVerifiedAt) - }) -} - -func TestPDPTracker_RunOnce_SkipsInvalidPieceCID(t *testing.T) { - testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { - walletSubaddr := make([]byte, 20) - walletSubaddr[19] = 3 - walletAddr, err := address.NewDelegatedAddress(10, walletSubaddr) - require.NoError(t, err) - - providerSubaddr := make([]byte, 20) - providerSubaddr[19] = 4 - providerAddr, err := address.NewDelegatedAddress(10, providerSubaddr) - require.NoError(t, err) - - err = db.Create(&model.Wallet{ - ID: "f0100", - Address: walletAddr.String(), - }).Error - require.NoError(t, err) - - client := &mockPDPClient{ - proofSets: map[address.Address][]ProofSetInfo{ - walletAddr: { - { - ProofSetID: 7, - ClientAddress: walletAddr, - ProviderAddress: providerAddr, - IsLive: true, - NextChallengeEpoch: 10, - PieceCIDs: []cid.Cid{cid.Undef}, - }, - }, - }, - } - tracker := NewPDPTracker(db, PDPConfig{PollingInterval: time.Minute}, client, true) - require.NoError(t, tracker.runOnce(ctx)) - - var count int64 - err = db.Model(&model.Deal{}).Where("deal_type = ?", model.DealTypePDP).Count(&count).Error - require.NoError(t, err) - require.EqualValues(t, 0, count) - }) -} - -func TestPDPTracker_RunOnce_UsesBulkFetchWhenAvailable(t *testing.T) { - testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { - walletSubaddr := make([]byte, 20) - walletSubaddr[19] = 5 - walletAddr, err := address.NewDelegatedAddress(10, walletSubaddr) - require.NoError(t, err) - - providerSubaddr := make([]byte, 20) - providerSubaddr[19] = 6 - providerAddr, err := address.NewDelegatedAddress(10, providerSubaddr) - require.NoError(t, err) - - err = db.Create(&model.Wallet{ - ID: "f0101", - Address: walletAddr.String(), - }).Error - require.NoError(t, err) - - pieceCID, err := cid.Decode("baga6ea4seaqao7s73y24kcutaosvacpdjgfe5pw76ooefnyqw4ynr3d2y6x2mpq") - require.NoError(t, err) - - client := &mockPDPClient{ - proofSets: map[address.Address][]ProofSetInfo{ - walletAddr: { - { - ProofSetID: 8, - ClientAddress: walletAddr, - ProviderAddress: providerAddr, - IsLive: true, - NextChallengeEpoch: 12, - PieceCIDs: []cid.Cid{pieceCID}, - }, - }, - }, - } - - tracker := NewPDPTracker(db, PDPConfig{PollingInterval: time.Minute}, client, true) - require.NoError(t, tracker.runOnce(ctx)) - require.Equal(t, 1, client.bulkCalls) - }) -} diff --git a/util/testutil/anvil.go b/util/testutil/anvil.go new file mode 100644 index 00000000..93a32333 --- /dev/null +++ b/util/testutil/anvil.go @@ -0,0 +1,91 @@ +package testutil + +import ( + "context" + "fmt" + "net" + "os" + "os/exec" + "testing" + "time" + + "github.com/ethereum/go-ethereum/ethclient" +) + +type AnvilInstance struct { + RPCURL string + cmd *exec.Cmd +} + +func (a *AnvilInstance) Close() { + if a.cmd != nil && a.cmd.Process != nil { + a.cmd.Process.Kill() + a.cmd.Wait() + } +} + +// StartAnvil forks the given upstream RPC on a random free port. +// Returns when the fork is ready to accept connections. +func StartAnvil(t *testing.T, upstreamRPC string) *AnvilInstance { + t.Helper() + + if _, err := exec.LookPath("anvil"); err != nil { + t.Skip("anvil not found on PATH") + } + + port, err := freePort() + if err != nil { + t.Fatalf("finding free port: %v", err) + } + + cmd := exec.Command("anvil", + "--fork-url", upstreamRPC, + "--port", fmt.Sprintf("%d", port), + "--silent", + ) + cmd.Stdout = os.Stderr + cmd.Stderr = os.Stderr + if err := cmd.Start(); err != nil { + t.Fatalf("starting anvil: %v", err) + } + + rpcURL := fmt.Sprintf("http://127.0.0.1:%d", port) + inst := &AnvilInstance{RPCURL: rpcURL, cmd: cmd} + t.Cleanup(inst.Close) + + if err := waitForRPC(rpcURL, 30*time.Second); err != nil { + t.Fatalf("anvil not ready: %v", err) + } + + return inst +} + +func freePort() (int, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, err + } + port := l.Addr().(*net.TCPAddr).Port + l.Close() + return port, nil +} + +func waitForRPC(url string, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + client, err := ethclient.DialContext(ctx, url) + if err == nil { + _, err = client.ChainID(ctx) + client.Close() + cancel() + if err == nil { + return nil + } + } else { + cancel() + } + time.Sleep(200 * time.Millisecond) + } + return fmt.Errorf("rpc at %s not ready after %s", url, timeout) +}