Skip to content

Commit

Permalink
Merge branch 'feat/serve-blob-sidecars' into release/serving-bundle-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Feb 20, 2024
2 parents 7523028 + e05677e commit ebfe6f7
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 5 deletions.
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ require (
github.com/prometheus/client_golang v1.16.0
github.com/sirupsen/logrus v1.9.1
github.com/spf13/cobra v1.6.1
github.com/stretchr/testify v1.8.4
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/ferranbt/fastssz v0.1.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
Expand All @@ -46,6 +48,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
Expand All @@ -68,4 +71,5 @@ require (
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
38 changes: 35 additions & 3 deletions pkg/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (h *Handler) Register(ctx context.Context, router *httprouter.Router) error
router.GET("/eth/v1/beacon/blocks/:block_id/root", h.wrappedHandler(h.handleEthV1BeaconBlocksRoot))
router.GET("/eth/v1/beacon/states/:state_id/finality_checkpoints", h.wrappedHandler(h.handleEthV1BeaconStatesFinalityCheckpoints))
router.GET("/eth/v1/beacon/deposit_snapshot", h.wrappedHandler(h.handleEthV1BeaconDepositSnapshot))
router.GET("/eth/v1/beacon/blob_sidecars/:block_id", h.wrappedHandler(h.handleEthV1BeaconBlobSidecars))

router.GET("/eth/v1/config/spec", h.wrappedHandler(h.handleEthV1ConfigSpec))
router.GET("/eth/v1/config/deposit_contract", h.wrappedHandler(h.handleEthV1ConfigDepositContract))
Expand Down Expand Up @@ -439,7 +440,7 @@ func (h *Handler) handleCheckpointzStatus(ctx context.Context, r *http.Request,
},
})

rsp.SetCacheControl("public, s-max-age=30")
rsp.SetCacheControl("public, s-max-age=5")

return rsp, nil
}
Expand Down Expand Up @@ -483,7 +484,7 @@ func (h *Handler) handleCheckpointzBeaconSlots(ctx context.Context, r *http.Requ
},
})

rsp.SetCacheControl("public, s-max-age=30")
rsp.SetCacheControl("public, s-max-age=5")

return rsp, nil
}
Expand All @@ -509,7 +510,7 @@ func (h *Handler) handleCheckpointzBeaconSlot(ctx context.Context, r *http.Reque
},
})

rsp.SetCacheControl("public, s-max-age=60")
rsp.SetCacheControl("public, s-max-age=5")

return rsp, nil
}
Expand Down Expand Up @@ -587,3 +588,34 @@ func (h *Handler) handleEthV1BeaconDepositSnapshot(ctx context.Context, r *http.
},
}), nil
}

func (h *Handler) handleEthV1BeaconBlobSidecars(ctx context.Context, r *http.Request, p httprouter.Params, contentType ContentType) (*HTTPResponse, error) {
if err := ValidateContentType(contentType, []ContentType{ContentTypeJSON}); err != nil {
return NewUnsupportedMediaTypeResponse(nil), err
}

id, err := eth.NewBlockIdentifier(p.ByName("block_id"))
if err != nil {
return NewBadRequestResponse(nil), err
}

sidecars, err := h.eth.BlobSidecars(ctx, id)
if err != nil {
return NewInternalServerErrorResponse(nil), err
}

rsp := NewSuccessResponse(ContentTypeResolvers{
ContentTypeJSON: func() ([]byte, error) {
return json.Marshal(sidecars)
},
})

switch id.Type() {
case eth.BlockIDFinalized, eth.BlockIDRoot:
rsp.SetCacheControl("public, s-max-age=6000")
default:
rsp.SetCacheControl("public, s-max-age=15")
}

return rsp, nil
}
6 changes: 4 additions & 2 deletions pkg/beacon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ type Config struct {
// Cache configuration holds configuration for the caches.
type CacheConfig struct {
// Blocks holds the block cache configuration.
Blocks store.Config `yaml:"blocks" default:"{\"MaxItems\": 200}"`
Blocks store.Config `yaml:"blocks" default:"{\"MaxItems\": 30}"`
// States holds the state cache configuration.
States store.Config `yaml:"states" default:"{\"MaxItems\": 5}"`
// DepositSnapshots holds the deposit snapshot cache configuration.
DepositSnapshots store.Config `yaml:"deposit_snapshots" default:"{\"MaxItems\": 50}"`
DepositSnapshots store.Config `yaml:"deposit_snapshots" default:"{\"MaxItems\": 30}"`
// BlobSidecars holds the blob sidecar cache configuration.
BlobSidecars store.Config `yaml:"blob_sidecars" default:"{\"MaxItems\": 30}"`
}

type FrontendConfig struct {
Expand Down
7 changes: 7 additions & 0 deletions pkg/beacon/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/chuckpreslar/emission"
"github.com/ethpandaops/beacon/pkg/beacon"
Expand Down Expand Up @@ -38,6 +39,7 @@ type Default struct {
blocks *store.Block
states *store.BeaconState
depositSnapshots *store.DepositSnapshot
blobSidecars *store.BlobSidecar

spec *state.Spec
genesis *v1.Genesis
Expand Down Expand Up @@ -79,6 +81,7 @@ func NewDefaultProvider(namespace string, log logrus.FieldLogger, nodes []node.C
blocks: store.NewBlock(log, config.Caches.Blocks, namespace),
states: store.NewBeaconState(log, config.Caches.States, namespace),
depositSnapshots: store.NewDepositSnapshot(log, config.Caches.DepositSnapshots, namespace),
blobSidecars: store.NewBlobSidecar(log, config.Caches.BlobSidecars, namespace),

servingMutex: sync.Mutex{},
historicalMutex: sync.Mutex{},
Expand Down Expand Up @@ -560,6 +563,10 @@ func (d *Default) GetBlockByStateRoot(ctx context.Context, stateRoot phase0.Root
return block, nil
}

func (d *Default) GetBlobSidecarsBySlot(ctx context.Context, slot phase0.Slot) ([]*deneb.BlobSidecar, error) {
return d.blobSidecars.GetBySlot(slot)
}

func (d *Default) GetBeaconStateBySlot(ctx context.Context, slot phase0.Slot) (*[]byte, error) {
block, err := d.GetBlockBySlot(ctx, slot)
if err != nil {
Expand Down
52 changes: 52 additions & 0 deletions pkg/beacon/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,24 @@ func (d *Default) fetchBundle(ctx context.Context, root phase0.Root, upstream *N
if err := d.downloadAndStoreDepositSnapshot(ctx, epoch, upstream); err != nil {
return nil, fmt.Errorf("failed to download and store deposit snapshot: %w", err)
}

spec, err := upstream.Beacon.Spec()
if err != nil {
return nil, fmt.Errorf("failed to fetch spec from upstream node: %w", err)
}

denebFork, err := spec.ForkEpochs.GetByName("DENEB")
if err != nil {
return nil, fmt.Errorf("failed to fetch deneb fork: %w", err)
}

if denebFork.Active(slot, spec.SlotsPerEpoch) {
// Download and store blob sidecars
if err := d.downloadAndStoreBlobSidecars(ctx, slot, upstream); err != nil {
return nil, fmt.Errorf("failed to download and store blob sidecars: %w", err)
}
}

}

d.log.Infof("Successfully fetched bundle from %s", upstream.Config.Name)
Expand Down Expand Up @@ -386,3 +404,37 @@ func (d *Default) downloadAndStoreDepositSnapshot(ctx context.Context, epoch pha

return nil
}

func (d *Default) downloadAndStoreBlobSidecars(ctx context.Context, slot phase0.Slot, node *Node) error {
// Check if we already have the blob sidecars.
if _, err := d.blobSidecars.GetBySlot(slot); err == nil {
return nil
}

// Download the blob sidecars from our upstream.
blobSidecars, err := node.Beacon.FetchBeaconBlockBlobs(ctx, eth.SlotAsString(slot))
if err != nil {
return err
}

if blobSidecars == nil {
return errors.New("invalid blob sidecars")
}

// Store for the FinalityHaltedServingPeriod to ensure we have them in case of non-finality.
// We'll let the store handle purging old items.
expiresAt := time.Now().Add(FinalityHaltedServingPeriod)

if err := d.blobSidecars.Add(slot, blobSidecars, expiresAt); err != nil {
return fmt.Errorf("failed to store blob sidecars: %w", err)
}

d.log.
WithFields(logrus.Fields{
"slot": slot,
"node": node.Config.Name,
}).
Infof("Downloaded and stored blob sidecar for slot %d", slot)

return nil
}
3 changes: 3 additions & 0 deletions pkg/beacon/expire_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ func TestExpireMultiple(t *testing.T) {
for _, test := range tests {
t.Run(fmt.Sprintf("%v", test.slot), func(t *testing.T) {
test := test

t.Parallel()

expirySlot := CalculateSlotExpiration(test.slot, slotsOfHistory)

expiryTime := GetSlotTime(expirySlot, defaultSecondsPerSlot, genesis)

if expiryTime.String() != test.expect {
Expand Down
3 changes: 3 additions & 0 deletions pkg/beacon/finality_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethpandaops/beacon/pkg/beacon/api/types"
"github.com/ethpandaops/beacon/pkg/beacon/state"
Expand Down Expand Up @@ -47,6 +48,8 @@ type FinalityProvider interface {
GetBeaconStateByStateRoot(ctx context.Context, root phase0.Root) (*[]byte, error)
// GetBeaconStateByRoot returns the beacon sate with the given root.
GetBeaconStateByRoot(ctx context.Context, root phase0.Root) (*[]byte, error)
// GetBlobSidecarsBySlot returns the blob sidecars for the given slot.
GetBlobSidecarsBySlot(ctx context.Context, slot phase0.Slot) ([]*deneb.BlobSidecar, error)
// ListFinalizedSlots returns a slice of finalized slots.
ListFinalizedSlots(ctx context.Context) ([]phase0.Slot, error)
// GetEpochBySlot returns the epoch for the given slot.
Expand Down
63 changes: 63 additions & 0 deletions pkg/beacon/store/blob_sidecars.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package store

import (
"errors"
"time"

"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethpandaops/checkpointz/pkg/cache"
"github.com/ethpandaops/checkpointz/pkg/eth"
"github.com/sirupsen/logrus"
)

type BlobSidecar struct {
store *cache.TTLMap
log logrus.FieldLogger
}

func NewBlobSidecar(log logrus.FieldLogger, config Config, namespace string) *BlobSidecar {
d := &BlobSidecar{
log: log.WithField("component", "beacon/store/blob_sidecar"),
store: cache.NewTTLMap(config.MaxItems, "blob_sidecar", namespace),
}

d.store.OnItemDeleted(func(key string, value interface{}, expiredAt time.Time) {
d.log.WithField("key", key).WithField("expired_at", expiredAt.String()).Debug("Blob sidecar was deleted from the cache")
})

d.store.EnableMetrics(namespace)

return d
}

func (d *BlobSidecar) Add(slot phase0.Slot, sidecars []*deneb.BlobSidecar, expiresAt time.Time) error {
d.store.Add(eth.SlotAsString(slot), sidecars, expiresAt, false)

d.log.WithFields(
logrus.Fields{
"slot": eth.SlotAsString(slot),
"expires_at": expiresAt.String(),
},
).Debug("Added blob sidecar")

return nil
}

func (d *BlobSidecar) GetBySlot(slot phase0.Slot) ([]*deneb.BlobSidecar, error) {
data, _, err := d.store.Get(eth.SlotAsString(slot))
if err != nil {
return nil, err
}

return d.parseSidecar(data)
}

func (d *BlobSidecar) parseSidecar(data interface{}) ([]*deneb.BlobSidecar, error) {
sidecar, ok := data.([]*deneb.BlobSidecar)
if !ok {
return nil, errors.New("invalid blob sidecar type")
}

return sidecar, nil
}
47 changes: 47 additions & 0 deletions pkg/beacon/store/blob_sidecars_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package store

import (
"testing"
"time"

"github.com/attestantio/go-eth2-client/spec/deneb"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
)

func TestBlobSidecarAddAndGet(t *testing.T) {
logger, _ := test.NewNullLogger()
config := Config{MaxItems: 10}
namespace := "test"
blobSidecarStore := NewBlobSidecar(logger, config, namespace)

slot := phase0.Slot(100)
expiresAt := time.Now().Add(10 * time.Minute)
sidecars := []*deneb.BlobSidecar{
{
Blob: deneb.Blob{},
},
}

err := blobSidecarStore.Add(slot, sidecars, expiresAt)
assert.NoError(t, err)

retrievedSidecars, err := blobSidecarStore.GetBySlot(slot)
assert.NoError(t, err)
assert.NotNil(t, retrievedSidecars)
assert.Equal(t, sidecars, retrievedSidecars)
}

func TestBlobSidecarGetBySlotNotFound(t *testing.T) {
logger, _ := test.NewNullLogger()
config := Config{MaxItems: 10}
namespace := "test"
blobSidecarStore := NewBlobSidecar(logger, config, namespace)

slot := phase0.Slot(200)

retrievedSidecars, err := blobSidecarStore.GetBySlot(slot)
assert.Error(t, err)
assert.Nil(t, retrievedSidecars)
}
Loading

0 comments on commit ebfe6f7

Please sign in to comment.