Skip to content

Commit

Permalink
fix(bug): sync from da and p2p when starting a node (#763)
Browse files Browse the repository at this point in the history
Co-authored-by: danwt <30197399+danwt@users.noreply.github.com>
Co-authored-by: github-actions <github-actions@github.com>
  • Loading branch information
3 people authored and omritoptix committed May 2, 2024
1 parent d61759e commit bafb117
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 88 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

* **celestia test:** fix race in test ([#755](https://github.com/dymensionxyz/dymint/issues/755)) ([0b36781](https://github.com/dymensionxyz/dymint/commit/0b367818bf6aa8da4a4fd8e4e5c78223b60b44e0))
* **celestia:** impl retry on submit ([#748](https://github.com/dymensionxyz/dymint/issues/748)) ([61630eb](https://github.com/dymensionxyz/dymint/commit/61630eb458197abe2440a81426210000dff25d40))
* **celestia:** use fixed delay in repeat attempts ([#753](https://github.com/dymensionxyz/dymint/issues/753)) ([53002b0](https://github.com/dymensionxyz/dymint/commit/53002b0a070743811295a98580ba038cac40cc7d))
* **da:** fixed da path seperator and encoding issue ([#731](https://github.com/dymensionxyz/dymint/issues/731)) ([3a3b219](https://github.com/dymensionxyz/dymint/commit/3a3b21932750fee7eaaa9c186f78e36e3e597746))
* **DA:** use expo backoff in retries ([#739](https://github.com/dymensionxyz/dymint/issues/739)) ([848085f](https://github.com/dymensionxyz/dymint/commit/848085f70bcaae81fb80da3ab78c4d8b399e13b1))
* **doc:** manager cache comment ([#767](https://github.com/dymensionxyz/dymint/issues/767)) ([b88bf6e](https://github.com/dymensionxyz/dymint/commit/b88bf6e72820c944b290147724255cc8466ada50))
* **logging:** added reason for websocket closed debug msg ([#746](https://github.com/dymensionxyz/dymint/issues/746)) ([3aa7d80](https://github.com/dymensionxyz/dymint/commit/3aa7d80ace92b3b0f79e4f338f10bb94c96ab6dd))
* **logs:** make logs more readable in a couple places, fix race cond ([#749](https://github.com/dymensionxyz/dymint/issues/749)) ([f05ef39](https://github.com/dymensionxyz/dymint/commit/f05ef3957b754c05fbc90aa39eabce80bbe65933))
* **p2p:** validate block before applying and not before caching in p2p gossiping ([#723](https://github.com/dymensionxyz/dymint/issues/723)) ([98371b5](https://github.com/dymensionxyz/dymint/commit/98371b5220613e70f3274fab5593e02ba532f7db))
Expand Down
11 changes: 5 additions & 6 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,20 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {
}
}

if !isAggregator {
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
}

err := m.syncBlockManager()
if err != nil {
err = fmt.Errorf("sync block manager: %w", err)
return err
return fmt.Errorf("sync block manager: %w", err)
}

if isAggregator {
go uevent.MustSubscribe(ctx, m.Pubsub, "nodeHealth", events.QueryHealthStatus, m.onNodeHealthStatus, m.logger)
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
} else {
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger, 100)
go m.RetrieveLoop(ctx)
go m.SyncTargetLoop(ctx)
}
Expand Down Expand Up @@ -238,7 +240,6 @@ func (m *Manager) onNodeHealthStatus(event pubsub.Message) {
// onNewGossippedBlock will take a block and apply it
func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
m.retrieverMutex.Lock() // needed to protect blockCache access

m.logger.Debug("Received new block via gossip", "n cachedBlocks", len(m.blockCache))
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
Expand All @@ -252,9 +253,7 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
}
m.logger.Debug("caching block", "block height", block.Header.Height, "store height", m.Store.Height())
}

m.retrieverMutex.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant

if block.Header.Height == nextHeight {
err := m.attemptApplyCachedBlocks()
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func TestInitialState(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {

dalc := testutil.GetMockDALC(logger)
agg, err := block.NewManager(key, conf, c.genesis, c.store, nil, proxyApp, dalc, settlementlc,
nil, pubsubServer, p2pClient, logger)
Expand Down Expand Up @@ -162,7 +161,6 @@ func TestRetrieveDaBatchesFailed(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, manager)

t.Log(manager.LastState.SLStateIndex)
daMetaData := &da.DASubmitMetaData{
Client: da.Mock,
Height: 1,
Expand Down Expand Up @@ -472,7 +470,6 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
}

func TestDAFetch(t *testing.T) {

require := require.New(t)
// Setup app
app := testutil.GetAppMock(testutil.Info, testutil.Commit)
Expand Down
99 changes: 41 additions & 58 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package block
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/avast/retry-go/v4"
Expand All @@ -12,8 +11,9 @@ import (
"github.com/dymensionxyz/dymint/da"
)

// RetrieveLoop listens for new sync messages written to a ring buffer and in turn
// runs syncUntilTarget on the latest message in the ring buffer.
// RetrieveLoop listens for new target sync heights and then syncs the chain by
// fetching batches from the settlement layer and then fetching the actual blocks
// from the DA.
func (m *Manager) RetrieveLoop(ctx context.Context) {
m.logger.Info("started retrieve loop")
syncTargetPoller := diodes.NewPoller(m.SyncTargetDiode, diodes.WithPollingContext(ctx))
Expand All @@ -24,88 +24,71 @@ func (m *Manager) RetrieveLoop(ctx context.Context) {
return
default:
// Get only the latest sync target
syncTarget := syncTargetPoller.Next()
err := m.syncUntilTarget(*(*uint64)(syncTarget))
targetHeight := syncTargetPoller.Next()
err := m.syncUntilTarget(*(*uint64)(targetHeight))
if err != nil {
panic(fmt.Errorf("sync until target: %w", err))
}
}
}
}

// syncUntilTarget syncs the block until the syncTarget is reached.
// syncUntilTarget syncs blocks until the target height is reached.
// It fetches the batches from the settlement, gets the DA height and gets
// the actual blocks from the DA.
func (m *Manager) syncUntilTarget(syncTarget uint64) error {
currentHeight := m.Store.Height()
func (m *Manager) syncUntilTarget(targetHeight uint64) error {
for currH := m.Store.Height(); currH < targetHeight; {

if currentHeight >= syncTarget {
m.logger.Info("Already synced", "current height", currentHeight, "syncTarget", syncTarget)
return nil
}

var stateIndex uint64
h := m.Store.Height()
err := retry.Do(
func() error {
res, err := m.SLClient.GetHeightState(h)
if err != nil {
m.logger.Debug("sl client get height state", "error", err)
return err
}
stateIndex = res.State.StateIndex
return nil
},
retry.Attempts(0),
retry.Delay(500*time.Millisecond),
retry.LastErrorOnly(true),
retry.DelayType(retry.FixedDelay),
)
if err != nil {
return fmt.Errorf("get height state: %w", err)
}
m.updateStateIndex(stateIndex - 1)
m.logger.Debug("Sync until target: updated state index pre loop", "stateIndex", stateIndex, "height", h, "syncTarget", syncTarget)

for currentHeight < syncTarget {
currStateIdx := atomic.LoadUint64(&m.LastState.SLStateIndex) + 1
m.logger.Info("Syncing until target", "height", currentHeight, "state_index", currStateIdx, "syncTarget", syncTarget)
settlementBatch, err := m.SLClient.RetrieveBatch(currStateIdx)
// It's important that we query the state index before fetching the batch, rather
// than e.g. keep it and increment it, because we might be concurrently applying blocks
// and may require a higher index than expected.
stateIndex, err := m.queryStateIndex()
if err != nil {
return err
return fmt.Errorf("query state index: %w", err)
}

err = m.ProcessNextDABatch(settlementBatch.MetaData.DA)
settlementBatch, err := m.SLClient.RetrieveBatch(stateIndex)
if err != nil {
return err
return fmt.Errorf("retrieve batch: %w", err)
}

currentHeight = m.Store.Height()
m.logger.Info("Retrieved batch.", "state_index", stateIndex)

err = m.updateStateIndex(settlementBatch.StateIndex)
err = m.ProcessNextDABatch(settlementBatch.MetaData.DA)
if err != nil {
return err
return fmt.Errorf("process next DA batch: %w", err)
}

}
m.logger.Info("Synced", "current height", currentHeight, "syncTarget", syncTarget)

// check for cached blocks
err = m.attemptApplyCachedBlocks()
m.logger.Info("Synced", "store height", m.Store.Height(), "target height", targetHeight)

err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("applying previous cached blocks", "err", err)
m.logger.Error("Attempt apply cached blocks.", "err", err)
}

return nil
}

func (m *Manager) updateStateIndex(stateIndex uint64) error {
atomic.StoreUint64(&m.LastState.SLStateIndex, stateIndex)
_, err := m.Store.UpdateState(m.LastState, nil)
if err != nil {
m.logger.Error("update state", "error", err)
return err
}
return nil
// TODO: we could encapsulate the retry in the SL client
func (m *Manager) queryStateIndex() (uint64, error) {
var stateIndex uint64
return stateIndex, retry.Do(
func() error {
res, err := m.SLClient.GetHeightState(m.Store.Height() + 1)
if err != nil {
m.logger.Debug("sl client get height state", "error", err)
return err
}
stateIndex = res.State.StateIndex
return nil
},
retry.Attempts(0), // try forever
retry.Delay(500*time.Millisecond),
retry.LastErrorOnly(true),
retry.DelayType(retry.FixedDelay),
)
}

func (m *Manager) ProcessNextDABatch(daMetaData *da.DASubmitMetaData) error {
Expand Down
1 change: 0 additions & 1 deletion block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func (e *Executor) updateState(state types.State, block *types.Block, abciRespon
Version: state.Version,
ChainID: state.ChainID,
InitialHeight: state.InitialHeight,
SLStateIndex: state.SLStateIndex,
LastBlockHeight: int64(block.Header.Height),
LastBlockTime: time.Unix(0, int64(block.Header.Time)),
LastBlockID: tmtypes.BlockID{
Expand Down
19 changes: 10 additions & 9 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/dymensionxyz/dymint/types"
)

// SubmitLoop submits a batch of blocks to the DA and SL layers on a time interval.
func (m *Manager) SubmitLoop(ctx context.Context) {
ticker := time.NewTicker(m.Conf.BatchSubmitMaxTime)
defer ticker.Stop()
Expand All @@ -38,26 +39,25 @@ func (m *Manager) SubmitLoop(ctx context.Context) {
}
}

// HandleSubmissionTrigger processes the submission trigger event. It checks if there are new blocks produced since the last submission.
// HandleSubmissionTrigger processes the sublayer submission trigger event. It checks if there are new blocks produced since the last submission.
// If there are, it attempts to submit a batch of blocks. It then attempts to produce an empty block to ensure IBC messages
// pass through during the batch submission process due to proofs requires for ibc messages only exist on the next block.
// Finally, it submits the next batch of blocks and updates the sync target to the height of
// the last block in the submitted batch.
// Finally, it submits the next batch of blocks and updates the sync target to the height of the last block in the submitted batch.
func (m *Manager) HandleSubmissionTrigger(ctx context.Context) error {
if !m.submitBatchMutex.TryLock() {
return fmt.Errorf("batch submission already in process, skipping submission: %w", gerr.ErrAborted)
}
defer m.submitBatchMutex.Unlock() // Ensure unlocking at the end
defer m.submitBatchMutex.Unlock()

// Load current sync target and height to determine if new blocks are available for submission.
syncTarget, height := m.SyncTarget.Load(), m.Store.Height()
if height <= syncTarget { // Check if there are new blocks since last sync target.
return nil // Exit if no new blocks are produced.
if m.Store.Height() <= m.SyncTarget.Load() {
return nil // No new blocks have been produced
}

// We try and produce an empty block to make sure relevant ibc messages will pass through during the batch submission: https://github.com/dymensionxyz/research/issues/173.
err := m.ProduceAndGossipBlock(ctx, true)
if err != nil {
m.logger.Error("produce and gossip empty block", "error", err)
m.logger.Error("Produce and gossip empty block.", "error", err)
}

if m.pendingBatch == nil {
Expand All @@ -76,13 +76,14 @@ func (m *Manager) HandleSubmissionTrigger(ctx context.Context) error {
batch: nextBatch,
}
} else {
m.logger.Info("pending batch already exists", "startHeight", m.pendingBatch.batch.StartHeight, "endHeight", m.pendingBatch.batch.EndHeight)
m.logger.Info("Pending batch already exists.", "startHeight", m.pendingBatch.batch.StartHeight, "endHeight", m.pendingBatch.batch.EndHeight)
}

syncHeight, err := m.submitPendingBatchToSL(*m.pendingBatch)
if err != nil {
return fmt.Errorf("submit pending batch to sl: %w", err)
}

m.pendingBatch = nil

// Update the syncTarget to the height of the last block in the last batch as seen by this node.
Expand Down
5 changes: 3 additions & 2 deletions block/synctarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"github.com/dymensionxyz/dymint/settlement"
)

// SyncTargetLoop is responsible for getting real time updates about batches submission.
// for non aggregator: updating the sync target which will be used by retrieveLoop to sync until this target.
// SyncTargetLoop is responsible for getting real time updates about settlement batch submissions.
// For non aggregator: updating the sync target which will be used by retrieveLoop to sync until this target.
// It publishes new sync height targets which will then be synced by another process.
func (m *Manager) SyncTargetLoop(ctx context.Context) {
m.logger.Info("Started sync target loop")
subscription, err := m.Pubsub.Subscribe(ctx, "syncTargetLoop", settlement.EventQueryNewSettlementBatchAccepted)
Expand Down
5 changes: 5 additions & 0 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package celestia

import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -319,6 +320,7 @@ func (c *DataAvailabilityLayerClient) retrieveBatches(daMetaData *da.DASubmitMet
ctx, cancel := context.WithTimeout(c.ctx, c.config.Timeout)
defer cancel()

c.logger.Debug("Celestia DA getting blob", "height", daMetaData.Height, "namespace", hex.EncodeToString(daMetaData.Namespace), "commitment", hex.EncodeToString(daMetaData.Commitment))
var batches []*types.Batch
blob, err := c.rpc.Get(ctx, daMetaData.Height, daMetaData.Namespace, daMetaData.Commitment)
if err != nil {
Expand All @@ -345,6 +347,9 @@ func (c *DataAvailabilityLayerClient) retrieveBatches(daMetaData *da.DASubmitMet
if err != nil {
c.logger.Error("unmarshal block", "daHeight", daMetaData.Height, "error", err)
}

c.logger.Debug("Celestia DA get blob successful", "DA height", daMetaData.Height, "lastBlockHeight", batch.EndHeight)

parsedBatch := new(types.Batch)
err = parsedBatch.FromProto(&batch)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions da/celestia/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

const (
defaultRpcRetryDelay = 30 * time.Second
defaultRpcRetryDelay = 1 * time.Second
defaultRpcCheckAttempts = 10
namespaceVersion = 0
defaultGasPrices = 0.1
Expand Down Expand Up @@ -41,7 +41,7 @@ type Config struct {
var CelestiaDefaultConfig = Config{
BaseURL: "http://127.0.0.1:26658",
AppNodeURL: "",
Timeout: 30 * time.Second,
Timeout: 5 * time.Second,
Fee: 0,
GasLimit: 20000000,
GasPrices: defaultGasPrices,
Expand Down
2 changes: 0 additions & 2 deletions types/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ func (s *State) ToProto() (*pb.State, error) {
ChainId: s.ChainID,
InitialHeight: s.InitialHeight,
LastBlockHeight: s.LastBlockHeight,
SLStateIndex: s.SLStateIndex,
LastBlockID: s.LastBlockID.ToProto(),
LastBlockTime: s.LastBlockTime,
NextValidators: nextValidators,
Expand Down Expand Up @@ -292,7 +291,6 @@ func (s *State) FromProto(other *pb.State) error {
s.LastStoreHeight = other.LastStoreHeight
}
s.BaseHeight = other.BaseHeight
s.SLStateIndex = other.SLStateIndex
lastBlockID, err := types.BlockIDFromProto(&other.LastBlockID)
if err != nil {
return err
Expand Down
3 changes: 0 additions & 3 deletions types/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ type State struct {
LastBlockID types.BlockID
LastBlockTime time.Time

// SLStateIndex identifies the Settlement Layer state index we're synced with
SLStateIndex uint64

// In the MVP implementation, there will be only one Validator
NextValidators *types.ValidatorSet
Validators *types.ValidatorSet
Expand Down
3 changes: 1 addition & 2 deletions utils/event/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ func MustSubscribe(
eventQuery pubsub.Query,
callback func(event pubsub.Message),
logger types.Logger,
outCapacity ...int,
) {
subscription, err := pubsubServer.Subscribe(ctx, clientID, eventQuery, outCapacity...)
subscription, err := pubsubServer.SubscribeUnbuffered(ctx, clientID, eventQuery)
if err != nil {
logger.Error("subscribe to events")
panic(err)
Expand Down

0 comments on commit bafb117

Please sign in to comment.