Skip to content

Commit

Permalink
remove gossip sync
Browse files Browse the repository at this point in the history
  • Loading branch information
countvonzero committed Aug 5, 2023
1 parent 6a4208b commit 4d8dd30
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 149 deletions.
3 changes: 1 addition & 2 deletions syncer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ var (
nodeSyncState = metrics.NewGauge(
"sync_state",
namespace,
"node sync state in [not_synced, gossip, synced]",
"node sync state in [not, synced, atx_synced]",
[]string{"state"},
)
nodeNotSynced = nodeSyncState.WithLabelValues("not")
nodeGossip = nodeSyncState.WithLabelValues("gossip")
nodeSynced = nodeSyncState.WithLabelValues("synced")
atxSynced = nodeSyncState.WithLabelValues("atx_synced")

Expand Down
73 changes: 17 additions & 56 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,13 @@ func DefaultConfig() Config {
}
}

const (
outOfSyncThreshold uint32 = 3 // see notSynced
numGossipSyncLayers uint32 = 2 // see gossipSync
)
const outOfSyncThreshold uint32 = 3 // see notSynced

type syncState uint32

const (
// notSynced is the state where the node is outOfSyncThreshold layers or more behind the current layer.
notSynced syncState = iota
// gossipSync is the state in which a node listens to at least one full layer of gossip before participating
// in the protocol. this is to protect the node from participating in the consensus without full information.
// for example, when a node wakes up in the middle of layer N, since it didn't receive all relevant messages and
// blocks of layer N, it shouldn't vote or produce blocks in layer N+1. it instead listens to gossip for all
// through layer N+1 and starts producing blocks and participates in hare committee in layer N+2.
gossipSync
// synced is the state where the node is in sync with its peers.
synced
)
Expand All @@ -64,8 +55,6 @@ func (s syncState) String() string {
switch s {
case notSynced:
return "notSynced"
case gossipSync:
return "gossipSync"
case synced:
return "synced"
default:
Expand Down Expand Up @@ -126,8 +115,8 @@ type Syncer struct {
isBusy atomic.Value
syncTimer *time.Ticker
validateTimer *time.Ticker
// targetSyncedLayer is used to signal at which layer we can set this node to synced state
targetSyncedLayer atomic.Value
// layerTurnedSynced is the layer when the node becomes synced
layerTurnedSynced atomic.Value
lastLayerSynced atomic.Value
lastEpochSynced atomic.Value

Expand Down Expand Up @@ -176,7 +165,7 @@ func NewSyncer(
s.syncState.Store(notSynced)
s.atxSyncState.Store(notSynced)
s.isBusy.Store(0)
s.targetSyncedLayer.Store(types.LayerID(0))
s.layerTurnedSynced.Store(types.LayerID(0))
s.lastLayerSynced.Store(s.mesh.ProcessedLayer())
s.lastEpochSynced.Store(types.GetEffectiveGenesis().GetEpoch() - 1)
return s
Expand All @@ -199,7 +188,7 @@ func (s *Syncer) RegisterForATXSynced() chan struct{} {

// ListenToGossip returns true if the node is listening to gossip for blocks/TXs data.
func (s *Syncer) ListenToGossip() bool {
return s.getSyncState() >= gossipSync
return s.getSyncState() == synced
}

// ListenToATXGossip returns true if the node is listening to gossip for ATXs data.
Expand All @@ -214,7 +203,7 @@ func (s *Syncer) IsSynced(ctx context.Context) bool {

// SyncedBefore returns true if the node became synced before `epoch` starts.
func (s *Syncer) SyncedBefore(epoch types.EpochID) bool {
return s.getSyncState() == synced && s.getTargetSyncedLayer() < epoch.FirstLayer()
return s.getSyncState() == synced && s.LayerTurnedSynced() < epoch.FirstLayer()

Check warning on line 206 in syncer/syncer.go

View check run for this annotation

Codecov / codecov/patch

syncer/syncer.go#L206

Added line #L206 was not covered by tests
}

func (s *Syncer) IsBeaconSynced(epoch types.EpochID) bool {
Expand Down Expand Up @@ -285,12 +274,13 @@ func (s *Syncer) getSyncState() syncState {
}

func (s *Syncer) setSyncState(ctx context.Context, newState syncState) {
current := s.ticker.CurrentLayer()
oldState := s.syncState.Swap(newState).(syncState)
if oldState != newState {
s.logger.WithContext(ctx).With().Info("sync state change",
log.String("from state", oldState.String()),
log.String("to state", newState.String()),
log.Stringer("current", s.ticker.CurrentLayer()),
log.Stringer("current", current),
log.Stringer("last synced", s.getLastSyncedLayer()),
log.Stringer("latest", s.mesh.LatestLayer()),
log.Stringer("processed", s.mesh.ProcessedLayer()))
Expand All @@ -299,16 +289,11 @@ func (s *Syncer) setSyncState(ctx context.Context, newState syncState) {
switch newState {
case notSynced:
nodeNotSynced.Set(1)
nodeGossip.Set(0)
nodeSynced.Set(0)
case gossipSync:
nodeNotSynced.Set(0)
nodeGossip.Set(1)
nodeSynced.Set(0)
case synced:
nodeNotSynced.Set(0)
nodeGossip.Set(0)
nodeSynced.Set(1)
s.setLayerTurnedSynced(current)
}
}

Expand All @@ -322,19 +307,13 @@ func (s *Syncer) setSyncerIdle() {
s.isBusy.Store(0)
}

// targetSyncedLayer is used to signal at which layer we can set this node to synced state.
func (s *Syncer) setTargetSyncedLayer(ctx context.Context, layerID types.LayerID) {
oldSyncLayer := s.targetSyncedLayer.Swap(layerID).(types.LayerID)
s.logger.WithContext(ctx).With().Debug("target synced layer changed",
log.Uint32("from_layer", oldSyncLayer.Uint32()),
log.Uint32("to_layer", layerID.Uint32()),
log.Stringer("current", s.ticker.CurrentLayer()),
log.Stringer("latest", s.mesh.LatestLayer()),
log.Stringer("processed", s.mesh.ProcessedLayer()))
// syncedLayer is used to signal at which layer we can set this node to synced state.
func (s *Syncer) setLayerTurnedSynced(lid types.LayerID) {
s.layerTurnedSynced.Store(lid)
}

func (s *Syncer) getTargetSyncedLayer() types.LayerID {
return s.targetSyncedLayer.Load().(types.LayerID)
func (s *Syncer) LayerTurnedSynced() types.LayerID {
return s.layerTurnedSynced.Load().(types.LayerID)

Check warning on line 316 in syncer/syncer.go

View check run for this annotation

Codecov / codecov/patch

syncer/syncer.go#L315-L316

Added lines #L315 - L316 were not covered by tests
}

func (s *Syncer) setLastSyncedLayer(lid types.LayerID) {
Expand Down Expand Up @@ -410,8 +389,7 @@ func (s *Syncer) synchronize(ctx context.Context) bool {
if s.ticker.CurrentLayer() <= types.GetEffectiveGenesis() {
return true
}
// always sync to currentLayer-1 to reduce race with gossip and hare/tortoise
for layerID := s.getLastSyncedLayer().Add(1); layerID.Before(s.ticker.CurrentLayer()); layerID = layerID.Add(1) {
for layerID := s.getLastSyncedLayer() + 1; layerID <= s.ticker.CurrentLayer(); layerID++ {
if err := s.syncLayer(ctx, layerID); err != nil {
s.logger.WithContext(ctx).With().Warning("failed to fetch layer", layerID, log.Err(err))
return false
Expand Down Expand Up @@ -510,37 +488,20 @@ func (s *Syncer) setStateBeforeSync(ctx context.Context) {

func (s *Syncer) dataSynced() bool {
current := s.ticker.CurrentLayer()
return current.Uint32() <= 1 || !s.getLastSyncedLayer().Before(current.Sub(1))
return current.Uint32() <= 1 || s.getLastSyncedLayer() == current
}

func (s *Syncer) setStateAfterSync(ctx context.Context, success bool) {
currSyncState := s.getSyncState()
current := s.ticker.CurrentLayer()

// for the gossipSync/notSynced states, we check if the mesh state is on target before we advance sync state.
// but for the synced state, we don't check the mesh state because gossip+hare+tortoise are in charge of
// advancing processed/verified layers. syncer is just auxiliary that fetches data in case of a temporary
// network outage.
switch currSyncState {
case synced:
if !success && isTooFarBehind(ctx, s.logger, current, s.getLastSyncedLayer()) {
s.setSyncState(ctx, notSynced)
}
case gossipSync:
if !success || !s.dataSynced() {
// push out the target synced layer
s.setTargetSyncedLayer(ctx, current.Add(numGossipSyncLayers))
break
}
// if we have gossip-synced to the target synced layer, we are ready to participate in consensus
if !s.getTargetSyncedLayer().After(current) {
s.setSyncState(ctx, synced)
}
case notSynced:
if success && s.dataSynced() {
// wait till s.ticker.GetCurrentLayer() + numGossipSyncLayers to participate in consensus
s.setSyncState(ctx, gossipSync)
s.setTargetSyncedLayer(ctx, current.Add(numGossipSyncLayers))
s.setSyncState(ctx, synced)
}
}
}
Expand Down
105 changes: 14 additions & 91 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestSynchronize_OnlyOneSynchronize(t *testing.T) {
return nil
},
)
for lid := gLayer.Add(2); lid.Before(current); lid = lid.Add(1) {
for lid := gLayer.Add(2); lid <= current; lid++ {
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lid)
}
var wg sync.WaitGroup
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestSynchronize_AllGood(t *testing.T) {
ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), epoch)
}
ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any())
for lid := gLayer.Add(1); lid.Before(current); lid = lid.Add(1) {
for lid := gLayer.Add(1); lid <= current; lid++ {
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lid)
}

Expand All @@ -202,12 +202,12 @@ func TestSynchronize_AllGood(t *testing.T) {
}()

require.True(t, ts.syncer.synchronize(context.Background()))
require.Equal(t, current.Sub(1), ts.syncer.getLastSyncedLayer())
require.Equal(t, current, ts.syncer.getLastSyncedLayer())
require.Equal(t, current.GetEpoch(), ts.syncer.lastAtxEpoch())
require.True(t, ts.syncer.dataSynced())
require.True(t, ts.syncer.ListenToATXGossip())
require.True(t, ts.syncer.ListenToGossip())
require.False(t, ts.syncer.IsSynced(context.Background()))
require.True(t, ts.syncer.IsSynced(context.Background()))

wg.Add(1)
go func() {
Expand Down Expand Up @@ -312,8 +312,8 @@ func startWithSyncedState(t *testing.T, ts *testSyncer) types.LayerID {

current := gLayer.Add(2)
ts.mTicker.advanceToLayer(current)
lyr := current.Sub(1)
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lyr)
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), gLayer+1)
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), current)

require.True(t, ts.syncer.synchronize(context.Background()))
require.True(t, ts.syncer.ListenToATXGossip())
Expand Down Expand Up @@ -385,7 +385,7 @@ func TestSyncAtxs(t *testing.T) {
for epoch := lyr.GetEpoch(); epoch <= tc.lastSyncEpoch; epoch++ {
ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), epoch)
}
for lid := lyr; lid < tc.current; lid++ {
for lid := lyr + 1; lid <= tc.current; lid++ {
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lid)
}

Expand All @@ -401,7 +401,7 @@ func TestSynchronize_StaySyncedUponFailure(t *testing.T) {
current := lyr.Add(1)
ts.mTicker.advanceToLayer(current)
ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), current.GetEpoch())
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lyr).Return(errors.New("doh"))
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), current).Return(errors.New("doh"))

require.False(t, ts.syncer.synchronize(context.Background()))
require.False(t, ts.syncer.dataSynced())
Expand All @@ -415,7 +415,7 @@ func TestSynchronize_BecomeNotSyncedUponFailureIfNoGossip(t *testing.T) {
lyr := startWithSyncedState(t, ts)
current := lyr.Add(outOfSyncThreshold)
ts.mTicker.advanceToLayer(current)
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lyr).Return(errors.New("boo"))
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lyr+1).Return(errors.New("boo"))

require.False(t, ts.syncer.synchronize(context.Background()))
require.False(t, ts.syncer.dataSynced())
Expand All @@ -440,58 +440,14 @@ func TestFromNotSyncedToSynced(t *testing.T) {
require.False(t, ts.syncer.ListenToGossip())
require.False(t, ts.syncer.IsSynced(context.Background()))

for lid := lyr; lid.Before(current); lid = lid.Add(1) {
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lid)
}
require.True(t, ts.syncer.synchronize(context.Background()))
// node should be in gossip sync state
require.True(t, ts.syncer.dataSynced())
require.True(t, ts.syncer.ListenToATXGossip())
require.True(t, ts.syncer.ListenToGossip())
require.False(t, ts.syncer.IsSynced(context.Background()))

waitOutGossipSync(t, current, ts)
}

// test the case where the node originally starts from notSynced, advances to gossipSync, but falls behind
// to notSynced.
func TestFromGossipSyncToNotSynced(t *testing.T) {
ts := newSyncerWithoutSyncTimer(t)
ts.mDataFetcher.EXPECT().GetEpochATXs(gomock.Any(), gomock.Any()).AnyTimes()
lyr := types.GetEffectiveGenesis().Add(1)
current := lyr.Add(1)
ts.mTicker.advanceToLayer(current)
ts.mDataFetcher.EXPECT().PollMaliciousProofs(gomock.Any())
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lyr)

require.True(t, ts.syncer.synchronize(context.Background()))
// node should be in gossip sync state
require.True(t, ts.syncer.dataSynced())
require.True(t, ts.syncer.ListenToATXGossip())
require.True(t, ts.syncer.ListenToGossip())
require.False(t, ts.syncer.IsSynced(context.Background()))

lyr = lyr.Add(1)
current = current.Add(outOfSyncThreshold)
ts.mTicker.advanceToLayer(current)
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lyr).Return(errors.New("baa-ram-ewe"))
require.False(t, ts.syncer.synchronize(context.Background()))
require.False(t, ts.syncer.dataSynced())
require.True(t, ts.syncer.ListenToATXGossip())
require.False(t, ts.syncer.ListenToGossip())
require.False(t, ts.syncer.IsSynced(context.Background()))

for lid := lyr; lid.Before(current); lid = lid.Add(1) {
for lid := lyr; lid <= current; lid++ {
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lid)
}
require.True(t, ts.syncer.synchronize(context.Background()))
// the node should enter gossipSync again
require.True(t, ts.syncer.dataSynced())
require.True(t, ts.syncer.ListenToATXGossip())
require.True(t, ts.syncer.ListenToGossip())
require.False(t, ts.syncer.IsSynced(context.Background()))

waitOutGossipSync(t, current, ts)
require.True(t, ts.syncer.IsSynced(context.Background()))
}

func TestNetworkHasNoData(t *testing.T) {
Expand Down Expand Up @@ -535,46 +491,13 @@ func TestFromSyncedToNotSynced(t *testing.T) {
require.False(t, ts.syncer.ListenToGossip())
require.False(t, ts.syncer.IsSynced(context.Background()))

for lid := lyr; lid.Before(current); lid = lid.Add(1) {
for lid := lyr; lid <= current; lid++ {
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lid)
}
require.True(t, ts.syncer.synchronize(context.Background()))
require.True(t, ts.syncer.dataSynced())
require.True(t, ts.syncer.ListenToATXGossip())
require.True(t, ts.syncer.ListenToGossip())
require.False(t, ts.syncer.IsSynced(context.Background()))

waitOutGossipSync(t, current, ts)
}

func waitOutGossipSync(t *testing.T, current types.LayerID, ts *testSyncer) {
require.True(t, ts.syncer.dataSynced())
require.True(t, ts.syncer.ListenToATXGossip())
require.True(t, ts.syncer.ListenToGossip())
require.False(t, ts.syncer.IsSynced(context.Background()))

// next layer will be still gossip syncing
require.Equal(t, types.LayerID(2).Uint32(), numGossipSyncLayers)
require.Equal(t, current.Add(numGossipSyncLayers), ts.syncer.getTargetSyncedLayer())

lyr := current
current = current.Add(1)
ts.mTicker.advanceToLayer(current)
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lyr)
require.True(t, ts.syncer.synchronize(context.Background()))
require.True(t, ts.syncer.ListenToATXGossip())
require.True(t, ts.syncer.ListenToGossip())
require.False(t, ts.syncer.IsSynced(context.Background()))

// done one full layer of gossip sync, now it is synced
lyr = lyr.Add(1)
current = current.Add(1)
ts.mTicker.advanceToLayer(current)
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lyr)
require.True(t, ts.syncer.synchronize(context.Background()))
require.True(t, ts.syncer.dataSynced())
require.True(t, ts.syncer.ListenToATXGossip())
require.True(t, ts.syncer.ListenToGossip())
require.True(t, ts.syncer.IsSynced(context.Background()))
}

Expand All @@ -600,9 +523,9 @@ func TestSync_AlsoSyncProcessedLayer(t *testing.T) {
// no data sync should happen
require.Equal(t, types.GetEffectiveGenesis(), ts.syncer.getLastSyncedLayer())
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), lyr)
ts.mDataFetcher.EXPECT().PollLayerData(gomock.Any(), current)
require.True(t, ts.syncer.synchronize(context.Background()))
// but last synced is updated
require.Equal(t, lyr, ts.syncer.getLastSyncedLayer())
require.Equal(t, current, ts.syncer.getLastSyncedLayer())
}

func TestSyncer_setATXSyncedTwice_NoError(t *testing.T) {
Expand Down

0 comments on commit 4d8dd30

Please sign in to comment.