From 240145166d87dfd3320f22b069c3bcf3b240f163 Mon Sep 17 00:00:00 2001 From: Adam Szkoda Date: Tue, 12 Mar 2024 09:51:15 +0100 Subject: [PATCH] Migrate off of removed StreamChainHead gRPC endpoint --- cmd/root.go | 4 +-- go.mod | 1 + go.sum | 2 ++ pkg/monitoring.go | 75 +++++++++++++++--------------------------- prysmgrpc/chainhead.go | 11 ------- 5 files changed, 32 insertions(+), 61 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 73d9816..c72cd8f 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -68,7 +68,7 @@ var ( var wg sync.WaitGroup wg.Add(2) - go pkg.SubscribeToEpochs(ctx, s, &wg) + go pkg.SubscribeToEpochs(ctx, s, beacon, &wg) go pkg.MonitorAttestationsAndProposals(ctx, s, beacon, plainPubkeys, &wg) //Create Prometheus Metrics Client @@ -96,7 +96,7 @@ var ( var wg sync.WaitGroup wg.Add(2) - go pkg.SubscribeToEpochs(ctx, s, &wg) + go pkg.SubscribeToEpochs(ctx, s, beacon, &wg) go pkg.MonitorSlashings(ctx, beacon, &wg) defer wg.Wait() }, diff --git a/go.mod b/go.mod index 71f6c9e..12c9a47 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/prometheus/procfs v0.10.1 // indirect github.com/prysmaticlabs/fastssz v0.0.0-20221107182844-78142813af44 // indirect github.com/prysmaticlabs/gohashtree v0.0.4-beta // indirect + github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc // indirect github.com/r3labs/sse/v2 v2.10.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/supranational/blst v0.3.11 // indirect diff --git a/go.sum b/go.sum index 36f80d5..afc7d5f 100644 --- a/go.sum +++ b/go.sum @@ -382,6 +382,8 @@ github.com/prysmaticlabs/prysm/v4 v4.1.1 h1:sbBkgfPzo/SGTJ5IimtsZSGECoRlhbowR1rE github.com/prysmaticlabs/prysm/v4 v4.1.1/go.mod h1:+o907dc4mwEE0wJkQ8RrzCroC+q2WCzdCLtikwonw8c= github.com/prysmaticlabs/prysm/v5 v5.0.1 h1:GCFxthIfOOLq0y6UVklNfntZypGWLbQqCCrVvhmH8T0= github.com/prysmaticlabs/prysm/v5 v5.0.1/go.mod h1:RKq4hdSHNLPC6s56KCTDnZEKsxj6/lwsm1xW8SmCbj4= +github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o= +github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8= github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0= github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= diff --git a/pkg/monitoring.go b/pkg/monitoring.go index c72b1c9..5f0898b 100644 --- a/pkg/monitoring.go +++ b/pkg/monitoring.go @@ -5,8 +5,8 @@ import ( "context" "encoding/hex" "fmt" - "io" "os" + "strconv" "strings" "sync" "time" @@ -18,13 +18,14 @@ import ( eth2client "github.com/attestantio/go-eth2-client" "github.com/attestantio/go-eth2-client/api" + v1 "github.com/attestantio/go-eth2-client/api/v1" eth2spec "github.com/attestantio/go-eth2-client/spec" "github.com/attestantio/go-eth2-client/spec/bellatrix" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ethereum/go-ethereum/core/types" "github.com/pkg/errors" bitfield "github.com/prysmaticlabs/go-bitfield" - primitives "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + primitives "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/rs/zerolog/log" "golang.org/x/exp/maps" @@ -241,17 +242,15 @@ func ListBlocks(ctx context.Context, beacon *beaconchain.BeaconChain, epoch spec result := make(map[spec.Slot][]*ChainBlock) lastSlot := (epoch + 1) * spec.SLOTS_PER_EPOCH for slot := epoch * spec.SLOTS_PER_EPOCH; slot < lastSlot; slot++ { - opCtx, cancel := context.WithTimeout(ctx, beacon.Timeout()) - resp, err := blockHeadersProvider.BeaconBlockHeader(opCtx, &api.BeaconBlockHeaderOpts{ + resp, err := blockHeadersProvider.BeaconBlockHeader(ctx, &api.BeaconBlockHeaderOpts{ Common: api.CommonOpts{}, - Block: fmt.Sprint(slot), + Block: fmt.Sprintf("%v", slot), }) - cancel() if resp == nil { continue } - log.Trace().Msgf("Block at slot=%v %v", slot, strings.TrimSuffix(resp.Data.Header.Message.String(), "\n")) + log.Trace().Msgf("Block at slot %v root=%v", slot, hex.EncodeToString(resp.Data.Root[:])) if err != nil { log.Error().Err(err).Msg("BeaconBlockHeader") continue @@ -259,14 +258,12 @@ func ListBlocks(ctx context.Context, beacon *beaconchain.BeaconChain, epoch spec proposerIndex := spec.ValidatorIndex(resp.Data.Header.Message.ProposerIndex) - opCtx, cancel = context.WithTimeout(ctx, beacon.Timeout()) - signedBeaconBlock, err := blockProvider.SignedBeaconBlock(opCtx, &api.SignedBeaconBlockOpts{ - Common: api.CommonOpts{}, - Block: resp.Data.Root.String(), + signedBeaconBlock, err := blockProvider.SignedBeaconBlock(ctx, &api.SignedBeaconBlockOpts{ + Block: strconv.FormatUint(uint64(resp.Data.Header.Message.Slot), 10), }) - cancel() if err != nil { - return nil, err + log.Error().Err(err).Msg("SignedBeaconBlock") + continue } blockAttestations, err := signedBeaconBlock.Data.Attestations() @@ -334,16 +331,14 @@ func ListBlocks(ctx context.Context, beacon *beaconchain.BeaconChain, epoch spec // SubscribeToEpochs subscribes to changings of the beacon chain head. // Note, if --replay-epoch or --since-epoch options passed, SubscribeToEpochs will not // listen to real-time changes. -func SubscribeToEpochs(ctx context.Context, s *prysmgrpc.Service, wg *sync.WaitGroup) { +func SubscribeToEpochs(ctx context.Context, s *prysmgrpc.Service, beacon *beaconchain.BeaconChain, wg *sync.WaitGroup) { defer wg.Done() - getEpoch := func(chainHead *ethpb.ChainHead) spec.Epoch { - return spec.Epoch(chainHead.JustifiedEpoch) - } - - lastChainHead, err := s.GetChainHead() + chainHead, err := s.GetChainHead() Must(err) + lastEpoch := uint64(chainHead.JustifiedEpoch) + if len(opts.Monitor.ReplayEpoch) > 0 { for _, epoch := range opts.Monitor.ReplayEpoch { epochsChan <- spec.Epoch(epoch) @@ -352,43 +347,27 @@ func SubscribeToEpochs(ctx context.Context, s *prysmgrpc.Service, wg *sync.WaitG return } if opts.Monitor.SinceEpoch != ^uint64(0) { - for epoch := opts.Monitor.SinceEpoch; epoch < getEpoch(lastChainHead); epoch++ { + for epoch := opts.Monitor.SinceEpoch; epoch < lastEpoch; epoch++ { epochsChan <- epoch } close(epochsChan) return } - stream, err := s.StreamChainHead() - if err != nil { - log.Error().Err(err).Msg("StreamChainHead failed") - return + eventsHandlerFunc := func(event *v1.Event) { + headEvent := event.Data.(*v1.HeadEvent) + log.Trace().Msgf("New head slot %v block %v", headEvent.Slot, headEvent.Block.String()) + thisEpoch := uint64(headEvent.Slot / spec.SLOTS_PER_EPOCH) + if thisEpoch > lastEpoch { + log.Trace().Msgf("New epoch %v at slot %v", thisEpoch, headEvent.Slot) + lastEpoch = thisEpoch + epochsChan <- thisEpoch + } } - defer stream.CloseSend() - - waitc := make(chan struct{}) - go func() { - epochsChan <- getEpoch(lastChainHead) - for { - chainHead, err := stream.Recv() - if err == io.EOF { - waitc <- struct{}{} - return - } - if err != nil { - close(epochsChan) - Must(err) - } - - if getEpoch(chainHead) > getEpoch(lastChainHead) { - lastChainHead = chainHead - - epochsChan <- getEpoch(lastChainHead) - } - } - }() - <-waitc + eventsProvider := beacon.Service().(eth2client.EventsProvider) + err = eventsProvider.Events(ctx, []string{"head"}, eventsHandlerFunc) + Must(err) } type AttestationLoggingStatus struct { diff --git a/prysmgrpc/chainhead.go b/prysmgrpc/chainhead.go index 1751c56..2c0b42b 100644 --- a/prysmgrpc/chainhead.go +++ b/prysmgrpc/chainhead.go @@ -21,17 +21,6 @@ func (s *Service) GetChainHead() (*ethpb.ChainHead, error) { return resp, nil } -func (s *Service) StreamChainHead() (ethpb.BeaconChain_StreamChainHeadClient, error) { - conn := ethpb.NewBeaconChainClient(s.conn) - - stream, err := conn.StreamChainHead(s.ctx, &empty.Empty{}) - if err != nil { - return nil, err - } - - return stream, nil -} - func (s *Service) GetGenesis() (*ethpb.Genesis, error) { conn := ethpb.NewNodeClient(s.Connection())