From 466049cf316af78acc98ba98a4c17a6e8f77500c Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Thu, 19 Feb 2026 18:07:38 +0100 Subject: [PATCH 1/3] add anvil fork tests for PDP integration --- .devcontainer/Dockerfile | 4 +- service/pdptracker/integration_test.go | 47 ++++--------- util/testutil/anvil.go | 91 ++++++++++++++++++++++++++ 3 files changed, 108 insertions(+), 34 deletions(-) create mode 100644 util/testutil/anvil.go diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 2fccf82f..713bcecf 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -24,7 +24,9 @@ RUN apt-get update \ mariadb-server mariadb-client \ && rm -rf /var/lib/apt/lists/* -# Prepare user-owned data dirs for rootless startup +COPY --from=ghcr.io/foundry-rs/foundry:latest /usr/local/bin/anvil /usr/local/bin/anvil + +# prepare user-owned data dirs for rootless startup RUN mkdir -p /home/vscode/.local/share/pg/pgdata \ && mkdir -p /home/vscode/.local/share/mysql \ && chown -R vscode:vscode /home/vscode/.local/share diff --git a/service/pdptracker/integration_test.go b/service/pdptracker/integration_test.go index 54c1d463..8a055e86 100644 --- a/service/pdptracker/integration_test.go +++ b/service/pdptracker/integration_test.go @@ -17,15 +17,19 @@ import ( const calibnetRPC = "https://api.calibration.node.glif.io/rpc/v1" -// TestIntegration_NetworkDetection verifies synapse.DetectNetwork works -// against calibnet and returns the expected chain ID and contract address. +func startCalibnetFork(t *testing.T) string { + t.Helper() + anvil := testutil.StartAnvil(t, calibnetRPC) + return anvil.RPCURL +} + func TestIntegration_NetworkDetection(t *testing.T) { - testutil.SkipIfNotExternalAPI(t) + rpcURL := startCalibnetFork(t) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - ethClient, err := ethclient.DialContext(ctx, calibnetRPC) + ethClient, err := ethclient.DialContext(ctx, rpcURL) require.NoError(t, err) defer ethClient.Close() @@ -39,11 +43,7 @@ func TestIntegration_NetworkDetection(t *testing.T) { t.Logf("calibnet PDPVerifier: %s", contractAddr.Hex()) } -// TestIntegration_ShovelConfig validates that the Shovel config generated for -// calibnet passes ValidateFix without errors. func TestIntegration_ShovelConfig(t *testing.T) { - testutil.SkipIfNotExternalAPI(t) - contractAddr := constants.GetPDPVerifierAddress(constants.NetworkCalibration) conf := buildShovelConfig( "postgres://localhost/test", @@ -52,17 +52,13 @@ func TestIntegration_ShovelConfig(t *testing.T) { contractAddr, ) - // import config package to validate require.Len(t, conf.Integrations, 7) require.Len(t, conf.Sources, 1) require.Equal(t, uint64(314159), conf.Sources[0].ChainID) } -// TestIntegration_ShovelIndexer_Calibnet starts an embedded Shovel indexer -// against calibnet and verifies it processes blocks without errors. -// Requires: Postgres (PGPORT), calibnet RPC, SINGULARITY_TEST_EXTERNAL_API=true. -func TestIntegration_ShovelIndexer_Calibnet(t *testing.T) { - testutil.SkipIfNotExternalAPI(t) +func TestIntegration_ShovelIndexer(t *testing.T) { + rpcURL := startCalibnetFork(t) testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { if db.Dialector.Name() != "postgres" { @@ -70,16 +66,14 @@ func TestIntegration_ShovelIndexer_Calibnet(t *testing.T) { return } - // get the postgres connection string from env (set by testutil) connStr := os.Getenv("DATABASE_CONNECTION_STRING") require.NotEmpty(t, connStr) contractAddr := constants.GetPDPVerifierAddress(constants.NetworkCalibration) - indexer, err := NewPDPIndexer(ctx, connStr, calibnetRPC, uint64(constants.ChainIDCalibration), contractAddr) + indexer, err := NewPDPIndexer(ctx, connStr, rpcURL, uint64(constants.ChainIDCalibration), contractAddr) require.NoError(t, err) - // start indexer with timeout context indexCtx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() @@ -87,16 +81,13 @@ func TestIntegration_ShovelIndexer_Calibnet(t *testing.T) { err = indexer.Start(indexCtx, exitErr) require.NoError(t, err) - // let it run for a few seconds to process some blocks time.Sleep(10 * time.Second) - // verify shovel internal tables exist var schemaExists bool err = db.Raw("SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = 'shovel')").Scan(&schemaExists).Error require.NoError(t, err) require.True(t, schemaExists, "shovel schema should exist") - // verify integration tables exist for _, table := range []string{ "pdp_dataset_created", "pdp_pieces_added", @@ -114,39 +105,29 @@ func TestIntegration_ShovelIndexer_Calibnet(t *testing.T) { require.True(t, exists, "table %s should exist", table) } - t.Log("Shovel indexer started and processed blocks against calibnet successfully") - cancel() - // wait for clean shutdown select { case err := <-exitErr: require.NoError(t, err) case <-time.After(5 * time.Second): - // fine, shutdown may be slow } }) } -// TestIntegration_RPCClient_Calibnet verifies the RPC client can make calls -// against the real calibnet PDPVerifier contract. -func TestIntegration_RPCClient_Calibnet(t *testing.T) { - testutil.SkipIfNotExternalAPI(t) +func TestIntegration_RPCClient(t *testing.T) { + rpcURL := startCalibnetFork(t) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() contractAddr := constants.GetPDPVerifierAddress(constants.NetworkCalibration) - client, err := NewPDPClient(ctx, calibnetRPC, contractAddr) + client, err := NewPDPClient(ctx, rpcURL, contractAddr) require.NoError(t, err) defer client.Close() - // try to get listener for set 0 — may fail (doesn't exist) but shouldn't panic _, err = client.GetDataSetListener(ctx, 0) - // we don't assert NoError here because set 0 may not exist, - // but the call should complete without panic t.Logf("GetDataSetListener(0): err=%v", err) - // try to get active pieces for set 0 _, err = client.GetActivePieces(ctx, 0) t.Logf("GetActivePieces(0): err=%v", err) } diff --git a/util/testutil/anvil.go b/util/testutil/anvil.go new file mode 100644 index 00000000..93a32333 --- /dev/null +++ b/util/testutil/anvil.go @@ -0,0 +1,91 @@ +package testutil + +import ( + "context" + "fmt" + "net" + "os" + "os/exec" + "testing" + "time" + + "github.com/ethereum/go-ethereum/ethclient" +) + +type AnvilInstance struct { + RPCURL string + cmd *exec.Cmd +} + +func (a *AnvilInstance) Close() { + if a.cmd != nil && a.cmd.Process != nil { + a.cmd.Process.Kill() + a.cmd.Wait() + } +} + +// StartAnvil forks the given upstream RPC on a random free port. +// Returns when the fork is ready to accept connections. +func StartAnvil(t *testing.T, upstreamRPC string) *AnvilInstance { + t.Helper() + + if _, err := exec.LookPath("anvil"); err != nil { + t.Skip("anvil not found on PATH") + } + + port, err := freePort() + if err != nil { + t.Fatalf("finding free port: %v", err) + } + + cmd := exec.Command("anvil", + "--fork-url", upstreamRPC, + "--port", fmt.Sprintf("%d", port), + "--silent", + ) + cmd.Stdout = os.Stderr + cmd.Stderr = os.Stderr + if err := cmd.Start(); err != nil { + t.Fatalf("starting anvil: %v", err) + } + + rpcURL := fmt.Sprintf("http://127.0.0.1:%d", port) + inst := &AnvilInstance{RPCURL: rpcURL, cmd: cmd} + t.Cleanup(inst.Close) + + if err := waitForRPC(rpcURL, 30*time.Second); err != nil { + t.Fatalf("anvil not ready: %v", err) + } + + return inst +} + +func freePort() (int, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, err + } + port := l.Addr().(*net.TCPAddr).Port + l.Close() + return port, nil +} + +func waitForRPC(url string, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + client, err := ethclient.DialContext(ctx, url) + if err == nil { + _, err = client.ChainID(ctx) + client.Close() + cancel() + if err == nil { + return nil + } + } else { + cancel() + } + time.Sleep(200 * time.Millisecond) + } + return fmt.Errorf("rpc at %s not ready after %s", url, timeout) +} From 443b410352007a2b29f9f1ad340928357ffae128 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Thu, 19 Feb 2026 18:38:27 +0100 Subject: [PATCH 2/3] fix shovel config: add log_addr column, run ValidateFix in unit test also add ./service/pdptracker/... to CI integration test path --- .github/workflows/devcontainer-podman.yml | 2 +- service/pdptracker/indexer.go | 45 +++++++++++++---------- service/pdptracker/indexer_test.go | 2 + 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/.github/workflows/devcontainer-podman.yml b/.github/workflows/devcontainer-podman.yml index 5e9584b9..fdc29d3a 100644 --- a/.github/workflows/devcontainer-podman.yml +++ b/.github/workflows/devcontainer-podman.yml @@ -88,7 +88,7 @@ jobs: command: > cd /workspaces/singularity && mkdir -p artifacts && - SINGULARITY_TEST_INTEGRATION=true make test GOTESTSUM_ARGS="--junitfile artifacts/integration-tests.xml -- -timeout 20m -run Integration ./cmd/..." + SINGULARITY_TEST_INTEGRATION=true make test GOTESTSUM_ARGS="--junitfile artifacts/integration-tests.xml -- -timeout 20m -run Integration ./cmd/... ./service/pdptracker/..." container-runtime: podman - name: Report test results diff --git a/service/pdptracker/indexer.go b/service/pdptracker/indexer.go index bf49642c..dc9630f0 100644 --- a/service/pdptracker/indexer.go +++ b/service/pdptracker/indexer.go @@ -87,29 +87,29 @@ func buildShovelConfig(pgURL, rpcURL string, chainID uint64, contract common.Add URLs: []string{rpcURL}, } - af := func() dig.BlockData { - return dig.BlockData{ - Name: "log_addr", - Filter: dig.Filter{Op: "contains", Arg: []string{addrHex}}, - } + addrFilter := dig.BlockData{ + Name: "log_addr", + Column: "log_addr", + Filter: dig.Filter{Op: "contains", Arg: []string{addrHex}}, } + addrCol := wpg.Column{Name: "log_addr", Type: "bytea"} return config.Root{ PGURL: pgURL, Sources: []config.Source{src}, Integrations: []config.Integration{ - dataSetCreatedIG(src, af()), - piecesAddedIG(src, af()), - piecesRemovedIG(src, af()), - nextProvingPeriodIG(src, af()), - possessionProvenIG(src, af()), - dataSetDeletedIG(src, af()), - spChangedIG(src, af()), + dataSetCreatedIG(src, addrFilter, addrCol), + piecesAddedIG(src, addrFilter, addrCol), + piecesRemovedIG(src, addrFilter, addrCol), + nextProvingPeriodIG(src, addrFilter, addrCol), + possessionProvenIG(src, addrFilter, addrCol), + dataSetDeletedIG(src, addrFilter, addrCol), + spChangedIG(src, addrFilter, addrCol), }, } } -func dataSetCreatedIG(src config.Source, af dig.BlockData) config.Integration { +func dataSetCreatedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { return config.Integration{ Name: "pdp_dataset_created", Enabled: true, @@ -117,6 +117,7 @@ func dataSetCreatedIG(src config.Source, af dig.BlockData) config.Integration { Table: wpg.Table{ Name: "pdp_dataset_created", Columns: []wpg.Column{ + ac, {Name: "set_id", Type: "numeric"}, {Name: "storage_provider", Type: "bytea"}, }, @@ -134,7 +135,7 @@ func dataSetCreatedIG(src config.Source, af dig.BlockData) config.Integration { } // only set_id captured; array fields reconciled via getActivePieces RPC -func piecesAddedIG(src config.Source, af dig.BlockData) config.Integration { +func piecesAddedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { return config.Integration{ Name: "pdp_pieces_added", Enabled: true, @@ -142,6 +143,7 @@ func piecesAddedIG(src config.Source, af dig.BlockData) config.Integration { Table: wpg.Table{ Name: "pdp_pieces_added", Columns: []wpg.Column{ + ac, {Name: "set_id", Type: "numeric"}, }, }, @@ -161,7 +163,7 @@ func piecesAddedIG(src config.Source, af dig.BlockData) config.Integration { } } -func piecesRemovedIG(src config.Source, af dig.BlockData) config.Integration { +func piecesRemovedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { return config.Integration{ Name: "pdp_pieces_removed", Enabled: true, @@ -169,6 +171,7 @@ func piecesRemovedIG(src config.Source, af dig.BlockData) config.Integration { Table: wpg.Table{ Name: "pdp_pieces_removed", Columns: []wpg.Column{ + ac, {Name: "set_id", Type: "numeric"}, }, }, @@ -184,7 +187,7 @@ func piecesRemovedIG(src config.Source, af dig.BlockData) config.Integration { } } -func nextProvingPeriodIG(src config.Source, af dig.BlockData) config.Integration { +func nextProvingPeriodIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { return config.Integration{ Name: "pdp_next_proving_period", Enabled: true, @@ -192,6 +195,7 @@ func nextProvingPeriodIG(src config.Source, af dig.BlockData) config.Integration Table: wpg.Table{ Name: "pdp_next_proving_period", Columns: []wpg.Column{ + ac, {Name: "set_id", Type: "numeric"}, {Name: "challenge_epoch", Type: "numeric"}, {Name: "leaf_count", Type: "numeric"}, @@ -211,7 +215,7 @@ func nextProvingPeriodIG(src config.Source, af dig.BlockData) config.Integration } // only set_id captured; challenges tuple not needed for deal tracking -func possessionProvenIG(src config.Source, af dig.BlockData) config.Integration { +func possessionProvenIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { return config.Integration{ Name: "pdp_possession_proven", Enabled: true, @@ -219,6 +223,7 @@ func possessionProvenIG(src config.Source, af dig.BlockData) config.Integration Table: wpg.Table{ Name: "pdp_possession_proven", Columns: []wpg.Column{ + ac, {Name: "set_id", Type: "numeric"}, }, }, @@ -237,7 +242,7 @@ func possessionProvenIG(src config.Source, af dig.BlockData) config.Integration } } -func dataSetDeletedIG(src config.Source, af dig.BlockData) config.Integration { +func dataSetDeletedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { return config.Integration{ Name: "pdp_dataset_deleted", Enabled: true, @@ -245,6 +250,7 @@ func dataSetDeletedIG(src config.Source, af dig.BlockData) config.Integration { Table: wpg.Table{ Name: "pdp_dataset_deleted", Columns: []wpg.Column{ + ac, {Name: "set_id", Type: "numeric"}, {Name: "deleted_leaf_count", Type: "numeric"}, }, @@ -261,7 +267,7 @@ func dataSetDeletedIG(src config.Source, af dig.BlockData) config.Integration { } } -func spChangedIG(src config.Source, af dig.BlockData) config.Integration { +func spChangedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { return config.Integration{ Name: "pdp_sp_changed", Enabled: true, @@ -269,6 +275,7 @@ func spChangedIG(src config.Source, af dig.BlockData) config.Integration { Table: wpg.Table{ Name: "pdp_sp_changed", Columns: []wpg.Column{ + ac, {Name: "set_id", Type: "numeric"}, {Name: "old_sp", Type: "bytea"}, {Name: "new_sp", Type: "bytea"}, diff --git a/service/pdptracker/indexer_test.go b/service/pdptracker/indexer_test.go index f8e86b8b..0407e97e 100644 --- a/service/pdptracker/indexer_test.go +++ b/service/pdptracker/indexer_test.go @@ -4,12 +4,14 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" + "github.com/indexsupply/shovel/shovel/config" "github.com/stretchr/testify/require" ) func TestBuildShovelConfig(t *testing.T) { contract := common.HexToAddress("0xBADd0B92C1c71d02E7d520f64c0876538fa2557F") conf := buildShovelConfig("postgres://localhost/test", "https://rpc.example.com", 314, contract) + require.NoError(t, config.ValidateFix(&conf)) require.Len(t, conf.Sources, 1) require.Equal(t, "fevm", conf.Sources[0].Name) From 6fefb71fa34b9899bec880afd0b1b73e567fb825 Mon Sep 17 00:00:00 2001 From: Arkadiy Kukarkin Date: Thu, 19 Feb 2026 19:20:43 +0100 Subject: [PATCH 3/3] set PollDuration on Shovel source config Shovel only defaults PollDuration to 1s during JSON unmarshal, not when the struct is constructed directly. Zero value propagates to jrpc2.Client and panics in time.NewTicker(0). --- service/pdptracker/indexer.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/service/pdptracker/indexer.go b/service/pdptracker/indexer.go index dc9630f0..2d95d235 100644 --- a/service/pdptracker/indexer.go +++ b/service/pdptracker/indexer.go @@ -3,6 +3,7 @@ package pdptracker import ( "context" "strings" + "time" "github.com/cockroachdb/errors" "github.com/ethereum/go-ethereum/common" @@ -82,9 +83,10 @@ const srcName = "fevm" func buildShovelConfig(pgURL, rpcURL string, chainID uint64, contract common.Address) config.Root { addrHex := strings.ToLower(contract.Hex()) src := config.Source{ - Name: srcName, - ChainID: chainID, - URLs: []string{rpcURL}, + Name: srcName, + ChainID: chainID, + URLs: []string{rpcURL}, + PollDuration: time.Second, } addrFilter := dig.BlockData{