Skip to content

Commit

Permalink
feat(engine): Check that execution client is not syncing (#1196)
Browse files Browse the repository at this point in the history
* bet

* x

* x

* bet

* bet
  • Loading branch information
itsdevbear authored May 27, 2024
1 parent c8a0f7c commit f87c5f1
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 40 deletions.
4 changes: 4 additions & 0 deletions mod/engine-primitives/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,8 @@ var (
ErrNilPayloadStatus = errors.New(
"nil payload status received from execution client",
)

// ErrExecutionClientIsSyncing indicates that the execution client is
// syncing.
ErrExecutionClientIsSyncing = errors.New("execution client is syncing")
)
52 changes: 39 additions & 13 deletions mod/execution/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"time"

engineprimitives "github.com/berachain/beacon-kit/mod/engine-primitives/pkg/engine-primitives"
engineerrors "github.com/berachain/beacon-kit/mod/engine-primitives/pkg/errors"
"github.com/berachain/beacon-kit/mod/errors"
"github.com/berachain/beacon-kit/mod/execution/pkg/client/cache"
"github.com/berachain/beacon-kit/mod/execution/pkg/client/ethclient"
Expand Down Expand Up @@ -132,6 +133,7 @@ func (s *EngineClient[ExecutionPayloadDenebT]) Start(
go s.jwtRefreshLoop(ctx)
}()
}
go s.syncCheck(ctx)
return s.initializeConnection(ctx)
}

Expand All @@ -140,7 +142,7 @@ func (s *EngineClient[ExecutionPayloadDenebT]) Start(
func (s *EngineClient[ExecutionPayloadDenebT]) Status() error {
s.statusErrMu.RLock()
defer s.statusErrMu.RUnlock()
return s.status(context.Background())
return s.status()
}

// WaitForHealthy waits for the engine client to be healthy.
Expand All @@ -150,7 +152,7 @@ func (s *EngineClient[ExecutionPayloadDenebT]) WaitForHealthy(
s.statusErrMu.Lock()
defer s.statusErrMu.Unlock()

for s.status(ctx) != nil {
for s.status() != nil {
go s.refreshUntilHealthy(ctx)
select {
case <-ctx.Done():
Expand Down Expand Up @@ -185,6 +187,39 @@ func (s *EngineClient[ExecutionPayloadDenebT]) VerifyChainID(

// ============================== HELPERS ==============================

// syncCheck checks the sync status of the execution client.
func (s *EngineClient[ExecutionPayloadDenebT]) syncCheck(ctx context.Context) {
ticker := time.NewTicker(s.cfg.SyncCheckInterval)
defer ticker.Stop()
for {
s.logger.Info(
"starting sync check rountine",
"interval",
s.cfg.SyncCheckInterval,
)
select {
case <-ticker.C:
syncProgress, err := s.SyncProgress(ctx)
if err != nil {
s.logger.Error("failed to get sync progress", "err", err)
continue
}

s.statusErrMu.Lock()
if syncProgress == nil || syncProgress.Done() {
s.logger.Info("execution client is in sync 🍻")
s.statusErr = nil
} else {
s.logger.Warn("execution client is syncing", "sync_progress", syncProgress)
s.statusErr = engineerrors.ErrExecutionClientIsSyncing
}
s.statusErrMu.Unlock()
case <-ctx.Done():
return
}
}
}

func (s *EngineClient[ExecutionPayloadDenebT]) initializeConnection(
ctx context.Context,
) error {
Expand Down Expand Up @@ -429,21 +464,12 @@ func (s *EngineClient[ExecutionPayloadDenebT]) startIPCServer(
// ================================ Info ================================

// status returns the status of the engine client.
func (s *EngineClient[ExecutionPayloadDenebT]) status(
ctx context.Context,
) error {
func (s *EngineClient[ExecutionPayloadDenebT]) status() error {
// If the client is not started, we return an error.
if s.Eth1Client.Client == nil {
return ErrNotStarted
}

if s.statusErr == nil {
// If we have an error, we will attempt
// to verify the chain ID again.
//#nosec:G703 wtf is even this problem here.
s.statusErr = s.VerifyChainID(ctx)
}

if s.statusErr == nil {
s.statusErrCond.Broadcast()
}
Expand All @@ -464,7 +490,7 @@ func (s *EngineClient[ExecutionPayloadDenebT]) refreshUntilHealthy(
case <-ctx.Done():
return
case <-ticker.C:
if err := s.status(ctx); err == nil {
if err := s.status(); err == nil {
return
}
}
Expand Down
4 changes: 4 additions & 0 deletions mod/execution/pkg/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
defaultRPCTimeout = 2 * time.Second
defaultRPCStartupCheckInterval = 3 * time.Second
defaultRPCJWTRefreshInterval = 30 * time.Second
defaultSyncCheckInterval = 5 * time.Second
//#nosec:G101 // false positive.
defaultJWTSecretPath = "./jwt.hex"
)
Expand All @@ -51,6 +52,7 @@ func DefaultConfig() Config {
RPCTimeout: defaultRPCTimeout,
RPCStartupCheckInterval: defaultRPCStartupCheckInterval,
RPCJWTRefreshInterval: defaultRPCJWTRefreshInterval,
SyncCheckInterval: defaultSyncCheckInterval,
JWTSecretPath: defaultJWTSecretPath,
}
}
Expand All @@ -70,6 +72,8 @@ type Config struct {
RPCStartupCheckInterval time.Duration `mapstructure:"rpc-startup-check-interval"`
// JWTRefreshInterval is the Interval for the JWT refresh.
RPCJWTRefreshInterval time.Duration `mapstructure:"rpc-jwt-refresh-interval"`
// SyncCheckInterval is the Interval for the sync check.
SyncCheckInterval time.Duration `mapstructure:"sync-check-interval"`
// JWTSecretPath is the path to the JWT secret.
JWTSecretPath string `mapstructure:"jwt-secret-path"`
}
9 changes: 9 additions & 0 deletions mod/execution/pkg/client/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ func (s *EngineClient[ExecutionPayloadDenebT]) ForkchoiceUpdated(
dctx, cancel := context.WithTimeout(ctx, s.cfg.RPCTimeout)
defer cancel()

// If the execution client is syncing, sanitize the payload attributes
// as trying to build a block while syncing can be problematic.
if errors.Is(s.status(), engineerrors.ErrExecutionClientIsSyncing) {
s.logger.Warn(
"execution client is syncing, sanitizing payload attributes",
)
attrs = nil
}

// If the suggested fee recipient is not set, log a warning.
if attrs != nil && !attrs.IsNil() &&
attrs.GetSuggestedFeeRecipient() == (common.ZeroAddress) {
Expand Down
2 changes: 0 additions & 2 deletions mod/execution/pkg/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func (ee *Engine[
ctx context.Context,
) error {
go func() {
// TODO: handle better
if err := ee.ec.Start(ctx); err != nil {
panic(err)
}
Expand Down Expand Up @@ -116,7 +115,6 @@ func (ee *Engine[
req.PayloadAttributes != nil &&
!req.PayloadAttributes.IsNil(),
)

// Notify the execution engine of the forkchoice update.
payloadID, latestValidHash, err := ee.ec.ForkchoiceUpdated(
ctx,
Expand Down
2 changes: 1 addition & 1 deletion mod/node-builder/pkg/components/engine_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func ProvideEngineClient(
) *engineclient.EngineClient[*types.ExecutableDataDeneb] {
return engineclient.New[*types.ExecutableDataDeneb](
&in.Config.Engine,
in.Logger.With("service", "engine.client"),
in.Logger.With("service", "engine.client").With("module", "beacon-kit"),
in.JWTSecret,
in.TelemetrySink,
new(big.Int).SetUint64(in.ChainSpec.DepositEth1ChainID()),
Expand Down
3 changes: 3 additions & 0 deletions mod/node-builder/pkg/config/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ rpc-startup-check-interval = "{{ .BeaconKit.Engine.RPCStartupCheckInterval }}"
# Interval for the JWT refresh.
rpc-jwt-refresh-interval = "{{ .BeaconKit.Engine.RPCJWTRefreshInterval }}"
# Interval for checking client sync status
sync-check-interval = "{{ .BeaconKit.Engine.SyncCheckInterval }}"
# Path to the execution client JWT-secret
jwt-secret-path = "{{.BeaconKit.Engine.JWTSecretPath}}"
Expand Down
48 changes: 24 additions & 24 deletions testing/e2e/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,18 @@ func DefaultE2ETestConfig() *E2ETestConfig {
ClType: "beaconkit",
Replicas: 1,
},
// {
// ElType: "erigon",
// ClImage: "beacond:kurtosis-local",
// ClType: "beaconkit",
// Replicas: 1,
// },
// {
// ElType: "besu",
// ClImage: "beacond:kurtosis-local",
// ClType: "beaconkit",
// Replicas: 1,
// },
{
ElType: "erigon",
ClImage: "beacond:kurtosis-local",
ClType: "beaconkit",
Replicas: 1,
},
{
ElType: "besu",
ClImage: "beacond:kurtosis-local",
ClType: "beaconkit",
Replicas: 1,
},
},
FullNodes: []Node{
{
Expand All @@ -125,18 +125,18 @@ func DefaultE2ETestConfig() *E2ETestConfig {
ClType: "beaconkit",
Replicas: 1,
},
// {
// ElType: "erigon",
// ClImage: "beacond:kurtosis-local",
// ClType: "beaconkit",
// Replicas: 1,
// },
// {
// ElType: "besu",
// ClImage: "beacond:kurtosis-local",
// ClType: "beaconkit",
// Replicas: 1,
// },
{
ElType: "erigon",
ClImage: "beacond:kurtosis-local",
ClType: "beaconkit",
Replicas: 1,
},
{
ElType: "besu",
ClImage: "beacond:kurtosis-local",
ClType: "beaconkit",
Replicas: 1,
},
},
BootSequence: map[string]string{
"type": "parallel",
Expand Down

0 comments on commit f87c5f1

Please sign in to comment.