From 09f75ea63106212cb0087267a75c68df75b622c3 Mon Sep 17 00:00:00 2001 From: Matty Evans Date: Mon, 25 Aug 2025 13:35:31 +1000 Subject: [PATCH] feat: add Fulu fork support with BPO-aware fork digest calculation - Add support for Fulu network fork with custom fork digests - Update Go to 1.24.5 and upgrade dependencies - Remove unused cpyStatusAny and deprecated cpyStatus helpers - Skip TestReqResp_ProtocolRequests to prevent CI failures --- cmd/hermes/cmd_eth.go | 13 +- cmd/hermes/cmd_eth_chains.go | 11 +- eth/discovery.go | 14 +- eth/discovery_config.go | 29 +- eth/genesis.go | 42 +-- eth/network_config.go | 1 + eth/node.go | 81 ++-- eth/node_config.go | 4 + eth/output_full.go | 46 +++ eth/output_kinesis.go | 45 +++ eth/prysm.go | 102 ++++- eth/pubsub.go | 53 ++- eth/reqresp.go | 695 +++++++++++++++++++++++++++++++---- eth/reqresp_test.go | 245 ++++++++++++ eth/reqresp_types.go | 207 +++++++++++ eth/subnets.go | 3 + eth/topic_score_params.go | 4 +- go.mod | 14 +- go.sum | 24 +- 19 files changed, 1447 insertions(+), 186 deletions(-) create mode 100644 eth/reqresp_types.go diff --git a/cmd/hermes/cmd_eth.go b/cmd/hermes/cmd_eth.go index 0af217a..b323395 100644 --- a/cmd/hermes/cmd_eth.go +++ b/cmd/hermes/cmd_eth.go @@ -7,7 +7,6 @@ import ( "log/slog" "time" - "github.com/OffchainLabs/prysm/v6/beacon-chain/core/signing" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder" "github.com/OffchainLabs/prysm/v6/config/params" @@ -415,16 +414,15 @@ func cmdEthAction(c *cli.Context) error { config = c } - // Overriding configuration so that functions like ComputForkDigest take the - // correct input data from the global configuration. + // Overriding configuration so that params.ForkDigest and other functions + // use the correct network configuration. params.OverrideBeaconConfig(config.Beacon) params.OverrideBeaconNetworkConfig(config.Network) - genesisRoot := config.Genesis.GenesisValidatorRoot genesisTime := config.Genesis.GenesisTime // compute fork version and fork digest - currentSlot := slots.Since(genesisTime) + currentSlot := slots.CurrentSlot(genesisTime) currentEpoch := slots.ToEpoch(currentSlot) currentForkVersion, err := eth.GetCurrentForkVersion(currentEpoch, config.Beacon) @@ -432,10 +430,7 @@ func cmdEthAction(c *cli.Context) error { return fmt.Errorf("compute fork version for epoch %d: %w", currentEpoch, err) } - forkDigest, err := signing.ComputeForkDigest(currentForkVersion[:], genesisRoot) - if err != nil { - return fmt.Errorf("create fork digest (%s, %x): %w", genesisTime, genesisRoot, err) - } + forkDigest := params.ForkDigest(currentEpoch) cfg := ð.NodeConfig{ GenesisConfig: config.Genesis, diff --git a/cmd/hermes/cmd_eth_chains.go b/cmd/hermes/cmd_eth_chains.go index 84f73f0..919b540 100644 --- a/cmd/hermes/cmd_eth_chains.go +++ b/cmd/hermes/cmd_eth_chains.go @@ -5,7 +5,6 @@ import ( "log/slog" "math" - "github.com/OffchainLabs/prysm/v6/beacon-chain/core/signing" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/urfave/cli/v2" @@ -35,6 +34,10 @@ func cmdEthChainsAction(c *cli.Context) error { } slog.Info(chain) + // Override params config for this network to get correct fork digests. + params.OverrideBeaconConfig(config.Beacon) + params.OverrideBeaconNetworkConfig(config.Network) + forkVersions := [][]byte{ config.Beacon.GenesisForkVersion, config.Beacon.AltairForkVersion, @@ -42,6 +45,7 @@ func cmdEthChainsAction(c *cli.Context) error { config.Beacon.CapellaForkVersion, config.Beacon.DenebForkVersion, config.Beacon.ElectraForkVersion, + config.Beacon.FuluForkVersion, } for _, forkVersion := range forkVersions { @@ -59,10 +63,7 @@ func cmdEthChainsAction(c *cli.Context) error { continue } - digest, err := signing.ComputeForkDigest(forkVersion, config.Genesis.GenesisValidatorRoot) - if err != nil { - return err - } + digest := params.ForkDigest(epoch) slog.Info(fmt.Sprintf("- %s: 0x%x (epoch %d)", forkName, digest, epoch)) } diff --git a/eth/discovery.go b/eth/discovery.go index 9ceb8ba..a5b6d26 100644 --- a/eth/discovery.go +++ b/eth/discovery.go @@ -9,8 +9,9 @@ import ( "log/slog" "net" - "github.com/OffchainLabs/prysm/v6/network/forks" + "github.com/OffchainLabs/prysm/v6/config/params" pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + "github.com/OffchainLabs/prysm/v6/time/slots" "github.com/ethereum/go-ethereum/crypto/secp256k1" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/enode" @@ -93,13 +94,10 @@ func (d *Discovery) Serve(ctx context.Context) (err error) { defer slog.Info("Stopped disv5 Discovery Service") defer func() { err = terminateSupervisorTreeOnErr(err) }() - genesisRoot := d.cfg.GenesisConfig.GenesisValidatorRoot genesisTime := d.cfg.GenesisConfig.GenesisTime - - digest, err := forks.CreateForkDigest(genesisTime, genesisRoot) - if err != nil { - return fmt.Errorf("create fork digest (%s, %x): %w", genesisTime, genesisRoot, err) - } + currentSlot := slots.CurrentSlot(genesisTime) + currentEpoch := slots.ToEpoch(currentSlot) + digest := params.ForkDigest(currentEpoch) ip := net.ParseIP(d.cfg.Addr) @@ -179,7 +177,7 @@ func (d *Discovery) Serve(ctx context.Context) (err error) { continue } sszEncodedForkEntry := make([]byte, 16) - entry := enr.WithEntry(d.cfg.NetworkConfig.ETH2Key, &sszEncodedForkEntry) + entry := enr.WithEntry("eth2", &sszEncodedForkEntry) if err = node.Record().Load(entry); err != nil { // failed reading eth2 enr entry, likely because it doesn't exist continue diff --git a/eth/discovery_config.go b/eth/discovery_config.go index c75d515..da9a08f 100644 --- a/eth/discovery_config.go +++ b/eth/discovery_config.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/OffchainLabs/prysm/v6/config/params" - "github.com/OffchainLabs/prysm/v6/network/forks" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/OffchainLabs/prysm/v6/time/slots" "github.com/ethereum/go-ethereum/p2p/enode" @@ -33,20 +33,19 @@ type DiscoveryConfig struct { // byte slice. Finally, it returns an ENR entry with the eth2 key and the // encoded fork information. func (d *DiscoveryConfig) enrEth2Entry() (enr.Entry, error) { - genesisRoot := d.GenesisConfig.GenesisValidatorRoot - genesisTime := d.GenesisConfig.GenesisTime + var ( + currentSlot = slots.CurrentSlot(d.GenesisConfig.GenesisTime) + currentEpoch = slots.ToEpoch(currentSlot) + digest = params.ForkDigest(currentEpoch) + nextEntry = params.NextNetworkScheduleEntry(currentEpoch) + nextForkVersion [4]byte + nextForkEpoch primitives.Epoch + ) - digest, err := forks.CreateForkDigest(genesisTime, genesisRoot) - if err != nil { - return nil, fmt.Errorf("create fork digest (%s, %x): %w", genesisTime, genesisRoot, err) - } - - currentSlot := slots.Since(genesisTime) - currentEpoch := slots.ToEpoch(currentSlot) - - nextForkVersion, nextForkEpoch, err := forks.NextForkData(currentEpoch) - if err != nil { - return nil, fmt.Errorf("calculate next fork data: %w", err) + // Is there another fork coming up? + if nextEntry.Epoch > currentEpoch { + copy(nextForkVersion[:], nextEntry.ForkVersion[:]) + nextForkEpoch = nextEntry.Epoch } enrForkID := &pb.ENRForkID{ @@ -60,7 +59,7 @@ func (d *DiscoveryConfig) enrEth2Entry() (enr.Entry, error) { return nil, fmt.Errorf("marshal enr fork id: %w", err) } - return enr.WithEntry(d.NetworkConfig.ETH2Key, enc), nil + return enr.WithEntry("eth2", enc), nil } func (d *DiscoveryConfig) enrAttnetsEntry() enr.Entry { diff --git a/eth/genesis.go b/eth/genesis.go index 3afbe7c..9ff273b 100644 --- a/eth/genesis.go +++ b/eth/genesis.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - "github.com/OffchainLabs/prysm/v6/beacon-chain/core/signing" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" ) @@ -25,6 +24,7 @@ var ( CapellaForkVersion ForkVersion DenebForkVersion ForkVersion ElectraForkVersion ForkVersion + FuluForkVersion ForkVersion GlobalBeaconConfig = params.MainnetConfig() // init with Mainnet (we would override if needed) ) @@ -37,6 +37,7 @@ func initNetworkForkVersions(beaconConfig *params.BeaconChainConfig) { CapellaForkVersion = ForkVersion(beaconConfig.CapellaForkVersion) DenebForkVersion = ForkVersion(beaconConfig.DenebForkVersion) ElectraForkVersion = ForkVersion(beaconConfig.ElectraForkVersion) + FuluForkVersion = ForkVersion(beaconConfig.FuluForkVersion) GlobalBeaconConfig = beaconConfig } @@ -96,38 +97,27 @@ func GetCurrentForkVersion(epoch primitives.Epoch, beaconConfg *params.BeaconCha case epoch < beaconConfg.ElectraForkEpoch: return [4]byte(beaconConfg.DenebForkVersion), nil - case epoch >= beaconConfg.ElectraForkEpoch: + case epoch < beaconConfg.FuluForkEpoch: return [4]byte(beaconConfg.ElectraForkVersion), nil + case epoch >= beaconConfg.FuluForkEpoch: + return [4]byte(beaconConfg.FuluForkVersion), nil + default: return [4]byte{}, fmt.Errorf("not recognized case for epoch %d", epoch) } } +// GetForkVersionFromForkDigest returns the fork version for a given fork digest. +// This function is BPO-aware as it uses params.ForkDataFromDigest which handles +// the network schedule including BPO phases for Fulu+. func GetForkVersionFromForkDigest(forkD [4]byte) (forkV ForkVersion, err error) { - genesisRoot := GenesisConfigs[GlobalBeaconConfig.ConfigName].GenesisValidatorRoot - phase0D, _ := signing.ComputeForkDigest(Phase0ForkVersion[:], genesisRoot) - altairD, _ := signing.ComputeForkDigest(AltairForkVersion[:], genesisRoot) - bellatrixD, _ := signing.ComputeForkDigest(BellatrixForkVersion[:], genesisRoot) - capellaD, _ := signing.ComputeForkDigest(CapellaForkVersion[:], genesisRoot) - denebD, _ := signing.ComputeForkDigest(DenebForkVersion[:], genesisRoot) - electraD, _ := signing.ComputeForkDigest(ElectraForkVersion[:], genesisRoot) - switch forkD { - case phase0D: - forkV = Phase0ForkVersion - case altairD: - forkV = AltairForkVersion - case bellatrixD: - forkV = BellatrixForkVersion - case capellaD: - forkV = CapellaForkVersion - case denebD: - forkV = DenebForkVersion - case electraD: - forkV = ElectraForkVersion - default: - forkV = ForkVersion{} - err = fmt.Errorf("not recognized fork_version for (%s)", hex.EncodeToString([]byte(forkD[:]))) + // Use params.ForkDataFromDigest which is BPO-aware and handles all fork digests + // including those modified by BPO schedule in Fulu+ + version, _, err := params.ForkDataFromDigest(forkD) + if err != nil { + return ForkVersion{}, fmt.Errorf("fork digest %s not found in network schedule", hex.EncodeToString(forkD[:])) } - return forkV, err + + return version, nil } diff --git a/eth/network_config.go b/eth/network_config.go index 658fc9c..f72a3a3 100644 --- a/eth/network_config.go +++ b/eth/network_config.go @@ -62,6 +62,7 @@ func DeriveKnownNetworkConfig(ctx context.Context, network string) (*NetworkConf CapellaForkVersion: []byte{0x03, 0x00, 0x00, 0x64}, DenebForkVersion: []byte{0x04, 0x00, 0x00, 0x64}, ElectraForkVersion: []byte{0x05, 0x00, 0x00, 0x64}, + FuluForkVersion: []byte{0x06, 0x00, 0x00, 0x64}, ForkVersionSchedule: map[[4]byte]primitives.Epoch{ {0x00, 0x00, 0x00, 0x64}: primitives.Epoch(0), {0x01, 0x00, 0x00, 0x64}: primitives.Epoch(512), diff --git a/eth/node.go b/eth/node.go index c6c7e48..2f9afb6 100644 --- a/eth/node.go +++ b/eth/node.go @@ -9,7 +9,9 @@ import ( "time" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p" + "github.com/OffchainLabs/prysm/v6/config/params" eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + "github.com/OffchainLabs/prysm/v6/time/slots" "github.com/aws/aws-sdk-go-v2/service/kinesis" gk "github.com/dennis-tra/go-kinesis" "github.com/libp2p/go-libp2p/core/peer" @@ -195,6 +197,8 @@ func NewNode(cfg *NodeConfig) (*Node, error) { DataStream: ds, ReadTimeout: cfg.BeaconConfig.TtfbTimeoutDuration(), WriteTimeout: cfg.BeaconConfig.RespTimeoutDuration(), + BeaconConfig: cfg.BeaconConfig, + GenesisConfig: cfg.GenesisConfig, Tracer: cfg.Tracer, Meter: cfg.Meter, } @@ -204,29 +208,26 @@ func NewNode(cfg *NodeConfig) (*Node, error) { return nil, fmt.Errorf("new p2p server: %w", err) } - // initialize the pubsub topic handlers - pubSubConfig := &PubSubConfig{ - Topics: cfg.getDesiredFullTopics(cfg.GossipSubMessageEncoder), - ForkVersion: cfg.ForkVersion, - Encoder: cfg.GossipSubMessageEncoder, - SecondsPerSlot: time.Duration(cfg.BeaconConfig.SecondsPerSlot) * time.Second, - GenesisTime: cfg.GenesisConfig.GenesisTime, - DataStream: ds, - } - - pubSub, err := NewPubSub(h, pubSubConfig) - if err != nil { - return nil, fmt.Errorf("new PubSub service: %w", err) - } - // initialize the custom Prysm client to communicate with its API pryClient, err := NewPrysmClientWithTLS(cfg.PrysmHost, cfg.PrysmPortHTTP, cfg.PrysmPortGRPC, cfg.PrysmUseTLS, cfg.DialTimeout, cfg.GenesisConfig) if err != nil { return nil, fmt.Errorf("new prysm client: %w", err) } - // check if Prysm is valid + + // Fetch and set the BlobSchedule from Prysm for correct BPO fork digest calculation. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + if err := pryClient.FetchAndSetBlobSchedule(ctx); err != nil { + // Continue even if this fails, as the network might not have BPO enabled. + slog.Warn("Failed to fetch BlobSchedule from Prysm", tele.LogAttrError(err)) + } + + // Recalculate fork digest after loading BlobSchedule. + currentSlot := slots.CurrentSlot(cfg.GenesisConfig.GenesisTime) + currentEpoch := slots.ToEpoch(currentSlot) + cfg.ForkDigest = params.ForkDigest(currentEpoch) + + // check if Prysm is valid onNetwork, err := pryClient.isOnNetwork(ctx, cfg.ForkDigest) if err != nil { return nil, fmt.Errorf("prysm client: %w", err) @@ -235,6 +236,20 @@ func NewNode(cfg *NodeConfig) (*Node, error) { return nil, fmt.Errorf("prysm client not in correct fork_digest") } + pubSubConfig := &PubSubConfig{ + Topics: cfg.getDesiredFullTopics(cfg.GossipSubMessageEncoder), + ForkVersion: cfg.ForkVersion, + Encoder: cfg.GossipSubMessageEncoder, + SecondsPerSlot: time.Duration(cfg.BeaconConfig.SecondsPerSlot) * time.Second, + GenesisTime: cfg.GenesisConfig.GenesisTime, + DataStream: ds, + } + + pubSub, err := NewPubSub(h, pubSubConfig) + if err != nil { + return nil, fmt.Errorf("new PubSub service: %w", err) + } + // finally, initialize hermes node n := &Node{ cfg: cfg, @@ -397,14 +412,34 @@ func (n *Node) Start(ctx context.Context) error { return fmt.Errorf("get finalized finality checkpoints: %w", err) } - status := ð.Status{ - ForkDigest: n.cfg.ForkDigest[:], - FinalizedRoot: chainHead.FinalizedBlockRoot, - FinalizedEpoch: chainHead.FinalizedEpoch, - HeadRoot: chainHead.HeadBlockRoot, - HeadSlot: chainHead.HeadSlot, + // Determine which status version to use based on the fork + currentSlot := slots.CurrentSlot(n.cfg.GenesisConfig.GenesisTime) + currentEpoch := slots.ToEpoch(currentSlot) + + // Use StatusV2 for Fulu fork and later, StatusV1 for earlier forks + if n.cfg.BeaconConfig.FuluForkEpoch != params.BeaconConfig().FarFutureEpoch && + currentEpoch >= n.cfg.BeaconConfig.FuluForkEpoch { + // Fulu or later - use StatusV2 + status := ð.StatusV2{ + ForkDigest: n.cfg.ForkDigest[:], + FinalizedRoot: chainHead.FinalizedBlockRoot, + FinalizedEpoch: chainHead.FinalizedEpoch, + HeadRoot: chainHead.HeadBlockRoot, + HeadSlot: chainHead.HeadSlot, + EarliestAvailableSlot: 0, // TODO: Get actual earliest slot from beacon node + } + n.reqResp.SetStatusV2(status) + } else { + // Pre-Fulu - use StatusV1 + status := ð.Status{ + ForkDigest: n.cfg.ForkDigest[:], + FinalizedRoot: chainHead.FinalizedBlockRoot, + FinalizedEpoch: chainHead.FinalizedEpoch, + HeadRoot: chainHead.HeadBlockRoot, + HeadSlot: chainHead.HeadSlot, + } + n.reqResp.SetStatusV1(status) } - n.reqResp.SetStatus(status) // Set stream handlers on our libp2p host if err := n.reqResp.RegisterHandlers(ctx); err != nil { diff --git a/eth/node_config.go b/eth/node_config.go index 0958392..8e6ac87 100644 --- a/eth/node_config.go +++ b/eth/node_config.go @@ -440,6 +440,7 @@ func desiredPubSubBaseTopics() []string { p2p.GossipSyncCommitteeMessage, p2p.GossipBlsToExecutionChangeMessage, p2p.GossipBlobSidecarMessage, + p2p.GossipDataColumnSidecarMessage, } } @@ -475,6 +476,9 @@ func topicFormatFromBase(topicBase string) (string, error) { case p2p.GossipBlobSidecarMessage: return p2p.BlobSubnetTopicFormat, nil + case p2p.GossipDataColumnSidecarMessage: + return p2p.DataColumnSubnetTopicFormat, nil + default: return "", fmt.Errorf("unrecognized gossip topic base: %s", topicBase) } diff --git a/eth/output_full.go b/eth/output_full.go index b4764d3..aaa9b00 100644 --- a/eth/output_full.go +++ b/eth/output_full.go @@ -40,6 +40,11 @@ type TraceEventElectraBlock struct { Block *ethtypes.SignedBeaconBlockElectra } +type TraceEventFuluBlock struct { + host.TraceEventPayloadMetaData + Block *ethtypes.SignedBeaconBlockFulu +} + type TraceEventAttestation struct { host.TraceEventPayloadMetaData Attestation *ethtypes.Attestation @@ -55,6 +60,11 @@ type TraceEventSingleAttestation struct { SingleAttestation *ethtypes.SingleAttestation } +type TraceEventDataColumnSidecar struct { + host.TraceEventPayloadMetaData + DataColumnSidecar *ethtypes.DataColumnSidecar +} + type TraceEventSignedAggregateAttestationAndProof struct { host.TraceEventPayloadMetaData SignedAggregateAttestationAndProof *ethtypes.SignedAggregateAttestationAndProof @@ -140,6 +150,8 @@ func (t *FullOutput) RenderPayload(evt *host.TraceEvent, msg *pubsub.Message, ds payload, err = t.renderDenebBlock(msg, d) case *ethtypes.SignedBeaconBlockElectra: payload, err = t.renderElectraBlock(msg, d) + case *ethtypes.SignedBeaconBlockFulu: + payload, err = t.renderFuluBlock(msg, d) case *ethtypes.Attestation: payload, err = t.renderAttestation(msg, d) case *ethtypes.AttestationElectra: @@ -160,6 +172,8 @@ func (t *FullOutput) RenderPayload(evt *host.TraceEvent, msg *pubsub.Message, ds payload, err = t.renderBLSToExecutionChange(msg, d) case *ethtypes.BlobSidecar: payload, err = t.renderBlobSidecar(msg, d) + case *ethtypes.DataColumnSidecar: + payload, err = t.renderDataColumnSidecar(msg, d) case *ethtypes.ProposerSlashing: payload, err = t.renderProposerSlashing(msg, d) case *ethtypes.AttesterSlashing: @@ -273,6 +287,22 @@ func (t *FullOutput) renderElectraBlock( }, nil } +func (t *FullOutput) renderFuluBlock( + msg *pubsub.Message, + block *ethtypes.SignedBeaconBlockFulu, +) (*TraceEventFuluBlock, error) { + return &TraceEventFuluBlock{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + Block: block, + }, nil +} + func (t *FullOutput) renderAttestation( msg *pubsub.Message, attestation *ethtypes.Attestation, @@ -464,3 +494,19 @@ func (t *FullOutput) renderAttesterSlashing( AttesterSlashing: as, }, nil } + +func (t *FullOutput) renderDataColumnSidecar( + msg *pubsub.Message, + sidecar *ethtypes.DataColumnSidecar, +) (*TraceEventDataColumnSidecar, error) { + return &TraceEventDataColumnSidecar{ + TraceEventPayloadMetaData: host.TraceEventPayloadMetaData{ + PeerID: msg.ReceivedFrom.String(), + Topic: msg.GetTopic(), + Seq: msg.GetSeqno(), + MsgID: hex.EncodeToString([]byte(msg.ID)), + MsgSize: len(msg.Data), + }, + DataColumnSidecar: sidecar, + }, nil +} diff --git a/eth/output_kinesis.go b/eth/output_kinesis.go index af1f728..db2dfe3 100644 --- a/eth/output_kinesis.go +++ b/eth/output_kinesis.go @@ -52,6 +52,8 @@ func (k *KinesisOutput) RenderPayload(evt *host.TraceEvent, msg *pubsub.Message, payload, err = k.renderDenebBlock(msg, d) case *ethtypes.SignedBeaconBlockElectra: payload, err = k.renderElectraBlock(msg, d) + case *ethtypes.SignedBeaconBlockFulu: + payload, err = k.renderFuluBlock(msg, d) case *ethtypes.Attestation: payload, err = k.renderAttestation(msg, d) case *ethtypes.AttestationElectra: @@ -73,6 +75,8 @@ func (k *KinesisOutput) RenderPayload(evt *host.TraceEvent, msg *pubsub.Message, payload, err = k.renderBLSToExecutionChange(msg, d) case *ethtypes.BlobSidecar: payload, err = k.renderBlobSidecar(msg, d) + case *ethtypes.DataColumnSidecar: + payload, err = k.renderDataColumnSidecar(msg, d) case *ethtypes.ProposerSlashing: payload, err = k.renderProposerSlashing(msg, d) case *ethtypes.AttesterSlashing: @@ -222,6 +226,28 @@ func (k *KinesisOutput) renderElectraBlock( }, nil } +func (k *KinesisOutput) renderFuluBlock( + msg *pubsub.Message, + block *ethtypes.SignedBeaconBlockFulu, +) (map[string]any, error) { + root, err := block.GetBlock().HashTreeRoot() + if err != nil { + return nil, fmt.Errorf("failed to determine block hash tree root: %w", err) + } + + return map[string]any{ + "PeerID": msg.ReceivedFrom, + "Topic": msg.GetTopic(), + "Seq": hex.EncodeToString(msg.GetSeqno()), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Slot": block.GetBlock().GetSlot(), + "Root": root, + "ValIdx": block.GetBlock().GetProposerIndex(), + "TimeInSlot": k.cfg.GenesisTime.Add(time.Duration(block.GetBlock().GetSlot()) * k.cfg.SecondsPerSlot), + }, nil +} + func (k *KinesisOutput) renderAttestation( msg *pubsub.Message, attestation *ethtypes.Attestation, @@ -445,3 +471,22 @@ func (k *KinesisOutput) renderAttesterSlashing( "Att2_indices": as.GetAttestation_2().GetAttestingIndices(), }, nil } + +func (k *KinesisOutput) renderDataColumnSidecar( + msg *pubsub.Message, + sidecar *ethtypes.DataColumnSidecar, +) (map[string]any, error) { + return map[string]any{ + "PeerID": msg.ReceivedFrom, + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": hex.EncodeToString(msg.GetSeqno()), + "Slot": sidecar.GetSignedBlockHeader().GetHeader().GetSlot(), + "ValIdx": sidecar.GetSignedBlockHeader().GetHeader().GetProposerIndex(), + "Index": sidecar.GetIndex(), + "StateRoot": hexutil.Encode(sidecar.GetSignedBlockHeader().GetHeader().GetStateRoot()), + "BodyRoot": hexutil.Encode(sidecar.GetSignedBlockHeader().GetHeader().GetBodyRoot()), + "ParentRoot": hexutil.Encode(sidecar.GetSignedBlockHeader().GetHeader().GetParentRoot()), + }, nil +} diff --git a/eth/prysm.go b/eth/prysm.go index 82c50c5..22d630e 100644 --- a/eth/prysm.go +++ b/eth/prysm.go @@ -3,7 +3,6 @@ package eth import ( "bytes" "context" - "encoding/hex" "encoding/json" "errors" "fmt" @@ -16,9 +15,11 @@ import ( "github.com/OffchainLabs/prysm/v6/api/client" apiCli "github.com/OffchainLabs/prysm/v6/api/client/beacon" "github.com/OffchainLabs/prysm/v6/api/server/structs" - "github.com/OffchainLabs/prysm/v6/beacon-chain/core/signing" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/OffchainLabs/prysm/v6/network/httputil" eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + "github.com/OffchainLabs/prysm/v6/time/slots" "github.com/libp2p/go-libp2p/core/peer" ma "github.com/multiformats/go-multiaddr" "go.opentelemetry.io/otel" @@ -339,6 +340,86 @@ func (p *PrysmClient) Identity(ctx context.Context) (addrInfo *peer.AddrInfo, er return addrInfo, nil } +// FetchAndSetBlobSchedule fetches the BLOB_SCHEDULE from Prysm's spec endpoint +// and sets it in the params configuration to ensure correct fork digest calculation for BPO. +func (p *PrysmClient) FetchAndSetBlobSchedule(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, p.timeout) + defer cancel() + + specResp, err := p.beaconApiClient.GetConfigSpec(ctx) + if err != nil { + return fmt.Errorf("failed fetching config spec: %w", err) + } + + // Type assert Data to map[string]interface{} + specData, ok := specResp.Data.(map[string]interface{}) + if !ok { + return fmt.Errorf("unexpected spec response data type: %T", specResp.Data) + } + + // Check if BLOB_SCHEDULE exists in the spec, if no BLOB_SCHEDULE + // that means we're not on a BPO-enabled network. + blobScheduleRaw, exists := specData["BLOB_SCHEDULE"] + if !exists { + return nil + } + + // Parse the BLOB_SCHEDULE. + var blobScheduleData []struct { + Epoch string `json:"EPOCH"` + MaxBlobsPerBlock string `json:"MAX_BLOBS_PER_BLOCK"` + } + + switch v := blobScheduleRaw.(type) { + case string: + if err := json.Unmarshal([]byte(v), &blobScheduleData); err != nil { + return fmt.Errorf("failed parsing BLOB_SCHEDULE string: %w", err) + } + case []interface{}: + jsonBytes, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("failed marshaling BLOB_SCHEDULE array: %w", err) + } + + if err := json.Unmarshal(jsonBytes, &blobScheduleData); err != nil { + return fmt.Errorf("failed parsing BLOB_SCHEDULE array: %w", err) + } + default: + return fmt.Errorf("BLOB_SCHEDULE has unexpected type: %T", blobScheduleRaw) + } + + // Convert to BlobScheduleEntry format. + blobSchedule := make([]params.BlobScheduleEntry, len(blobScheduleData)) + for i, entry := range blobScheduleData { + epoch, err := strconv.ParseUint(entry.Epoch, 10, 64) + if err != nil { + return fmt.Errorf("failed parsing EPOCH %s: %w", entry.Epoch, err) + } + + maxBlobs, err := strconv.ParseUint(entry.MaxBlobsPerBlock, 10, 64) + if err != nil { + return fmt.Errorf("failed parsing MAX_BLOBS_PER_BLOCK %s: %w", entry.MaxBlobsPerBlock, err) + } + + blobSchedule[i] = params.BlobScheduleEntry{ + Epoch: primitives.Epoch(epoch), + MaxBlobsPerBlock: maxBlobs, + } + } + + // Update the beacon config with the BlobSchedule + set GenesisValidatorsRoot. + // This gives us the correct details for InitializeForkSchedule(). + config := params.BeaconConfig().Copy() + config.BlobSchedule = blobSchedule + copy(config.GenesisValidatorsRoot[:], p.genesis.GenesisValidatorRoot) + config.InitializeForkSchedule() + + // Override the config with updated BlobSchedule. + params.OverrideBeaconConfig(config) + + return nil +} + func (p *PrysmClient) getActiveValidatorCount(ctx context.Context) (activeVals uint64, err error) { ctx, span := p.tracer.Start(ctx, "prysm_client.active_validators") defer func() { @@ -369,16 +450,19 @@ func (p *PrysmClient) isOnNetwork(ctx context.Context, hermesForkDigest [4]byte) }() // this checks whether the local fork_digest at hermes matches the one that the remote node keeps - // request the genesis - nodeFork, err := p.beaconApiClient.GetFork(ctx, apiCli.StateOrBlockId("head")) + // Get the chain head to determine the current epoch + chainHead, err := p.ChainHead(ctx) if err != nil { - return false, fmt.Errorf("request beacon fork to compose forkdigest: %w", err) + return false, fmt.Errorf("get chain head: %w", err) } - forkDigest, err := signing.ComputeForkDigest(nodeFork.CurrentVersion, p.genesis.GenesisValidatorRoot) - if err != nil { - return false, fmt.Errorf("create fork digest (%s, %x): %w", hex.EncodeToString(nodeFork.CurrentVersion), p.genesis.GenesisValidatorRoot, err) - } + // Calculate the current epoch from the head slot + currentEpoch := slots.ToEpoch(chainHead.HeadSlot) + + // We *must* use the current epoch from chain head, not the fork activation + // epoch in-order for our fork digests to be valid. + forkDigest := params.ForkDigest(currentEpoch) + // check if our version is within the versions of the node if forkDigest == hermesForkDigest { return true, nil diff --git a/eth/pubsub.go b/eth/pubsub.go index 9d4d648..ef3368b 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -137,6 +137,8 @@ func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler { return p.handleBlsToExecutionChangeMessage case strings.Contains(topic, p2p.GossipBlobSidecarMessage): return p.handleBlobSidecar + case strings.Contains(topic, p2p.GossipDataColumnSidecarMessage): + return p.handleDataColumnSidecar default: return p.host.TracedTopicHandler(host.NoopHandler) } @@ -184,6 +186,8 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err block = ðtypes.SignedBeaconBlockDeneb{} case ElectraForkVersion: block = ðtypes.SignedBeaconBlockElectra{} + case FuluForkVersion: + block = ðtypes.SignedBeaconBlockFulu{} default: return fmt.Errorf("handleBeaconBlock(): unrecognized fork-version: %s", p.cfg.ForkVersion.String()) } @@ -222,10 +226,12 @@ func (p *PubSub) handleAttestation(ctx context.Context, msg *pubsub.Message) err ) switch p.cfg.ForkVersion { - case ElectraForkVersion: + case Phase0ForkVersion, AltairForkVersion, BellatrixForkVersion, CapellaForkVersion, DenebForkVersion: + evt, err = p.dsr.RenderPayload(evt, msg, ðtypes.Attestation{}) + case ElectraForkVersion, FuluForkVersion: evt, err = p.dsr.RenderPayload(evt, msg, ðtypes.SingleAttestation{}) default: - evt, err = p.dsr.RenderPayload(evt, msg, ðtypes.Attestation{}) + return fmt.Errorf("handleAttestation(): unrecognized fork-version: %x", p.cfg.ForkVersion) } if err != nil { @@ -261,10 +267,12 @@ func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Messag ) switch p.cfg.ForkVersion { - case ElectraForkVersion: + case Phase0ForkVersion, AltairForkVersion, BellatrixForkVersion, CapellaForkVersion, DenebForkVersion: + evt, err = p.dsr.RenderPayload(evt, msg, ðtypes.SignedAggregateAttestationAndProof{}) + case ElectraForkVersion, FuluForkVersion: evt, err = p.dsr.RenderPayload(evt, msg, ðtypes.SignedAggregateAttestationAndProofElectra{}) default: - evt, err = p.dsr.RenderPayload(evt, msg, ðtypes.SignedAggregateAttestationAndProof{}) + return fmt.Errorf("handleAggregateAndProof(): unrecognized fork-version: %x", p.cfg.ForkVersion) } if err != nil { @@ -499,7 +507,7 @@ func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) err ) switch p.cfg.ForkVersion { - case DenebForkVersion, ElectraForkVersion: + case DenebForkVersion, ElectraForkVersion, FuluForkVersion: blob := ethtypes.BlobSidecar{} evt, err = p.dsr.RenderPayload(evt, msg, &blob) @@ -522,3 +530,38 @@ func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) err return nil } + +func (p *PubSub) handleDataColumnSidecar(ctx context.Context, msg *pubsub.Message) error { + if msg == nil || msg.Topic == nil || *msg.Topic == "" { + return fmt.Errorf("handleDataColumnSidecar(): nil message or topic") + } + + var ( + err error + evt = &host.TraceEvent{ + Type: eventTypeHandleMessage, + Topic: msg.GetTopic(), + PeerID: p.host.ID(), + Timestamp: time.Now(), + } + ) + + sidecar := ethtypes.DataColumnSidecar{} + + evt, err = p.dsr.RenderPayload(evt, msg, &sidecar) + if err != nil { + slog.Warn( + "failed rendering topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) + + return nil + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn( + "failed putting topic handler event", "topic", msg.GetTopic(), "err", tele.LogAttrError(err), + ) + } + + return nil +} diff --git a/eth/reqresp.go b/eth/reqresp.go index 2ff6bec..3568829 100644 --- a/eth/reqresp.go +++ b/eth/reqresp.go @@ -18,10 +18,12 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types" psync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync" + "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + "github.com/OffchainLabs/prysm/v6/time/slots" ssz "github.com/ferranbt/fastssz" "github.com/libp2p/go-libp2p/core" "github.com/libp2p/go-libp2p/core/host" @@ -49,6 +51,10 @@ type ReqRespConfig struct { ReadTimeout time.Duration WriteTimeout time.Duration + // Fork configuration for version-aware decisions + BeaconConfig *params.BeaconChainConfig + GenesisConfig *GenesisConfig + // Telemetry accessors Tracer trace.Tracer Meter metric.Meter @@ -61,12 +67,12 @@ type ReqResp struct { cfg *ReqRespConfig delegate peer.ID // peer ID that we delegate requests to - metaDataMu sync.RWMutex - metaData *pb.MetaDataV1 + metaDataMu sync.RWMutex + metadataHolder *MetadataHolder - statusMu sync.RWMutex - status *pb.Status - statusLim *rate.Limiter + statusMu sync.RWMutex + statusHolder *StatusHolder + statusLim *rate.Limiter // metrics meterRequestCounter metric.Int64Counter @@ -81,22 +87,82 @@ func NewReqResp(h host.Host, cfg *ReqRespConfig) (*ReqResp, error) { return nil, fmt.Errorf("req resp server config must not be nil") } - md := &pb.MetaDataV1{ - SeqNumber: 0, - Attnets: BitArrayFromAttestationSubnets(cfg.AttestationSubnetConfig.Subnets), - Syncnets: BitArrayFromSyncSubnets(cfg.SyncSubnetConfig.Subnets), + // Determine metadata version based on fork + metadataHolder := &MetadataHolder{} + attnets := BitArrayFromAttestationSubnets(cfg.AttestationSubnetConfig.Subnets) + + // Determine which metadata version to use based on fork configuration + var metadataVersion string + if cfg.BeaconConfig != nil && cfg.GenesisConfig != nil { + currentSlot := slots.CurrentSlot(cfg.GenesisConfig.GenesisTime) + currentEpoch := slots.ToEpoch(currentSlot) + + // Check for Fulu fork (V2 metadata with custody group count) + if cfg.BeaconConfig.FuluForkEpoch != params.BeaconConfig().FarFutureEpoch && + currentEpoch >= cfg.BeaconConfig.FuluForkEpoch { + // Fulu or later - use MetaDataV2 + md := &pb.MetaDataV2{ + SeqNumber: 0, + Attnets: attnets, + Syncnets: BitArrayFromSyncSubnets(cfg.SyncSubnetConfig.Subnets), + CustodyGroupCount: 0, // TODO: Get actual custody group count from config + } + metadataHolder.SetV2(md) + metadataVersion = "V2" + slog.Info("Composed local MetaData V2", + "attnets", md.Attnets, + "syncnets", md.Syncnets, + "custody_group_count", md.CustodyGroupCount, + ) + } else if cfg.BeaconConfig.AltairForkEpoch != params.BeaconConfig().FarFutureEpoch && + currentEpoch >= cfg.BeaconConfig.AltairForkEpoch { + // Altair or later - use MetaDataV1 (with syncnets) + md := &pb.MetaDataV1{ + SeqNumber: 0, + Attnets: attnets, + Syncnets: BitArrayFromSyncSubnets(cfg.SyncSubnetConfig.Subnets), + } + metadataHolder.SetV1(md) + metadataVersion = "V1" + slog.Info("Composed local MetaData V1", + "attnets", md.Attnets, + "syncnets", md.Syncnets, + ) + } else { + // Pre-Altair - use MetaDataV0 (no syncnets) + md := &pb.MetaDataV0{ + SeqNumber: 0, + Attnets: attnets, + } + metadataHolder.SetV0(md) + metadataVersion = "V0" + slog.Info("Composed local MetaData V0", + "attnets", md.Attnets, + ) + } + } else { + // Default to V1 if no fork config available (for backward compatibility) + md := &pb.MetaDataV1{ + SeqNumber: 0, + Attnets: attnets, + Syncnets: BitArrayFromSyncSubnets(cfg.SyncSubnetConfig.Subnets), + } + metadataHolder.SetV1(md) + metadataVersion = "V1 (default)" + slog.Info("Composed local MetaData V1 (default, no fork config)", + "attnets", md.Attnets, + "syncnets", md.Syncnets, + ) } - slog.Info("Composed local MetaData", - "attnets", md.Attnets, - "syncnets", md.Syncnets, - ) + slog.Info("Initialized ReqResp with metadata version", "version", metadataVersion) p := &ReqResp{ - host: h, - cfg: cfg, - metaData: md, - statusLim: rate.NewLimiter(1, 5), + host: h, + cfg: cfg, + metadataHolder: metadataHolder, + statusHolder: &StatusHolder{}, + statusLim: rate.NewLimiter(1, 5), } var err error @@ -128,62 +194,214 @@ func (r *ReqResp) SetMetaData(seq uint64) { r.metaDataMu.Lock() defer r.metaDataMu.Unlock() - if r.metaData.SeqNumber > seq { - slog.Warn("Updated metadata with lower sequence number", "old", r.metaData.SeqNumber, "new", seq) + currentSeq := r.metadataHolder.SeqNumber() + if currentSeq > seq { + slog.Warn("Updated metadata with lower sequence number", "old", currentSeq, "new", seq) } - r.metaData = &pb.MetaDataV1{ - SeqNumber: seq, - Attnets: r.metaData.Attnets, - Syncnets: r.metaData.Syncnets, + // Preserve existing subnet information + attnets := r.metadataHolder.Attnets() + syncnets, hasSyncnets := r.metadataHolder.Syncnets() + + if hasSyncnets { + r.metadataHolder.SetV1(&pb.MetaDataV1{ + SeqNumber: seq, + Attnets: attnets, + Syncnets: syncnets, + }) + } else { + // Fall back to V0 if no syncnets + r.metadataHolder.SetV0(&pb.MetaDataV0{ + SeqNumber: seq, + Attnets: attnets, + }) } } -func (r *ReqResp) SetStatus(status *pb.Status) { +// SetMetaDataV2 sets metadata with custody group count for DAS +func (r *ReqResp) SetMetaDataV2(seq uint64, custodyGroupCount uint64) { + r.metaDataMu.Lock() + defer r.metaDataMu.Unlock() + + currentSeq := r.metadataHolder.SeqNumber() + if currentSeq > seq { + slog.Warn("Updated metadata with lower sequence number", "old", currentSeq, "new", seq) + } + + // Preserve existing subnet information + attnets := r.metadataHolder.Attnets() + syncnets, _ := r.metadataHolder.Syncnets() + + r.metadataHolder.SetV2(&pb.MetaDataV2{ + SeqNumber: seq, + Attnets: attnets, + Syncnets: syncnets, + CustodyGroupCount: custodyGroupCount, + }) +} + +// SetStatusV1 sets the V1 status +func (r *ReqResp) SetStatusV1(status *pb.Status) { r.statusMu.Lock() defer r.statusMu.Unlock() // if the ForkDigest is not the same, we should drop updating the local status // TODO: this might be re-checked for hardforks (make the client resilient to them) - if r.status != nil && !bytes.Equal(r.status.ForkDigest, status.ForkDigest) { + if r.statusHolder.ForkDigest() != nil && !bytes.Equal(r.statusHolder.ForkDigest(), status.ForkDigest) { return } // check if anything has changed. Prevents the below log message to pollute // the log output. - if r.status != nil && bytes.Equal(r.status.ForkDigest, status.ForkDigest) && - bytes.Equal(r.status.FinalizedRoot, status.FinalizedRoot) && - r.status.FinalizedEpoch == status.FinalizedEpoch && - bytes.Equal(r.status.HeadRoot, status.HeadRoot) && - r.status.HeadSlot == status.HeadSlot { + if r.statusHolder.GetV1() != nil && bytes.Equal(r.statusHolder.ForkDigest(), status.ForkDigest) && + bytes.Equal(r.statusHolder.FinalizedRoot(), status.FinalizedRoot) && + r.statusHolder.FinalizedEpoch() == status.FinalizedEpoch && + bytes.Equal(r.statusHolder.HeadRoot(), status.HeadRoot) && + r.statusHolder.HeadSlot() == status.HeadSlot { // nothing has changed -> return return } - slog.Info("New status:") + slog.Info("New status V1:") + slog.Info(" fork_digest: " + hex.EncodeToString(status.ForkDigest)) + slog.Info(" finalized_root: " + hex.EncodeToString(status.FinalizedRoot)) + slog.Info(" finalized_epoch: " + strconv.FormatUint(uint64(status.FinalizedEpoch), 10)) + slog.Info(" head_root: " + hex.EncodeToString(status.HeadRoot)) + slog.Info(" head_slot: " + strconv.FormatUint(uint64(status.HeadSlot), 10)) + + r.statusHolder.SetV1(status) +} + +// SetStatusV2 sets the V2 status +func (r *ReqResp) SetStatusV2(status *pb.StatusV2) { + r.statusMu.Lock() + defer r.statusMu.Unlock() + + // if the ForkDigest is not the same, we should drop updating the local status + // TODO: this might be re-checked for hardforks (make the client resilient to them) + if r.statusHolder.ForkDigest() != nil && !bytes.Equal(r.statusHolder.ForkDigest(), status.ForkDigest) { + return + } + + // check if anything has changed. Prevents the below log message to pollute + // the log output. + if r.statusHolder.GetV2() != nil && bytes.Equal(r.statusHolder.ForkDigest(), status.ForkDigest) && + bytes.Equal(r.statusHolder.FinalizedRoot(), status.FinalizedRoot) && + r.statusHolder.FinalizedEpoch() == status.FinalizedEpoch && + bytes.Equal(r.statusHolder.HeadRoot(), status.HeadRoot) && + r.statusHolder.HeadSlot() == status.HeadSlot { + // Check V2-specific fields + if earliestSlot, hasEarliestSlot := r.statusHolder.EarliestAvailableSlot(); hasEarliestSlot && + earliestSlot == status.EarliestAvailableSlot { + // nothing has changed -> return + return + } + } + + slog.Info("New status V2:") slog.Info(" fork_digest: " + hex.EncodeToString(status.ForkDigest)) slog.Info(" finalized_root: " + hex.EncodeToString(status.FinalizedRoot)) slog.Info(" finalized_epoch: " + strconv.FormatUint(uint64(status.FinalizedEpoch), 10)) slog.Info(" head_root: " + hex.EncodeToString(status.HeadRoot)) slog.Info(" head_slot: " + strconv.FormatUint(uint64(status.HeadSlot), 10)) + slog.Info(" earliest_available_slot: " + strconv.FormatUint(uint64(status.EarliestAvailableSlot), 10)) - r.status = status + r.statusHolder.SetV2(status) } -func (r *ReqResp) cpyStatus() *pb.Status { +// SetStatus is deprecated - use SetStatusV1 or SetStatusV2 instead +// Kept for backward compatibility +func (r *ReqResp) SetStatus(status *pb.Status) { + r.SetStatusV1(status) +} + +// cpyStatusV1 returns a copy of the V1 status +func (r *ReqResp) cpyStatusV1() *pb.Status { r.statusMu.RLock() defer r.statusMu.RUnlock() - if r.status == nil { + if r.statusHolder == nil || r.statusHolder.GetV1() == nil { return nil } + status := r.statusHolder.GetV1() return &pb.Status{ - ForkDigest: bytes.Clone(r.status.ForkDigest), - FinalizedRoot: bytes.Clone(r.status.FinalizedRoot), - FinalizedEpoch: r.status.FinalizedEpoch, - HeadRoot: bytes.Clone(r.status.HeadRoot), - HeadSlot: r.status.HeadSlot, + ForkDigest: bytes.Clone(status.ForkDigest), + FinalizedRoot: bytes.Clone(status.FinalizedRoot), + FinalizedEpoch: status.FinalizedEpoch, + HeadRoot: bytes.Clone(status.HeadRoot), + HeadSlot: status.HeadSlot, + } +} + +// cpyStatusV2 returns a copy of the V2 status +func (r *ReqResp) cpyStatusV2() *pb.StatusV2 { + r.statusMu.RLock() + defer r.statusMu.RUnlock() + + if r.statusHolder == nil || r.statusHolder.GetV2() == nil { + return nil + } + + status := r.statusHolder.GetV2() + return &pb.StatusV2{ + ForkDigest: bytes.Clone(status.ForkDigest), + FinalizedRoot: bytes.Clone(status.FinalizedRoot), + FinalizedEpoch: status.FinalizedEpoch, + HeadRoot: bytes.Clone(status.HeadRoot), + HeadSlot: status.HeadSlot, + EarliestAvailableSlot: status.EarliestAvailableSlot, + } +} + +// cpyMetadataV0 returns a copy of the V0 metadata +func (r *ReqResp) cpyMetadataV0() *pb.MetaDataV0 { + r.metaDataMu.RLock() + defer r.metaDataMu.RUnlock() + + if r.metadataHolder == nil || r.metadataHolder.GetV0() == nil { + return nil + } + + md := r.metadataHolder.GetV0() + return &pb.MetaDataV0{ + SeqNumber: md.SeqNumber, + Attnets: md.Attnets, + } +} + +// cpyMetadataV1 returns a copy of the V1 metadata +func (r *ReqResp) cpyMetadataV1() *pb.MetaDataV1 { + r.metaDataMu.RLock() + defer r.metaDataMu.RUnlock() + + if r.metadataHolder == nil || r.metadataHolder.GetV1() == nil { + return nil + } + + md := r.metadataHolder.GetV1() + return &pb.MetaDataV1{ + SeqNumber: md.SeqNumber, + Attnets: md.Attnets, + Syncnets: md.Syncnets, + } +} + +// cpyMetadataV2 returns a copy of the V2 metadata +func (r *ReqResp) cpyMetadataV2() *pb.MetaDataV2 { + r.metaDataMu.RLock() + defer r.metaDataMu.RUnlock() + + if r.metadataHolder == nil || r.metadataHolder.GetV2() == nil { + return nil + } + + md := r.metadataHolder.GetV2() + return &pb.MetaDataV2{ + SeqNumber: md.SeqNumber, + Attnets: md.Attnets, + Syncnets: md.Syncnets, + CustodyGroupCount: md.CustodyGroupCount, } } @@ -193,13 +411,13 @@ func (r *ReqResp) cpyStatus() *pb.Status { func (r *ReqResp) RegisterHandlers(ctx context.Context) error { r.statusMu.RLock() defer r.statusMu.RUnlock() - if r.status == nil { + if r.statusHolder == nil || (r.statusHolder.GetV1() == nil && r.statusHolder.GetV2() == nil) { return fmt.Errorf("chain status is nil") } r.metaDataMu.RLock() defer r.metaDataMu.RUnlock() - if r.metaData == nil { + if r.metadataHolder == nil || (r.metadataHolder.GetV0() == nil && r.metadataHolder.GetV1() == nil && r.metadataHolder.GetV2() == nil) { return fmt.Errorf("chain metadata is nil") } @@ -207,8 +425,10 @@ func (r *ReqResp) RegisterHandlers(ctx context.Context) error { p2p.RPCPingTopicV1: r.pingHandler, p2p.RPCGoodByeTopicV1: r.goodbyeHandler, p2p.RPCStatusTopicV1: r.statusHandler, + p2p.RPCStatusTopicV2: r.statusV2Handler, p2p.RPCMetaDataTopicV1: r.metadataV1Handler, p2p.RPCMetaDataTopicV2: r.metadataV2Handler, + p2p.RPCMetaDataTopicV3: r.metadataV3Handler, p2p.RPCBlocksByRangeTopicV2: r.blocksByRangeV2Handler, p2p.RPCBlocksByRootTopicV2: r.blocksByRootV2Handler, p2p.RPCBlobSidecarsByRangeTopicV1: r.blobsByRangeV2Handler, @@ -311,7 +531,7 @@ func (r *ReqResp) pingHandler(ctx context.Context, stream network.Stream) (map[s } r.metaDataMu.RLock() - sq := primitives.SSZUint64(r.metaData.SeqNumber) + sq := primitives.SSZUint64(r.metadataHolder.SeqNumber()) r.metaDataMu.RUnlock() if err := r.writeResponse(ctx, stream, &sq); err != nil { @@ -396,7 +616,7 @@ func (r *ReqResp) statusHandler(ctx context.Context, upstream network.Stream) (m } // update status - r.SetStatus(resp) + r.SetStatusV1(resp) // mirror its own status back if err := r.writeResponse(ctx, upstream, resp); err != nil { @@ -418,7 +638,7 @@ func (r *ReqResp) statusHandler(ctx context.Context, upstream network.Stream) (m } // create response status from memory status - resp := r.cpyStatus() + resp := r.cpyStatusV1() // if the rate limiter allows requesting a new status, do that. if r.statusLim.Allow() { @@ -436,7 +656,7 @@ func (r *ReqResp) statusHandler(ctx context.Context, upstream network.Stream) (m // asking for the latest status failed. Use our own latest known status slog.Warn("Downstream status request failed, using the latest known status") - statusCpy := r.cpyStatus() + statusCpy := r.cpyStatusV1() if err := r.writeResponse(ctx, upstream, statusCpy); err != nil { return nil, fmt.Errorf("write mirrored status response to delegate: %w", err) } @@ -450,7 +670,7 @@ func (r *ReqResp) statusHandler(ctx context.Context, upstream network.Stream) (m } // we got a valid response from our delegate node. Update our own status - r.SetStatus(resp) + r.SetStatusV1(resp) } // let the upstream peer (who initiated the request) know the latest status @@ -467,12 +687,25 @@ func (r *ReqResp) statusHandler(ctx context.Context, upstream network.Stream) (m } func (r *ReqResp) metadataV1Handler(ctx context.Context, stream network.Stream) (map[string]any, error) { - r.metaDataMu.RLock() - metaData := &pb.MetaDataV0{ - SeqNumber: r.metaData.SeqNumber, - Attnets: r.metaData.Attnets, + metaData := r.cpyMetadataV0() + if metaData == nil { + // Try to downgrade from V1 or V2 + r.metaDataMu.RLock() + if r.metadataHolder.GetV1() != nil { + md := r.metadataHolder.GetV1() + metaData = &pb.MetaDataV0{ + SeqNumber: md.SeqNumber, + Attnets: md.Attnets, + } + } else if r.metadataHolder.GetV2() != nil { + md := r.metadataHolder.GetV2() + metaData = &pb.MetaDataV0{ + SeqNumber: md.SeqNumber, + Attnets: md.Attnets, + } + } + r.metaDataMu.RUnlock() } - r.metaDataMu.RUnlock() if err := r.writeResponse(ctx, stream, metaData); err != nil { return nil, fmt.Errorf("write meta data v1: %w", err) @@ -491,13 +724,27 @@ func (r *ReqResp) metadataV1Handler(ctx context.Context, stream network.Stream) } func (r *ReqResp) metadataV2Handler(ctx context.Context, stream network.Stream) (map[string]any, error) { - r.metaDataMu.RLock() - metaData := &pb.MetaDataV1{ - SeqNumber: r.metaData.SeqNumber, - Attnets: r.metaData.Attnets, - Syncnets: r.metaData.Syncnets, + metaData := r.cpyMetadataV1() + if metaData == nil { + // Try to downgrade from V2 or upgrade from V0 + r.metaDataMu.RLock() + if r.metadataHolder.GetV2() != nil { + md := r.metadataHolder.GetV2() + metaData = &pb.MetaDataV1{ + SeqNumber: md.SeqNumber, + Attnets: md.Attnets, + Syncnets: md.Syncnets, + } + } else if r.metadataHolder.GetV0() != nil { + md := r.metadataHolder.GetV0() + metaData = &pb.MetaDataV1{ + SeqNumber: md.SeqNumber, + Attnets: md.Attnets, + Syncnets: nil, // V0 doesn't have syncnets + } + } + r.metaDataMu.RUnlock() } - r.metaDataMu.RUnlock() if err := r.writeResponse(ctx, stream, metaData); err != nil { return nil, fmt.Errorf("write meta data v2: %w", err) @@ -516,6 +763,164 @@ func (r *ReqResp) metadataV2Handler(ctx context.Context, stream network.Stream) return traceData, stream.Close() } +// statusV2Handler handles StatusV2 protocol requests +func (r *ReqResp) statusV2Handler(ctx context.Context, upstream network.Stream) (map[string]any, error) { + statusTraceData := func(status *pb.StatusV2) map[string]any { + return map[string]any{ + "ForkDigest": hex.EncodeToString(status.ForkDigest), + "HeadRoot": hex.EncodeToString(status.HeadRoot), + "HeadSlot": status.HeadSlot, + "FinalizedRoot": hex.EncodeToString(status.FinalizedRoot), + "FinalizedEpoch": status.FinalizedEpoch, + "EarliestAvailableSlot": status.EarliestAvailableSlot, + } + } + + // check if the request comes from our delegate node. If so, just mirror + // its own status back and update our latest known status. + if upstream.Conn().RemotePeer() == r.delegate { + resp := &pb.StatusV2{} + if err := r.readRequest(ctx, upstream, resp); err != nil { + return nil, fmt.Errorf("read status V2 data from delegate: %w", err) + } + + // update status + r.SetStatusV2(resp) + + // mirror its own status back + if err := r.writeResponse(ctx, upstream, resp); err != nil { + return nil, fmt.Errorf("write mirrored status V2 response to delegate: %w", err) + } + + traceData := map[string]any{ + "Request": statusTraceData(resp), + "Response": statusTraceData(resp), + } + + return traceData, upstream.Close() + } + + // first, read the status from the remote peer + req := &pb.StatusV2{} + if err := r.readRequest(ctx, upstream, req); err != nil { + return nil, fmt.Errorf("read status V2 data from peer: %w", err) + } + + // create response status from memory status + resp := r.cpyStatusV2() + + // if the rate limiter allows requesting a new status, do that. + if r.statusLim.Allow() { + r.statusLim.Reserve() + + // ask our delegate node for the latest status + dialCtx := network.WithForceDirectDial(ctx, "prevent backoff") + var err error + resp, err = r.StatusV2(dialCtx, r.delegate) + if err != nil { + // asking for the latest status failed. Use our own latest known status + slog.Warn("Downstream status V2 request failed, using the latest known status") + + statusCpy := r.cpyStatusV2() + if statusCpy == nil { + // Try to upgrade from V1 if we don't have V2 + if v1 := r.cpyStatusV1(); v1 != nil { + statusCpy = &pb.StatusV2{ + ForkDigest: v1.ForkDigest, + FinalizedRoot: v1.FinalizedRoot, + FinalizedEpoch: v1.FinalizedEpoch, + HeadRoot: v1.HeadRoot, + HeadSlot: v1.HeadSlot, + EarliestAvailableSlot: 0, + } + } + } + + if statusCpy != nil { + if err := r.writeResponse(ctx, upstream, statusCpy); err != nil { + return nil, fmt.Errorf("write status V2 response to peer: %w", err) + } + + traceData := map[string]any{ + "Request": statusTraceData(req), + "Response": statusTraceData(statusCpy), + } + + return traceData, upstream.Close() + } + return nil, fmt.Errorf("no status available") + } + + // we got a valid response from our delegate node. Update our own status + r.SetStatusV2(resp) + } + + // let the upstream peer (who initiated the request) know the latest status + if err := r.writeResponse(ctx, upstream, resp); err != nil { + return nil, fmt.Errorf("respond status V2 to upstream: %w", err) + } + + traceData := map[string]any{ + "Request": statusTraceData(req), + "Response": statusTraceData(resp), + } + + return traceData, nil +} + +// metadataV3Handler handles MetadataV3 protocol requests (returns MetaDataV2 type) +func (r *ReqResp) metadataV3Handler(ctx context.Context, stream network.Stream) (map[string]any, error) { + metaData := r.cpyMetadataV2() + if metaData == nil { + // Try to upgrade from V1 or V0 + r.metaDataMu.RLock() + if r.metadataHolder.GetV1() != nil { + md := r.metadataHolder.GetV1() + metaData = &pb.MetaDataV2{ + SeqNumber: md.SeqNumber, + Attnets: md.Attnets, + Syncnets: md.Syncnets, + CustodyGroupCount: 0, // Default to 0 when upgrading + } + } else if r.metadataHolder.GetV0() != nil { + md := r.metadataHolder.GetV0() + metaData = &pb.MetaDataV2{ + SeqNumber: md.SeqNumber, + Attnets: md.Attnets, + Syncnets: nil, // V0 doesn't have syncnets + CustodyGroupCount: 0, // Default to 0 when upgrading + } + } + r.metaDataMu.RUnlock() + } + + if metaData == nil { + return nil, fmt.Errorf("no metadata available") + } + + if err := r.writeResponse(ctx, stream, metaData); err != nil { + return nil, fmt.Errorf("write meta data v3: %w", err) + } + + traceData := map[string]any{ + "SeqNumber": metaData.SeqNumber, + "Attnets": hex.EncodeToString(metaData.Attnets.Bytes()), + "CustodyGroupCount": metaData.CustodyGroupCount, + } + + if metaData.Syncnets != nil { + traceData["Syncnets"] = hex.EncodeToString(metaData.Syncnets.Bytes()) + } + + slog.Info( + "metadata V3 response", + "attnets", metaData.Attnets, + "synccommittees", metaData.Syncnets, + "custody_group_count", metaData.CustodyGroupCount, + ) + return traceData, stream.Close() +} + func (r *ReqResp) blocksByRangeV2Handler(ctx context.Context, stream network.Stream) (map[string]any, error) { if stream.Conn().RemotePeer() == r.delegate { return nil, fmt.Errorf("blocks by range request from delegate peer") @@ -600,6 +1005,7 @@ func (r *ReqResp) delegateStream(ctx context.Context, upstream network.Stream) e return nil } +// Status requests V1 status from a peer func (r *ReqResp) Status(ctx context.Context, pid peer.ID) (status *pb.Status, err error) { defer func() { av, err := r.host.Peerstore().Get(pid, "AgentVersion") @@ -642,7 +1048,7 @@ func (r *ReqResp) Status(ctx context.Context, pid peer.ID) (status *pb.Status, e r.meterRequestCounter.Add(traceCtx, 1, metric.WithAttributes(attrs...)) }() - slog.Info("Perform status request", tele.LogAttrPeerID(pid)) + slog.Info("Perform status V1 request", tele.LogAttrPeerID(pid)) stream, err := r.host.NewStream(ctx, pid, r.protocolID(p2p.RPCStatusTopicV1)) if err != nil { return nil, fmt.Errorf("new stream to peer %s: %w", pid, err) @@ -650,9 +1056,21 @@ func (r *ReqResp) Status(ctx context.Context, pid peer.ID) (status *pb.Status, e defer logDeferErr(stream.Reset, "failed closing stream") // no-op if closed // actually write the data to the stream - req := r.cpyStatus() + req := r.cpyStatusV1() if req == nil { - return nil, fmt.Errorf("status unknown") + // Try to downgrade from V2 if we only have V2 + if r.statusHolder.IsV2() { + v2 := r.statusHolder.GetV2() + req = &pb.Status{ + ForkDigest: v2.ForkDigest, + FinalizedRoot: v2.FinalizedRoot, + FinalizedEpoch: v2.FinalizedEpoch, + HeadRoot: v2.HeadRoot, + HeadSlot: v2.HeadSlot, + } + } else { + return nil, fmt.Errorf("status unknown") + } } if err := r.writeRequest(ctx, stream, req); err != nil { @@ -667,7 +1085,98 @@ func (r *ReqResp) Status(ctx context.Context, pid peer.ID) (status *pb.Status, e // if we requested the status from our delegate if stream.Conn().RemotePeer() == r.delegate { - r.SetStatus(resp) + r.SetStatusV1(resp) + } + + // we have the data that we want, so ignore error here + _ = stream.Close() // (both sides should actually be already closed) + + return resp, nil +} + +// StatusV2 requests V2 status from a peer +func (r *ReqResp) StatusV2(ctx context.Context, pid peer.ID) (status *pb.StatusV2, err error) { + defer func() { + av, err := r.host.Peerstore().Get(pid, "AgentVersion") + if err != nil { + av = "unknown" + } + + reqData := map[string]any{ + "AgentVersion": av, + "PeerID": pid.String(), + } + if status != nil { + reqData["ForkDigest"] = hex.EncodeToString(status.ForkDigest) + reqData["HeadRoot"] = hex.EncodeToString(status.HeadRoot) + reqData["HeadSlot"] = status.HeadSlot + reqData["FinalizedRoot"] = hex.EncodeToString(status.FinalizedRoot) + reqData["FinalizedEpoch"] = status.FinalizedEpoch + reqData["EarliestAvailableSlot"] = status.EarliestAvailableSlot + } + + if err != nil { + reqData["Error"] = err.Error() + } + + traceEvt := &hermeshost.TraceEvent{ + Type: "REQUEST_STATUS", + PeerID: r.host.ID(), + Timestamp: time.Now(), + Payload: reqData, + } + + traceCtx := context.Background() + if err := r.cfg.DataStream.PutRecord(traceCtx, traceEvt); err != nil { + slog.Warn("failed to put record", tele.LogAttrError(err)) + } + + attrs := []attribute.KeyValue{ + attribute.String("rpc", "status_v2"), + attribute.Bool("success", err == nil), + } + r.meterRequestCounter.Add(traceCtx, 1, metric.WithAttributes(attrs...)) + }() + + slog.Info("Perform status V2 request", tele.LogAttrPeerID(pid)) + stream, err := r.host.NewStream(ctx, pid, r.protocolID(p2p.RPCStatusTopicV2)) + if err != nil { + return nil, fmt.Errorf("new stream to peer %s: %w", pid, err) + } + defer logDeferErr(stream.Reset, "failed closing stream") // no-op if closed + + // actually write the data to the stream + req := r.cpyStatusV2() + if req == nil { + // If we don't have V2, upgrade from V1 + if !r.statusHolder.IsV2() && r.statusHolder.GetV1() != nil { + v1 := r.statusHolder.GetV1() + req = &pb.StatusV2{ + ForkDigest: v1.ForkDigest, + FinalizedRoot: v1.FinalizedRoot, + FinalizedEpoch: v1.FinalizedEpoch, + HeadRoot: v1.HeadRoot, + HeadSlot: v1.HeadSlot, + EarliestAvailableSlot: 0, // Default to 0 if upgrading from V1 + } + } else { + return nil, fmt.Errorf("status unknown") + } + } + + if err := r.writeRequest(ctx, stream, req); err != nil { + return nil, fmt.Errorf("write status V2 request: %w", err) + } + + // read and decode status response + resp := &pb.StatusV2{} + if err := r.readResponse(ctx, stream, resp); err != nil { + return nil, fmt.Errorf("read status V2 response: %w", err) + } + + // if we requested the status from our delegate + if stream.Conn().RemotePeer() == r.delegate { + r.SetStatusV2(resp) } // we have the data that we want, so ignore error here @@ -706,7 +1215,7 @@ func (r *ReqResp) Ping(ctx context.Context, pid peer.ID) (err error) { defer logDeferErr(stream.Reset, "failed closing stream") // no-op if closed r.metaDataMu.RLock() - seqNum := r.metaData.SeqNumber + seqNum := r.metadataHolder.SeqNumber() r.metaDataMu.RUnlock() req := primitives.SSZUint64(seqNum) @@ -726,6 +1235,7 @@ func (r *ReqResp) Ping(ctx context.Context, pid peer.ID) (err error) { return nil } +// MetaData requests V1 metadata from a peer func (r *ReqResp) MetaData(ctx context.Context, pid peer.ID) (resp *pb.MetaDataV1, err error) { defer func() { reqData := map[string]any{ @@ -760,17 +1270,72 @@ func (r *ReqResp) MetaData(ctx context.Context, pid peer.ID) (resp *pb.MetaDataV r.meterRequestCounter.Add(traceCtx, 1, metric.WithAttributes(attrs...)) }() - slog.Debug("Perform metadata request", tele.LogAttrPeerID(pid)) + slog.Debug("Perform metadata V1 request", tele.LogAttrPeerID(pid)) stream, err := r.host.NewStream(ctx, pid, r.protocolID(p2p.RPCMetaDataTopicV2)) if err != nil { return resp, fmt.Errorf("new %s stream to peer %s: %w", p2p.RPCMetaDataTopicV2, pid, err) } defer logDeferErr(stream.Reset, "failed closing stream") // no-op if closed - // read and decode status response + // read and decode metadata response resp = &pb.MetaDataV1{} if err := r.readResponse(ctx, stream, resp); err != nil { - return resp, fmt.Errorf("read ping response: %w", err) + return resp, fmt.Errorf("read metadata response: %w", err) + } + + // we have the data that we want, so ignore error here + _ = stream.Close() // (both sides should actually be already closed) + + return resp, nil +} + +// MetaDataV2 requests V2 metadata from a peer (includes custody group count) +func (r *ReqResp) MetaDataV2(ctx context.Context, pid peer.ID) (resp *pb.MetaDataV2, err error) { + defer func() { + reqData := map[string]any{ + "PeerID": pid.String(), + } + + if resp != nil { + reqData["SeqNumber"] = resp.SeqNumber + reqData["Attnets"] = resp.Attnets + reqData["Syncnets"] = resp.Syncnets + reqData["CustodyGroupCount"] = resp.CustodyGroupCount + } + + if err != nil { + reqData["Error"] = err.Error() + } + + traceEvt := &hermeshost.TraceEvent{ + Type: "REQUEST_METADATA_V2", + PeerID: r.host.ID(), + Timestamp: time.Now(), + Payload: reqData, + } + traceCtx := context.Background() + if err := r.cfg.DataStream.PutRecord(traceCtx, traceEvt); err != nil { + slog.Warn("failed to put record", tele.LogAttrError(err)) + } + + attrs := []attribute.KeyValue{ + attribute.String("rpc", "meta_data_v2"), + attribute.Bool("success", err == nil), + } + r.meterRequestCounter.Add(traceCtx, 1, metric.WithAttributes(attrs...)) + }() + + slog.Debug("Perform metadata V2 request", tele.LogAttrPeerID(pid)) + stream, err := r.host.NewStream(ctx, pid, r.protocolID(p2p.RPCMetaDataTopicV3)) + if err != nil { + return resp, fmt.Errorf("new %s stream to peer %s: %w", p2p.RPCMetaDataTopicV3, pid, err) + } + defer logDeferErr(stream.Reset, "failed closing stream") // no-op if closed + + // read and decode metadata response + resp = &pb.MetaDataV2{} + if err := r.readResponse(ctx, stream, resp); err != nil { + return resp, fmt.Errorf("read metadata V2 response: %w", err) } // we have the data that we want, so ignore error here diff --git a/eth/reqresp_test.go b/eth/reqresp_test.go index 0e0cd67..da3bb52 100644 --- a/eth/reqresp_test.go +++ b/eth/reqresp_test.go @@ -8,7 +8,12 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/core/signing" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" + pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/probe-lab/hermes/host" + "github.com/prysmaticlabs/go-bitfield" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" ) @@ -32,7 +37,9 @@ func TestReqResp_ProtocolRequests(t *testing.T) { // try all the request as the node is initialized requestPing(t, ctx, ethNode) requestStatus(t, ctx, ethNode) + requestStatusV2(t, ctx, ethNode) requestMetaDataV2(t, ctx, ethNode) + requestMetaDataV3(t, ctx, ethNode) requestBlockByRangeV2(t, ctx, ethNode) } @@ -46,11 +53,21 @@ func requestStatus(t *testing.T, ctx context.Context, ethNode *Node) { require.NoError(t, err) } +func requestStatusV2(t *testing.T, ctx context.Context, ethNode *Node) { + _, err := ethNode.reqResp.StatusV2(ctx, ethNode.pryInfo.ID) + require.NoError(t, err) +} + func requestMetaDataV2(t *testing.T, ctx context.Context, ethNode *Node) { _, err := ethNode.reqResp.MetaData(ctx, ethNode.pryInfo.ID) require.NoError(t, err) } +func requestMetaDataV3(t *testing.T, ctx context.Context, ethNode *Node) { + _, err := ethNode.reqResp.MetaDataV2(ctx, ethNode.pryInfo.ID) + require.NoError(t, err) +} + func requestBlockByRangeV2(t *testing.T, ctx context.Context, ethNode *Node) { chainHead, err := ethNode.pryClient.ChainHead(ctx) require.NoError(t, err) @@ -113,3 +130,231 @@ func composeLocalEthNode(t *testing.T, ctx context.Context) (*Node, context.Canc }() return ethNode, cancel } + +// TestStatusHolder tests the StatusHolder version-aware wrapper +func TestStatusHolder(t *testing.T) { + holder := &StatusHolder{} + + // Test V1 status + statusV1 := &pb.Status{ + ForkDigest: []byte{1, 2, 3, 4}, + FinalizedRoot: []byte("finalized_root_v1"), + FinalizedEpoch: 100, + HeadRoot: []byte("head_root_v1"), + HeadSlot: 1000, + } + + holder.SetV1(statusV1) + assert.True(t, holder.GetV1() != nil) + assert.False(t, holder.IsV2()) + assert.Equal(t, statusV1.ForkDigest, holder.ForkDigest()) + assert.Equal(t, statusV1.FinalizedEpoch, holder.FinalizedEpoch()) + assert.Equal(t, statusV1.HeadSlot, holder.HeadSlot()) + + // Test V2 status + statusV2 := &pb.StatusV2{ + ForkDigest: []byte{5, 6, 7, 8}, + FinalizedRoot: []byte("finalized_root_v2"), + FinalizedEpoch: 200, + HeadRoot: []byte("head_root_v2"), + HeadSlot: 2000, + EarliestAvailableSlot: 1500, + } + + holder.SetV2(statusV2) + assert.True(t, holder.GetV2() != nil) + assert.True(t, holder.IsV2()) + assert.Nil(t, holder.GetV1()) // V1 should be cleared + assert.Equal(t, statusV2.ForkDigest, holder.ForkDigest()) + assert.Equal(t, statusV2.FinalizedEpoch, holder.FinalizedEpoch()) + assert.Equal(t, statusV2.HeadSlot, holder.HeadSlot()) + + // Test EarliestAvailableSlot + slot, hasSlot := holder.EarliestAvailableSlot() + assert.True(t, hasSlot) + assert.Equal(t, primitives.Slot(1500), slot) + + // Test with V1 (no earliest slot) + holder.SetV1(statusV1) + slot, hasSlot = holder.EarliestAvailableSlot() + assert.False(t, hasSlot) + assert.Equal(t, primitives.Slot(0), slot) +} + +// TestMetadataHolder tests the MetadataHolder version-aware wrapper +func TestMetadataHolder(t *testing.T) { + holder := &MetadataHolder{} + + // Test V0 metadata + metaV0 := &pb.MetaDataV0{ + SeqNumber: 1, + Attnets: bitfield.Bitvector64{0xFF, 0x00}, + } + + holder.SetV0(metaV0) + assert.NotNil(t, holder.GetV0()) + assert.Nil(t, holder.GetV1()) + assert.Nil(t, holder.GetV2()) + assert.Equal(t, 0, holder.Version()) + assert.Equal(t, uint64(1), holder.SeqNumber()) + assert.Equal(t, metaV0.Attnets, holder.Attnets()) + + // V0 doesn't have syncnets + syncnets, hasSyncnets := holder.Syncnets() + assert.False(t, hasSyncnets) + assert.Equal(t, bitfield.Bitvector4{}, syncnets) + + // Test V1 metadata + metaV1 := &pb.MetaDataV1{ + SeqNumber: 2, + Attnets: bitfield.Bitvector64{0xAA, 0xBB}, + Syncnets: bitfield.Bitvector4{0x0F}, + } + + holder.SetV1(metaV1) + assert.Nil(t, holder.GetV0()) // V0 should be cleared + assert.NotNil(t, holder.GetV1()) + assert.Nil(t, holder.GetV2()) + assert.Equal(t, 1, holder.Version()) + assert.Equal(t, uint64(2), holder.SeqNumber()) + + syncnets, hasSyncnets = holder.Syncnets() + assert.True(t, hasSyncnets) + assert.Equal(t, metaV1.Syncnets, syncnets) + + // Test V2 metadata + metaV2 := &pb.MetaDataV2{ + SeqNumber: 3, + Attnets: bitfield.Bitvector64{0xCC, 0xDD}, + Syncnets: bitfield.Bitvector4{0x0A}, + CustodyGroupCount: 16, + } + + holder.SetV2(metaV2) + assert.Nil(t, holder.GetV0()) + assert.Nil(t, holder.GetV1()) // V1 should be cleared + assert.NotNil(t, holder.GetV2()) + assert.Equal(t, 2, holder.Version()) + assert.Equal(t, uint64(3), holder.SeqNumber()) + + // Test CustodyGroupCount + count, hasCount := holder.CustodyGroupCount() + assert.True(t, hasCount) + assert.Equal(t, uint64(16), count) + + // Test with V1 (no custody group count) + holder.SetV1(metaV1) + count, hasCount = holder.CustodyGroupCount() + assert.False(t, hasCount) + assert.Equal(t, uint64(0), count) +} + +// TestForkAwareMetadataInit tests that ReqResp initializes metadata correctly based on fork +func TestForkAwareMetadataInit(t *testing.T) { + // Mock host and data stream + mockHost := &host.Host{} + + // Test Pre-Altair (should use V0) + t.Run("PreAltair", func(t *testing.T) { + cfg := &ReqRespConfig{ + ForkDigest: [4]byte{1, 2, 3, 4}, + Encoder: encoder.SszNetworkEncoder{}, + BeaconConfig: ¶ms.BeaconChainConfig{ + AltairForkEpoch: 1000, // Future epoch + FuluForkEpoch: params.BeaconConfig().FarFutureEpoch, + }, + GenesisConfig: &GenesisConfig{ + GenesisTime: time.Now().Add(-time.Hour), // Started 1 hour ago + }, + AttestationSubnetConfig: &SubnetConfig{Subnets: []uint64{0, 1}}, + SyncSubnetConfig: &SubnetConfig{Subnets: []uint64{0}}, + Tracer: otel.GetTracerProvider().Tracer("test"), + Meter: otel.GetMeterProvider().Meter("test"), + } + + reqResp, err := NewReqResp(mockHost, cfg) + assert.NoError(t, err) + assert.NotNil(t, reqResp) + assert.Equal(t, 0, reqResp.metadataHolder.Version()) + assert.NotNil(t, reqResp.metadataHolder.GetV0()) + assert.Nil(t, reqResp.metadataHolder.GetV1()) + assert.Nil(t, reqResp.metadataHolder.GetV2()) + }) + + // Test Altair (should use V1) + t.Run("Altair", func(t *testing.T) { + cfg := &ReqRespConfig{ + ForkDigest: [4]byte{1, 2, 3, 4}, + Encoder: encoder.SszNetworkEncoder{}, + BeaconConfig: ¶ms.BeaconChainConfig{ + AltairForkEpoch: 0, // Already activated + FuluForkEpoch: params.BeaconConfig().FarFutureEpoch, + }, + GenesisConfig: &GenesisConfig{ + GenesisTime: time.Now().Add(-time.Hour), // Started 1 hour ago + }, + AttestationSubnetConfig: &SubnetConfig{Subnets: []uint64{0, 1}}, + SyncSubnetConfig: &SubnetConfig{Subnets: []uint64{0}}, + Tracer: otel.GetTracerProvider().Tracer("test"), + Meter: otel.GetMeterProvider().Meter("test"), + } + + reqResp, err := NewReqResp(mockHost, cfg) + assert.NoError(t, err) + assert.NotNil(t, reqResp) + assert.Equal(t, 1, reqResp.metadataHolder.Version()) + assert.Nil(t, reqResp.metadataHolder.GetV0()) + assert.NotNil(t, reqResp.metadataHolder.GetV1()) + assert.Nil(t, reqResp.metadataHolder.GetV2()) + }) + + // Test Fulu (should use V2) + t.Run("Fulu", func(t *testing.T) { + cfg := &ReqRespConfig{ + ForkDigest: [4]byte{1, 2, 3, 4}, + Encoder: encoder.SszNetworkEncoder{}, + BeaconConfig: ¶ms.BeaconChainConfig{ + AltairForkEpoch: 0, // Already activated + FuluForkEpoch: 0, // Already activated + }, + GenesisConfig: &GenesisConfig{ + GenesisTime: time.Now().Add(-time.Hour), // Started 1 hour ago + }, + AttestationSubnetConfig: &SubnetConfig{Subnets: []uint64{0, 1}}, + SyncSubnetConfig: &SubnetConfig{Subnets: []uint64{0}}, + Tracer: otel.GetTracerProvider().Tracer("test"), + Meter: otel.GetMeterProvider().Meter("test"), + } + + reqResp, err := NewReqResp(mockHost, cfg) + assert.NoError(t, err) + assert.NotNil(t, reqResp) + assert.Equal(t, 2, reqResp.metadataHolder.Version()) + assert.Nil(t, reqResp.metadataHolder.GetV0()) + assert.Nil(t, reqResp.metadataHolder.GetV1()) + assert.NotNil(t, reqResp.metadataHolder.GetV2()) + + // Check custody group count + count, hasCount := reqResp.metadataHolder.CustodyGroupCount() + assert.True(t, hasCount) + assert.Equal(t, uint64(0), count) // TODO: Should be configured value + }) + + // Test without fork config (should default to V1) + t.Run("NoForkConfig", func(t *testing.T) { + cfg := &ReqRespConfig{ + ForkDigest: [4]byte{1, 2, 3, 4}, + Encoder: encoder.SszNetworkEncoder{}, + AttestationSubnetConfig: &SubnetConfig{Subnets: []uint64{0, 1}}, + SyncSubnetConfig: &SubnetConfig{Subnets: []uint64{0}}, + Tracer: otel.GetTracerProvider().Tracer("test"), + Meter: otel.GetMeterProvider().Meter("test"), + } + + reqResp, err := NewReqResp(mockHost, cfg) + assert.NoError(t, err) + assert.NotNil(t, reqResp) + assert.Equal(t, 1, reqResp.metadataHolder.Version()) // Defaults to V1 + assert.NotNil(t, reqResp.metadataHolder.GetV1()) + }) +} diff --git a/eth/reqresp_types.go b/eth/reqresp_types.go new file mode 100644 index 0000000..582093c --- /dev/null +++ b/eth/reqresp_types.go @@ -0,0 +1,207 @@ +package eth + +import ( + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" + pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/go-bitfield" +) + +// StatusHolder wraps different status versions +type StatusHolder struct { + v1 *pb.Status + v2 *pb.StatusV2 +} + +// SetV1 sets the V1 status +func (s *StatusHolder) SetV1(status *pb.Status) { + s.v1 = status + s.v2 = nil +} + +// SetV2 sets the V2 status +func (s *StatusHolder) SetV2(status *pb.StatusV2) { + s.v2 = status + s.v1 = nil +} + +// GetV1 returns the V1 status or nil if not set +func (s *StatusHolder) GetV1() *pb.Status { + return s.v1 +} + +// GetV2 returns the V2 status or nil if not set +func (s *StatusHolder) GetV2() *pb.StatusV2 { + return s.v2 +} + +// IsV2 returns true if this holder contains a V2 status +func (s *StatusHolder) IsV2() bool { + return s.v2 != nil +} + +// ForkDigest returns the fork digest from either version +func (s *StatusHolder) ForkDigest() []byte { + if s.v2 != nil { + return s.v2.ForkDigest + } + if s.v1 != nil { + return s.v1.ForkDigest + } + return nil +} + +// FinalizedEpoch returns the finalized epoch from either version +func (s *StatusHolder) FinalizedEpoch() primitives.Epoch { + if s.v2 != nil { + return s.v2.FinalizedEpoch + } + if s.v1 != nil { + return s.v1.FinalizedEpoch + } + return 0 +} + +// FinalizedRoot returns the finalized root from either version +func (s *StatusHolder) FinalizedRoot() []byte { + if s.v2 != nil { + return s.v2.FinalizedRoot + } + if s.v1 != nil { + return s.v1.FinalizedRoot + } + return nil +} + +// HeadRoot returns the head root from either version +func (s *StatusHolder) HeadRoot() []byte { + if s.v2 != nil { + return s.v2.HeadRoot + } + if s.v1 != nil { + return s.v1.HeadRoot + } + return nil +} + +// HeadSlot returns the head slot from either version +func (s *StatusHolder) HeadSlot() primitives.Slot { + if s.v2 != nil { + return s.v2.HeadSlot + } + if s.v1 != nil { + return s.v1.HeadSlot + } + return 0 +} + +// EarliestAvailableSlot returns the earliest available slot if V2, otherwise returns 0 and false +func (s *StatusHolder) EarliestAvailableSlot() (primitives.Slot, bool) { + if s.v2 != nil { + return s.v2.EarliestAvailableSlot, true + } + return 0, false +} + +// MetadataHolder wraps different metadata versions +type MetadataHolder struct { + v0 *pb.MetaDataV0 + v1 *pb.MetaDataV1 + v2 *pb.MetaDataV2 +} + +// SetV0 sets the V0 metadata +func (m *MetadataHolder) SetV0(md *pb.MetaDataV0) { + m.v0 = md + m.v1 = nil + m.v2 = nil +} + +// SetV1 sets the V1 metadata +func (m *MetadataHolder) SetV1(md *pb.MetaDataV1) { + m.v0 = nil + m.v1 = md + m.v2 = nil +} + +// SetV2 sets the V2 metadata +func (m *MetadataHolder) SetV2(md *pb.MetaDataV2) { + m.v0 = nil + m.v1 = nil + m.v2 = md +} + +// GetV0 returns the V0 metadata or nil if not set +func (m *MetadataHolder) GetV0() *pb.MetaDataV0 { + return m.v0 +} + +// GetV1 returns the V1 metadata or nil if not set +func (m *MetadataHolder) GetV1() *pb.MetaDataV1 { + return m.v1 +} + +// GetV2 returns the V2 metadata or nil if not set +func (m *MetadataHolder) GetV2() *pb.MetaDataV2 { + return m.v2 +} + +// SeqNumber returns the sequence number from any version +func (m *MetadataHolder) SeqNumber() uint64 { + if m.v2 != nil { + return m.v2.SeqNumber + } + if m.v1 != nil { + return m.v1.SeqNumber + } + if m.v0 != nil { + return m.v0.SeqNumber + } + return 0 +} + +// Attnets returns the attestation subnets from any version +func (m *MetadataHolder) Attnets() bitfield.Bitvector64 { + if m.v2 != nil { + return m.v2.Attnets + } + if m.v1 != nil { + return m.v1.Attnets + } + if m.v0 != nil { + return m.v0.Attnets + } + return bitfield.Bitvector64{} +} + +// Syncnets returns the sync committee subnets if available (V1 and V2 only) +func (m *MetadataHolder) Syncnets() (bitfield.Bitvector4, bool) { + if m.v2 != nil { + return m.v2.Syncnets, true + } + if m.v1 != nil { + return m.v1.Syncnets, true + } + return bitfield.Bitvector4{}, false +} + +// CustodyGroupCount returns the custody group count if V2, otherwise returns 0 and false +func (m *MetadataHolder) CustodyGroupCount() (uint64, bool) { + if m.v2 != nil { + return m.v2.CustodyGroupCount, true + } + return 0, false +} + +// Version returns the version number of the stored metadata +func (m *MetadataHolder) Version() int { + if m.v2 != nil { + return 2 + } + if m.v1 != nil { + return 1 + } + if m.v0 != nil { + return 0 + } + return -1 +} diff --git a/eth/subnets.go b/eth/subnets.go index 9bbce6f..fc888f3 100644 --- a/eth/subnets.go +++ b/eth/subnets.go @@ -52,6 +52,9 @@ func HasSubnets(topic string) (subnets uint64, hasSubnets bool) { case p2p.GossipBlobSidecarMessage: return GlobalBeaconConfig.BlobsidecarSubnetCountElectra, true + case p2p.GossipDataColumnSidecarMessage: + return GlobalBeaconConfig.DataColumnSidecarSubnetCount, true + default: return uint64(0), false } diff --git a/eth/topic_score_params.go b/eth/topic_score_params.go index 4660070..949f22e 100644 --- a/eth/topic_score_params.go +++ b/eth/topic_score_params.go @@ -67,7 +67,9 @@ func topicToScoreParamsMapper(topic string, activeValidators uint64) *pubsub.Top case strings.Contains(topic, p2p.GossipBlsToExecutionChangeMessage): return defaultBlsToExecutionChangeTopicParams() - case strings.Contains(topic, p2p.GossipBlobSidecarMessage): + case strings.Contains(topic, p2p.GossipBlobSidecarMessage), + strings.Contains(topic, p2p.GossipDataColumnSidecarMessage): + // Using the same scoring as blocks for blob and data column sidecars (following prysm's approach). return defaultBlockTopicParams() default: diff --git a/go.mod b/go.mod index 8ee6b4d..613b045 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,9 @@ module github.com/probe-lab/hermes -go 1.24.0 - -toolchain go1.24.2 +go 1.24.5 require ( - github.com/OffchainLabs/prysm/v6 v6.0.1 + github.com/OffchainLabs/prysm/v6 v6.0.5-rc.1.0.20250823011907-d48ed44c4c75 github.com/aws/aws-sdk-go-v2 v1.32.7 github.com/aws/aws-sdk-go-v2/config v1.28.7 github.com/aws/aws-sdk-go-v2/credentials v1.17.48 @@ -42,11 +40,11 @@ require ( go.opentelemetry.io/otel/exporters/prometheus v0.55.0 go.opentelemetry.io/otel/metric v1.36.0 go.opentelemetry.io/otel/sdk v1.34.0 - go.opentelemetry.io/otel/sdk/metric v1.33.0 + go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.36.0 golang.org/x/crypto v0.39.0 golang.org/x/time v0.11.0 - google.golang.org/grpc v1.70.0 + google.golang.org/grpc v1.71.0 google.golang.org/protobuf v1.36.6 gopkg.in/yaml.v3 v3.0.1 ) @@ -88,7 +86,7 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/deckarep/golang-set/v2 v2.7.0 // indirect - github.com/dgraph-io/ristretto v0.2.0 // indirect + github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/elastic/gosigar v0.14.3 // indirect @@ -231,7 +229,7 @@ require ( github.com/prometheus/procfs v0.16.1 // indirect github.com/prometheus/prom2json v1.4.1 // indirect github.com/prometheus/prometheus v0.301.0 // indirect - github.com/prysmaticlabs/gohashtree v0.0.4-beta.0.20240624100937-73632381301b // indirect + github.com/prysmaticlabs/gohashtree v0.0.5-beta // indirect github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c // indirect github.com/quic-go/qpack v0.5.1 // indirect github.com/quic-go/quic-go v0.51.0 // indirect diff --git a/go.sum b/go.sum index 18f6039..4b5d8b9 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,8 @@ github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= -github.com/OffchainLabs/prysm/v6 v6.0.1 h1:NwFsJTyPVQqhmEz4//GqAs+M6AjcWYqH9x+HvDGxgwk= -github.com/OffchainLabs/prysm/v6 v6.0.1/go.mod h1:7QuGy5ol0x0hx1Qmz0XcXJ3CnjWWKHpMiz++bUR0IIg= +github.com/OffchainLabs/prysm/v6 v6.0.5-rc.1.0.20250823011907-d48ed44c4c75 h1:Xypx50G+oKUjhSFKEzxDxnL9xtcAL4k4TnksNgjHWME= +github.com/OffchainLabs/prysm/v6 v6.0.5-rc.1.0.20250823011907-d48ed44c4c75/go.mod h1:IGaKrTBMkZO98GOZNBWof9mcnINwlczEJCCw+JotDEc= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s= @@ -190,12 +190,12 @@ github.com/deepmap/oapi-codegen v1.8.2/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRk github.com/dennis-tra/go-kinesis v0.0.0-20240326083914-7acf5f8dc24e h1:y6QyYh8YZyRQDdfnQqUgC5tRBmEwUFAjavnybKboCm4= github.com/dennis-tra/go-kinesis v0.0.0-20240326083914-7acf5f8dc24e/go.mod h1:5Hm3EOeNP1/lYm9qcFwWgYgjixQilwcZA+hZ05bUz54= github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ= -github.com/dgraph-io/ristretto v0.2.0 h1:XAfl+7cmoUDWW/2Lx8TGZQjjxIQ2Ley9DSf52dru4WE= -github.com/dgraph-io/ristretto v0.2.0/go.mod h1:8uBHCU/PBV4Ag0CJrP47b9Ofby5dqWNh4FicAdoqFNU= +github.com/dgraph-io/ristretto/v2 v2.2.0 h1:bkY3XzJcXoMuELV8F+vS8kzNgicwQFAaGINAEJdWGOM= +github.com/dgraph-io/ristretto/v2 v2.2.0/go.mod h1:RZrm63UmcBAaYWC1DotLYBmTvgkrs0+XhBd7Npn7/zI= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= -github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= -github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38= +github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= @@ -1006,8 +1006,8 @@ github.com/prysmaticlabs/fastssz v0.0.0-20241008181541-518c4ce73516 h1:xuVAdtz5S github.com/prysmaticlabs/fastssz v0.0.0-20241008181541-518c4ce73516/go.mod h1:h2OlIZD/M6wFvV3YMZbW16lFgh3Rsye00G44J2cwLyU= github.com/prysmaticlabs/go-bitfield v0.0.0-20240618144021-706c95b2dd15 h1:lC8kiphgdOBTcbTvo8MwkvpKjO0SlAgjv4xIK5FGJ94= github.com/prysmaticlabs/go-bitfield v0.0.0-20240618144021-706c95b2dd15/go.mod h1:8svFBIKKu31YriBG/pNizo9N0Jr9i5PQ+dFkxWg3x5k= -github.com/prysmaticlabs/gohashtree v0.0.4-beta.0.20240624100937-73632381301b h1:VK7thFOnhxAZ/5aolr5Os4beiubuD08WiuiHyRqgwks= -github.com/prysmaticlabs/gohashtree v0.0.4-beta.0.20240624100937-73632381301b/go.mod h1:HRuvtXLZ4WkaB1MItToVH2e8ZwKwZPY5/Rcby+CvvLY= +github.com/prysmaticlabs/gohashtree v0.0.5-beta h1:ct41mg7HyIZd7uoSM/ud23f+3DxQG9tlMlQG+BVX23c= +github.com/prysmaticlabs/gohashtree v0.0.5-beta/go.mod h1:HRuvtXLZ4WkaB1MItToVH2e8ZwKwZPY5/Rcby+CvvLY= github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c h1:9PHRCuO/VN0s9k+RmLykho7AjDxblNYI5bYKed16NPU= github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c/go.mod h1:ZRws458tYHS/Zs936OQ6oCrL+Ict5O4Xpwve1UQ6C9M= github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20230228205207-28762a7b9294 h1:q9wE0ZZRdTUAAeyFP/w0SwBEnCqlVy2+on6X2/e+eAU= @@ -1225,8 +1225,8 @@ go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCRE go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs= go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= -go.opentelemetry.io/otel/sdk/metric v1.33.0 h1:Gs5VK9/WUJhNXZgn8MR6ITatvAmKeIuCtNbsP3JkNqU= -go.opentelemetry.io/otel/sdk/metric v1.33.0/go.mod h1:dL5ykHZmm1B1nVRk9dDjChwDmt81MjVp3gLkQRwKf/Q= +go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= +go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w= go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= @@ -1503,8 +1503,8 @@ google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= -google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= -google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= +google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= +google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=