diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 2fccf82f..713bcecf 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -24,7 +24,9 @@ RUN apt-get update \ mariadb-server mariadb-client \ && rm -rf /var/lib/apt/lists/* -# Prepare user-owned data dirs for rootless startup +COPY --from=ghcr.io/foundry-rs/foundry:latest /usr/local/bin/anvil /usr/local/bin/anvil + +# prepare user-owned data dirs for rootless startup RUN mkdir -p /home/vscode/.local/share/pg/pgdata \ && mkdir -p /home/vscode/.local/share/mysql \ && chown -R vscode:vscode /home/vscode/.local/share diff --git a/.github/workflows/devcontainer-podman.yml b/.github/workflows/devcontainer-podman.yml index 5e9584b9..fdc29d3a 100644 --- a/.github/workflows/devcontainer-podman.yml +++ b/.github/workflows/devcontainer-podman.yml @@ -88,7 +88,7 @@ jobs: command: > cd /workspaces/singularity && mkdir -p artifacts && - SINGULARITY_TEST_INTEGRATION=true make test GOTESTSUM_ARGS="--junitfile artifacts/integration-tests.xml -- -timeout 20m -run Integration ./cmd/..." + SINGULARITY_TEST_INTEGRATION=true make test GOTESTSUM_ARGS="--junitfile artifacts/integration-tests.xml -- -timeout 20m -run Integration ./cmd/... ./service/pdptracker/..." container-runtime: podman - name: Report test results diff --git a/service/pdptracker/indexer.go b/service/pdptracker/indexer.go index bf49642c..2d95d235 100644 --- a/service/pdptracker/indexer.go +++ b/service/pdptracker/indexer.go @@ -3,6 +3,7 @@ package pdptracker import ( "context" "strings" + "time" "github.com/cockroachdb/errors" "github.com/ethereum/go-ethereum/common" @@ -82,34 +83,35 @@ const srcName = "fevm" func buildShovelConfig(pgURL, rpcURL string, chainID uint64, contract common.Address) config.Root { addrHex := strings.ToLower(contract.Hex()) src := config.Source{ - Name: srcName, - ChainID: chainID, - URLs: []string{rpcURL}, + Name: srcName, + ChainID: chainID, + URLs: []string{rpcURL}, + PollDuration: time.Second, } - af := func() dig.BlockData { - return dig.BlockData{ - Name: "log_addr", - Filter: dig.Filter{Op: "contains", Arg: []string{addrHex}}, - } + addrFilter := dig.BlockData{ + Name: "log_addr", + Column: "log_addr", + Filter: dig.Filter{Op: "contains", Arg: []string{addrHex}}, } + addrCol := wpg.Column{Name: "log_addr", Type: "bytea"} return config.Root{ PGURL: pgURL, Sources: []config.Source{src}, Integrations: []config.Integration{ - dataSetCreatedIG(src, af()), - piecesAddedIG(src, af()), - piecesRemovedIG(src, af()), - nextProvingPeriodIG(src, af()), - possessionProvenIG(src, af()), - dataSetDeletedIG(src, af()), - spChangedIG(src, af()), + dataSetCreatedIG(src, addrFilter, addrCol), + piecesAddedIG(src, addrFilter, addrCol), + piecesRemovedIG(src, addrFilter, addrCol), + nextProvingPeriodIG(src, addrFilter, addrCol), + possessionProvenIG(src, addrFilter, addrCol), + dataSetDeletedIG(src, addrFilter, addrCol), + spChangedIG(src, addrFilter, addrCol), }, } } -func dataSetCreatedIG(src config.Source, af dig.BlockData) config.Integration { +func dataSetCreatedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { return config.Integration{ Name: "pdp_dataset_created", Enabled: true, @@ -117,6 +119,7 @@ func dataSetCreatedIG(src config.Source, af dig.BlockData) config.Integration { Table: wpg.Table{ Name: "pdp_dataset_created", Columns: []wpg.Column{ + ac, {Name: "set_id", Type: "numeric"}, {Name: "storage_provider", Type: "bytea"}, }, @@ -134,7 +137,7 @@ func dataSetCreatedIG(src config.Source, af dig.BlockData) config.Integration { } // only set_id captured; array fields reconciled via getActivePieces RPC -func piecesAddedIG(src config.Source, af dig.BlockData) config.Integration { +func piecesAddedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { return config.Integration{ Name: "pdp_pieces_added", Enabled: true, @@ -142,6 +145,7 @@ func piecesAddedIG(src config.Source, af dig.BlockData) config.Integration { Table: wpg.Table{ Name: "pdp_pieces_added", Columns: []wpg.Column{ + ac, {Name: "set_id", Type: "numeric"}, }, }, @@ -161,7 +165,7 @@ func piecesAddedIG(src config.Source, af dig.BlockData) config.Integration { } } -func piecesRemovedIG(src config.Source, af dig.BlockData) config.Integration { +func piecesRemovedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { return config.Integration{ Name: "pdp_pieces_removed", Enabled: true, @@ -169,6 +173,7 @@ func piecesRemovedIG(src config.Source, af dig.BlockData) config.Integration { Table: wpg.Table{ Name: "pdp_pieces_removed", Columns: []wpg.Column{ + ac, {Name: "set_id", Type: "numeric"}, }, }, @@ -184,7 +189,7 @@ func piecesRemovedIG(src config.Source, af dig.BlockData) config.Integration { } } -func nextProvingPeriodIG(src config.Source, af dig.BlockData) config.Integration { +func nextProvingPeriodIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { return config.Integration{ Name: "pdp_next_proving_period", Enabled: true, @@ -192,6 +197,7 @@ func nextProvingPeriodIG(src config.Source, af dig.BlockData) config.Integration Table: wpg.Table{ Name: "pdp_next_proving_period", Columns: []wpg.Column{ + ac, {Name: "set_id", Type: "numeric"}, {Name: "challenge_epoch", Type: "numeric"}, {Name: "leaf_count", Type: "numeric"}, @@ -211,7 +217,7 @@ func nextProvingPeriodIG(src config.Source, af dig.BlockData) config.Integration } // only set_id captured; challenges tuple not needed for deal tracking -func possessionProvenIG(src config.Source, af dig.BlockData) config.Integration { +func possessionProvenIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { return config.Integration{ Name: "pdp_possession_proven", Enabled: true, @@ -219,6 +225,7 @@ func possessionProvenIG(src config.Source, af dig.BlockData) config.Integration Table: wpg.Table{ Name: "pdp_possession_proven", Columns: []wpg.Column{ + ac, {Name: "set_id", Type: "numeric"}, }, }, @@ -237,7 +244,7 @@ func possessionProvenIG(src config.Source, af dig.BlockData) config.Integration } } -func dataSetDeletedIG(src config.Source, af dig.BlockData) config.Integration { +func dataSetDeletedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { return config.Integration{ Name: "pdp_dataset_deleted", Enabled: true, @@ -245,6 +252,7 @@ func dataSetDeletedIG(src config.Source, af dig.BlockData) config.Integration { Table: wpg.Table{ Name: "pdp_dataset_deleted", Columns: []wpg.Column{ + ac, {Name: "set_id", Type: "numeric"}, {Name: "deleted_leaf_count", Type: "numeric"}, }, @@ -261,7 +269,7 @@ func dataSetDeletedIG(src config.Source, af dig.BlockData) config.Integration { } } -func spChangedIG(src config.Source, af dig.BlockData) config.Integration { +func spChangedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration { return config.Integration{ Name: "pdp_sp_changed", Enabled: true, @@ -269,6 +277,7 @@ func spChangedIG(src config.Source, af dig.BlockData) config.Integration { Table: wpg.Table{ Name: "pdp_sp_changed", Columns: []wpg.Column{ + ac, {Name: "set_id", Type: "numeric"}, {Name: "old_sp", Type: "bytea"}, {Name: "new_sp", Type: "bytea"}, diff --git a/service/pdptracker/indexer_test.go b/service/pdptracker/indexer_test.go index f8e86b8b..0407e97e 100644 --- a/service/pdptracker/indexer_test.go +++ b/service/pdptracker/indexer_test.go @@ -4,12 +4,14 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" + "github.com/indexsupply/shovel/shovel/config" "github.com/stretchr/testify/require" ) func TestBuildShovelConfig(t *testing.T) { contract := common.HexToAddress("0xBADd0B92C1c71d02E7d520f64c0876538fa2557F") conf := buildShovelConfig("postgres://localhost/test", "https://rpc.example.com", 314, contract) + require.NoError(t, config.ValidateFix(&conf)) require.Len(t, conf.Sources, 1) require.Equal(t, "fevm", conf.Sources[0].Name) diff --git a/service/pdptracker/integration_test.go b/service/pdptracker/integration_test.go index 54c1d463..8a055e86 100644 --- a/service/pdptracker/integration_test.go +++ b/service/pdptracker/integration_test.go @@ -17,15 +17,19 @@ import ( const calibnetRPC = "https://api.calibration.node.glif.io/rpc/v1" -// TestIntegration_NetworkDetection verifies synapse.DetectNetwork works -// against calibnet and returns the expected chain ID and contract address. +func startCalibnetFork(t *testing.T) string { + t.Helper() + anvil := testutil.StartAnvil(t, calibnetRPC) + return anvil.RPCURL +} + func TestIntegration_NetworkDetection(t *testing.T) { - testutil.SkipIfNotExternalAPI(t) + rpcURL := startCalibnetFork(t) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - ethClient, err := ethclient.DialContext(ctx, calibnetRPC) + ethClient, err := ethclient.DialContext(ctx, rpcURL) require.NoError(t, err) defer ethClient.Close() @@ -39,11 +43,7 @@ func TestIntegration_NetworkDetection(t *testing.T) { t.Logf("calibnet PDPVerifier: %s", contractAddr.Hex()) } -// TestIntegration_ShovelConfig validates that the Shovel config generated for -// calibnet passes ValidateFix without errors. func TestIntegration_ShovelConfig(t *testing.T) { - testutil.SkipIfNotExternalAPI(t) - contractAddr := constants.GetPDPVerifierAddress(constants.NetworkCalibration) conf := buildShovelConfig( "postgres://localhost/test", @@ -52,17 +52,13 @@ func TestIntegration_ShovelConfig(t *testing.T) { contractAddr, ) - // import config package to validate require.Len(t, conf.Integrations, 7) require.Len(t, conf.Sources, 1) require.Equal(t, uint64(314159), conf.Sources[0].ChainID) } -// TestIntegration_ShovelIndexer_Calibnet starts an embedded Shovel indexer -// against calibnet and verifies it processes blocks without errors. -// Requires: Postgres (PGPORT), calibnet RPC, SINGULARITY_TEST_EXTERNAL_API=true. -func TestIntegration_ShovelIndexer_Calibnet(t *testing.T) { - testutil.SkipIfNotExternalAPI(t) +func TestIntegration_ShovelIndexer(t *testing.T) { + rpcURL := startCalibnetFork(t) testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) { if db.Dialector.Name() != "postgres" { @@ -70,16 +66,14 @@ func TestIntegration_ShovelIndexer_Calibnet(t *testing.T) { return } - // get the postgres connection string from env (set by testutil) connStr := os.Getenv("DATABASE_CONNECTION_STRING") require.NotEmpty(t, connStr) contractAddr := constants.GetPDPVerifierAddress(constants.NetworkCalibration) - indexer, err := NewPDPIndexer(ctx, connStr, calibnetRPC, uint64(constants.ChainIDCalibration), contractAddr) + indexer, err := NewPDPIndexer(ctx, connStr, rpcURL, uint64(constants.ChainIDCalibration), contractAddr) require.NoError(t, err) - // start indexer with timeout context indexCtx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() @@ -87,16 +81,13 @@ func TestIntegration_ShovelIndexer_Calibnet(t *testing.T) { err = indexer.Start(indexCtx, exitErr) require.NoError(t, err) - // let it run for a few seconds to process some blocks time.Sleep(10 * time.Second) - // verify shovel internal tables exist var schemaExists bool err = db.Raw("SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = 'shovel')").Scan(&schemaExists).Error require.NoError(t, err) require.True(t, schemaExists, "shovel schema should exist") - // verify integration tables exist for _, table := range []string{ "pdp_dataset_created", "pdp_pieces_added", @@ -114,39 +105,29 @@ func TestIntegration_ShovelIndexer_Calibnet(t *testing.T) { require.True(t, exists, "table %s should exist", table) } - t.Log("Shovel indexer started and processed blocks against calibnet successfully") - cancel() - // wait for clean shutdown select { case err := <-exitErr: require.NoError(t, err) case <-time.After(5 * time.Second): - // fine, shutdown may be slow } }) } -// TestIntegration_RPCClient_Calibnet verifies the RPC client can make calls -// against the real calibnet PDPVerifier contract. -func TestIntegration_RPCClient_Calibnet(t *testing.T) { - testutil.SkipIfNotExternalAPI(t) +func TestIntegration_RPCClient(t *testing.T) { + rpcURL := startCalibnetFork(t) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() contractAddr := constants.GetPDPVerifierAddress(constants.NetworkCalibration) - client, err := NewPDPClient(ctx, calibnetRPC, contractAddr) + client, err := NewPDPClient(ctx, rpcURL, contractAddr) require.NoError(t, err) defer client.Close() - // try to get listener for set 0 — may fail (doesn't exist) but shouldn't panic _, err = client.GetDataSetListener(ctx, 0) - // we don't assert NoError here because set 0 may not exist, - // but the call should complete without panic t.Logf("GetDataSetListener(0): err=%v", err) - // try to get active pieces for set 0 _, err = client.GetActivePieces(ctx, 0) t.Logf("GetActivePieces(0): err=%v", err) } diff --git a/util/testutil/anvil.go b/util/testutil/anvil.go new file mode 100644 index 00000000..93a32333 --- /dev/null +++ b/util/testutil/anvil.go @@ -0,0 +1,91 @@ +package testutil + +import ( + "context" + "fmt" + "net" + "os" + "os/exec" + "testing" + "time" + + "github.com/ethereum/go-ethereum/ethclient" +) + +type AnvilInstance struct { + RPCURL string + cmd *exec.Cmd +} + +func (a *AnvilInstance) Close() { + if a.cmd != nil && a.cmd.Process != nil { + a.cmd.Process.Kill() + a.cmd.Wait() + } +} + +// StartAnvil forks the given upstream RPC on a random free port. +// Returns when the fork is ready to accept connections. +func StartAnvil(t *testing.T, upstreamRPC string) *AnvilInstance { + t.Helper() + + if _, err := exec.LookPath("anvil"); err != nil { + t.Skip("anvil not found on PATH") + } + + port, err := freePort() + if err != nil { + t.Fatalf("finding free port: %v", err) + } + + cmd := exec.Command("anvil", + "--fork-url", upstreamRPC, + "--port", fmt.Sprintf("%d", port), + "--silent", + ) + cmd.Stdout = os.Stderr + cmd.Stderr = os.Stderr + if err := cmd.Start(); err != nil { + t.Fatalf("starting anvil: %v", err) + } + + rpcURL := fmt.Sprintf("http://127.0.0.1:%d", port) + inst := &AnvilInstance{RPCURL: rpcURL, cmd: cmd} + t.Cleanup(inst.Close) + + if err := waitForRPC(rpcURL, 30*time.Second); err != nil { + t.Fatalf("anvil not ready: %v", err) + } + + return inst +} + +func freePort() (int, error) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return 0, err + } + port := l.Addr().(*net.TCPAddr).Port + l.Close() + return port, nil +} + +func waitForRPC(url string, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + client, err := ethclient.DialContext(ctx, url) + if err == nil { + _, err = client.ChainID(ctx) + client.Close() + cancel() + if err == nil { + return nil + } + } else { + cancel() + } + time.Sleep(200 * time.Millisecond) + } + return fmt.Errorf("rpc at %s not ready after %s", url, timeout) +}