Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions FUNDING.json

This file was deleted.

59 changes: 49 additions & 10 deletions cmd/hermes/cmd_eth.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -423,18 +424,56 @@ func cmdEthAction(c *cli.Context) error {
genesisRoot := config.Genesis.GenesisValidatorRoot
genesisTime := config.Genesis.GenesisTime

// compute fork version and fork digest
currentSlot := slots.Since(genesisTime)
currentEpoch := slots.ToEpoch(currentSlot)
// For devnets, we need to query Prysm for the actual fork version
// instead of calculating it from the config
var currentForkVersion [4]byte
var forkDigest [4]byte

if ethConfig.Chain == params.DevnetName {
// Create a temporary Prysm client to query the fork version
tempPryClient, err := eth.NewPrysmClientWithTLS(ethConfig.PrysmHost, ethConfig.PrysmPortHTTP, ethConfig.PrysmPortGRPC, ethConfig.PrysmUseTLS, ethConfig.DialTimeout, config.Genesis)
if err != nil {
return fmt.Errorf("create temporary prysm client: %w", err)
}

ctx, cancel := context.WithTimeout(c.Context, 10*time.Second)
defer cancel()

nodeFork, err := tempPryClient.GetFork(ctx)
if err != nil {
return fmt.Errorf("get fork from prysm: %w", err)
}

copy(currentForkVersion[:], nodeFork.CurrentVersion)

forkDigest, err = signing.ComputeForkDigest(currentForkVersion[:], genesisRoot)
if err != nil {
return fmt.Errorf("create fork digest: %w", err)
}

slog.Info("Using fork version from Prysm for devnet",
"fork_version", hex.EncodeToString(currentForkVersion[:]),
"fork_digest", hex.EncodeToString(forkDigest[:]))
} else {
// For known networks, calculate from config
currentSlot := slots.Since(genesisTime)
currentEpoch := slots.ToEpoch(currentSlot)

currentForkVersion, err := eth.GetCurrentForkVersion(currentEpoch, config.Beacon)
if err != nil {
return fmt.Errorf("compute fork version for epoch %d: %w", currentEpoch, err)
}
var err error
currentForkVersion, err = eth.GetCurrentForkVersion(currentEpoch, config.Beacon)
if err != nil {
return fmt.Errorf("compute fork version for epoch %d: %w", currentEpoch, err)
}

forkDigest, err := signing.ComputeForkDigest(currentForkVersion[:], genesisRoot)
if err != nil {
return fmt.Errorf("create fork digest (%s, %x): %w", genesisTime, genesisRoot, err)
slog.Debug("Computing fork digest",
"current_epoch", currentEpoch,
"fork_version", hex.EncodeToString(currentForkVersion[:]),
"genesis_root", hex.EncodeToString(genesisRoot))

forkDigest, err = signing.ComputeForkDigest(currentForkVersion[:], genesisRoot)
if err != nil {
return fmt.Errorf("create fork digest (%s, %x): %w", genesisTime, genesisRoot, err)
}
}

// Overriding configuration so that functions like ComputForkDigest take the
Expand Down
4 changes: 4 additions & 0 deletions eth/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ func desiredPubSubBaseTopics() []string {
p2p.GossipSyncCommitteeMessage,
p2p.GossipBlsToExecutionChangeMessage,
p2p.GossipBlobSidecarMessage,
p2p.GossipDataColumnSidecarMessage,
}
}

Expand Down Expand Up @@ -475,6 +476,9 @@ func topicFormatFromBase(topicBase string) (string, error) {
case p2p.GossipBlobSidecarMessage:
return p2p.BlobSubnetTopicFormat, nil

case p2p.GossipDataColumnSidecarMessage:
return p2p.DataColumnSubnetTopicFormat, nil

default:
return "", fmt.Errorf("unrecognized gossip topic base: %s", topicBase)
}
Expand Down
24 changes: 24 additions & 0 deletions eth/prysm.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"strconv"
Expand Down Expand Up @@ -297,6 +298,22 @@ func (p *PrysmClient) ChainHead(ctx context.Context) (chainHead *eth.ChainHead,
return p.beaconClient.GetChainHead(ctx, &emptypb.Empty{}) //lint:ignore SA1019 I don't see an alternative
}

func (p *PrysmClient) GetFork(ctx context.Context) (fork *eth.Fork, err error) {
ctx, span := p.tracer.Start(ctx, "prysm_client.get_fork")
defer func() {
if err != nil {
span.SetStatus(codes.Error, err.Error())
span.RecordError(err)
}
span.End()
}()

ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()

return p.beaconApiClient.GetFork(ctx, apiCli.StateOrBlockId("head"))
}

func (p *PrysmClient) Identity(ctx context.Context) (addrInfo *peer.AddrInfo, err error) {
ctx, span := p.tracer.Start(ctx, "prysm_client.identity")
defer func() {
Expand Down Expand Up @@ -383,6 +400,13 @@ func (p *PrysmClient) isOnNetwork(ctx context.Context, hermesForkDigest [4]byte)
if forkDigest == hermesForkDigest {
return true, nil
}

// Log the mismatch for debugging
slog.Debug("Fork digest mismatch",
"hermes_fork_digest", hex.EncodeToString(hermesForkDigest[:]),
"prysm_fork_digest", hex.EncodeToString(forkDigest[:]),
"prysm_fork_version", hex.EncodeToString(nodeFork.CurrentVersion))

return false, nil
}

Expand Down
48 changes: 42 additions & 6 deletions eth/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eth

import (
"context"
"encoding/hex"
"fmt"
"log/slog"
"strings"
Expand Down Expand Up @@ -117,12 +118,17 @@ func (p *PubSub) Serve(ctx context.Context) error {

func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler {
switch {
case strings.Contains(topic, p2p.GossipBlockMessage):
return p.handleBeaconBlock
case strings.Contains(topic, p2p.GossipAggregateAndProofMessage):
return p.handleAggregateAndProof
// Ensure hotter topics are at the top of the switch statement.
case strings.Contains(topic, p2p.GossipAttestationMessage):
return p.handleAttestation
case strings.Contains(topic, p2p.GossipDataColumnSidecarMessage):
return p.handleBeaconDataColumnSidecar
case strings.Contains(topic, p2p.GossipAggregateAndProofMessage):
return p.handleAggregateAndProof
case strings.Contains(topic, p2p.GossipBlockMessage):
return p.handleBeaconBlock
case strings.Contains(topic, p2p.GossipBlobSidecarMessage):
return p.handleBlobSidecar
case strings.Contains(topic, p2p.GossipExitMessage):
return p.handleExitMessage
case strings.Contains(topic, p2p.GossipAttesterSlashingMessage):
Expand All @@ -135,8 +141,6 @@ func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler {
return p.handleSyncCommitteeMessage
case strings.Contains(topic, p2p.GossipBlsToExecutionChangeMessage):
return p.handleBlsToExecutionChangeMessage
case strings.Contains(topic, p2p.GossipBlobSidecarMessage):
return p.handleBlobSidecar
default:
return p.host.TracedTopicHandler(host.NoopHandler)
}
Expand All @@ -156,9 +160,41 @@ func (n *Node) FilterIncomingSubscriptions(id peer.ID, subs []*pubsubpb.RPC_SubO
if len(subs) > n.cfg.PubSubSubscriptionRequestLimit {
return nil, pubsub.ErrTooManySubscriptions
}

return pubsub.FilterSubscriptions(subs, n.CanSubscribe), nil
}

func (p *PubSub) handleBeaconDataColumnSidecar(ctx context.Context, msg *pubsub.Message) error {
now := time.Now()

sidecar := &ethtypes.DataColumnSidecar{}
if err := p.cfg.Encoder.DecodeGossip(msg.Data, sidecar); err != nil {
return fmt.Errorf("error decoding electra data column sidecar gossip message: %w", err)
}

evt := &host.TraceEvent{
Type: "HANDLE_MESSAGE",
PeerID: p.host.ID(),
Timestamp: now,
Payload: map[string]any{
"PeerID": msg.ReceivedFrom.String(),
"MsgID": hex.EncodeToString([]byte(msg.ID)),
"MsgSize": len(msg.Data),
"Topic": msg.GetTopic(),
"Seq": msg.GetSeqno(),
"Sidecar": sidecar,
"Timestamp": now,
},
}

if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil {
slog.Warn("failed putting topic handler event", tele.LogAttrError(err))
}

return nil

}

func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) error {
var (
err error
Expand Down
56 changes: 54 additions & 2 deletions eth/topic_score_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
proposerSlashingWeight = 0.05
voluntaryExitWeight = 0.05
blsToExecutionChangeWeight = 0.05
attestationWeight = 0.05
dataColumnSidecarWeight = 0.05

// mesh-related params
maxInMeshScore = 10
Expand All @@ -43,10 +45,13 @@ func topicToScoreParamsMapper(topic string, activeValidators uint64) *pubsub.Top
case strings.Contains(topic, p2p.GossipBlockMessage):
return defaultBlockTopicParams()

case strings.Contains(topic, p2p.GossipAttestationMessage):
return defaultAttestationTopicParams()

case strings.Contains(topic, p2p.GossipAggregateAndProofMessage):
return defaultAggregateTopicParams(activeValidators)

case strings.Contains(topic, p2p.GossipAttestationMessage):
case strings.Contains(topic, p2p.GossipAggregateAndProofMessage):
return defaultAggregateSubnetTopicParams(activeValidators)

case strings.Contains(topic, p2p.GossipExitMessage):
Expand All @@ -70,6 +75,9 @@ func topicToScoreParamsMapper(topic string, activeValidators uint64) *pubsub.Top
case strings.Contains(topic, p2p.GossipBlobSidecarMessage):
return defaultBlockTopicParams()

case strings.Contains(topic, p2p.GossipDataColumnSidecarMessage):
return defaultDataColumnSidecarTopicParams()

default:
slog.Warn("unrecognized gossip-topic to apply peerscores", slog.Attr{Key: "topic", Value: slog.StringValue(topic)})
// return empty parameters
Expand Down Expand Up @@ -188,7 +196,7 @@ func defaultAggregateSubnetTopicParams(activeValidators uint64) *pubsub.TopicSco
return nil
}
// Determine the amount of validators expected in a subnet in a single slot.
numPerSlot := time.Duration(subnetWeight / uint64(globalBeaconConfig.SlotsPerEpoch))
numPerSlot := subnetWeight / uint64(globalBeaconConfig.SlotsPerEpoch)
if numPerSlot == 0 {
slog.Warn("numPerSlot is 0, skipping initializing topic scoring")
return nil
Expand Down Expand Up @@ -388,6 +396,50 @@ func defaultBlsToExecutionChangeTopicParams() *pubsub.TopicScoreParams {
}
}

func defaultAttestationTopicParams() *pubsub.TopicScoreParams {
return &pubsub.TopicScoreParams{
TopicWeight: attestationWeight,
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: 2,
FirstMessageDeliveriesDecay: scoreDecay(oneHundredEpochs),
FirstMessageDeliveriesCap: 5,
MeshMessageDeliveriesWeight: 0,
MeshMessageDeliveriesDecay: 0,
MeshMessageDeliveriesCap: 0,
MeshMessageDeliveriesThreshold: 0,
MeshMessageDeliveriesWindow: 0,
MeshMessageDeliveriesActivation: 0,
MeshFailurePenaltyWeight: 0,
MeshFailurePenaltyDecay: 0,
InvalidMessageDeliveriesWeight: -2000,
InvalidMessageDeliveriesDecay: scoreDecay(invalidDecayPeriod),
}
}

func defaultDataColumnSidecarTopicParams() *pubsub.TopicScoreParams {
return &pubsub.TopicScoreParams{
TopicWeight: dataColumnSidecarWeight,
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: 2,
FirstMessageDeliveriesDecay: scoreDecay(oneHundredEpochs),
FirstMessageDeliveriesCap: 5,
MeshMessageDeliveriesWeight: 0,
MeshMessageDeliveriesDecay: 0,
MeshMessageDeliveriesCap: 0,
MeshMessageDeliveriesThreshold: 0,
MeshMessageDeliveriesWindow: 0,
MeshMessageDeliveriesActivation: 0,
MeshFailurePenaltyWeight: 0,
MeshFailurePenaltyDecay: 0,
InvalidMessageDeliveriesWeight: -2000,
InvalidMessageDeliveriesDecay: scoreDecay(invalidDecayPeriod),
}
}

// utility functions

func oneSlotDuration() time.Duration {
Expand Down
5 changes: 0 additions & 5 deletions funding.json

This file was deleted.

18 changes: 9 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.24.0
toolchain go1.24.2

require (
github.com/OffchainLabs/prysm/v6 v6.0.1
github.com/OffchainLabs/prysm/v6 v6.0.4
github.com/aws/aws-sdk-go-v2 v1.32.7
github.com/aws/aws-sdk-go-v2/config v1.28.7
github.com/aws/aws-sdk-go-v2/credentials v1.17.48
Expand All @@ -30,16 +30,16 @@ require (
github.com/stretchr/testify v1.10.0
github.com/thejerf/suture/v4 v4.0.6
github.com/urfave/cli/v2 v2.27.5
go.opentelemetry.io/otel v1.34.0
go.opentelemetry.io/otel v1.35.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0
go.opentelemetry.io/otel/exporters/prometheus v0.55.0
go.opentelemetry.io/otel/metric v1.34.0
go.opentelemetry.io/otel/metric v1.35.0
go.opentelemetry.io/otel/sdk v1.34.0
go.opentelemetry.io/otel/sdk/metric v1.33.0
go.opentelemetry.io/otel/trace v1.34.0
go.opentelemetry.io/otel/sdk/metric v1.34.0
go.opentelemetry.io/otel/trace v1.35.0
golang.org/x/time v0.9.0
google.golang.org/grpc v1.69.4
google.golang.org/protobuf v1.36.3
google.golang.org/grpc v1.71.0
google.golang.org/protobuf v1.36.5
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -78,7 +78,7 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/deckarep/golang-set/v2 v2.7.0 // indirect
github.com/dgraph-io/ristretto v0.2.0 // indirect
github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/elastic/gosigar v0.14.3 // indirect
Expand All @@ -100,7 +100,7 @@ require (
github.com/golang-jwt/jwt/v4 v4.5.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
Expand Down
Loading
Loading