From ef619176b78b7b378d86245e091fbd3ff676b638 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 29 Apr 2024 12:39:31 +1000 Subject: [PATCH 01/12] Feat: Enable Attestations --- eth/node_config.go | 2 +- eth/pubsub.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/eth/node_config.go b/eth/node_config.go index fdfbb79..f14a38e 100644 --- a/eth/node_config.go +++ b/eth/node_config.go @@ -376,7 +376,7 @@ func desiredPubSubBaseTopics() []string { return []string{ p2p.GossipBlockMessage, p2p.GossipAggregateAndProofMessage, - // p2p.GossipAttestationMessage, + p2p.GossipAttestationMessage, p2p.GossipExitMessage, p2p.GossipAttesterSlashingMessage, p2p.GossipProposerSlashingMessage, diff --git a/eth/pubsub.go b/eth/pubsub.go index 2ed6786..cd750f5 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -106,6 +106,9 @@ func (p *PubSub) Serve(ctx context.Context) error { func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler { switch { + // Ensure hotter topics are at the top of the switch statement. + case strings.Contains(topic, p2p.GossipAttestationMessage): + return p.handleBeaconAttestation case strings.Contains(topic, p2p.GossipBlockMessage): return p.handleBeaconBlock default: @@ -127,9 +130,41 @@ func (n *Node) FilterIncomingSubscriptions(id peer.ID, subs []*pubsubpb.RPC_SubO if len(subs) > n.cfg.PubSubSubscriptionRequestLimit { return nil, pubsub.ErrTooManySubscriptions } + return pubsub.FilterSubscriptions(subs, n.CanSubscribe), nil } +func (p *PubSub) handleBeaconAttestation(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + attestation := ðtypes.Attestation{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, attestation); err != nil { + return fmt.Errorf("error decoding phase0 attestation gossip message: %w", err) + } + + evt := &host.TraceEvent{ + Type: "HANDLE_MESSAGE", + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Attestation": attestation, + "Timestamp": now, + }, + } + + if err := p.cfg.DataStream.PutEvent(ctx, evt); err != nil { + slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) + } + + return nil + +} + func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) error { now := time.Now() From f8170ec5da4d42900b9cced7be1855accdc49228 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 1 May 2024 11:35:10 +1000 Subject: [PATCH 02/12] implement attestation peer scoring --- eth/node_config.go | 16 ++++++++-------- eth/topic_score_params.go | 30 +++++++++++++++++++++++++++--- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/eth/node_config.go b/eth/node_config.go index f14a38e..693164c 100644 --- a/eth/node_config.go +++ b/eth/node_config.go @@ -440,12 +440,12 @@ func hasSubnets(topic string) (subnets uint64, hasSubnets bool) { } } -func (n *NodeConfig) composeEthTopic(base string, encoder encoder.NetworkEncoding, subnet uint64) string { - if subnet > 1 { // as far as I know, there aren't subnets with index 0 - return fmt.Sprintf(base, n.ForkDigest, subnet) + encoder.ProtocolSuffix() - } else { - return fmt.Sprintf(base, n.ForkDigest) + encoder.ProtocolSuffix() - } +func (n *NodeConfig) composeEthTopic(base string, encoder encoder.NetworkEncoding) string { + return fmt.Sprintf(base, n.ForkDigest) + encoder.ProtocolSuffix() +} + +func (n *NodeConfig) composeSubnettedEthTopic(base string, encoder encoder.NetworkEncoding, subnet uint64) string { + return fmt.Sprintf(base, n.ForkDigest, subnet) + encoder.ProtocolSuffix() } func (n *NodeConfig) getDesiredFullTopics(encoder encoder.NetworkEncoding) []string { @@ -461,10 +461,10 @@ func (n *NodeConfig) getDesiredFullTopics(encoder encoder.NetworkEncoding) []str subnets, withSubnets := hasSubnets(topicBase) if withSubnets { for subnet := uint64(1); subnet <= subnets; subnet++ { - fullTopics = append(fullTopics, n.composeEthTopic(topicFormat, encoder, subnet)) + fullTopics = append(fullTopics, n.composeSubnettedEthTopic(topicFormat, encoder, subnet)) } } else { - fullTopics = append(fullTopics, n.composeEthTopic(topicFormat, encoder, 0)) + fullTopics = append(fullTopics, n.composeEthTopic(topicFormat, encoder)) } } diff --git a/eth/topic_score_params.go b/eth/topic_score_params.go index d99547f..ce8ede3 100644 --- a/eth/topic_score_params.go +++ b/eth/topic_score_params.go @@ -25,6 +25,7 @@ const ( proposerSlashingWeight = 0.05 voluntaryExitWeight = 0.05 blsToExecutionChangeWeight = 0.05 + attestationWeight = 0.05 // mesh-related params maxInMeshScore = 10 @@ -42,11 +43,12 @@ func topicToScoreParamsMapper(topic string, activeValidators uint64) *pubsub.Top switch { case strings.Contains(topic, p2p.GossipBlockMessage): return defaultBlockTopicParams() - + case strings.Contains(topic, p2p.GossipAttestationMessage): + return defaultAttestationTopicParams() case strings.Contains(topic, p2p.GossipAggregateAndProofMessage): return defaultAggregateTopicParams(activeValidators) - case strings.Contains(topic, p2p.GossipAttestationMessage): + case strings.Contains(topic, p2p.GossipAggregateAndProofMessage): return defaultAggregateSubnetTopicParams(activeValidators) case strings.Contains(topic, p2p.GossipExitMessage): @@ -188,7 +190,7 @@ func defaultAggregateSubnetTopicParams(activeValidators uint64) *pubsub.TopicSco return nil } // Determine the amount of validators expected in a subnet in a single slot. - numPerSlot := time.Duration(subnetWeight / uint64(currentBeaconConfig.SlotsPerEpoch)) + numPerSlot := subnetWeight / uint64(currentBeaconConfig.SlotsPerEpoch) if numPerSlot == 0 { slog.Warn("numPerSlot is 0, skipping initializing topic scoring") return nil @@ -388,6 +390,28 @@ func defaultBlsToExecutionChangeTopicParams() *pubsub.TopicScoreParams { } } +func defaultAttestationTopicParams() *pubsub.TopicScoreParams { + return &pubsub.TopicScoreParams{ + TopicWeight: attestationWeight, + TimeInMeshWeight: maxInMeshScore / inMeshCap(), + TimeInMeshQuantum: inMeshTime(), + TimeInMeshCap: inMeshCap(), + FirstMessageDeliveriesWeight: 2, + FirstMessageDeliveriesDecay: scoreDecay(oneHundredEpochs), + FirstMessageDeliveriesCap: 5, + MeshMessageDeliveriesWeight: 0, + MeshMessageDeliveriesDecay: 0, + MeshMessageDeliveriesCap: 0, + MeshMessageDeliveriesThreshold: 0, + MeshMessageDeliveriesWindow: 0, + MeshMessageDeliveriesActivation: 0, + MeshFailurePenaltyWeight: 0, + MeshFailurePenaltyDecay: 0, + InvalidMessageDeliveriesWeight: -2000, + InvalidMessageDeliveriesDecay: scoreDecay(invalidDecayPeriod), + } +} + // utility functions func oneSlotDuration() time.Duration { From 0c42cafd65a5e5f6a2be657e68537241deb6821e Mon Sep 17 00:00:00 2001 From: Andrew Davis <1709934+Savid@users.noreply.github.com> Date: Wed, 8 May 2024 13:48:27 +1000 Subject: [PATCH 03/12] feat: peerdas --- cmd/hermes/cmd_eth_chains.go | 1 - eth/genesis.go | 6 ------ eth/node_config.go | 6 ++++++ eth/pubsub.go | 33 +++++++++++++++++++++++++++++++++ eth/topic_score_params.go | 28 ++++++++++++++++++++++++++++ go.mod | 4 ++-- go.sum | 8 ++++---- 7 files changed, 73 insertions(+), 13 deletions(-) diff --git a/cmd/hermes/cmd_eth_chains.go b/cmd/hermes/cmd_eth_chains.go index 73e2da2..35a6c31 100644 --- a/cmd/hermes/cmd_eth_chains.go +++ b/cmd/hermes/cmd_eth_chains.go @@ -22,7 +22,6 @@ func cmdEthChainsAction(c *cli.Context) error { chains := []string{ params.MainnetName, params.SepoliaName, - params.PraterName, params.HoleskyName, } diff --git a/eth/genesis.go b/eth/genesis.go index 13d1c85..8df71c8 100644 --- a/eth/genesis.go +++ b/eth/genesis.go @@ -52,8 +52,6 @@ func GetConfigsByNetworkName(net string) (*GenesisConfig, *params.NetworkConfig, return GenesisConfigs[net], params.BeaconNetworkConfig(), params.MainnetConfig(), nil case params.SepoliaName: return GenesisConfigs[net], params.BeaconNetworkConfig(), params.SepoliaConfig(), nil - case params.PraterName: - return GenesisConfigs[net], params.BeaconNetworkConfig(), params.PraterConfig(), nil case params.HoleskyName: return GenesisConfigs[net], params.BeaconNetworkConfig(), params.HoleskyConfig(), nil default: @@ -70,10 +68,6 @@ var GenesisConfigs = map[string]*GenesisConfig{ GenesisValidatorRoot: hexToBytes("d8ea171f3c94aea21ebc42a1ed61052acf3f9209c00e4efbaaddac09ed9b8078"), GenesisTime: time.Unix(1655733600, 0), }, - params.PraterName: { - GenesisValidatorRoot: hexToBytes("043db0d9a83813551ee2f33450d23797757d430911a9320530ad8a0eabc43efb"), - GenesisTime: time.Unix(1616508000, 0), // https://github.com/eth-clients/goerli - }, params.HoleskyName: { GenesisValidatorRoot: hexToBytes("9143aa7c615a7f7115e2b6aac319c03529df8242ae705fba9df39b79c59fa8b1"), GenesisTime: time.Unix(1695902400, 0), diff --git a/eth/node_config.go b/eth/node_config.go index 693164c..d8bab72 100644 --- a/eth/node_config.go +++ b/eth/node_config.go @@ -419,6 +419,9 @@ func topicFormatFromBase(topicBase string) (string, error) { case p2p.GossipBlobSidecarMessage: return p2p.BlobSubnetTopicFormat, nil + case p2p.GossipDataColumnSidecarMessage: + return p2p.DataColumnSubnetTopicFormat, nil + default: return "", fmt.Errorf("unrecognized gossip topic base: %s", topicBase) } @@ -435,6 +438,9 @@ func hasSubnets(topic string) (subnets uint64, hasSubnets bool) { case p2p.GossipBlobSidecarMessage: return currentBeaconConfig.BlobsidecarSubnetCount, true + case p2p.GossipDataColumnSidecarMessage: + return currentBeaconConfig.DataColumnSidecarSubnetCount, true + default: return uint64(0), false } diff --git a/eth/pubsub.go b/eth/pubsub.go index cd750f5..d65c1a7 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -109,6 +109,8 @@ func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler { // Ensure hotter topics are at the top of the switch statement. case strings.Contains(topic, p2p.GossipAttestationMessage): return p.handleBeaconAttestation + case strings.Contains(topic, p2p.GossipDataColumnSidecarMessage): + return p.handleBeaconDataColumnSidecar case strings.Contains(topic, p2p.GossipBlockMessage): return p.handleBeaconBlock default: @@ -165,6 +167,37 @@ func (p *PubSub) handleBeaconAttestation(ctx context.Context, msg *pubsub.Messag } +func (p *PubSub) handleBeaconDataColumnSidecar(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + sidecar := ðtypes.DataColumnSidecar{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, sidecar); err != nil { + return fmt.Errorf("error decoding electra data column sidecar gossip message: %w", err) + } + + evt := &host.TraceEvent{ + Type: "HANDLE_MESSAGE", + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Sidecar": sidecar, + "Timestamp": now, + }, + } + + if err := p.cfg.DataStream.PutEvent(ctx, evt); err != nil { + slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) + } + + return nil + +} + func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) error { now := time.Now() diff --git a/eth/topic_score_params.go b/eth/topic_score_params.go index ce8ede3..cbb62ba 100644 --- a/eth/topic_score_params.go +++ b/eth/topic_score_params.go @@ -26,6 +26,7 @@ const ( voluntaryExitWeight = 0.05 blsToExecutionChangeWeight = 0.05 attestationWeight = 0.05 + dataColumnSidecarWeight = 0.05 // mesh-related params maxInMeshScore = 10 @@ -43,8 +44,10 @@ func topicToScoreParamsMapper(topic string, activeValidators uint64) *pubsub.Top switch { case strings.Contains(topic, p2p.GossipBlockMessage): return defaultBlockTopicParams() + case strings.Contains(topic, p2p.GossipAttestationMessage): return defaultAttestationTopicParams() + case strings.Contains(topic, p2p.GossipAggregateAndProofMessage): return defaultAggregateTopicParams(activeValidators) @@ -72,6 +75,9 @@ func topicToScoreParamsMapper(topic string, activeValidators uint64) *pubsub.Top case strings.Contains(topic, p2p.GossipBlobSidecarMessage): return defaultBlockTopicParams() + case strings.Contains(topic, p2p.GossipDataColumnSidecarMessage): + return defaultDataColumnSidecarTopicParams() + default: slog.Warn("unrecognized gossip-topic to apply peerscores", slog.Attr{Key: "topic", Value: slog.StringValue(topic)}) // return empty parameters @@ -412,6 +418,28 @@ func defaultAttestationTopicParams() *pubsub.TopicScoreParams { } } +func defaultDataColumnSidecarTopicParams() *pubsub.TopicScoreParams { + return &pubsub.TopicScoreParams{ + TopicWeight: dataColumnSidecarWeight, + TimeInMeshWeight: maxInMeshScore / inMeshCap(), + TimeInMeshQuantum: inMeshTime(), + TimeInMeshCap: inMeshCap(), + FirstMessageDeliveriesWeight: 2, + FirstMessageDeliveriesDecay: scoreDecay(oneHundredEpochs), + FirstMessageDeliveriesCap: 5, + MeshMessageDeliveriesWeight: 0, + MeshMessageDeliveriesDecay: 0, + MeshMessageDeliveriesCap: 0, + MeshMessageDeliveriesThreshold: 0, + MeshMessageDeliveriesWindow: 0, + MeshMessageDeliveriesActivation: 0, + MeshFailurePenaltyWeight: 0, + MeshFailurePenaltyDecay: 0, + InvalidMessageDeliveriesWeight: -2000, + InvalidMessageDeliveriesDecay: scoreDecay(invalidDecayPeriod), + } +} + // utility functions func oneSlotDuration() time.Duration { diff --git a/go.mod b/go.mod index f90ffaa..3e46406 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/multiformats/go-multiaddr v0.12.2 github.com/prometheus/client_golang v1.19.0 github.com/prysmaticlabs/go-bitfield v0.0.0-20210809151128-385d8c5e3fb7 - github.com/prysmaticlabs/prysm/v5 v5.0.2 + github.com/prysmaticlabs/prysm/v5 v5.0.4-0.20240507051732-39060d20cdd9 github.com/stretchr/testify v1.9.0 github.com/thejerf/suture/v4 v4.0.5 github.com/urfave/cli/v2 v2.27.1 @@ -73,7 +73,7 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/elastic/gosigar v0.14.2 // indirect - github.com/ethereum/c-kzg-4844 v0.4.0 // indirect + github.com/ethereum/c-kzg-4844 v1.0.1-0.20240422190800-13be436f5927 // indirect github.com/flynn/noise v1.1.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect diff --git a/go.sum b/go.sum index e1dc2c7..1abfd14 100644 --- a/go.sum +++ b/go.sum @@ -247,8 +247,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/ethereum/c-kzg-4844 v0.4.0 h1:3MS1s4JtA868KpJxroZoepdV0ZKBp3u/O5HcZ7R3nlY= -github.com/ethereum/c-kzg-4844 v0.4.0/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= +github.com/ethereum/c-kzg-4844 v1.0.1-0.20240422190800-13be436f5927 h1:ffWmm0RUR2+VqJsCkf94HqgEwZi2fgbm2iq+O/GdJNI= +github.com/ethereum/c-kzg-4844 v1.0.1-0.20240422190800-13be436f5927/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= github.com/ethereum/go-ethereum v1.13.14 h1:EwiY3FZP94derMCIam1iW4HFVrSgIcpsu0HwTQtm6CQ= github.com/ethereum/go-ethereum v1.13.14/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -788,8 +788,8 @@ github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c h1:9PHRCuO github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c/go.mod h1:ZRws458tYHS/Zs936OQ6oCrL+Ict5O4Xpwve1UQ6C9M= github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20230228205207-28762a7b9294 h1:q9wE0ZZRdTUAAeyFP/w0SwBEnCqlVy2+on6X2/e+eAU= github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20230228205207-28762a7b9294/go.mod h1:ZVEbRdnMkGhp/pu35zq4SXxtvUwWK0J1MATtekZpH2Y= -github.com/prysmaticlabs/prysm/v5 v5.0.2 h1:xcSUvrCVfOGslKYUb5Hpyz98N9I8fC2p7DMAZfiqEIA= -github.com/prysmaticlabs/prysm/v5 v5.0.2/go.mod h1:XG4nOU925zemOimoexcrFP4oA57f+RTQbp7V/TH9UOM= +github.com/prysmaticlabs/prysm/v5 v5.0.4-0.20240507051732-39060d20cdd9 h1:z6b+3uqU4t2lJAAYIB75Yp8sBaCeo4p1/+uHxZxXkmw= +github.com/prysmaticlabs/prysm/v5 v5.0.4-0.20240507051732-39060d20cdd9/go.mod h1:5Pt2wu6NOUTSzLWcoEco/SraJ4kEEx5HubrHBTsmGmE= github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo= github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A= github.com/quic-go/quic-go v0.42.0 h1:uSfdap0eveIl8KXnipv9K7nlwZ5IqLlYOpJ58u5utpM= From eafb396a70278827b45ed42d305b3e1faa82278d Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 13 May 2024 11:03:48 +0300 Subject: [PATCH 04/12] WIP: Disable same network check --- eth/prysm.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/eth/prysm.go b/eth/prysm.go index 7d0ce96..202f720 100644 --- a/eth/prysm.go +++ b/eth/prysm.go @@ -312,6 +312,8 @@ func (p *PrysmClient) getActiveValidatorCount(ctx context.Context) (activeVals u } func (p *PrysmClient) isOnNetwork(ctx context.Context, hermesForkDigest [4]byte) (onNetwork bool, err error) { + return true, nil + ctx, span := p.tracer.Start(ctx, "prysm_client.check_on_same_fork_digest") defer func() { if err != nil { From 967d7180f2b39acf580c79eee4d55010e4cc9708 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 13 May 2024 15:16:17 +0300 Subject: [PATCH 05/12] Enable columns --- eth/node_config.go | 1 + 1 file changed, 1 insertion(+) diff --git a/eth/node_config.go b/eth/node_config.go index d8bab72..185211f 100644 --- a/eth/node_config.go +++ b/eth/node_config.go @@ -383,6 +383,7 @@ func desiredPubSubBaseTopics() []string { p2p.GossipContributionAndProofMessage, // p2p.GossipSyncCommitteeMessage, p2p.GossipBlsToExecutionChangeMessage, + p2p.GossipDataColumnSidecarMessage, // p2p.GossipBlobSidecarMessage, } } From 3f79c2d9c28743a6f649aa29efe97cbe241772da Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 27 May 2024 13:57:46 +1000 Subject: [PATCH 06/12] refactor: Remove redundant return statement --- eth/prysm.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/eth/prysm.go b/eth/prysm.go index 202f720..7d0ce96 100644 --- a/eth/prysm.go +++ b/eth/prysm.go @@ -312,8 +312,6 @@ func (p *PrysmClient) getActiveValidatorCount(ctx context.Context) (activeVals u } func (p *PrysmClient) isOnNetwork(ctx context.Context, hermesForkDigest [4]byte) (onNetwork bool, err error) { - return true, nil - ctx, span := p.tracer.Start(ctx, "prysm_client.check_on_same_fork_digest") defer func() { if err != nil { From 094e08a3232dd6875a63790d0edd667d8083f7c0 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 25 Jul 2024 12:21:53 +1000 Subject: [PATCH 07/12] feat: PeerDAS --- README.md | 17 +- cmd/hermes/cmd_eth.go | 85 +++++++-- cmd/hermes/cmd_eth_chains.go | 19 +- eth/fetch.go | 121 ++++++++++++ eth/genesis.go | 15 -- eth/network_config.go | 120 ++++++++++++ eth/node.go | 2 +- eth/node_config.go | 15 +- eth/prysm.go | 19 +- eth/prysm_test.go | 4 +- eth/pubsub.go | 350 ++++++++++++++++++++++++++++++++++- eth/reqresp.go | 8 +- go.mod | 4 +- go.sum | 4 +- host/callback.go | 4 +- host/flush_tracer.go | 28 +-- host/host.go | 7 +- host/kinesis.go | 11 +- host/producer.go | 2 +- 19 files changed, 737 insertions(+), 98 deletions(-) create mode 100644 eth/fetch.go create mode 100644 eth/network_config.go diff --git a/README.md b/README.md index b00a573..9997a90 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![GoDoc](https://pkg.go.dev/badge/github.com/probe-lab/hermes)](https://pkg.go.dev/github.com/probe-lab/hermes) Hermes is a GossipSub listener and tracer. It subscribes to all relevant pubsub topics -and traces all protocol interactions. As of `2024-03-27`, Hermes supports the Ethereum +and traces all protocol interactions. As of `2024-05-21`, Hermes supports the Ethereum network. ## Table of Contents @@ -21,6 +21,7 @@ network. - [Telemetry](#telemetry) - [Metrics](#metrics) - [Tracing](#tracing) + - [Differences with other tools](#differences-with-other-tools) - [Maintainers](#maintainers) - [Contributing](#contributing) - [License](#license) @@ -199,6 +200,20 @@ You can find the UI at [`http://localhost:16686`](http://localhost:16686). Port Run Hermes with the `--tracing` flag. To change the address of the trace collector, you can also specify `--tracing.addr` and `--tracing.port`. +## Differences with other tools +Hermes jumps to the web3/blockchain/libp2p ecosystem despite a large variety of tools around it, such as the many existing network crawlers or light clients for most mature networks. Although at first sight it might look like a competitor to those, there was still a large incentive to develop it. Here, we describe the gap that Hermes fills as well as the use-cases that it is suitable for. +Hermes was designed to behave as a light node in each supported network, where, in addition to being an honest participant in the network and supporting all the protocols and RPC endpoints, it also allows streaming of custom internal events (mostly libp2p-related). + +Hermes avoids being based on a custom fork of existing full/light clients, which would come with non-negligible maintenance baggage and would complicate having control of events. + +Currently available similar tools: + +### Armiarma Crawler from MigaLabs vs Hermes from ProbeLab +Although both Hermes and Armiarma seem to be focusing on the same goals at first sight, they have significant differences in their use cases and their targets in data collection and metrics. + +[Armiarma](https://github.com/migalabs/armiarma) is a network crawler that relies on running discv5 peer discovery service and a libp2p host 24/7 to establish connections. However, significant modifications have been made to connect to as many peers as possible (custom peering module). This way, it tries to identify as many peers as possible in the network periodically. Thus, its focus is mainly on opening and identifying as many peers as possible, rather than maintaining stable connections to other peers in the network. + +On the other hand, although Hermes also relies on a continuously running discv5 and libp2p host, it uses the libp2p connection manager for a different purpose to Armiarma (which is to connect to as many peers as possible). In the case of Hermes, the connection manager is used to decide who it connects to (i.e., simulating the behaviour of a standard node). Furthermore, it backs up some of the RPC, which requires keeping the chain db calls on a trusted node. This way, it behaves like a light node to the network, which is honest and beneficial for the rest of the network, allowing us to track all desired networking events from stable connections, while at the same time having a highly customizable tracing system. ## Maintainers diff --git a/cmd/hermes/cmd_eth.go b/cmd/hermes/cmd_eth.go index 6609144..e3b11cf 100644 --- a/cmd/hermes/cmd_eth.go +++ b/cmd/hermes/cmd_eth.go @@ -33,6 +33,10 @@ var ethConfig = &struct { DialConcurrency int DialTimeout time.Duration MaxPeers int + GenesisSSZURL string + ConfigURL string + BootnodesURL string + DepositContractBlockURL string }{ PrivateKeyStr: "", // unset means it'll be generated Chain: params.MainnetName, @@ -48,6 +52,10 @@ var ethConfig = &struct { DialConcurrency: 16, DialTimeout: 5 * time.Second, MaxPeers: 30, // arbitrary + GenesisSSZURL: "", + ConfigURL: "", + BootnodesURL: "", + DepositContractBlockURL: "", } var cmdEth = &cli.Command{ @@ -167,6 +175,34 @@ var cmdEthFlags = []cli.Flag{ Value: ethConfig.MaxPeers, Destination: ðConfig.MaxPeers, }, + &cli.StringFlag{ + Name: "genesis.ssz.url", + EnvVars: []string{"HERMES_ETH_GENESIS_SSZ_URL"}, + Usage: "The .ssz URL from which to fetch the genesis data, requires 'chain=devnet'", + Value: ethConfig.GenesisSSZURL, + Destination: ðConfig.GenesisSSZURL, + }, + &cli.StringFlag{ + Name: "config.yaml.url", + EnvVars: []string{"HERMES_ETH_CONFIG_URL"}, + Usage: "The .yaml URL from which to fetch the beacon chain config, requires 'chain=devnet'", + Value: ethConfig.ConfigURL, + Destination: ðConfig.ConfigURL, + }, + &cli.StringFlag{ + Name: "bootnodes.yaml.url", + EnvVars: []string{"HERMES_ETH_BOOTNODES_URL"}, + Usage: "The .yaml URL from which to fetch the bootnode ENRs, requires 'chain=devnet'", + Value: ethConfig.BootnodesURL, + Destination: ðConfig.BootnodesURL, + }, + &cli.StringFlag{ + Name: "deposit-contract-block.txt.url", + EnvVars: []string{"HERMES_ETH_DEPOSIT_CONTRACT_BLOCK_URL"}, + Usage: "The .txt URL from which to fetch the deposit contract block. Requires 'chain=devnet'", + Value: ethConfig.DepositContractBlockURL, + Destination: ðConfig.DepositContractBlockURL, + }, } func cmdEthAction(c *cli.Context) error { @@ -176,20 +212,45 @@ func cmdEthAction(c *cli.Context) error { // Print hermes configuration for debugging purposes printEthConfig() - // Extract chain configuration parameters based on the given chain name - genConfig, netConfig, beaConfig, err := eth.GetConfigsByNetworkName(ethConfig.Chain) - if err != nil { - return fmt.Errorf("get config for %s: %w", ethConfig.Chain, err) + var config *eth.NetworkConfig + // Derive network configuration + if ethConfig.Chain != params.DevnetName { + slog.Info("Deriving known network config:", "chain", ethConfig.Chain) + + c, err := eth.DeriveKnownNetworkConfig(c.Context, ethConfig.Chain) + if err != nil { + return fmt.Errorf("derive network config: %w", err) + } + + config = c + } else { + slog.Info("Deriving devnet network config") + + c, err := eth.DeriveDevnetConfig(c.Context, eth.DevnetOptions{ + ConfigURL: ethConfig.ConfigURL, + BootnodesURL: ethConfig.BootnodesURL, + DepositContractBlockURL: ethConfig.DepositContractBlockURL, + GenesisSSZURL: ethConfig.GenesisSSZURL, + }) + if err != nil { + return fmt.Errorf("failed to derive devnet network config: %w", err) + } + config = c } - genesisRoot := genConfig.GenesisValidatorRoot - genesisTime := genConfig.GenesisTime + // Overriding configuration so that functions like ComputForkDigest take the + // correct input data from the global configuration. + params.OverrideBeaconConfig(config.Beacon) + params.OverrideBeaconNetworkConfig(config.Network) + + genesisRoot := config.Genesis.GenesisValidatorRoot + genesisTime := config.Genesis.GenesisTime // compute fork version and fork digest currentSlot := slots.Since(genesisTime) currentEpoch := slots.ToEpoch(currentSlot) - currentForkVersion, err := eth.GetCurrentForkVersion(currentEpoch, beaConfig) + currentForkVersion, err := eth.GetCurrentForkVersion(currentEpoch, config.Beacon) if err != nil { return fmt.Errorf("compute fork version for epoch %d: %w", currentEpoch, err) } @@ -201,13 +262,13 @@ func cmdEthAction(c *cli.Context) error { // Overriding configuration so that functions like ComputForkDigest take the // correct input data from the global configuration. - params.OverrideBeaconConfig(beaConfig) - params.OverrideBeaconNetworkConfig(netConfig) + params.OverrideBeaconConfig(config.Beacon) + params.OverrideBeaconNetworkConfig(config.Network) cfg := ð.NodeConfig{ - GenesisConfig: genConfig, - NetworkConfig: netConfig, - BeaconConfig: beaConfig, + GenesisConfig: config.Genesis, + NetworkConfig: config.Network, + BeaconConfig: config.Beacon, ForkDigest: forkDigest, ForkVersion: currentForkVersion, PrivateKeyStr: ethConfig.PrivateKeyStr, diff --git a/cmd/hermes/cmd_eth_chains.go b/cmd/hermes/cmd_eth_chains.go index 35a6c31..dda4b56 100644 --- a/cmd/hermes/cmd_eth_chains.go +++ b/cmd/hermes/cmd_eth_chains.go @@ -27,28 +27,27 @@ func cmdEthChainsAction(c *cli.Context) error { slog.Info("Supported chains:") for _, chain := range chains { - - genConfig, _, beaConfig, err := eth.GetConfigsByNetworkName(chain) + config, err := eth.DeriveKnownNetworkConfig(c.Context, chain) if err != nil { return fmt.Errorf("get config for %s: %w", chain, err) } slog.Info(chain) forkVersions := [][]byte{ - beaConfig.GenesisForkVersion, - beaConfig.AltairForkVersion, - beaConfig.BellatrixForkVersion, - beaConfig.CapellaForkVersion, - beaConfig.DenebForkVersion, + config.Beacon.GenesisForkVersion, + config.Beacon.AltairForkVersion, + config.Beacon.BellatrixForkVersion, + config.Beacon.CapellaForkVersion, + config.Beacon.DenebForkVersion, } for _, forkVersion := range forkVersions { - epoch, found := beaConfig.ForkVersionSchedule[[4]byte(forkVersion)] + epoch, found := config.Beacon.ForkVersionSchedule[[4]byte(forkVersion)] if !found { return fmt.Errorf("fork version schedule not found for %x", forkVersion) } - forkName, found := beaConfig.ForkVersionNames[[4]byte(forkVersion)] + forkName, found := config.Beacon.ForkVersionNames[[4]byte(forkVersion)] if !found { return fmt.Errorf("fork version name not found for %x", forkVersion) } @@ -57,7 +56,7 @@ func cmdEthChainsAction(c *cli.Context) error { continue } - digest, err := signing.ComputeForkDigest(forkVersion, genConfig.GenesisValidatorRoot) + digest, err := signing.ComputeForkDigest(forkVersion, config.Genesis.GenesisValidatorRoot) if err != nil { return err } diff --git a/eth/fetch.go b/eth/fetch.go new file mode 100644 index 0000000..70971bc --- /dev/null +++ b/eth/fetch.go @@ -0,0 +1,121 @@ +package eth + +import ( + "context" + "encoding/binary" + "io" + "net/http" + + "github.com/prysmaticlabs/prysm/v5/config/params" + "gopkg.in/yaml.v3" +) + +// FetchConfigFromURL fetches the beacon chain config from a given URL. +func FetchConfigFromURL(ctx context.Context, url string) (*params.BeaconChainConfig, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, err + } + + response, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer response.Body.Close() + + data, err := io.ReadAll(response.Body) + if err != nil { + return nil, err + } + + config := params.MainnetConfig().Copy() + + out, err := params.UnmarshalConfig(data, config) + if err != nil { + return nil, err + } + + return out, nil +} + +// FetchBootnodeENRsFromURL fetches the bootnode ENRs from a given URL. +func FetchBootnodeENRsFromURL(ctx context.Context, url string) ([]string, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, err + } + + response, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer response.Body.Close() + + data, err := io.ReadAll(response.Body) + if err != nil { + return nil, err + } + + var enrs []string + err = yaml.Unmarshal(data, &enrs) + if err != nil { + return nil, err + } + + return enrs, nil +} + +// FetchDepositContractBlockFromURL fetches the deposit contract block from a given URL. +func FetchDepositContractBlockFromURL(ctx context.Context, url string) (uint64, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return 0, err + } + + response, err := http.DefaultClient.Do(req) + if err != nil { + return 0, err + } + defer response.Body.Close() + + data, err := io.ReadAll(response.Body) + if err != nil { + return 0, err + } + + var block uint64 + + err = yaml.Unmarshal(data, &block) + if err != nil { + return 0, err + } + + return block, nil +} + +// FetchGenesisDetailsFromURL fetches the genesis time and validators root from a given URL. +func FetchGenesisDetailsFromURL(ctx context.Context, url string) (uint64, [32]byte, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return 0, [32]byte{}, err + } + + response, err := http.DefaultClient.Do(req) + if err != nil { + return 0, [32]byte{}, err + } + defer response.Body.Close() + + // Read only the first 40 bytes (8 bytes for GenesisTime + 32 bytes for GenesisValidatorsRoot) + data := make([]byte, 40) + _, err = io.ReadFull(response.Body, data) + if err != nil { + return 0, [32]byte{}, err + } + + genesisTime := binary.LittleEndian.Uint64(data[:8]) + var genesisValidatorsRoot [32]byte + copy(genesisValidatorsRoot[:], data[8:]) + + return genesisTime, genesisValidatorsRoot, nil +} diff --git a/eth/genesis.go b/eth/genesis.go index 8df71c8..86220c3 100644 --- a/eth/genesis.go +++ b/eth/genesis.go @@ -44,21 +44,6 @@ type GenesisConfig struct { GenesisTime time.Time // Time at Genesis } -// GetConfigsByNetworkName returns the GenesisConfig, NetworkConfig, -// BeaconChainConfig and any error based on the input network name -func GetConfigsByNetworkName(net string) (*GenesisConfig, *params.NetworkConfig, *params.BeaconChainConfig, error) { - switch net { - case params.MainnetName: - return GenesisConfigs[net], params.BeaconNetworkConfig(), params.MainnetConfig(), nil - case params.SepoliaName: - return GenesisConfigs[net], params.BeaconNetworkConfig(), params.SepoliaConfig(), nil - case params.HoleskyName: - return GenesisConfigs[net], params.BeaconNetworkConfig(), params.HoleskyConfig(), nil - default: - return nil, nil, nil, fmt.Errorf("network %s not found", net) - } -} - var GenesisConfigs = map[string]*GenesisConfig{ params.MainnetName: { GenesisValidatorRoot: hexToBytes("4b363db94e286120d76eb905340fdd4e54bfe9f06bf33ff6cf5ad27f511bfe95"), diff --git a/eth/network_config.go b/eth/network_config.go new file mode 100644 index 0000000..154289a --- /dev/null +++ b/eth/network_config.go @@ -0,0 +1,120 @@ +package eth + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/prysmaticlabs/prysm/v5/config/params" +) + +type NetworkConfig struct { + Genesis *GenesisConfig + Network *params.NetworkConfig + Beacon *params.BeaconChainConfig +} + +func DeriveKnownNetworkConfig(ctx context.Context, network string) (*NetworkConfig, error) { + if network == params.DevnetName { + return nil, errors.New("network devnet not supported - use DeriveDevnetConfig instead") + } + + defaultBeaconNetworkConfig := params.BeaconNetworkConfig() + + switch network { + case params.MainnetName: + return &NetworkConfig{ + Genesis: GenesisConfigs[network], + Beacon: params.MainnetConfig(), + Network: defaultBeaconNetworkConfig, + }, nil + case params.SepoliaName: + return &NetworkConfig{ + Genesis: GenesisConfigs[network], + Beacon: params.SepoliaConfig(), + Network: defaultBeaconNetworkConfig, + }, nil + case params.HoleskyName: + return &NetworkConfig{ + Genesis: GenesisConfigs[network], + Beacon: params.HoleskyConfig(), + Network: defaultBeaconNetworkConfig, + }, nil + case params.DevnetName: + return nil, errors.New("network devnet not supported") + default: + return nil, fmt.Errorf("network %s not found", network) + } +} + +type DevnetOptions struct { + ConfigURL string + BootnodesURL string + DepositContractBlockURL string + GenesisSSZURL string +} + +func (o *DevnetOptions) Validate() error { + if o.ConfigURL == "" { + return errors.New("config URL is required") + } + + if o.BootnodesURL == "" { + return errors.New("bootnodes URL is required") + } + + if o.DepositContractBlockURL == "" { + return errors.New("deposit contract block URL is required") + } + + if o.GenesisSSZURL == "" { + return errors.New("genesis SSZ URL is required") + } + + return nil +} + +func DeriveDevnetConfig(ctx context.Context, options DevnetOptions) (*NetworkConfig, error) { + if err := options.Validate(); err != nil { + return nil, fmt.Errorf("invalid options: %w", err) + } + + // Fetch the beacon chain config from the provided URL + beaconConfig, err := FetchConfigFromURL(ctx, options.ConfigURL) + if err != nil { + return nil, fmt.Errorf("fetch beacon config: %w", err) + } + + // Fetch bootnode ENRs from the provided URL + bootnodeENRs, err := FetchBootnodeENRsFromURL(ctx, options.BootnodesURL) + if err != nil { + return nil, fmt.Errorf("fetch bootnode ENRs: %w", err) + } + + // Fetch deposit contract block from the provided URL + depositContractBlock, err := FetchDepositContractBlockFromURL(ctx, options.DepositContractBlockURL) + if err != nil { + return nil, fmt.Errorf("fetch deposit contract block: %w", err) + } + + // Fetch genesis details from the provided URL + genesisTime, genesisValidatorsRoot, err := FetchGenesisDetailsFromURL(ctx, options.GenesisSSZURL) + if err != nil { + return nil, fmt.Errorf("fetch genesis details: %w", err) + } + + network := params.BeaconNetworkConfig() + + network.BootstrapNodes = bootnodeENRs + network.ContractDeploymentBlock = depositContractBlock + + return &NetworkConfig{ + Genesis: &GenesisConfig{ + GenesisTime: time.Unix(int64(genesisTime), 0), + GenesisValidatorRoot: genesisValidatorsRoot[:], + }, + Network: network, + Beacon: beaconConfig, + }, nil +} diff --git a/eth/node.go b/eth/node.go index dfa45f0..12ebf2c 100644 --- a/eth/node.go +++ b/eth/node.go @@ -185,7 +185,7 @@ func NewNode(cfg *NodeConfig) (*Node, error) { } // initialize the custom Prysm client to communicate with its API - pryClient, err := NewPrysmClient(cfg.PrysmHost, cfg.PrysmPortHTTP, cfg.PrysmPortGRPC, cfg.DialTimeout) + pryClient, err := NewPrysmClient(cfg.PrysmHost, cfg.PrysmPortHTTP, cfg.PrysmPortGRPC, cfg.DialTimeout, cfg.GenesisConfig) if err != nil { return nil, fmt.Errorf("new prysm client") } diff --git a/eth/node_config.go b/eth/node_config.go index 185211f..f582de5 100644 --- a/eth/node_config.go +++ b/eth/node_config.go @@ -381,10 +381,10 @@ func desiredPubSubBaseTopics() []string { p2p.GossipAttesterSlashingMessage, p2p.GossipProposerSlashingMessage, p2p.GossipContributionAndProofMessage, - // p2p.GossipSyncCommitteeMessage, + p2p.GossipSyncCommitteeMessage, p2p.GossipBlsToExecutionChangeMessage, + p2p.GossipBlobSidecarMessage, p2p.GossipDataColumnSidecarMessage, - // p2p.GossipBlobSidecarMessage, } } @@ -451,7 +451,7 @@ func (n *NodeConfig) composeEthTopic(base string, encoder encoder.NetworkEncodin return fmt.Sprintf(base, n.ForkDigest) + encoder.ProtocolSuffix() } -func (n *NodeConfig) composeSubnettedEthTopic(base string, encoder encoder.NetworkEncoding, subnet uint64) string { +func (n *NodeConfig) composeEthTopicWithSubnet(base string, encoder encoder.NetworkEncoding, subnet uint64) string { return fmt.Sprintf(base, n.ForkDigest, subnet) + encoder.ProtocolSuffix() } @@ -467,8 +467,8 @@ func (n *NodeConfig) getDesiredFullTopics(encoder encoder.NetworkEncoding) []str } subnets, withSubnets := hasSubnets(topicBase) if withSubnets { - for subnet := uint64(1); subnet <= subnets; subnet++ { - fullTopics = append(fullTopics, n.composeSubnettedEthTopic(topicFormat, encoder, subnet)) + for subnet := uint64(0); subnet < subnets; subnet++ { + fullTopics = append(fullTopics, n.composeEthTopicWithSubnet(topicFormat, encoder, subnet)) } } else { fullTopics = append(fullTopics, n.composeEthTopic(topicFormat, encoder)) @@ -482,8 +482,9 @@ func (n *NodeConfig) getDefaultTopicScoreParams(encoder encoder.NetworkEncoding, desiredTopics := n.getDesiredFullTopics(encoder) topicScores := make(map[string]*pubsub.TopicScoreParams, len(desiredTopics)) for _, topic := range desiredTopics { - params := topicToScoreParamsMapper(topic, activeValidators) - topicScores[topic] = params + if params := topicToScoreParamsMapper(topic, activeValidators); params != nil { + topicScores[topic] = params + } } return topicScores } diff --git a/eth/prysm.go b/eth/prysm.go index 7d0ce96..6920494 100644 --- a/eth/prysm.go +++ b/eth/prysm.go @@ -38,9 +38,10 @@ type PrysmClient struct { tracer trace.Tracer beaconClient eth.BeaconChainClient beaconApiClient *apiCli.Client + genesis *GenesisConfig } -func NewPrysmClient(host string, portHTTP int, portGRPC int, timeout time.Duration) (*PrysmClient, error) { +func NewPrysmClient(host string, portHTTP int, portGRPC int, timeout time.Duration, genesis *GenesisConfig) (*PrysmClient, error) { tracer := otel.GetTracerProvider().Tracer("prysm_client") conn, err := grpc.Dial(fmt.Sprintf("%s:%d", host, portGRPC), @@ -63,6 +64,7 @@ func NewPrysmClient(host string, portHTTP int, portGRPC int, timeout time.Durati beaconApiClient: apiCli, timeout: timeout, tracer: tracer, + genesis: genesis, }, nil } @@ -320,26 +322,17 @@ func (p *PrysmClient) isOnNetwork(ctx context.Context, hermesForkDigest [4]byte) } span.End() }() + // this checks whether the local fork_digest at hermes matches the one that the remote node keeps // request the genesis - nodeCnf, err := p.beaconApiClient.GetConfigSpec(ctx) - if err != nil { - return false, fmt.Errorf("request prysm node config to compose forkdigest: %w", err) - } - cnf := nodeCnf.Data.(map[string]interface{}) - genesisConf, _, _, err := GetConfigsByNetworkName(cnf["CONFIG_NAME"].(string)) - if err != nil { - return false, fmt.Errorf("not identified network from trusted node (%s): %w", cnf["CONFIG_NAME"].(string), err) - } - nodeFork, err := p.beaconApiClient.GetFork(ctx, apiCli.StateOrBlockId("head")) if err != nil { return false, fmt.Errorf("request beacon fork to compose forkdigest: %w", err) } - forkDigest, err := signing.ComputeForkDigest(nodeFork.CurrentVersion, genesisConf.GenesisValidatorRoot) + forkDigest, err := signing.ComputeForkDigest(nodeFork.CurrentVersion, p.genesis.GenesisValidatorRoot) if err != nil { - return false, fmt.Errorf("create fork digest (%s, %x): %w", hex.EncodeToString(nodeFork.CurrentVersion), genesisConf.GenesisValidatorRoot, err) + return false, fmt.Errorf("create fork digest (%s, %x): %w", hex.EncodeToString(nodeFork.CurrentVersion), p.genesis.GenesisValidatorRoot, err) } // check if our version is within the versions of the node if forkDigest == hermesForkDigest { diff --git a/eth/prysm_test.go b/eth/prysm_test.go index 6f14862..7d0753c 100644 --- a/eth/prysm_test.go +++ b/eth/prysm_test.go @@ -86,7 +86,7 @@ func TestPrysmClient_AddTrustedPeer(t *testing.T) { port, err := strconv.Atoi(serverURL.Port()) require.NoError(t, err) - p, err := NewPrysmClient(serverURL.Hostname(), port, 0, time.Second) + p, err := NewPrysmClient(serverURL.Hostname(), port, 0, time.Second, nil) require.NoError(t, err) err = p.AddTrustedPeer(context.Background(), pid, maddr) @@ -150,7 +150,7 @@ func TestPrysmClient_RemoveTrustedPeer(t *testing.T) { port, err := strconv.Atoi(serverURL.Port()) require.NoError(t, err) - p, err := NewPrysmClient(serverURL.Hostname(), port, 0, time.Second) + p, err := NewPrysmClient(serverURL.Hostname(), port, 0, time.Second, nil) require.NoError(t, err) err = p.RemoveTrustedPeer(context.Background(), pid) diff --git a/eth/pubsub.go b/eth/pubsub.go index d65c1a7..265c963 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/ethereum/go-ethereum/common/hexutil" pubsub "github.com/libp2p/go-libp2p-pubsub" pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p/core/peer" @@ -21,6 +22,8 @@ import ( "github.com/probe-lab/hermes/tele" ) +const eventTypeHandleMessage = "HANDLE_MESSAGE" + type PubSubConfig struct { Topics []string ForkVersion ForkVersion @@ -113,6 +116,24 @@ func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler { return p.handleBeaconDataColumnSidecar case strings.Contains(topic, p2p.GossipBlockMessage): return p.handleBeaconBlock + case strings.Contains(topic, p2p.GossipAggregateAndProofMessage): + return p.handleAggregateAndProof + case strings.Contains(topic, p2p.GossipAttestationMessage): + return p.handleAttestation + case strings.Contains(topic, p2p.GossipExitMessage): + return p.handleExitMessage + case strings.Contains(topic, p2p.GossipAttesterSlashingMessage): + return p.handleAttesterSlashingMessage + case strings.Contains(topic, p2p.GossipProposerSlashingMessage): + return p.handleProposerSlashingMessage + case strings.Contains(topic, p2p.GossipContributionAndProofMessage): + return p.handleContributtionAndProofMessage + case strings.Contains(topic, p2p.GossipSyncCommitteeMessage): + return p.handleSyncCommitteeMessage + case strings.Contains(topic, p2p.GossipBlsToExecutionChangeMessage): + return p.handleBlsToExecutionChangeMessage + case strings.Contains(topic, p2p.GossipBlobSidecarMessage): + return p.handleBlobSidecar default: return p.host.TracedTopicHandler(host.NoopHandler) } @@ -159,7 +180,7 @@ func (p *PubSub) handleBeaconAttestation(ctx context.Context, msg *pubsub.Messag }, } - if err := p.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) } @@ -190,7 +211,7 @@ func (p *PubSub) handleBeaconDataColumnSidecar(ctx context.Context, msg *pubsub. }, } - if err := p.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) } @@ -212,7 +233,7 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot) evt := &host.TraceEvent{ - Type: "HANDLE_MESSAGE", + Type: eventTypeHandleMessage, PeerID: p.host.ID(), Timestamp: now, Payload: map[string]any{ @@ -225,17 +246,336 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err "Slot": slot, "Root": root, "TimeInSlot": now.Sub(slotStart).Seconds(), - "Timestamp": now, }, } - if err := p.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) + } + + return nil +} + +func (p *PubSub) handleAttestation(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + if msg == nil || msg.Topic == nil || *msg.Topic == "" { + return fmt.Errorf("nil message or topic") + } + + attestation := ethtypes.Attestation{} + err := p.cfg.Encoder.DecodeGossip(msg.Data, &attestation) + if err != nil { + return fmt.Errorf("decode attestation gossip message: %w", err) + } + + payload := map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "CommIdx": attestation.GetData().GetCommitteeIndex(), + "Slot": attestation.GetData().GetSlot(), + "BeaconBlockRoot": attestation.GetData().GetBeaconBlockRoot(), + "Source": attestation.GetData().GetSource(), + "Target": attestation.GetData().GetTarget(), + } + + // If the attestation only has one aggregation bit set, we can add an additional field to the payload + // that denotes _which_ aggregation bit is set. This is required to determine which validator created the attestation. + // In the pursuit of reducing the amount of data stored in the data stream we omit this field if the attestation is + // aggregated. + if attestation.GetAggregationBits().Count() == 1 { + payload["AggregatePos"] = attestation.AggregationBits.BitIndices()[0] + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: payload, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) } return nil } +func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + ap := ðtypes.SignedAggregateAttestationAndProof{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, ap); err != nil { + return fmt.Errorf("decode aggregate and proof message: %w", err) + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Sig": hexutil.Encode(ap.GetSignature()), + "AggIdx": ap.GetMessage().GetAggregatorIndex(), + "SelectionProof": hexutil.Encode(ap.GetMessage().GetSelectionProof()), + // There are other details in the SignedAggregateAttestationAndProof message, add them when needed. + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn( + "failed putting topic handler event", + "topic", msg.GetTopic(), + "err", tele.LogAttrError(err), + ) + } + + return nil +} + +func (p *PubSub) handleExitMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + ve := ðtypes.VoluntaryExit{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, ve); err != nil { + return fmt.Errorf("decode voluntary exit message: %w", err) + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Epoch": ve.GetEpoch(), + "ValIdx": ve.GetValidatorIndex(), + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting voluntary exit event", tele.LogAttrError(err)) + } + + return nil +} + +func (p *PubSub) handleAttesterSlashingMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + as := ðtypes.AttesterSlashing{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, as); err != nil { + return fmt.Errorf("decode attester slashing message: %w", err) + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Att1_indices": as.GetAttestation_1().GetAttestingIndices(), + "Att2_indices": as.GetAttestation_2().GetAttestingIndices(), + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting attester slashing event", tele.LogAttrError(err)) + } + + return nil +} + +func (p *PubSub) handleProposerSlashingMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + ps := ðtypes.ProposerSlashing{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, ps); err != nil { + return fmt.Errorf("decode proposer slashing message: %w", err) + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Header1_Slot": ps.GetHeader_1().GetHeader().GetSlot(), + "Header1_ProposerIndex": ps.GetHeader_1().GetHeader().GetProposerIndex(), + "Header1_StateRoot": hexutil.Encode(ps.GetHeader_1().GetHeader().GetStateRoot()), + "Header2_Slot": ps.GetHeader_2().GetHeader().GetSlot(), + "Header2_ProposerIndex": ps.GetHeader_2().GetHeader().GetProposerIndex(), + "Header2_StateRoot": hexutil.Encode(ps.GetHeader_2().GetHeader().GetStateRoot()), + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting proposer slashing event", tele.LogAttrError(err)) + } + + return nil +} + +func (p *PubSub) handleContributtionAndProofMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + cp := ðtypes.SignedContributionAndProof{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, cp); err != nil { + return fmt.Errorf("decode contribution and proof message: %w", err) + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Sig": hexutil.Encode(cp.GetSignature()), + "AggIdx": cp.GetMessage().GetAggregatorIndex(), + "Contrib_Slot": cp.GetMessage().GetContribution().GetSlot(), + "Contrib_SubCommitteeIdx": cp.GetMessage().GetContribution().GetSubcommitteeIndex(), + "Contrib_BlockRoot": cp.GetMessage().GetContribution().GetBlockRoot(), + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting contribution and proof event", tele.LogAttrError(err)) + } + + return nil +} + +func (p *PubSub) handleSyncCommitteeMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + sc := ðtypes.SyncCommitteeMessage{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, sc); err != nil { + return fmt.Errorf("decode sync committee message: %w", err) + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Slot": sc.GetSlot(), + "ValIdx": sc.GetValidatorIndex(), + "BlockRoot": hexutil.Encode(sc.GetBlockRoot()), + "Signature": hexutil.Encode(sc.GetSignature()), + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting sync committee event", tele.LogAttrError(err)) + } + + return nil +} + +func (p *PubSub) handleBlsToExecutionChangeMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + pb := ðtypes.BLSToExecutionChange{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, pb); err != nil { + return fmt.Errorf("decode bls to execution change message: %w", err) + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "ValIdx": pb.GetValidatorIndex(), + "FromBlsPubkey": hexutil.Encode(pb.GetFromBlsPubkey()), + "ToExecutionAddress": hexutil.Encode(pb.GetToExecutionAddress()), + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting bls to execution change event", tele.LogAttrError(err)) + } + + return nil +} + +func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + switch p.cfg.ForkVersion { + case DenebForkVersion: + var blob ethtypes.BlobSidecar + err := p.cfg.Encoder.DecodeGossip(msg.Data, &blob) + if err != nil { + slog.Error("decode blob sidecar gossip message", tele.LogAttrError(err)) + return err + } + + slot := blob.GetSignedBlockHeader().GetHeader().GetSlot() + slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot) + proposerIndex := blob.GetSignedBlockHeader().GetHeader().GetProposerIndex() + + evt := &host.TraceEvent{ + Type: "HANDLE_MESSAGE", + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Slot": slot, + "ValIdx": proposerIndex, + "index": blob.GetIndex(), + "TimeInSlot": now.Sub(slotStart).Seconds(), + "StateRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetStateRoot()), + "BodyRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetBodyRoot()), + "ParentRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetParentRoot()), + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) + } + default: + return fmt.Errorf("non recognized fork-version: %d", p.cfg.ForkVersion[:]) + } + + return nil +} + type GenericSignedBeaconBlock interface { GetBlock() GenericBeaconBlock } diff --git a/eth/reqresp.go b/eth/reqresp.go index 0505188..04b0519 100644 --- a/eth/reqresp.go +++ b/eth/reqresp.go @@ -278,7 +278,7 @@ func (r *ReqResp) wrapStreamHandler(ctx context.Context, name string, handler Co Payload: commonData, } - if err := r.cfg.DataStream.PutEvent(ctx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutRecord(ctx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } @@ -575,7 +575,7 @@ func (r *ReqResp) Status(ctx context.Context, pid peer.ID) (status *pb.Status, e } traceCtx := context.Background() - if err := r.cfg.DataStream.PutEvent(traceCtx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutRecord(traceCtx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } @@ -631,7 +631,7 @@ func (r *ReqResp) Ping(ctx context.Context, pid peer.ID) (err error) { }, } traceCtx := context.Background() - if err := r.cfg.DataStream.PutEvent(traceCtx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutRecord(traceCtx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } @@ -693,7 +693,7 @@ func (r *ReqResp) MetaData(ctx context.Context, pid peer.ID) (resp *pb.MetaDataV Payload: reqData, } traceCtx := context.Background() - if err := r.cfg.DataStream.PutEvent(traceCtx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutRecord(traceCtx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } diff --git a/go.mod b/go.mod index 3e46406..4fdd9e8 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.3 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 github.com/dennis-tra/go-kinesis v0.0.0-20240326083914-7acf5f8dc24e - github.com/ethereum/go-ethereum v1.13.14 + github.com/ethereum/go-ethereum v1.13.15 github.com/ferranbt/fastssz v0.1.3 github.com/google/uuid v1.6.0 github.com/hashicorp/golang-lru/v2 v2.0.7 @@ -34,6 +34,7 @@ require ( golang.org/x/time v0.5.0 google.golang.org/grpc v1.62.1 google.golang.org/protobuf v1.33.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -202,7 +203,6 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apimachinery v0.20.0 // indirect k8s.io/client-go v0.20.0 // indirect k8s.io/klog/v2 v2.80.0 // indirect diff --git a/go.sum b/go.sum index 1abfd14..4121e8f 100644 --- a/go.sum +++ b/go.sum @@ -249,8 +249,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/ethereum/c-kzg-4844 v1.0.1-0.20240422190800-13be436f5927 h1:ffWmm0RUR2+VqJsCkf94HqgEwZi2fgbm2iq+O/GdJNI= github.com/ethereum/c-kzg-4844 v1.0.1-0.20240422190800-13be436f5927/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= -github.com/ethereum/go-ethereum v1.13.14 h1:EwiY3FZP94derMCIam1iW4HFVrSgIcpsu0HwTQtm6CQ= -github.com/ethereum/go-ethereum v1.13.14/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU= +github.com/ethereum/go-ethereum v1.13.15 h1:U7sSGYGo4SPjP6iNIifNoyIAiNjrmQkz6EwQG+/EZWo= +github.com/ethereum/go-ethereum v1.13.15/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/ferranbt/fastssz v0.1.3 h1:ZI+z3JH05h4kgmFXdHuR1aWYsgrg7o+Fw7/NCzM16Mo= diff --git a/host/callback.go b/host/callback.go index 9e1cdd4..c2df4db 100644 --- a/host/callback.go +++ b/host/callback.go @@ -47,8 +47,8 @@ func (c *CallbackDataStream) Stop(ctx context.Context) error { return ctx.Err() } -// PutEvent sends an event to the callback if the stream has not been stopped. -func (c *CallbackDataStream) PutEvent(ctx context.Context, event *TraceEvent) error { +// PutRecord sends an event to the callback if the stream has not been stopped. +func (c *CallbackDataStream) PutRecord(ctx context.Context, event *TraceEvent) error { if c.stopped { return ctx.Err() } diff --git a/host/flush_tracer.go b/host/flush_tracer.go index 4a346b2..de87c2b 100644 --- a/host/flush_tracer.go +++ b/host/flush_tracer.go @@ -63,7 +63,7 @@ func (h *Host) FlushTraceWithTimestamp(evtType string, timestamp time.Time, payl ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) defer cancel() - if err := h.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := h.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("Failed to put trace event payload", tele.LogAttrError(err)) return } @@ -72,39 +72,39 @@ func (h *Host) FlushTraceWithTimestamp(evtType string, timestamp time.Time, payl } func (h *Host) AddPeer(p peer.ID, proto protocol.ID) { - h.FlushTrace("ADD_PEER", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_ADD_PEER.String(), map[string]any{ "PeerID": p, "Protocol": proto, }) } func (h *Host) RemovePeer(p peer.ID) { - h.FlushTrace("REMOVE_PEER", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_REMOVE_PEER.String(), map[string]any{ "PeerID": p, }) } func (h *Host) Join(topic string) { - h.FlushTrace("JOIN", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_JOIN.String(), map[string]any{ "Topic": topic, }) } func (h *Host) Leave(topic string) { - h.FlushTrace("LEAVE", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_LEAVE.String(), map[string]any{ "Topic": topic, }) } func (h *Host) Graft(p peer.ID, topic string) { - h.FlushTrace("GRAFT", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_GRAFT.String(), map[string]any{ "PeerID": p, "Topic": topic, }) } func (h *Host) Prune(p peer.ID, topic string) { - h.FlushTrace("PRUNE", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_PRUNE.String(), map[string]any{ "PeerID": p, "Topic": topic, }) @@ -122,7 +122,7 @@ func (h *Host) ValidateMessage(msg *pubsub.Message) { } func (h *Host) DeliverMessage(msg *pubsub.Message) { - h.FlushTrace("DELIVER_MESSAGE", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_DELIVER_MESSAGE.String(), map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -133,7 +133,7 @@ func (h *Host) DeliverMessage(msg *pubsub.Message) { } func (h *Host) RejectMessage(msg *pubsub.Message, reason string) { - h.FlushTrace("REJECT_MESSAGE", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_REJECT_MESSAGE.String(), map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -145,7 +145,7 @@ func (h *Host) RejectMessage(msg *pubsub.Message, reason string) { } func (h *Host) DuplicateMessage(msg *pubsub.Message) { - h.FlushTrace("DUPLICATE_MESSAGE", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_DUPLICATE_MESSAGE.String(), map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -186,18 +186,18 @@ func (h *Host) Trace(evt *pubsubpb.TraceEvent) { ts := time.Unix(0, evt.GetTimestamp()) switch evt.GetType() { case pubsubpb.TraceEvent_PUBLISH_MESSAGE: - h.FlushTraceWithTimestamp("PUBLISH_MESSAGE", ts, map[string]any{ + h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_PUBLISH_MESSAGE.String(), ts, map[string]any{ "MsgID": evt.GetPublishMessage().GetMessageID(), "Topic": evt.GetPublishMessage().GetTopic(), }) case pubsubpb.TraceEvent_RECV_RPC: payload := newRPCMeta(evt.GetRecvRPC().GetReceivedFrom(), evt.GetRecvRPC().GetMeta()) - h.FlushTraceWithTimestamp("RECV_RPC", ts, payload) + h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_RECV_RPC.String(), ts, payload) case pubsubpb.TraceEvent_SEND_RPC: payload := newRPCMeta(evt.GetSendRPC().GetSendTo(), evt.GetSendRPC().GetMeta()) - h.FlushTraceWithTimestamp("SEND_RPC", ts, payload) + h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_SEND_RPC.String(), ts, payload) case pubsubpb.TraceEvent_DROP_RPC: payload := newRPCMeta(evt.GetDropRPC().GetSendTo(), evt.GetDropRPC().GetMeta()) - h.FlushTraceWithTimestamp("DROP_RPC", ts, payload) + h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_DROP_RPC.String(), ts, payload) } } diff --git a/host/host.go b/host/host.go index 2a0d3e1..e09dc24 100644 --- a/host/host.go +++ b/host/host.go @@ -97,7 +97,7 @@ func (h *Host) Serve(ctx context.Context) error { }, } - if err := h.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := h.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("Failed to put event payload", tele.LogAttrError(err)) return } @@ -270,6 +270,7 @@ func (h *Host) PrivateListenMaddr() (ma.Multiaddr, error) { func (h *Host) TracedTopicHandler(handler TopicHandler) TopicHandler { return func(ctx context.Context, msg *pubsub.Message) error { + slog.Debug("Handling gossip message", "topic", msg.GetTopic()) evt := &TraceEvent{ Type: "HANDLE_MESSAGE", PeerID: h.ID(), @@ -283,7 +284,7 @@ func (h *Host) TracedTopicHandler(handler TopicHandler) TopicHandler { }, } - if err := h.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := h.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) } @@ -328,6 +329,6 @@ func (h *Host) UpdatePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot) { traceCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - h.cfg.DataStream.PutEvent(traceCtx, trace) + h.cfg.DataStream.PutRecord(traceCtx, trace) } } diff --git a/host/kinesis.go b/host/kinesis.go index a9ce570..dad62f6 100644 --- a/host/kinesis.go +++ b/host/kinesis.go @@ -36,9 +36,12 @@ func (k *KinesisDataStream) Start(ctx context.Context) error { dsCtx, dsCancel := context.WithCancel(ctx) k.ctx = dsCtx - k.cancelFn = dsCancel + if err := k.producer.Start(ctx); err != nil { + return err + } + <-dsCtx.Done() return dsCtx.Err() @@ -51,7 +54,7 @@ func (k *KinesisDataStream) Stop(ctx context.Context) error { if err := k.producer.WaitIdle(timeoutCtx); err != nil { slog.Info("Error waiting for producer to become idle", tele.LogAttrError(err)) } - + timeoutCncl() // stop the producer k.cancelFn() @@ -67,8 +70,8 @@ func (k *KinesisDataStream) Stop(ctx context.Context) error { return k.producer.WaitIdle(ctx) } -// PutEvent sends an event to the Kinesis data stream. -func (k *KinesisDataStream) PutEvent(ctx context.Context, event *TraceEvent) error { +// PutRecord sends an event to the Kinesis data stream. +func (k *KinesisDataStream) PutRecord(ctx context.Context, event *TraceEvent) error { if event != nil { kRecord := gk.Record(event) diff --git a/host/producer.go b/host/producer.go index 874c1eb..8e6f755 100644 --- a/host/producer.go +++ b/host/producer.go @@ -7,7 +7,7 @@ import ( type DataStream interface { Start(ctx context.Context) error Stop(ctx context.Context) error - PutEvent(ctx context.Context, event *TraceEvent) error + PutRecord(ctx context.Context, event *TraceEvent) error Type() DataStreamType } From 0b7f7b2c33acada722045021301411c0ebcf5aa3 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 25 Jul 2024 12:29:09 +1000 Subject: [PATCH 08/12] Tidying --- eth/node_config.go | 1 - eth/pubsub.go | 33 --------------------------------- 2 files changed, 34 deletions(-) diff --git a/eth/node_config.go b/eth/node_config.go index 4f23900..1a8fcc9 100644 --- a/eth/node_config.go +++ b/eth/node_config.go @@ -381,7 +381,6 @@ func desiredPubSubBaseTopics() []string { // we unfortunatelly can't validate the messages (yet) // thus, better not to forward invalid messages // p2p.GossipExitMessage, - p2p.GossipExitMessage, p2p.GossipAttesterSlashingMessage, p2p.GossipProposerSlashingMessage, p2p.GossipContributionAndProofMessage, diff --git a/eth/pubsub.go b/eth/pubsub.go index 265c963..0760b7f 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -110,8 +110,6 @@ func (p *PubSub) Serve(ctx context.Context) error { func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler { switch { // Ensure hotter topics are at the top of the switch statement. - case strings.Contains(topic, p2p.GossipAttestationMessage): - return p.handleBeaconAttestation case strings.Contains(topic, p2p.GossipDataColumnSidecarMessage): return p.handleBeaconDataColumnSidecar case strings.Contains(topic, p2p.GossipBlockMessage): @@ -157,37 +155,6 @@ func (n *Node) FilterIncomingSubscriptions(id peer.ID, subs []*pubsubpb.RPC_SubO return pubsub.FilterSubscriptions(subs, n.CanSubscribe), nil } -func (p *PubSub) handleBeaconAttestation(ctx context.Context, msg *pubsub.Message) error { - now := time.Now() - - attestation := ðtypes.Attestation{} - if err := p.cfg.Encoder.DecodeGossip(msg.Data, attestation); err != nil { - return fmt.Errorf("error decoding phase0 attestation gossip message: %w", err) - } - - evt := &host.TraceEvent{ - Type: "HANDLE_MESSAGE", - PeerID: p.host.ID(), - Timestamp: now, - Payload: map[string]any{ - "PeerID": msg.ReceivedFrom.String(), - "MsgID": hex.EncodeToString([]byte(msg.ID)), - "MsgSize": len(msg.Data), - "Topic": msg.GetTopic(), - "Seq": msg.GetSeqno(), - "Attestation": attestation, - "Timestamp": now, - }, - } - - if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { - slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) - } - - return nil - -} - func (p *PubSub) handleBeaconDataColumnSidecar(ctx context.Context, msg *pubsub.Message) error { now := time.Now() From 113a74f8b7210af3c3ae0e90acff1cecbf807112 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 25 Jul 2024 12:31:04 +1000 Subject: [PATCH 09/12] Tidying --- eth/pubsub.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/eth/pubsub.go b/eth/pubsub.go index 0760b7f..cec686f 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -110,14 +110,16 @@ func (p *PubSub) Serve(ctx context.Context) error { func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler { switch { // Ensure hotter topics are at the top of the switch statement. + case strings.Contains(topic, p2p.GossipAttestationMessage): + return p.handleAttestation case strings.Contains(topic, p2p.GossipDataColumnSidecarMessage): return p.handleBeaconDataColumnSidecar - case strings.Contains(topic, p2p.GossipBlockMessage): - return p.handleBeaconBlock case strings.Contains(topic, p2p.GossipAggregateAndProofMessage): return p.handleAggregateAndProof - case strings.Contains(topic, p2p.GossipAttestationMessage): - return p.handleAttestation + case strings.Contains(topic, p2p.GossipBlockMessage): + return p.handleBeaconBlock + case strings.Contains(topic, p2p.GossipBlobSidecarMessage): + return p.handleBlobSidecar case strings.Contains(topic, p2p.GossipExitMessage): return p.handleExitMessage case strings.Contains(topic, p2p.GossipAttesterSlashingMessage): @@ -130,8 +132,6 @@ func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler { return p.handleSyncCommitteeMessage case strings.Contains(topic, p2p.GossipBlsToExecutionChangeMessage): return p.handleBlsToExecutionChangeMessage - case strings.Contains(topic, p2p.GossipBlobSidecarMessage): - return p.handleBlobSidecar default: return p.host.TracedTopicHandler(host.NoopHandler) } From 8f99d238af41360fe3dc1ac010dbb46812e7a728 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 9 Jun 2025 10:31:11 +0200 Subject: [PATCH 10/12] yeet --- eth/pubsub.go | 1 + go.mod | 18 +++++++++--------- go.sum | 40 ++++++++++++++++++++-------------------- 3 files changed, 30 insertions(+), 29 deletions(-) diff --git a/eth/pubsub.go b/eth/pubsub.go index 6a0e2d7..753ed6e 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -2,6 +2,7 @@ package eth import ( "context" + "encoding/hex" "fmt" "log/slog" "strings" diff --git a/go.mod b/go.mod index 1beab8a..5941251 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.24.0 toolchain go1.24.2 require ( - github.com/OffchainLabs/prysm/v6 v6.0.1 + github.com/OffchainLabs/prysm/v6 v6.0.4 github.com/aws/aws-sdk-go-v2 v1.32.7 github.com/aws/aws-sdk-go-v2/config v1.28.7 github.com/aws/aws-sdk-go-v2/credentials v1.17.48 @@ -30,16 +30,16 @@ require ( github.com/stretchr/testify v1.10.0 github.com/thejerf/suture/v4 v4.0.6 github.com/urfave/cli/v2 v2.27.5 - go.opentelemetry.io/otel v1.34.0 + go.opentelemetry.io/otel v1.35.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0 go.opentelemetry.io/otel/exporters/prometheus v0.55.0 - go.opentelemetry.io/otel/metric v1.34.0 + go.opentelemetry.io/otel/metric v1.35.0 go.opentelemetry.io/otel/sdk v1.34.0 - go.opentelemetry.io/otel/sdk/metric v1.33.0 - go.opentelemetry.io/otel/trace v1.34.0 + go.opentelemetry.io/otel/sdk/metric v1.34.0 + go.opentelemetry.io/otel/trace v1.35.0 golang.org/x/time v0.9.0 - google.golang.org/grpc v1.69.4 - google.golang.org/protobuf v1.36.3 + google.golang.org/grpc v1.71.0 + google.golang.org/protobuf v1.36.5 gopkg.in/yaml.v3 v3.0.1 ) @@ -78,7 +78,7 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/deckarep/golang-set/v2 v2.7.0 // indirect - github.com/dgraph-io/ristretto v0.2.0 // indirect + github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/elastic/gosigar v0.14.3 // indirect @@ -100,7 +100,7 @@ require ( github.com/golang-jwt/jwt/v4 v4.5.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e // indirect - github.com/google/go-cmp v0.6.0 // indirect + github.com/google/go-cmp v0.7.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect diff --git a/go.sum b/go.sum index 4bd8f11..df6794b 100644 --- a/go.sum +++ b/go.sum @@ -15,8 +15,8 @@ github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwS github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= -github.com/OffchainLabs/prysm/v6 v6.0.1 h1:NwFsJTyPVQqhmEz4//GqAs+M6AjcWYqH9x+HvDGxgwk= -github.com/OffchainLabs/prysm/v6 v6.0.1/go.mod h1:7QuGy5ol0x0hx1Qmz0XcXJ3CnjWWKHpMiz++bUR0IIg= +github.com/OffchainLabs/prysm/v6 v6.0.4 h1:aqWCb2U3LfeahzjORvxXYsL1ebKWT1AUu3Ya3y7LApE= +github.com/OffchainLabs/prysm/v6 v6.0.4/go.mod h1:lMkHT3gWiCOqo4rbuhLTU4FoQ/THni9v6z4w9P6FRyU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= @@ -168,11 +168,11 @@ github.com/deepmap/oapi-codegen v1.8.2 h1:SegyeYGcdi0jLLrpbCMoJxnUUn8GBXHsvr4rbz github.com/deepmap/oapi-codegen v1.8.2/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw= github.com/dennis-tra/go-kinesis v0.0.0-20240326083914-7acf5f8dc24e h1:y6QyYh8YZyRQDdfnQqUgC5tRBmEwUFAjavnybKboCm4= github.com/dennis-tra/go-kinesis v0.0.0-20240326083914-7acf5f8dc24e/go.mod h1:5Hm3EOeNP1/lYm9qcFwWgYgjixQilwcZA+hZ05bUz54= -github.com/dgraph-io/ristretto v0.2.0 h1:XAfl+7cmoUDWW/2Lx8TGZQjjxIQ2Ley9DSf52dru4WE= -github.com/dgraph-io/ristretto v0.2.0/go.mod h1:8uBHCU/PBV4Ag0CJrP47b9Ofby5dqWNh4FicAdoqFNU= +github.com/dgraph-io/ristretto/v2 v2.2.0 h1:bkY3XzJcXoMuELV8F+vS8kzNgicwQFAaGINAEJdWGOM= +github.com/dgraph-io/ristretto/v2 v2.2.0/go.mod h1:RZrm63UmcBAaYWC1DotLYBmTvgkrs0+XhBd7Npn7/zI= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= -github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= -github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38= +github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= @@ -298,8 +298,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -875,8 +875,8 @@ go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJyS go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 h1:CV7UdSGJt/Ao6Gp4CXckLxVRRsRgDHoI8XjbL3PDl8s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0/go.mod h1:FRmFuRJfag1IZ2dPkHnEoSFVgTVPUd2qf5Vi69hLb8I= -go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= -go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= +go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 h1:OeNbIYk/2C15ckl7glBlOBp5+WlYsOElzTNmiPW/x60= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0/go.mod h1:7Bept48yIeqxP2OZ9/AqIpYS94h2or0aB4FypJTc8ZM= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0 h1:5pojmb1U1AogINhN3SurB+zm/nIcusopeBNp42f45QM= @@ -885,14 +885,14 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 h1:BEj3S go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0/go.mod h1:9cKLGBDzI/F3NoHLQGm4ZrYdIHsvGt6ej6hUowxY0J4= go.opentelemetry.io/otel/exporters/prometheus v0.55.0 h1:sSPw658Lk2NWAv74lkD3B/RSDb+xRFx46GjkrL3VUZo= go.opentelemetry.io/otel/exporters/prometheus v0.55.0/go.mod h1:nC00vyCmQixoeaxF6KNyP42II/RHa9UdruK02qBmHvI= -go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= -go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= +go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= -go.opentelemetry.io/otel/sdk/metric v1.33.0 h1:Gs5VK9/WUJhNXZgn8MR6ITatvAmKeIuCtNbsP3JkNqU= -go.opentelemetry.io/otel/sdk/metric v1.33.0/go.mod h1:dL5ykHZmm1B1nVRk9dDjChwDmt81MjVp3gLkQRwKf/Q= -go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= -go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= +go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= +go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= +go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= +go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -1149,16 +1149,16 @@ google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= -google.golang.org/grpc v1.69.4 h1:MF5TftSMkd8GLw/m0KM6V8CMOCY6NZ1NQDPGFgbTt4A= -google.golang.org/grpc v1.69.4/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= +google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= +google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU= -google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From 698d84939d36d6dc8ede241949e98ad41623071f Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 11 Jun 2025 11:26:45 +0200 Subject: [PATCH 11/12] yeet --- FUNDING.json | 7 ------- funding.json | 5 ----- 2 files changed, 12 deletions(-) delete mode 100644 FUNDING.json delete mode 100644 funding.json diff --git a/FUNDING.json b/FUNDING.json deleted file mode 100644 index c6cbfb1..0000000 --- a/FUNDING.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "drips": { - "filecoin": { - "ownedBy": "0xFb90943018928cBF18e6355629A250928Ca6Be02" - } - } -} diff --git a/funding.json b/funding.json deleted file mode 100644 index 5d0fede..0000000 --- a/funding.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "opRetro": { - "projectId": "0x7504e494cb8d227193182e083128912173c14eaeecec9b90fa453de28377b269" - } -} From f67a746c4227c15e18eb8a613d540271c205f44f Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 11 Jun 2025 11:58:01 +0200 Subject: [PATCH 12/12] feat(eth): query prysm for fork version on devnets to handle dynamic forks --- cmd/hermes/cmd_eth.go | 59 +++++++++++++++++++++++++++++++++++-------- eth/prysm.go | 24 ++++++++++++++++++ 2 files changed, 73 insertions(+), 10 deletions(-) diff --git a/cmd/hermes/cmd_eth.go b/cmd/hermes/cmd_eth.go index 4f84329..31b308d 100644 --- a/cmd/hermes/cmd_eth.go +++ b/cmd/hermes/cmd_eth.go @@ -1,6 +1,7 @@ package main import ( + "context" "encoding/hex" "encoding/json" "fmt" @@ -423,18 +424,56 @@ func cmdEthAction(c *cli.Context) error { genesisRoot := config.Genesis.GenesisValidatorRoot genesisTime := config.Genesis.GenesisTime - // compute fork version and fork digest - currentSlot := slots.Since(genesisTime) - currentEpoch := slots.ToEpoch(currentSlot) + // For devnets, we need to query Prysm for the actual fork version + // instead of calculating it from the config + var currentForkVersion [4]byte + var forkDigest [4]byte + + if ethConfig.Chain == params.DevnetName { + // Create a temporary Prysm client to query the fork version + tempPryClient, err := eth.NewPrysmClientWithTLS(ethConfig.PrysmHost, ethConfig.PrysmPortHTTP, ethConfig.PrysmPortGRPC, ethConfig.PrysmUseTLS, ethConfig.DialTimeout, config.Genesis) + if err != nil { + return fmt.Errorf("create temporary prysm client: %w", err) + } + + ctx, cancel := context.WithTimeout(c.Context, 10*time.Second) + defer cancel() + + nodeFork, err := tempPryClient.GetFork(ctx) + if err != nil { + return fmt.Errorf("get fork from prysm: %w", err) + } + + copy(currentForkVersion[:], nodeFork.CurrentVersion) + + forkDigest, err = signing.ComputeForkDigest(currentForkVersion[:], genesisRoot) + if err != nil { + return fmt.Errorf("create fork digest: %w", err) + } + + slog.Info("Using fork version from Prysm for devnet", + "fork_version", hex.EncodeToString(currentForkVersion[:]), + "fork_digest", hex.EncodeToString(forkDigest[:])) + } else { + // For known networks, calculate from config + currentSlot := slots.Since(genesisTime) + currentEpoch := slots.ToEpoch(currentSlot) - currentForkVersion, err := eth.GetCurrentForkVersion(currentEpoch, config.Beacon) - if err != nil { - return fmt.Errorf("compute fork version for epoch %d: %w", currentEpoch, err) - } + var err error + currentForkVersion, err = eth.GetCurrentForkVersion(currentEpoch, config.Beacon) + if err != nil { + return fmt.Errorf("compute fork version for epoch %d: %w", currentEpoch, err) + } - forkDigest, err := signing.ComputeForkDigest(currentForkVersion[:], genesisRoot) - if err != nil { - return fmt.Errorf("create fork digest (%s, %x): %w", genesisTime, genesisRoot, err) + slog.Debug("Computing fork digest", + "current_epoch", currentEpoch, + "fork_version", hex.EncodeToString(currentForkVersion[:]), + "genesis_root", hex.EncodeToString(genesisRoot)) + + forkDigest, err = signing.ComputeForkDigest(currentForkVersion[:], genesisRoot) + if err != nil { + return fmt.Errorf("create fork digest (%s, %x): %w", genesisTime, genesisRoot, err) + } } // Overriding configuration so that functions like ComputForkDigest take the diff --git a/eth/prysm.go b/eth/prysm.go index 82c50c5..9239460 100644 --- a/eth/prysm.go +++ b/eth/prysm.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "log/slog" "net/http" "net/url" "strconv" @@ -297,6 +298,22 @@ func (p *PrysmClient) ChainHead(ctx context.Context) (chainHead *eth.ChainHead, return p.beaconClient.GetChainHead(ctx, &emptypb.Empty{}) //lint:ignore SA1019 I don't see an alternative } +func (p *PrysmClient) GetFork(ctx context.Context) (fork *eth.Fork, err error) { + ctx, span := p.tracer.Start(ctx, "prysm_client.get_fork") + defer func() { + if err != nil { + span.SetStatus(codes.Error, err.Error()) + span.RecordError(err) + } + span.End() + }() + + ctx, cancel := context.WithTimeout(ctx, p.timeout) + defer cancel() + + return p.beaconApiClient.GetFork(ctx, apiCli.StateOrBlockId("head")) +} + func (p *PrysmClient) Identity(ctx context.Context) (addrInfo *peer.AddrInfo, err error) { ctx, span := p.tracer.Start(ctx, "prysm_client.identity") defer func() { @@ -383,6 +400,13 @@ func (p *PrysmClient) isOnNetwork(ctx context.Context, hermesForkDigest [4]byte) if forkDigest == hermesForkDigest { return true, nil } + + // Log the mismatch for debugging + slog.Debug("Fork digest mismatch", + "hermes_fork_digest", hex.EncodeToString(hermesForkDigest[:]), + "prysm_fork_digest", hex.EncodeToString(forkDigest[:]), + "prysm_fork_version", hex.EncodeToString(nodeFork.CurrentVersion)) + return false, nil }