Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ RUN apt-get update \
mariadb-server mariadb-client \
&& rm -rf /var/lib/apt/lists/*

# Prepare user-owned data dirs for rootless startup
COPY --from=ghcr.io/foundry-rs/foundry:latest /usr/local/bin/anvil /usr/local/bin/anvil

# prepare user-owned data dirs for rootless startup
RUN mkdir -p /home/vscode/.local/share/pg/pgdata \
&& mkdir -p /home/vscode/.local/share/mysql \
&& chown -R vscode:vscode /home/vscode/.local/share
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/devcontainer-podman.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
command: >
cd /workspaces/singularity &&
mkdir -p artifacts &&
SINGULARITY_TEST_INTEGRATION=true make test GOTESTSUM_ARGS="--junitfile artifacts/integration-tests.xml -- -timeout 20m -run Integration ./cmd/..."
SINGULARITY_TEST_INTEGRATION=true make test GOTESTSUM_ARGS="--junitfile artifacts/integration-tests.xml -- -timeout 20m -run Integration ./cmd/... ./service/pdptracker/..."
container-runtime: podman

- name: Report test results
Expand Down
53 changes: 31 additions & 22 deletions service/pdptracker/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pdptracker
import (
"context"
"strings"
"time"

"github.com/cockroachdb/errors"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -82,41 +83,43 @@ const srcName = "fevm"
func buildShovelConfig(pgURL, rpcURL string, chainID uint64, contract common.Address) config.Root {
addrHex := strings.ToLower(contract.Hex())
src := config.Source{
Name: srcName,
ChainID: chainID,
URLs: []string{rpcURL},
Name: srcName,
ChainID: chainID,
URLs: []string{rpcURL},
PollDuration: time.Second,
}

af := func() dig.BlockData {
return dig.BlockData{
Name: "log_addr",
Filter: dig.Filter{Op: "contains", Arg: []string{addrHex}},
}
addrFilter := dig.BlockData{
Name: "log_addr",
Column: "log_addr",
Filter: dig.Filter{Op: "contains", Arg: []string{addrHex}},
}
addrCol := wpg.Column{Name: "log_addr", Type: "bytea"}

return config.Root{
PGURL: pgURL,
Sources: []config.Source{src},
Integrations: []config.Integration{
dataSetCreatedIG(src, af()),
piecesAddedIG(src, af()),
piecesRemovedIG(src, af()),
nextProvingPeriodIG(src, af()),
possessionProvenIG(src, af()),
dataSetDeletedIG(src, af()),
spChangedIG(src, af()),
dataSetCreatedIG(src, addrFilter, addrCol),
piecesAddedIG(src, addrFilter, addrCol),
piecesRemovedIG(src, addrFilter, addrCol),
nextProvingPeriodIG(src, addrFilter, addrCol),
possessionProvenIG(src, addrFilter, addrCol),
dataSetDeletedIG(src, addrFilter, addrCol),
spChangedIG(src, addrFilter, addrCol),
},
}
}

func dataSetCreatedIG(src config.Source, af dig.BlockData) config.Integration {
func dataSetCreatedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration {
return config.Integration{
Name: "pdp_dataset_created",
Enabled: true,
Sources: []config.Source{{Name: src.Name}},
Table: wpg.Table{
Name: "pdp_dataset_created",
Columns: []wpg.Column{
ac,
{Name: "set_id", Type: "numeric"},
{Name: "storage_provider", Type: "bytea"},
},
Expand All @@ -134,14 +137,15 @@ func dataSetCreatedIG(src config.Source, af dig.BlockData) config.Integration {
}

// only set_id captured; array fields reconciled via getActivePieces RPC
func piecesAddedIG(src config.Source, af dig.BlockData) config.Integration {
func piecesAddedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration {
return config.Integration{
Name: "pdp_pieces_added",
Enabled: true,
Sources: []config.Source{{Name: src.Name}},
Table: wpg.Table{
Name: "pdp_pieces_added",
Columns: []wpg.Column{
ac,
{Name: "set_id", Type: "numeric"},
},
},
Expand All @@ -161,14 +165,15 @@ func piecesAddedIG(src config.Source, af dig.BlockData) config.Integration {
}
}

func piecesRemovedIG(src config.Source, af dig.BlockData) config.Integration {
func piecesRemovedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration {
return config.Integration{
Name: "pdp_pieces_removed",
Enabled: true,
Sources: []config.Source{{Name: src.Name}},
Table: wpg.Table{
Name: "pdp_pieces_removed",
Columns: []wpg.Column{
ac,
{Name: "set_id", Type: "numeric"},
},
},
Expand All @@ -184,14 +189,15 @@ func piecesRemovedIG(src config.Source, af dig.BlockData) config.Integration {
}
}

func nextProvingPeriodIG(src config.Source, af dig.BlockData) config.Integration {
func nextProvingPeriodIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration {
return config.Integration{
Name: "pdp_next_proving_period",
Enabled: true,
Sources: []config.Source{{Name: src.Name}},
Table: wpg.Table{
Name: "pdp_next_proving_period",
Columns: []wpg.Column{
ac,
{Name: "set_id", Type: "numeric"},
{Name: "challenge_epoch", Type: "numeric"},
{Name: "leaf_count", Type: "numeric"},
Expand All @@ -211,14 +217,15 @@ func nextProvingPeriodIG(src config.Source, af dig.BlockData) config.Integration
}

// only set_id captured; challenges tuple not needed for deal tracking
func possessionProvenIG(src config.Source, af dig.BlockData) config.Integration {
func possessionProvenIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration {
return config.Integration{
Name: "pdp_possession_proven",
Enabled: true,
Sources: []config.Source{{Name: src.Name}},
Table: wpg.Table{
Name: "pdp_possession_proven",
Columns: []wpg.Column{
ac,
{Name: "set_id", Type: "numeric"},
},
},
Expand All @@ -237,14 +244,15 @@ func possessionProvenIG(src config.Source, af dig.BlockData) config.Integration
}
}

func dataSetDeletedIG(src config.Source, af dig.BlockData) config.Integration {
func dataSetDeletedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration {
return config.Integration{
Name: "pdp_dataset_deleted",
Enabled: true,
Sources: []config.Source{{Name: src.Name}},
Table: wpg.Table{
Name: "pdp_dataset_deleted",
Columns: []wpg.Column{
ac,
{Name: "set_id", Type: "numeric"},
{Name: "deleted_leaf_count", Type: "numeric"},
},
Expand All @@ -261,14 +269,15 @@ func dataSetDeletedIG(src config.Source, af dig.BlockData) config.Integration {
}
}

func spChangedIG(src config.Source, af dig.BlockData) config.Integration {
func spChangedIG(src config.Source, af dig.BlockData, ac wpg.Column) config.Integration {
return config.Integration{
Name: "pdp_sp_changed",
Enabled: true,
Sources: []config.Source{{Name: src.Name}},
Table: wpg.Table{
Name: "pdp_sp_changed",
Columns: []wpg.Column{
ac,
{Name: "set_id", Type: "numeric"},
{Name: "old_sp", Type: "bytea"},
{Name: "new_sp", Type: "bytea"},
Expand Down
2 changes: 2 additions & 0 deletions service/pdptracker/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/indexsupply/shovel/shovel/config"
"github.com/stretchr/testify/require"
)

func TestBuildShovelConfig(t *testing.T) {
contract := common.HexToAddress("0xBADd0B92C1c71d02E7d520f64c0876538fa2557F")
conf := buildShovelConfig("postgres://localhost/test", "https://rpc.example.com", 314, contract)
require.NoError(t, config.ValidateFix(&conf))

require.Len(t, conf.Sources, 1)
require.Equal(t, "fevm", conf.Sources[0].Name)
Expand Down
47 changes: 14 additions & 33 deletions service/pdptracker/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ import (

const calibnetRPC = "https://api.calibration.node.glif.io/rpc/v1"

// TestIntegration_NetworkDetection verifies synapse.DetectNetwork works
// against calibnet and returns the expected chain ID and contract address.
func startCalibnetFork(t *testing.T) string {
t.Helper()
anvil := testutil.StartAnvil(t, calibnetRPC)
return anvil.RPCURL
}

func TestIntegration_NetworkDetection(t *testing.T) {
testutil.SkipIfNotExternalAPI(t)
rpcURL := startCalibnetFork(t)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

ethClient, err := ethclient.DialContext(ctx, calibnetRPC)
ethClient, err := ethclient.DialContext(ctx, rpcURL)
require.NoError(t, err)
defer ethClient.Close()

Expand All @@ -39,11 +43,7 @@ func TestIntegration_NetworkDetection(t *testing.T) {
t.Logf("calibnet PDPVerifier: %s", contractAddr.Hex())
}

// TestIntegration_ShovelConfig validates that the Shovel config generated for
// calibnet passes ValidateFix without errors.
func TestIntegration_ShovelConfig(t *testing.T) {
testutil.SkipIfNotExternalAPI(t)

contractAddr := constants.GetPDPVerifierAddress(constants.NetworkCalibration)
conf := buildShovelConfig(
"postgres://localhost/test",
Expand All @@ -52,51 +52,42 @@ func TestIntegration_ShovelConfig(t *testing.T) {
contractAddr,
)

// import config package to validate
require.Len(t, conf.Integrations, 7)
require.Len(t, conf.Sources, 1)
require.Equal(t, uint64(314159), conf.Sources[0].ChainID)
}

// TestIntegration_ShovelIndexer_Calibnet starts an embedded Shovel indexer
// against calibnet and verifies it processes blocks without errors.
// Requires: Postgres (PGPORT), calibnet RPC, SINGULARITY_TEST_EXTERNAL_API=true.
func TestIntegration_ShovelIndexer_Calibnet(t *testing.T) {
testutil.SkipIfNotExternalAPI(t)
func TestIntegration_ShovelIndexer(t *testing.T) {
rpcURL := startCalibnetFork(t)

testutil.All(t, func(ctx context.Context, t *testing.T, db *gorm.DB) {
if db.Dialector.Name() != "postgres" {
t.Skip("Shovel requires Postgres")
return
}

// get the postgres connection string from env (set by testutil)
connStr := os.Getenv("DATABASE_CONNECTION_STRING")
require.NotEmpty(t, connStr)

contractAddr := constants.GetPDPVerifierAddress(constants.NetworkCalibration)

indexer, err := NewPDPIndexer(ctx, connStr, calibnetRPC, uint64(constants.ChainIDCalibration), contractAddr)
indexer, err := NewPDPIndexer(ctx, connStr, rpcURL, uint64(constants.ChainIDCalibration), contractAddr)
require.NoError(t, err)

// start indexer with timeout context
indexCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()

exitErr := make(chan error, 1)
err = indexer.Start(indexCtx, exitErr)
require.NoError(t, err)

// let it run for a few seconds to process some blocks
time.Sleep(10 * time.Second)

// verify shovel internal tables exist
var schemaExists bool
err = db.Raw("SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = 'shovel')").Scan(&schemaExists).Error
require.NoError(t, err)
require.True(t, schemaExists, "shovel schema should exist")

// verify integration tables exist
for _, table := range []string{
"pdp_dataset_created",
"pdp_pieces_added",
Expand All @@ -114,39 +105,29 @@ func TestIntegration_ShovelIndexer_Calibnet(t *testing.T) {
require.True(t, exists, "table %s should exist", table)
}

t.Log("Shovel indexer started and processed blocks against calibnet successfully")

cancel()
// wait for clean shutdown
select {
case err := <-exitErr:
require.NoError(t, err)
case <-time.After(5 * time.Second):
// fine, shutdown may be slow
}
})
}

// TestIntegration_RPCClient_Calibnet verifies the RPC client can make calls
// against the real calibnet PDPVerifier contract.
func TestIntegration_RPCClient_Calibnet(t *testing.T) {
testutil.SkipIfNotExternalAPI(t)
func TestIntegration_RPCClient(t *testing.T) {
rpcURL := startCalibnetFork(t)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

contractAddr := constants.GetPDPVerifierAddress(constants.NetworkCalibration)
client, err := NewPDPClient(ctx, calibnetRPC, contractAddr)
client, err := NewPDPClient(ctx, rpcURL, contractAddr)
require.NoError(t, err)
defer client.Close()

// try to get listener for set 0 — may fail (doesn't exist) but shouldn't panic
_, err = client.GetDataSetListener(ctx, 0)
// we don't assert NoError here because set 0 may not exist,
// but the call should complete without panic
t.Logf("GetDataSetListener(0): err=%v", err)

// try to get active pieces for set 0
_, err = client.GetActivePieces(ctx, 0)
t.Logf("GetActivePieces(0): err=%v", err)
}
Loading