Skip to content

Commit

Permalink
Migrate off of removed StreamChainHead gRPC endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
adaszko committed Mar 12, 2024
1 parent 7f5974c commit 2401451
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 61 deletions.
4 changes: 2 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var (

var wg sync.WaitGroup
wg.Add(2)
go pkg.SubscribeToEpochs(ctx, s, &wg)
go pkg.SubscribeToEpochs(ctx, s, beacon, &wg)
go pkg.MonitorAttestationsAndProposals(ctx, s, beacon, plainPubkeys, &wg)

//Create Prometheus Metrics Client
Expand Down Expand Up @@ -96,7 +96,7 @@ var (

var wg sync.WaitGroup
wg.Add(2)
go pkg.SubscribeToEpochs(ctx, s, &wg)
go pkg.SubscribeToEpochs(ctx, s, beacon, &wg)
go pkg.MonitorSlashings(ctx, beacon, &wg)
defer wg.Wait()
},
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
github.com/prometheus/procfs v0.10.1 // indirect
github.com/prysmaticlabs/fastssz v0.0.0-20221107182844-78142813af44 // indirect
github.com/prysmaticlabs/gohashtree v0.0.4-beta // indirect
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc // indirect
github.com/r3labs/sse/v2 v2.10.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/supranational/blst v0.3.11 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ github.com/prysmaticlabs/prysm/v4 v4.1.1 h1:sbBkgfPzo/SGTJ5IimtsZSGECoRlhbowR1rE
github.com/prysmaticlabs/prysm/v4 v4.1.1/go.mod h1:+o907dc4mwEE0wJkQ8RrzCroC+q2WCzdCLtikwonw8c=
github.com/prysmaticlabs/prysm/v5 v5.0.1 h1:GCFxthIfOOLq0y6UVklNfntZypGWLbQqCCrVvhmH8T0=
github.com/prysmaticlabs/prysm/v5 v5.0.1/go.mod h1:RKq4hdSHNLPC6s56KCTDnZEKsxj6/lwsm1xW8SmCbj4=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8=
github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=
github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
Expand Down
75 changes: 27 additions & 48 deletions pkg/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"context"
"encoding/hex"
"fmt"
"io"
"os"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -18,13 +18,14 @@ import (

eth2client "github.com/attestantio/go-eth2-client"
"github.com/attestantio/go-eth2-client/api"
v1 "github.com/attestantio/go-eth2-client/api/v1"
eth2spec "github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/bellatrix"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethereum/go-ethereum/core/types"
"github.com/pkg/errors"
bitfield "github.com/prysmaticlabs/go-bitfield"
primitives "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
primitives "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/rs/zerolog/log"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -241,32 +242,28 @@ func ListBlocks(ctx context.Context, beacon *beaconchain.BeaconChain, epoch spec
result := make(map[spec.Slot][]*ChainBlock)
lastSlot := (epoch + 1) * spec.SLOTS_PER_EPOCH
for slot := epoch * spec.SLOTS_PER_EPOCH; slot < lastSlot; slot++ {
opCtx, cancel := context.WithTimeout(ctx, beacon.Timeout())
resp, err := blockHeadersProvider.BeaconBlockHeader(opCtx, &api.BeaconBlockHeaderOpts{
resp, err := blockHeadersProvider.BeaconBlockHeader(ctx, &api.BeaconBlockHeaderOpts{
Common: api.CommonOpts{},
Block: fmt.Sprint(slot),
Block: fmt.Sprintf("%v", slot),
})
cancel()

if resp == nil {
continue
}
log.Trace().Msgf("Block at slot=%v %v", slot, strings.TrimSuffix(resp.Data.Header.Message.String(), "\n"))
log.Trace().Msgf("Block at slot %v root=%v", slot, hex.EncodeToString(resp.Data.Root[:]))
if err != nil {
log.Error().Err(err).Msg("BeaconBlockHeader")
continue
}

proposerIndex := spec.ValidatorIndex(resp.Data.Header.Message.ProposerIndex)

opCtx, cancel = context.WithTimeout(ctx, beacon.Timeout())
signedBeaconBlock, err := blockProvider.SignedBeaconBlock(opCtx, &api.SignedBeaconBlockOpts{
Common: api.CommonOpts{},
Block: resp.Data.Root.String(),
signedBeaconBlock, err := blockProvider.SignedBeaconBlock(ctx, &api.SignedBeaconBlockOpts{
Block: strconv.FormatUint(uint64(resp.Data.Header.Message.Slot), 10),
})
cancel()
if err != nil {
return nil, err
log.Error().Err(err).Msg("SignedBeaconBlock")
continue
}

blockAttestations, err := signedBeaconBlock.Data.Attestations()
Expand Down Expand Up @@ -334,16 +331,14 @@ func ListBlocks(ctx context.Context, beacon *beaconchain.BeaconChain, epoch spec
// SubscribeToEpochs subscribes to changings of the beacon chain head.
// Note, if --replay-epoch or --since-epoch options passed, SubscribeToEpochs will not
// listen to real-time changes.
func SubscribeToEpochs(ctx context.Context, s *prysmgrpc.Service, wg *sync.WaitGroup) {
func SubscribeToEpochs(ctx context.Context, s *prysmgrpc.Service, beacon *beaconchain.BeaconChain, wg *sync.WaitGroup) {
defer wg.Done()

getEpoch := func(chainHead *ethpb.ChainHead) spec.Epoch {
return spec.Epoch(chainHead.JustifiedEpoch)
}

lastChainHead, err := s.GetChainHead()
chainHead, err := s.GetChainHead()
Must(err)

lastEpoch := uint64(chainHead.JustifiedEpoch)

if len(opts.Monitor.ReplayEpoch) > 0 {
for _, epoch := range opts.Monitor.ReplayEpoch {
epochsChan <- spec.Epoch(epoch)
Expand All @@ -352,43 +347,27 @@ func SubscribeToEpochs(ctx context.Context, s *prysmgrpc.Service, wg *sync.WaitG
return
}
if opts.Monitor.SinceEpoch != ^uint64(0) {
for epoch := opts.Monitor.SinceEpoch; epoch < getEpoch(lastChainHead); epoch++ {
for epoch := opts.Monitor.SinceEpoch; epoch < lastEpoch; epoch++ {
epochsChan <- epoch
}
close(epochsChan)
return
}

stream, err := s.StreamChainHead()
if err != nil {
log.Error().Err(err).Msg("StreamChainHead failed")
return
eventsHandlerFunc := func(event *v1.Event) {
headEvent := event.Data.(*v1.HeadEvent)
log.Trace().Msgf("New head slot %v block %v", headEvent.Slot, headEvent.Block.String())
thisEpoch := uint64(headEvent.Slot / spec.SLOTS_PER_EPOCH)
if thisEpoch > lastEpoch {
log.Trace().Msgf("New epoch %v at slot %v", thisEpoch, headEvent.Slot)
lastEpoch = thisEpoch
epochsChan <- thisEpoch
}
}
defer stream.CloseSend()

waitc := make(chan struct{})
go func() {
epochsChan <- getEpoch(lastChainHead)

for {
chainHead, err := stream.Recv()
if err == io.EOF {
waitc <- struct{}{}
return
}
if err != nil {
close(epochsChan)
Must(err)
}

if getEpoch(chainHead) > getEpoch(lastChainHead) {
lastChainHead = chainHead

epochsChan <- getEpoch(lastChainHead)
}
}
}()
<-waitc
eventsProvider := beacon.Service().(eth2client.EventsProvider)
err = eventsProvider.Events(ctx, []string{"head"}, eventsHandlerFunc)
Must(err)
}

type AttestationLoggingStatus struct {
Expand Down
11 changes: 0 additions & 11 deletions prysmgrpc/chainhead.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,6 @@ func (s *Service) GetChainHead() (*ethpb.ChainHead, error) {
return resp, nil
}

func (s *Service) StreamChainHead() (ethpb.BeaconChain_StreamChainHeadClient, error) {
conn := ethpb.NewBeaconChainClient(s.conn)

stream, err := conn.StreamChainHead(s.ctx, &empty.Empty{})
if err != nil {
return nil, err
}

return stream, nil
}

func (s *Service) GetGenesis() (*ethpb.Genesis, error) {
conn := ethpb.NewNodeClient(s.Connection())

Expand Down

0 comments on commit 2401451

Please sign in to comment.