Skip to content

Commit

Permalink
feat: Serve blob_sidecar (#155)
Browse files Browse the repository at this point in the history
* fix: Add additional serving bundle checks

* feat: Subscribe to node events

* feat: Subscribe to node events

* fix: Panic on unhealthy node without an initialized wallclock

* Fix: Reduce checkpointz status edge caching time

* feat: Serve block_sidecars route

* Merge master

* fix: typo

* fix: Remove debug logs

* chore: linting

* feat: Handle no DENEB fork definition

* fix: Fix assignment in error handling condition

* fix: duplicate metrics
  • Loading branch information
samcm authored Feb 20, 2024
1 parent afd7dbe commit c7cf5df
Show file tree
Hide file tree
Showing 9 changed files with 349 additions and 27 deletions.
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ require (
github.com/prometheus/client_golang v1.16.0
github.com/sirupsen/logrus v1.9.1
github.com/spf13/cobra v1.6.1
github.com/stretchr/testify v1.8.4
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/ferranbt/fastssz v0.1.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
Expand All @@ -46,6 +48,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
Expand All @@ -68,4 +71,5 @@ require (
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
32 changes: 32 additions & 0 deletions pkg/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (h *Handler) Register(ctx context.Context, router *httprouter.Router) error
router.GET("/eth/v1/beacon/blocks/:block_id/root", h.wrappedHandler(h.handleEthV1BeaconBlocksRoot))
router.GET("/eth/v1/beacon/states/:state_id/finality_checkpoints", h.wrappedHandler(h.handleEthV1BeaconStatesFinalityCheckpoints))
router.GET("/eth/v1/beacon/deposit_snapshot", h.wrappedHandler(h.handleEthV1BeaconDepositSnapshot))
router.GET("/eth/v1/beacon/blob_sidecars/:block_id", h.wrappedHandler(h.handleEthV1BeaconBlobSidecars))

router.GET("/eth/v1/config/spec", h.wrappedHandler(h.handleEthV1ConfigSpec))
router.GET("/eth/v1/config/deposit_contract", h.wrappedHandler(h.handleEthV1ConfigDepositContract))
Expand Down Expand Up @@ -587,3 +588,34 @@ func (h *Handler) handleEthV1BeaconDepositSnapshot(ctx context.Context, r *http.
},
}), nil
}

func (h *Handler) handleEthV1BeaconBlobSidecars(ctx context.Context, r *http.Request, p httprouter.Params, contentType ContentType) (*HTTPResponse, error) {
if err := ValidateContentType(contentType, []ContentType{ContentTypeJSON}); err != nil {
return NewUnsupportedMediaTypeResponse(nil), err
}

id, err := eth.NewBlockIdentifier(p.ByName("block_id"))
if err != nil {
return NewBadRequestResponse(nil), err
}

sidecars, err := h.eth.BlobSidecars(ctx, id)
if err != nil {
return NewInternalServerErrorResponse(nil), err
}

rsp := NewSuccessResponse(ContentTypeResolvers{
ContentTypeJSON: func() ([]byte, error) {
return json.Marshal(sidecars)
},
})

switch id.Type() {
case eth.BlockIDFinalized, eth.BlockIDRoot:
rsp.SetCacheControl("public, s-max-age=6000")
default:
rsp.SetCacheControl("public, s-max-age=15")
}

return rsp, nil
}
6 changes: 4 additions & 2 deletions pkg/beacon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ type Config struct {
// Cache configuration holds configuration for the caches.
type CacheConfig struct {
// Blocks holds the block cache configuration.
Blocks store.Config `yaml:"blocks" default:"{\"MaxItems\": 200}"`
Blocks store.Config `yaml:"blocks" default:"{\"MaxItems\": 30}"`
// States holds the state cache configuration.
States store.Config `yaml:"states" default:"{\"MaxItems\": 5}"`
// DepositSnapshots holds the deposit snapshot cache configuration.
DepositSnapshots store.Config `yaml:"deposit_snapshots" default:"{\"MaxItems\": 50}"`
DepositSnapshots store.Config `yaml:"deposit_snapshots" default:"{\"MaxItems\": 30}"`
// BlobSidecars holds the blob sidecar cache configuration.
BlobSidecars store.Config `yaml:"blob_sidecars" default:"{\"MaxItems\": 30}"`
}

type FrontendConfig struct {
Expand Down
7 changes: 7 additions & 0 deletions pkg/beacon/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/chuckpreslar/emission"
"github.com/ethpandaops/beacon/pkg/beacon"
Expand Down Expand Up @@ -38,6 +39,7 @@ type Default struct {
blocks *store.Block
states *store.BeaconState
depositSnapshots *store.DepositSnapshot
blobSidecars *store.BlobSidecar

spec *state.Spec
genesis *v1.Genesis
Expand Down Expand Up @@ -79,6 +81,7 @@ func NewDefaultProvider(namespace string, log logrus.FieldLogger, nodes []node.C
blocks: store.NewBlock(log, config.Caches.Blocks, namespace),
states: store.NewBeaconState(log, config.Caches.States, namespace),
depositSnapshots: store.NewDepositSnapshot(log, config.Caches.DepositSnapshots, namespace),
blobSidecars: store.NewBlobSidecar(log, config.Caches.BlobSidecars, namespace),

servingMutex: sync.Mutex{},
historicalMutex: sync.Mutex{},
Expand Down Expand Up @@ -560,6 +563,10 @@ func (d *Default) GetBlockByStateRoot(ctx context.Context, stateRoot phase0.Root
return block, nil
}

func (d *Default) GetBlobSidecarsBySlot(ctx context.Context, slot phase0.Slot) ([]*deneb.BlobSidecar, error) {
return d.blobSidecars.GetBySlot(slot)
}

func (d *Default) GetBeaconStateBySlot(ctx context.Context, slot phase0.Slot) (*[]byte, error) {
block, err := d.GetBlockBySlot(ctx, slot)
if err != nil {
Expand Down
106 changes: 81 additions & 25 deletions pkg/beacon/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,47 +311,69 @@ func (d *Default) fetchBundle(ctx context.Context, root phase0.Root, upstream *N
}

if d.shouldDownloadStates() {
// If the state already exists, don't bother downloading it again.
existingState, err := d.states.GetByStateRoot(stateRoot)
if err == nil && existingState != nil {
d.log.Infof("Successfully fetched bundle from %s", upstream.Config.Name)

return block, nil
}

beaconState, err := upstream.Beacon.FetchRawBeaconState(ctx, eth.SlotAsString(slot), "application/octet-stream")
if err != nil {
return nil, fmt.Errorf("failed to fetch beacon state: %w", err)
}

if beaconState == nil {
return nil, errors.New("beacon state is nil")
}

expiresAt := time.Now().Add(FinalityHaltedServingPeriod)
if slot == phase0.Slot(0) {
expiresAt = time.Now().Add(999999 * time.Hour)
}

if err := d.states.Add(stateRoot, &beaconState, expiresAt, slot); err != nil {
return nil, fmt.Errorf("failed to store beacon state: %w", err)
// Download and store beacon state
if err = d.downloadAndStoreBeaconState(ctx, stateRoot, slot, upstream); err != nil {
return nil, fmt.Errorf("failed to download and store beacon state: %w", err)
}
}

if slot != phase0.Slot(0) {
epoch := phase0.Epoch(slot / d.spec.SlotsPerEpoch)

// Download and store deposit snapshots
if err := d.downloadAndStoreDepositSnapshot(ctx, epoch, upstream); err != nil {
if err = d.downloadAndStoreDepositSnapshot(ctx, epoch, upstream); err != nil {
return nil, fmt.Errorf("failed to download and store deposit snapshot: %w", err)
}
}

sp, err := upstream.Beacon.Spec()
if err != nil {
return nil, fmt.Errorf("failed to fetch spec from upstream node: %w", err)
}

denebFork, err := sp.ForkEpochs.GetByName("DENEB")
if err == nil && denebFork != nil {
if denebFork.Active(slot, sp.SlotsPerEpoch) {
// Download and store blob sidecars
if err := d.downloadAndStoreBlobSidecars(ctx, slot, upstream); err != nil {
return nil, fmt.Errorf("failed to download and store blob sidecars: %w", err)
}
}
}

d.log.Infof("Successfully fetched bundle from %s", upstream.Config.Name)

return block, nil
}

func (d *Default) downloadAndStoreBeaconState(ctx context.Context, stateRoot phase0.Root, slot phase0.Slot, node *Node) error {
// If the state already exists, don't bother downloading it again.
existingState, err := d.states.GetByStateRoot(stateRoot)
if err == nil && existingState != nil {
return nil
}

beaconState, err := node.Beacon.FetchRawBeaconState(ctx, eth.SlotAsString(slot), "application/octet-stream")
if err != nil {
return fmt.Errorf("failed to fetch beacon state: %w", err)
}

if beaconState == nil {
return errors.New("beacon state is nil")
}

expiresAt := time.Now().Add(FinalityHaltedServingPeriod)
if slot == phase0.Slot(0) {
expiresAt = time.Now().Add(999999 * time.Hour)
}

if err := d.states.Add(stateRoot, &beaconState, expiresAt, slot); err != nil {
return fmt.Errorf("failed to store beacon state: %w", err)
}

return nil
}

func (d *Default) downloadAndStoreDepositSnapshot(ctx context.Context, epoch phase0.Epoch, node *Node) error {
// Check if we already have the deposit snapshot.
if _, err := d.depositSnapshots.GetByEpoch(epoch); err == nil {
Expand Down Expand Up @@ -386,3 +408,37 @@ func (d *Default) downloadAndStoreDepositSnapshot(ctx context.Context, epoch pha

return nil
}

func (d *Default) downloadAndStoreBlobSidecars(ctx context.Context, slot phase0.Slot, node *Node) error {
// Check if we already have the blob sidecars.
if _, err := d.blobSidecars.GetBySlot(slot); err == nil {
return nil
}

// Download the blob sidecars from our upstream.
blobSidecars, err := node.Beacon.FetchBeaconBlockBlobs(ctx, eth.SlotAsString(slot))
if err != nil {
return err
}

if blobSidecars == nil {
return errors.New("invalid blob sidecars")
}

// Store for the FinalityHaltedServingPeriod to ensure we have them in case of non-finality.
// We'll let the store handle purging old items.
expiresAt := time.Now().Add(FinalityHaltedServingPeriod)

if err := d.blobSidecars.Add(slot, blobSidecars, expiresAt); err != nil {
return fmt.Errorf("failed to store blob sidecars: %w", err)
}

d.log.
WithFields(logrus.Fields{
"slot": slot,
"node": node.Config.Name,
}).
Infof("Downloaded and stored blob sidecar for slot %d", slot)

return nil
}
3 changes: 3 additions & 0 deletions pkg/beacon/finality_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethpandaops/beacon/pkg/beacon/api/types"
"github.com/ethpandaops/beacon/pkg/beacon/state"
Expand Down Expand Up @@ -47,6 +48,8 @@ type FinalityProvider interface {
GetBeaconStateByStateRoot(ctx context.Context, root phase0.Root) (*[]byte, error)
// GetBeaconStateByRoot returns the beacon sate with the given root.
GetBeaconStateByRoot(ctx context.Context, root phase0.Root) (*[]byte, error)
// GetBlobSidecarsBySlot returns the blob sidecars for the given slot.
GetBlobSidecarsBySlot(ctx context.Context, slot phase0.Slot) ([]*deneb.BlobSidecar, error)
// ListFinalizedSlots returns a slice of finalized slots.
ListFinalizedSlots(ctx context.Context) ([]phase0.Slot, error)
// GetEpochBySlot returns the epoch for the given slot.
Expand Down
63 changes: 63 additions & 0 deletions pkg/beacon/store/blob_sidecars.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package store

import (
"errors"
"time"

"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethpandaops/checkpointz/pkg/cache"
"github.com/ethpandaops/checkpointz/pkg/eth"
"github.com/sirupsen/logrus"
)

type BlobSidecar struct {
store *cache.TTLMap
log logrus.FieldLogger
}

func NewBlobSidecar(log logrus.FieldLogger, config Config, namespace string) *BlobSidecar {
d := &BlobSidecar{
log: log.WithField("component", "beacon/store/blob_sidecar"),
store: cache.NewTTLMap(config.MaxItems, "blob_sidecar", namespace),
}

d.store.OnItemDeleted(func(key string, value interface{}, expiredAt time.Time) {
d.log.WithField("key", key).WithField("expired_at", expiredAt.String()).Debug("Blob sidecar was deleted from the cache")
})

d.store.EnableMetrics(namespace)

return d
}

func (d *BlobSidecar) Add(slot phase0.Slot, sidecars []*deneb.BlobSidecar, expiresAt time.Time) error {
d.store.Add(eth.SlotAsString(slot), sidecars, expiresAt, false)

d.log.WithFields(
logrus.Fields{
"slot": eth.SlotAsString(slot),
"expires_at": expiresAt.String(),
},
).Debug("Added blob sidecar")

return nil
}

func (d *BlobSidecar) GetBySlot(slot phase0.Slot) ([]*deneb.BlobSidecar, error) {
data, _, err := d.store.Get(eth.SlotAsString(slot))
if err != nil {
return nil, err
}

return d.parseSidecar(data)
}

func (d *BlobSidecar) parseSidecar(data interface{}) ([]*deneb.BlobSidecar, error) {
sidecar, ok := data.([]*deneb.BlobSidecar)
if !ok {
return nil, errors.New("invalid blob sidecar type")
}

return sidecar, nil
}
47 changes: 47 additions & 0 deletions pkg/beacon/store/blob_sidecars_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package store

import (
"testing"
"time"

"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
)

func TestBlobSidecarAddAndGet(t *testing.T) {
logger, _ := test.NewNullLogger()
config := Config{MaxItems: 10}
namespace := "test_a"
blobSidecarStore := NewBlobSidecar(logger, config, namespace)

slot := phase0.Slot(100)
expiresAt := time.Now().Add(10 * time.Minute)
sidecars := []*deneb.BlobSidecar{
{
Blob: deneb.Blob{},
},
}

err := blobSidecarStore.Add(slot, sidecars, expiresAt)
assert.NoError(t, err)

retrievedSidecars, err := blobSidecarStore.GetBySlot(slot)
assert.NoError(t, err)
assert.NotNil(t, retrievedSidecars)
assert.Equal(t, sidecars, retrievedSidecars)
}

func TestBlobSidecarGetBySlotNotFound(t *testing.T) {
logger, _ := test.NewNullLogger()
config := Config{MaxItems: 10}
namespace := "test_b"
blobSidecarStore := NewBlobSidecar(logger, config, namespace)

slot := phase0.Slot(200)

retrievedSidecars, err := blobSidecarStore.GetBySlot(slot)
assert.Error(t, err)
assert.Nil(t, retrievedSidecars)
}
Loading

0 comments on commit c7cf5df

Please sign in to comment.