diff --git a/mod/engine-primitives/pkg/errors/errors.go b/mod/engine-primitives/pkg/errors/errors.go index 3ec896712c..c2971f8ebd 100644 --- a/mod/engine-primitives/pkg/errors/errors.go +++ b/mod/engine-primitives/pkg/errors/errors.go @@ -128,4 +128,8 @@ var ( ErrNilPayloadStatus = errors.New( "nil payload status received from execution client", ) + + // ErrExecutionClientIsSyncing indicates that the execution client is + // syncing. + ErrExecutionClientIsSyncing = errors.New("execution client is syncing") ) diff --git a/mod/execution/pkg/client/client.go b/mod/execution/pkg/client/client.go index 8c86272981..d8056d3a74 100644 --- a/mod/execution/pkg/client/client.go +++ b/mod/execution/pkg/client/client.go @@ -37,6 +37,7 @@ import ( "time" engineprimitives "github.com/berachain/beacon-kit/mod/engine-primitives/pkg/engine-primitives" + engineerrors "github.com/berachain/beacon-kit/mod/engine-primitives/pkg/errors" "github.com/berachain/beacon-kit/mod/errors" "github.com/berachain/beacon-kit/mod/execution/pkg/client/cache" "github.com/berachain/beacon-kit/mod/execution/pkg/client/ethclient" @@ -132,6 +133,7 @@ func (s *EngineClient[ExecutionPayloadDenebT]) Start( go s.jwtRefreshLoop(ctx) }() } + go s.syncCheck(ctx) return s.initializeConnection(ctx) } @@ -140,7 +142,7 @@ func (s *EngineClient[ExecutionPayloadDenebT]) Start( func (s *EngineClient[ExecutionPayloadDenebT]) Status() error { s.statusErrMu.RLock() defer s.statusErrMu.RUnlock() - return s.status(context.Background()) + return s.status() } // WaitForHealthy waits for the engine client to be healthy. @@ -150,7 +152,7 @@ func (s *EngineClient[ExecutionPayloadDenebT]) WaitForHealthy( s.statusErrMu.Lock() defer s.statusErrMu.Unlock() - for s.status(ctx) != nil { + for s.status() != nil { go s.refreshUntilHealthy(ctx) select { case <-ctx.Done(): @@ -185,6 +187,39 @@ func (s *EngineClient[ExecutionPayloadDenebT]) VerifyChainID( // ============================== HELPERS ============================== +// syncCheck checks the sync status of the execution client. +func (s *EngineClient[ExecutionPayloadDenebT]) syncCheck(ctx context.Context) { + ticker := time.NewTicker(s.cfg.SyncCheckInterval) + defer ticker.Stop() + for { + s.logger.Info( + "starting sync check rountine", + "interval", + s.cfg.SyncCheckInterval, + ) + select { + case <-ticker.C: + syncProgress, err := s.SyncProgress(ctx) + if err != nil { + s.logger.Error("failed to get sync progress", "err", err) + continue + } + + s.statusErrMu.Lock() + if syncProgress == nil || syncProgress.Done() { + s.logger.Info("execution client is in sync 🍻") + s.statusErr = nil + } else { + s.logger.Warn("execution client is syncing", "sync_progress", syncProgress) + s.statusErr = engineerrors.ErrExecutionClientIsSyncing + } + s.statusErrMu.Unlock() + case <-ctx.Done(): + return + } + } +} + func (s *EngineClient[ExecutionPayloadDenebT]) initializeConnection( ctx context.Context, ) error { @@ -429,21 +464,12 @@ func (s *EngineClient[ExecutionPayloadDenebT]) startIPCServer( // ================================ Info ================================ // status returns the status of the engine client. -func (s *EngineClient[ExecutionPayloadDenebT]) status( - ctx context.Context, -) error { +func (s *EngineClient[ExecutionPayloadDenebT]) status() error { // If the client is not started, we return an error. if s.Eth1Client.Client == nil { return ErrNotStarted } - if s.statusErr == nil { - // If we have an error, we will attempt - // to verify the chain ID again. - //#nosec:G703 wtf is even this problem here. - s.statusErr = s.VerifyChainID(ctx) - } - if s.statusErr == nil { s.statusErrCond.Broadcast() } @@ -464,7 +490,7 @@ func (s *EngineClient[ExecutionPayloadDenebT]) refreshUntilHealthy( case <-ctx.Done(): return case <-ticker.C: - if err := s.status(ctx); err == nil { + if err := s.status(); err == nil { return } } diff --git a/mod/execution/pkg/client/config.go b/mod/execution/pkg/client/config.go index ffb0af0ea1..476765c02f 100644 --- a/mod/execution/pkg/client/config.go +++ b/mod/execution/pkg/client/config.go @@ -37,6 +37,7 @@ const ( defaultRPCTimeout = 2 * time.Second defaultRPCStartupCheckInterval = 3 * time.Second defaultRPCJWTRefreshInterval = 30 * time.Second + defaultSyncCheckInterval = 5 * time.Second //#nosec:G101 // false positive. defaultJWTSecretPath = "./jwt.hex" ) @@ -51,6 +52,7 @@ func DefaultConfig() Config { RPCTimeout: defaultRPCTimeout, RPCStartupCheckInterval: defaultRPCStartupCheckInterval, RPCJWTRefreshInterval: defaultRPCJWTRefreshInterval, + SyncCheckInterval: defaultSyncCheckInterval, JWTSecretPath: defaultJWTSecretPath, } } @@ -70,6 +72,8 @@ type Config struct { RPCStartupCheckInterval time.Duration `mapstructure:"rpc-startup-check-interval"` // JWTRefreshInterval is the Interval for the JWT refresh. RPCJWTRefreshInterval time.Duration `mapstructure:"rpc-jwt-refresh-interval"` + // SyncCheckInterval is the Interval for the sync check. + SyncCheckInterval time.Duration `mapstructure:"sync-check-interval"` // JWTSecretPath is the path to the JWT secret. JWTSecretPath string `mapstructure:"jwt-secret-path"` } diff --git a/mod/execution/pkg/client/engine.go b/mod/execution/pkg/client/engine.go index ab910e7f38..fbec3fa543 100644 --- a/mod/execution/pkg/client/engine.go +++ b/mod/execution/pkg/client/engine.go @@ -112,6 +112,15 @@ func (s *EngineClient[ExecutionPayloadDenebT]) ForkchoiceUpdated( dctx, cancel := context.WithTimeout(ctx, s.cfg.RPCTimeout) defer cancel() + // If the execution client is syncing, sanitize the payload attributes + // as trying to build a block while syncing can be problematic. + if errors.Is(s.status(), engineerrors.ErrExecutionClientIsSyncing) { + s.logger.Warn( + "execution client is syncing, sanitizing payload attributes", + ) + attrs = nil + } + // If the suggested fee recipient is not set, log a warning. if attrs != nil && !attrs.IsNil() && attrs.GetSuggestedFeeRecipient() == (common.ZeroAddress) { diff --git a/mod/execution/pkg/engine/engine.go b/mod/execution/pkg/engine/engine.go index 0e8341c10a..819d4729f7 100644 --- a/mod/execution/pkg/engine/engine.go +++ b/mod/execution/pkg/engine/engine.go @@ -75,7 +75,6 @@ func (ee *Engine[ ctx context.Context, ) error { go func() { - // TODO: handle better if err := ee.ec.Start(ctx); err != nil { panic(err) } @@ -116,7 +115,6 @@ func (ee *Engine[ req.PayloadAttributes != nil && !req.PayloadAttributes.IsNil(), ) - // Notify the execution engine of the forkchoice update. payloadID, latestValidHash, err := ee.ec.ForkchoiceUpdated( ctx, diff --git a/mod/node-builder/pkg/components/engine_client.go b/mod/node-builder/pkg/components/engine_client.go index fc6f3a88e5..b5460a6899 100644 --- a/mod/node-builder/pkg/components/engine_client.go +++ b/mod/node-builder/pkg/components/engine_client.go @@ -61,7 +61,7 @@ func ProvideEngineClient( ) *engineclient.EngineClient[*types.ExecutableDataDeneb] { return engineclient.New[*types.ExecutableDataDeneb]( &in.Config.Engine, - in.Logger.With("service", "engine.client"), + in.Logger.With("service", "engine.client").With("module", "beacon-kit"), in.JWTSecret, in.TelemetrySink, new(big.Int).SetUint64(in.ChainSpec.DepositEth1ChainID()), diff --git a/mod/node-builder/pkg/config/template.go b/mod/node-builder/pkg/config/template.go index 00c6dbdb02..109263cb47 100644 --- a/mod/node-builder/pkg/config/template.go +++ b/mod/node-builder/pkg/config/template.go @@ -47,6 +47,9 @@ rpc-startup-check-interval = "{{ .BeaconKit.Engine.RPCStartupCheckInterval }}" # Interval for the JWT refresh. rpc-jwt-refresh-interval = "{{ .BeaconKit.Engine.RPCJWTRefreshInterval }}" +# Interval for checking client sync status +sync-check-interval = "{{ .BeaconKit.Engine.SyncCheckInterval }}" + # Path to the execution client JWT-secret jwt-secret-path = "{{.BeaconKit.Engine.JWTSecretPath}}" diff --git a/testing/e2e/config/config.go b/testing/e2e/config/config.go index 76c68411bd..99f5e703dc 100644 --- a/testing/e2e/config/config.go +++ b/testing/e2e/config/config.go @@ -93,18 +93,18 @@ func DefaultE2ETestConfig() *E2ETestConfig { ClType: "beaconkit", Replicas: 1, }, - // { - // ElType: "erigon", - // ClImage: "beacond:kurtosis-local", - // ClType: "beaconkit", - // Replicas: 1, - // }, - // { - // ElType: "besu", - // ClImage: "beacond:kurtosis-local", - // ClType: "beaconkit", - // Replicas: 1, - // }, + { + ElType: "erigon", + ClImage: "beacond:kurtosis-local", + ClType: "beaconkit", + Replicas: 1, + }, + { + ElType: "besu", + ClImage: "beacond:kurtosis-local", + ClType: "beaconkit", + Replicas: 1, + }, }, FullNodes: []Node{ { @@ -125,18 +125,18 @@ func DefaultE2ETestConfig() *E2ETestConfig { ClType: "beaconkit", Replicas: 1, }, - // { - // ElType: "erigon", - // ClImage: "beacond:kurtosis-local", - // ClType: "beaconkit", - // Replicas: 1, - // }, - // { - // ElType: "besu", - // ClImage: "beacond:kurtosis-local", - // ClType: "beaconkit", - // Replicas: 1, - // }, + { + ElType: "erigon", + ClImage: "beacond:kurtosis-local", + ClType: "beaconkit", + Replicas: 1, + }, + { + ElType: "besu", + ClImage: "beacond:kurtosis-local", + ClType: "beaconkit", + Replicas: 1, + }, }, BootSequence: map[string]string{ "type": "parallel",