diff --git a/.mergify.yml b/.mergify.yml new file mode 100644 index 0000000000..865a31d66a --- /dev/null +++ b/.mergify.yml @@ -0,0 +1,59 @@ +pull_request_rules: + - name: Squash merge rule to dev + conditions: + # applied for merge to the dev branch + - base=dev + # no unresolved threads + - "#review-threads-unresolved=0" + # Approved by two reviewers + - "#approved-reviews-by>=2" + # no unverified commit + - "#commits-unverified=0" + # Travis ci succeeded + - "check-success=Travis CI - Pull Request" + # git guardian succeeded + - "check-success=GitGuardian Security Checks" + # PR is not a draft + - -draft + # PR is not conflicting with the base branch + - -conflict + # conditions to avoid auto merge mistakes + # PR title doesn't have wip (not case sensitive) + - -title~=(?i)wip + # PR doesn't have WIP label (not case sensitive) + - label!=(?i)wip + # ready-to-merge is required to trigger the merge + - label=ready-to-merge + actions: + merge: + method: squash + - name: merge rule to main + conditions: + # from the dev branch : no direct PR to main + - head=dev + # applied for merge to the dev branch + - base=main + # no unresolved threads + - "#review-threads-unresolved=0" + # Approved by two reviewers + - "#approved-reviews-by>=2" + # no unverified commit + - "#commits-unverified=0" + # Travis ci succeeded + - "check-success=Travis CI - Pull Request" + # git guardian succeeded + - "check-success=GitGuardian Security Checks" + # PR is not a draft + - -draft + # PR is not conflicting with the base branch + - -conflict + # conditions to avoid auto merge mistakes + # PR title doesn't have wip (not case sensitive) + - -title~=(?i)wip + # PR doesn't have WIP label (not case sensitive) + - label!=(?i)wip + # ready-to-merge is required to trigger the merge + - label=ready-to-merge + actions: + merge: + method: merge \ No newline at end of file diff --git a/api/service/legacysync/syncing_test.go b/api/service/legacysync/syncing_test.go index bc17aeaec6..7d99051c76 100644 --- a/api/service/legacysync/syncing_test.go +++ b/api/service/legacysync/syncing_test.go @@ -12,7 +12,7 @@ import ( "time" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" - peer "github.com/libp2p/go-libp2p-core/peer" + peer "github.com/libp2p/go-libp2p/core/peer" "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/harmony/api/service/legacysync/downloader" diff --git a/api/service/stagedstreamsync/const.go b/api/service/stagedstreamsync/const.go index 0e6bc6e2cf..5c735d764b 100644 --- a/api/service/stagedstreamsync/const.go +++ b/api/service/stagedstreamsync/const.go @@ -14,7 +14,10 @@ const ( BlockByHashesUpperCap int = 10 // number of get blocks by hashes upper cap BlockByHashesLowerCap int = 3 // number of get blocks by hashes lower cap - LastMileBlocksThreshold int = 10 + LastMileBlocksThreshold int = 10 + SyncLoopBatchSize uint32 = 30 // maximum size for one query of block hashes + VerifyHeaderBatchSize uint64 = 100 // block chain header verification batch size (not used for now) + LastMileBlocksSize = 50 // SoftQueueCap is the soft cap of size in resultQueue. When the queue size is larger than this limit, // no more request will be assigned to workers to wait for InsertChain to finish. @@ -50,8 +53,15 @@ type ( // config for beacon config BHConfig *BeaconHelperConfig + // use memory db + UseMemDB bool + // log the stage progress LogProgress bool + + // logs every single process and error to help debugging stream sync + // DebugMode is not accessible to the end user and is only an aid for development + DebugMode bool } // BeaconHelperConfig is the extra config used for beaconHelper which uses diff --git a/api/service/stagedstreamsync/default_stages.go b/api/service/stagedstreamsync/default_stages.go index 6e4808738f..55986ff6e8 100644 --- a/api/service/stagedstreamsync/default_stages.go +++ b/api/service/stagedstreamsync/default_stages.go @@ -15,11 +15,13 @@ var DefaultForwardOrder = ForwardOrder{ BlockBodies, // Stages below don't use Internet States, + LastMile, Finish, } var DefaultRevertOrder = RevertOrder{ Finish, + LastMile, States, BlockBodies, ShortRange, @@ -29,6 +31,7 @@ var DefaultRevertOrder = RevertOrder{ var DefaultCleanUpOrder = CleanUpOrder{ Finish, + LastMile, States, BlockBodies, ShortRange, @@ -42,6 +45,7 @@ func DefaultStages(ctx context.Context, srCfg StageShortRangeCfg, bodiesCfg StageBodiesCfg, statesCfg StageStatesCfg, + lastMileCfg StageLastMileCfg, finishCfg StageFinishCfg, ) []*Stage { @@ -50,6 +54,7 @@ func DefaultStages(ctx context.Context, handlerStageEpochSync := NewStageEpoch(seCfg) handlerStageBodies := NewStageBodies(bodiesCfg) handlerStageStates := NewStageStates(statesCfg) + handlerStageLastMile := NewStageLastMile(lastMileCfg) handlerStageFinish := NewStageFinish(finishCfg) return []*Stage{ @@ -78,6 +83,11 @@ func DefaultStages(ctx context.Context, Description: "Update Blockchain State", Handler: handlerStageStates, }, + { + ID: LastMile, + Description: "update status for blocks after sync and update last mile blocks as well", + Handler: handlerStageLastMile, + }, { ID: Finish, Description: "Finalize Changes", diff --git a/api/service/stagedstreamsync/downloader.go b/api/service/stagedstreamsync/downloader.go index 668698f107..96add97fd3 100644 --- a/api/service/stagedstreamsync/downloader.go +++ b/api/service/stagedstreamsync/downloader.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/rs/zerolog" + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" @@ -37,7 +38,7 @@ type ( ) // NewDownloader creates a new downloader -func NewDownloader(host p2p.Host, bc core.BlockChain, dbDir string, isBeaconNode bool, config Config) *Downloader { +func NewDownloader(host p2p.Host, bc core.BlockChain, consensus *consensus.Consensus, dbDir string, isBeaconNode bool, config Config) *Downloader { config.fixValues() sp := sync.NewProtocol(sync.Config{ @@ -67,8 +68,8 @@ func NewDownloader(host p2p.Host, bc core.BlockChain, dbDir string, isBeaconNode ctx, cancel := context.WithCancel(context.Background()) - //TODO: use mem db should be in config file - stagedSyncInstance, err := CreateStagedSync(ctx, bc, dbDir, false, isBeaconNode, sp, config, logger, config.LogProgress) + // create an instance of staged sync for the downloader + stagedSyncInstance, err := CreateStagedSync(ctx, bc, consensus, dbDir, isBeaconNode, sp, config, logger) if err != nil { cancel() return nil @@ -189,6 +190,7 @@ func (d *Downloader) waitForBootFinish() { func (d *Downloader) loop() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() + // for shard chain and beacon chain node, first we start with initSync=true to // make sure it goes through the long range sync first. // for epoch chain we do only need to go through epoch sync process @@ -208,7 +210,8 @@ func (d *Downloader) loop() { go trigger() case <-d.downloadC: - addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync) + bnBeforeSync := d.bc.CurrentBlock().NumberU64() + estimatedHeight, addedBN, err := d.stagedSyncInstance.doSync(d.ctx, initSync) if err != nil { //TODO: if there is a bad block which can't be resolved if d.stagedSyncInstance.invalidBlock.Active { @@ -216,13 +219,14 @@ func (d *Downloader) loop() { // if many streams couldn't solve it, then that's an unresolvable bad block if numTriedStreams >= d.config.InitStreams { if !d.stagedSyncInstance.invalidBlock.IsLogged { - fmt.Println("unresolvable bad block:", d.stagedSyncInstance.invalidBlock.Number) + d.logger.Error(). + Uint64("bad block number", d.stagedSyncInstance.invalidBlock.Number). + Msg(WrapStagedSyncMsg("unresolvable bad block")) d.stagedSyncInstance.invalidBlock.IsLogged = true } //TODO: if we don't have any new or untried stream in the list, sleep or panic } } - // If any error happens, sleep 5 seconds and retry d.logger.Error(). Err(err). @@ -242,16 +246,27 @@ func (d *Downloader) loop() { Uint32("shard", d.bc.ShardID()). Msg(WrapStagedSyncMsg("sync finished")) } - + // If block number has been changed, trigger another sync if addedBN != 0 { - // If block number has been changed, trigger another sync go trigger() + // try to add last mile from pub-sub (blocking) + if d.bh != nil { + d.bh.insertSync() + } } - // try to add last mile from pub-sub (blocking) - if d.bh != nil { - d.bh.insertSync() + // if last doSync needed only to add a few blocks less than LastMileBlocksThreshold and + // the node is fully synced now, then switch to short range + // the reason why we need to check distanceBeforeSync is because, if it was long distance, + // very likely, there are a couple of new blocks have been added to the other nodes which + // we should still stay in long range and check them. + bnAfterSync := d.bc.CurrentBlock().NumberU64() + distanceBeforeSync := estimatedHeight - bnBeforeSync + distanceAfterSync := estimatedHeight - bnAfterSync + if estimatedHeight > 0 && addedBN > 0 && + distanceBeforeSync <= uint64(LastMileBlocksThreshold) && + distanceAfterSync <= uint64(LastMileBlocksThreshold) { + initSync = false } - initSync = false case <-d.closeC: return diff --git a/api/service/stagedstreamsync/downloaders.go b/api/service/stagedstreamsync/downloaders.go index 2df8b74aab..583f3e1523 100644 --- a/api/service/stagedstreamsync/downloaders.go +++ b/api/service/stagedstreamsync/downloaders.go @@ -2,6 +2,7 @@ package stagedstreamsync import ( "github.com/harmony-one/abool" + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/p2p" ) @@ -15,7 +16,7 @@ type Downloaders struct { } // NewDownloaders creates Downloaders for sync of multiple blockchains -func NewDownloaders(host p2p.Host, bcs []core.BlockChain, dbDir string, config Config) *Downloaders { +func NewDownloaders(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, dbDir string, config Config) *Downloaders { ds := make(map[uint32]*Downloader) isBeaconNode := len(bcs) == 1 for _, bc := range bcs { @@ -25,7 +26,7 @@ func NewDownloaders(host p2p.Host, bcs []core.BlockChain, dbDir string, config C if _, ok := ds[bc.ShardID()]; ok { continue } - ds[bc.ShardID()] = NewDownloader(host, bc, dbDir, isBeaconNode, config) + ds[bc.ShardID()] = NewDownloader(host, bc, consensus, dbDir, isBeaconNode, config) } return &Downloaders{ ds: ds, diff --git a/api/service/stagedstreamsync/service.go b/api/service/stagedstreamsync/service.go index 40fbf7097c..f7ffd7f2d9 100644 --- a/api/service/stagedstreamsync/service.go +++ b/api/service/stagedstreamsync/service.go @@ -1,6 +1,7 @@ package stagedstreamsync import ( + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/p2p" ) @@ -11,9 +12,9 @@ type StagedStreamSyncService struct { } // NewService creates a new downloader service -func NewService(host p2p.Host, bcs []core.BlockChain, config Config, dbDir string) *StagedStreamSyncService { +func NewService(host p2p.Host, bcs []core.BlockChain, consensus *consensus.Consensus, config Config, dbDir string) *StagedStreamSyncService { return &StagedStreamSyncService{ - Downloaders: NewDownloaders(host, bcs, dbDir, config), + Downloaders: NewDownloaders(host, bcs, consensus, dbDir, config), } } diff --git a/api/service/stagedstreamsync/short_range_helper.go b/api/service/stagedstreamsync/short_range_helper.go index e43b3c6916..42327c78df 100644 --- a/api/service/stagedstreamsync/short_range_helper.go +++ b/api/service/stagedstreamsync/short_range_helper.go @@ -207,6 +207,12 @@ func (sh *srHelper) removeStreams(sts []sttypes.StreamID) { } } +func (sh *srHelper) streamsFailed(sts []sttypes.StreamID, reason string) { + for _, st := range sts { + sh.syncProtocol.StreamFailed(st, reason) + } +} + // blameAllStreams only not to blame all whitelisted streams when the it's not the last block signature verification failed. func (sh *srHelper) blameAllStreams(blocks types.Blocks, errIndex int, err error) bool { if errors.As(err, &emptySigVerifyErr) && errIndex == len(blocks)-1 { diff --git a/api/service/stagedstreamsync/stage_bodies.go b/api/service/stagedstreamsync/stage_bodies.go index 62309b76df..4996ea78b7 100644 --- a/api/service/stagedstreamsync/stage_bodies.go +++ b/api/service/stagedstreamsync/stage_bodies.go @@ -10,6 +10,7 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" + "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/pkg/errors" ) @@ -60,6 +61,11 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev return nil } + // shouldn't execute for epoch chain + if b.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { + return nil + } + maxHeight := s.state.status.targetBN currentHead := b.configs.bc.CurrentBlock().NumberU64() if currentHead >= maxHeight { @@ -77,7 +83,7 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev return errV } - if currProgress == 0 { + if currProgress <= currentHead { if err := b.cleanAllBlockDBs(ctx); err != nil { return err } @@ -209,7 +215,7 @@ func (b *StageBodies) redownloadBadBlock(ctx context.Context, s *StageState) err isOneOfTheBadStreams := false for _, id := range s.state.invalidBlock.StreamID { if id == stid { - b.configs.protocol.RemoveStream(stid) + b.configs.protocol.StreamFailed(stid, "re-download bad block from this stream failed") isOneOfTheBadStreams = true break } diff --git a/api/service/stagedstreamsync/stage_epoch.go b/api/service/stagedstreamsync/stage_epoch.go index 394a1d5d69..2c51aa1f94 100644 --- a/api/service/stagedstreamsync/stage_epoch.go +++ b/api/service/stagedstreamsync/stage_epoch.go @@ -5,6 +5,7 @@ import ( "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" + sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/pkg/errors" @@ -51,9 +52,12 @@ func (sr *StageEpoch) Exec(ctx context.Context, firstCycle bool, invalidBlockRev n, err := sr.doShortRangeSyncForEpochSync(ctx, s) s.state.inserted = n if err != nil { + utils.Logger().Info().Err(err).Msg("short range for epoch sync failed") return err } - + if n > 0 { + utils.Logger().Info().Err(err).Int("blocks inserted", n).Msg("epoch sync short range blocks inserted successfully") + } useInternalTx := tx == nil if useInternalTx { var err error @@ -108,30 +112,13 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(ctx context.Context, s *Stage return 0, nil } - //////////////////////////////////////////////////////// - hashChain, whitelist, err := sh.getHashChain(ctx, bns) + blocks, streamID, err := sh.getBlocksChain(ctx, bns) if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return 0, nil + } return 0, errors.Wrap(err, "getHashChain") } - if len(hashChain) == 0 { - // short circuit for no sync is needed - return 0, nil - } - blocks, streamID, err := sh.getBlocksByHashes(ctx, hashChain, whitelist) - if err != nil { - utils.Logger().Warn().Err(err).Msg("epoch sync getBlocksByHashes failed") - if !errors.Is(err, context.Canceled) { - sh.removeStreams(whitelist) // Remote nodes cannot provide blocks with target hashes - } - return 0, errors.Wrap(err, "epoch sync getBlocksByHashes") - } - /////////////////////////////////////////////////////// - // TODO: check this - // blocks, streamID, err := sh.getBlocksChain(bns) - // if err != nil { - // return 0, errors.Wrap(err, "getHashChain") - // } - /////////////////////////////////////////////////////// if len(blocks) == 0 { // short circuit for no sync is needed return 0, nil @@ -141,12 +128,9 @@ func (sr *StageEpoch) doShortRangeSyncForEpochSync(ctx context.Context, s *Stage numBlocksInsertedShortRangeHistogramVec.With(s.state.promLabels()).Observe(float64(n)) if err != nil { utils.Logger().Info().Err(err).Int("blocks inserted", n).Msg("Insert block failed") - sh.removeStreams(streamID) // Data provided by remote nodes is corrupted + sh.streamsFailed([]sttypes.StreamID{streamID}, "corrupted data") return n, err } - if n > 0 { - utils.Logger().Info().Int("blocks inserted", n).Msg("Insert block success") - } return n, nil } diff --git a/api/service/stagedstreamsync/stage_lastmile.go b/api/service/stagedstreamsync/stage_lastmile.go new file mode 100644 index 0000000000..157dcc3680 --- /dev/null +++ b/api/service/stagedstreamsync/stage_lastmile.go @@ -0,0 +1,109 @@ +package stagedstreamsync + +import ( + "context" + + "github.com/ethereum/go-ethereum/common" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/shard" + "github.com/ledgerwatch/erigon-lib/kv" +) + +type StageLastMile struct { + configs StageLastMileCfg +} + +type StageLastMileCfg struct { + ctx context.Context + bc core.BlockChain + db kv.RwDB +} + +func NewStageLastMile(cfg StageLastMileCfg) *StageLastMile { + return &StageLastMile{ + configs: cfg, + } +} + +func NewStageLastMileCfg(ctx context.Context, bc core.BlockChain, db kv.RwDB) StageLastMileCfg { + return StageLastMileCfg{ + ctx: ctx, + bc: bc, + db: db, + } +} + +func (lm *StageLastMile) Exec(ctx context.Context, firstCycle bool, invalidBlockRevert bool, s *StageState, reverter Reverter, tx kv.RwTx) (err error) { + + // no need to download the last mile blocks if we are redoing the stages because of bad block + if invalidBlockRevert { + return nil + } + + // shouldn't execute for epoch chain + if lm.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { + return nil + } + + bc := lm.configs.bc + + // update last mile blocks if any + parentHash := bc.CurrentBlock().Hash() + var hashes []common.Hash + for { + block := s.state.getBlockFromLastMileBlocksByParentHash(parentHash) + if block == nil { + break + } + err = s.state.UpdateBlockAndStatus(block, bc, false) + if err != nil { + s.state.RollbackLastMileBlocks(ctx, hashes) + return err + } + hashes = append(hashes, block.Hash()) + parentHash = block.Hash() + } + s.state.purgeLastMileBlocksFromCache() + + return nil +} + +func (lm *StageLastMile) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) { + useInternalTx := tx == nil + if useInternalTx { + tx, err = lm.configs.db.BeginRw(lm.configs.ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if err = u.Done(tx); err != nil { + return err + } + + if useInternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} + +func (lm *StageLastMile) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) { + useInternalTx := tx == nil + if useInternalTx { + tx, err = lm.configs.db.BeginRw(lm.configs.ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + + if useInternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} diff --git a/api/service/stagedstreamsync/stage_short_range.go b/api/service/stagedstreamsync/stage_short_range.go index 8fb2f3059e..54534bfbb1 100644 --- a/api/service/stagedstreamsync/stage_short_range.go +++ b/api/service/stagedstreamsync/stage_short_range.go @@ -44,6 +44,7 @@ func (sr *StageShortRange) Exec(ctx context.Context, firstCycle bool, invalidBlo return nil } + // shouldn't execute for epoch chain if sr.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { return nil } @@ -52,8 +53,12 @@ func (sr *StageShortRange) Exec(ctx context.Context, firstCycle bool, invalidBlo n, err := sr.doShortRangeSync(ctx, s) s.state.inserted = n if err != nil { + utils.Logger().Info().Err(err).Msg("short range sync failed") return err } + if n > 0 { + utils.Logger().Info().Err(err).Int("blocks inserted", n).Msg("short range blocks inserted successfully") + } useInternalTx := tx == nil if useInternalTx { @@ -98,6 +103,9 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState) blkNums := sh.prepareBlockHashNumbers(curBN) hashChain, whitelist, err := sh.getHashChain(ctx, blkNums) if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return 0, nil + } return 0, errors.Wrap(err, "getHashChain") } @@ -114,37 +122,34 @@ func (sr *StageShortRange) doShortRangeSync(ctx context.Context, s *StageState) s.state.status.setTargetBN(expEndBN) - s.state.status.startSyncing() - defer func() { - utils.Logger().Info().Msg("short range finished syncing") - s.state.status.finishSyncing() - }() - blocks, stids, err := sh.getBlocksByHashes(ctx, hashChain, whitelist) if err != nil { utils.Logger().Warn().Err(err).Msg("getBlocksByHashes failed") - if !errors.Is(err, context.Canceled) { - sh.removeStreams(whitelist) // Remote nodes cannot provide blocks with target hashes + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return 0, errors.Wrap(err, "getBlocksByHashes") } - return 0, errors.Wrap(err, "getBlocksByHashes") + sh.streamsFailed(whitelist, "remote nodes cannot provide blocks with target hashes") } - utils.Logger().Info().Int("num blocks", len(blocks)).Msg("getBlockByHashes result") - n, err := verifyAndInsertBlocks(sr.configs.bc, blocks) numBlocksInsertedShortRangeHistogramVec.With(s.state.promLabels()).Observe(float64(n)) if err != nil { utils.Logger().Warn().Err(err).Int("blocks inserted", n).Msg("Insert block failed") + // rollback all added new blocks + if rbErr := sr.configs.bc.Rollback(hashChain); rbErr != nil { + utils.Logger().Error().Err(rbErr).Msg("short range failed to rollback") + return 0, rbErr + } + // fail streams if sh.blameAllStreams(blocks, n, err) { - sh.removeStreams(whitelist) // Data provided by remote nodes is corrupted + sh.streamsFailed(whitelist, "data provided by remote nodes is corrupted") } else { // It is the last block gives a wrong commit sig. Blame the provider of the last block. st2Blame := stids[len(stids)-1] - sh.removeStreams([]sttypes.StreamID{st2Blame}) + sh.streamsFailed([]sttypes.StreamID{st2Blame}, "the last block provided by stream gives a wrong commit sig") } - return n, err + return 0, err } - utils.Logger().Info().Err(err).Int("blocks inserted", n).Msg("Insert block success") return n, nil } diff --git a/api/service/stagedstreamsync/stage_state.go b/api/service/stagedstreamsync/stage_state.go index 4b237c2916..b8dfb18288 100644 --- a/api/service/stagedstreamsync/stage_state.go +++ b/api/service/stagedstreamsync/stage_state.go @@ -10,6 +10,7 @@ import ( "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/rs/zerolog" ) @@ -57,6 +58,11 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR return nil } + // shouldn't execute for epoch chain + if stg.configs.bc.ShardID() == shard.BeaconChainShardID && !s.state.isBeaconNode { + return nil + } + maxHeight := s.state.status.targetBN currentHead := stg.configs.bc.CurrentBlock().NumberU64() if currentHead >= maxHeight { @@ -144,8 +150,10 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR if block.NumberU64() != i { s.state.protocol.StreamFailed(streamID, "invalid block with unmatched number is received from stream") - invalidBlockHash := block.Hash() - reverter.RevertTo(stg.configs.bc.CurrentBlock().NumberU64(), i, invalidBlockHash, streamID) + if !invalidBlockRevert { + invalidBlockHash := block.Hash() + reverter.RevertTo(stg.configs.bc.CurrentBlock().NumberU64(), i, invalidBlockHash, streamID) + } return ErrInvalidBlockNumber } diff --git a/api/service/stagedstreamsync/staged_stream_sync.go b/api/service/stagedstreamsync/staged_stream_sync.go index 6c1eec4c37..3cd8756604 100644 --- a/api/service/stagedstreamsync/staged_stream_sync.go +++ b/api/service/stagedstreamsync/staged_stream_sync.go @@ -8,11 +8,16 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" + "github.com/harmony-one/harmony/consensus" + "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/chain" "github.com/harmony-one/harmony/internal/utils" syncproto "github.com/harmony-one/harmony/p2p/stream/protocols/sync" sttypes "github.com/harmony-one/harmony/p2p/stream/types" "github.com/ledgerwatch/erigon-lib/kv" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" ) @@ -54,19 +59,22 @@ func (ib *InvalidBlock) addBadStream(bsID sttypes.StreamID) { } type StagedStreamSync struct { - bc core.BlockChain - isBeacon bool - isExplorer bool - db kv.RwDB - protocol syncProtocol - isBeaconNode bool - gbm *blockDownloadManager // initialized when finished get block number - inserted int - config Config - logger zerolog.Logger - status *status //TODO: merge this with currentSyncCycle - initSync bool // if sets to true, node start long range syncing - UseMemDB bool + bc core.BlockChain + consensus *consensus.Consensus + isBeacon bool + isExplorer bool + db kv.RwDB + protocol syncProtocol + isBeaconNode bool + gbm *blockDownloadManager // initialized when finished get block number + lastMileBlocks []*types.Block // last mile blocks to catch up with the consensus + lastMileMux sync.Mutex + inserted int + config Config + logger zerolog.Logger + status *status //TODO: merge this with currentSyncCycle + initSync bool // if sets to true, node start long range syncing + UseMemDB bool revertPoint *uint64 // used to run stages prevRevertPoint *uint64 // used to get value from outside of staged sync after cycle (for example to notify RPCDaemon) @@ -249,12 +257,12 @@ func (s *StagedStreamSync) cleanUp(ctx context.Context, fromStage int, db kv.RwD // New creates a new StagedStreamSync instance func New( bc core.BlockChain, + consensus *consensus.Consensus, db kv.RwDB, stagesList []*Stage, isBeacon bool, protocol syncProtocol, isBeaconNode bool, - useMemDB bool, config Config, logger zerolog.Logger, ) *StagedStreamSync { @@ -286,22 +294,24 @@ func New( status := newStatus() return &StagedStreamSync{ - bc: bc, - isBeacon: isBeacon, - db: db, - protocol: protocol, - isBeaconNode: isBeaconNode, - gbm: nil, - status: &status, - inserted: 0, - config: config, - logger: logger, - stages: stagesList, - currentStage: 0, - revertOrder: revertStages, - pruningOrder: pruneStages, - logPrefixes: logPrefixes, - UseMemDB: useMemDB, + bc: bc, + consensus: consensus, + isBeacon: isBeacon, + db: db, + protocol: protocol, + isBeaconNode: isBeaconNode, + lastMileBlocks: []*types.Block{}, + gbm: nil, + status: &status, + inserted: 0, + config: config, + logger: logger, + stages: stagesList, + currentStage: 0, + revertOrder: revertStages, + pruningOrder: pruneStages, + logPrefixes: logPrefixes, + UseMemDB: config.UseMemDB, } } @@ -583,3 +593,133 @@ func (s *StagedStreamSync) EnableStages(ids ...SyncStageID) { } } } + +func (ss *StagedStreamSync) purgeLastMileBlocksFromCache() { + ss.lastMileMux.Lock() + ss.lastMileBlocks = nil + ss.lastMileMux.Unlock() +} + +// AddLastMileBlock adds the latest a few block into queue for syncing +// only keep the latest blocks with size capped by LastMileBlocksSize +func (ss *StagedStreamSync) AddLastMileBlock(block *types.Block) { + ss.lastMileMux.Lock() + defer ss.lastMileMux.Unlock() + if ss.lastMileBlocks != nil { + if len(ss.lastMileBlocks) >= LastMileBlocksSize { + ss.lastMileBlocks = ss.lastMileBlocks[1:] + } + ss.lastMileBlocks = append(ss.lastMileBlocks, block) + } +} + +func (ss *StagedStreamSync) getBlockFromLastMileBlocksByParentHash(parentHash common.Hash) *types.Block { + ss.lastMileMux.Lock() + defer ss.lastMileMux.Unlock() + for _, block := range ss.lastMileBlocks { + ph := block.ParentHash() + if ph == parentHash { + return block + } + } + return nil +} + +func (ss *StagedStreamSync) addConsensusLastMile(bc core.BlockChain, cs *consensus.Consensus) ([]common.Hash, error) { + curNumber := bc.CurrentBlock().NumberU64() + var hashes []common.Hash + + err := cs.GetLastMileBlockIter(curNumber+1, func(blockIter *consensus.LastMileBlockIter) error { + for { + block := blockIter.Next() + if block == nil { + break + } + if _, err := bc.InsertChain(types.Blocks{block}, true); err != nil { + return errors.Wrap(err, "failed to InsertChain") + } + hashes = append(hashes, block.Header().Hash()) + } + return nil + }) + return hashes, err +} + +func (ss *StagedStreamSync) RollbackLastMileBlocks(ctx context.Context, hashes []common.Hash) error { + if len(hashes) == 0 { + return nil + } + utils.Logger().Info(). + Interface("block", ss.bc.CurrentBlock()). + Msg("[STAGED_STREAM_SYNC] Rolling back last mile blocks") + if err := ss.bc.Rollback(hashes); err != nil { + utils.Logger().Error().Err(err). + Msg("[STAGED_STREAM_SYNC] failed to rollback last mile blocks") + return err + } + return nil +} + +// UpdateBlockAndStatus updates block and its status in db +func (ss *StagedStreamSync) UpdateBlockAndStatus(block *types.Block, bc core.BlockChain, verifyAllSig bool) error { + if block.NumberU64() != bc.CurrentBlock().NumberU64()+1 { + utils.Logger().Debug(). + Uint64("curBlockNum", bc.CurrentBlock().NumberU64()). + Uint64("receivedBlockNum", block.NumberU64()). + Msg("[STAGED_STREAM_SYNC] Inappropriate block number, ignore!") + return nil + } + + haveCurrentSig := len(block.GetCurrentCommitSig()) != 0 + // Verify block signatures + if block.NumberU64() > 1 { + // Verify signature every N blocks (which N is verifyHeaderBatchSize and can be adjusted in configs) + verifySeal := block.NumberU64()%VerifyHeaderBatchSize == 0 || verifyAllSig + verifyCurrentSig := verifyAllSig && haveCurrentSig + if verifyCurrentSig { + sig, bitmap, err := chain.ParseCommitSigAndBitmap(block.GetCurrentCommitSig()) + if err != nil { + return errors.Wrap(err, "parse commitSigAndBitmap") + } + + startTime := time.Now() + if err := bc.Engine().VerifyHeaderSignature(bc, block.Header(), sig, bitmap); err != nil { + return errors.Wrapf(err, "verify header signature %v", block.Hash().String()) + } + utils.Logger().Debug(). + Int64("elapsed time", time.Now().Sub(startTime).Milliseconds()). + Msg("[STAGED_STREAM_SYNC] VerifyHeaderSignature") + } + err := bc.Engine().VerifyHeader(bc, block.Header(), verifySeal) + if err == engine.ErrUnknownAncestor { + return nil + } else if err != nil { + utils.Logger().Error(). + Err(err). + Uint64("block number", block.NumberU64()). + Msgf("[STAGED_STREAM_SYNC] UpdateBlockAndStatus: failed verifying signatures for new block") + return err + } + } + + _, err := bc.InsertChain([]*types.Block{block}, false /* verifyHeaders */) + if err != nil { + utils.Logger().Error(). + Err(err). + Uint64("block number", block.NumberU64()). + Uint32("shard", block.ShardID()). + Msgf("[STAGED_STREAM_SYNC] UpdateBlockAndStatus: Error adding new block to blockchain") + return err + } + utils.Logger().Info(). + Uint64("blockHeight", block.NumberU64()). + Uint64("blockEpoch", block.Epoch().Uint64()). + Str("blockHex", block.Hash().Hex()). + Uint32("ShardID", block.ShardID()). + Msg("[STAGED_STREAM_SYNC] UpdateBlockAndStatus: New Block Added to Blockchain") + + for i, tx := range block.StakingTransactions() { + utils.Logger().Info().Msgf("StakingTxn %d: %s, %v", i, tx.StakingType().String(), tx.StakingMessage()) + } + return nil +} diff --git a/api/service/stagedstreamsync/stages.go b/api/service/stagedstreamsync/stages.go index 55681d68f5..6a21fe7071 100644 --- a/api/service/stagedstreamsync/stages.go +++ b/api/service/stagedstreamsync/stages.go @@ -13,6 +13,7 @@ const ( SyncEpoch SyncStageID = "SyncEpoch" // epoch sync BlockBodies SyncStageID = "BlockBodies" // Block bodies are downloaded, TxHash and UncleHash are getting verified States SyncStageID = "States" // will construct most recent state from downloaded blocks + LastMile SyncStageID = "LastMile" // update blocks after sync and update last mile blocks as well Finish SyncStageID = "Finish" // Nominal stage after all other stages ) diff --git a/api/service/stagedstreamsync/syncing.go b/api/service/stagedstreamsync/syncing.go index ebef810b19..738f2f9203 100644 --- a/api/service/stagedstreamsync/syncing.go +++ b/api/service/stagedstreamsync/syncing.go @@ -4,13 +4,15 @@ import ( "context" "fmt" "path/filepath" + "runtime" + "strings" "sync" "time" + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" sttypes "github.com/harmony-one/harmony/p2p/stream/types" - "github.com/harmony-one/harmony/shard" "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/mdbx" "github.com/ledgerwatch/erigon-lib/kv/memdb" @@ -38,41 +40,53 @@ var Buckets = []string{ // CreateStagedSync creates an instance of staged sync func CreateStagedSync(ctx context.Context, bc core.BlockChain, + consensus *consensus.Consensus, dbDir string, - UseMemDB bool, isBeaconNode bool, protocol syncProtocol, config Config, logger zerolog.Logger, - logProgress bool, ) (*StagedStreamSync, error) { - isBeacon := bc.ShardID() == shard.BeaconChainShardID + logger.Info(). + Uint32("shard", bc.ShardID()). + Bool("beaconNode", isBeaconNode). + Bool("memdb", config.UseMemDB). + Str("dbDir", dbDir). + Bool("serverOnly", config.ServerOnly). + Int("minStreams", config.MinStreams). + Msg(WrapStagedSyncMsg("creating staged sync")) var mainDB kv.RwDB dbs := make([]kv.RwDB, config.Concurrency) - if UseMemDB { - mainDB = memdb.New(getMemDbTempPath(dbDir, -1)) + if config.UseMemDB { + mainDB = memdb.New(getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)) for i := 0; i < config.Concurrency; i++ { - dbs[i] = memdb.New(getMemDbTempPath(dbDir, i)) + dbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, i, dbDir) + dbs[i] = memdb.New(dbPath) } } else { - mainDB = mdbx.NewMDBX(log.New()).Path(getBlockDbPath(isBeacon, -1, dbDir)).MustOpen() + logger.Info(). + Str("path", getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)). + Msg(WrapStagedSyncMsg("creating main db")) + mainDB = mdbx.NewMDBX(log.New()).Path(getBlockDbPath(bc.ShardID(), isBeaconNode, -1, dbDir)).MustOpen() for i := 0; i < config.Concurrency; i++ { - dbPath := getBlockDbPath(isBeacon, i, dbDir) + dbPath := getBlockDbPath(bc.ShardID(), isBeaconNode, i, dbDir) dbs[i] = mdbx.NewMDBX(log.New()).Path(dbPath).MustOpen() } } if errInitDB := initDB(ctx, mainDB, dbs, config.Concurrency); errInitDB != nil { + logger.Error().Err(errInitDB).Msg("create staged sync instance failed") return nil, errInitDB } stageHeadsCfg := NewStageHeadersCfg(bc, mainDB) stageShortRangeCfg := NewStageShortRangeCfg(bc, mainDB) stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB) - stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeacon, logProgress) - stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, logProgress) + stageBodiesCfg := NewStageBodiesCfg(bc, mainDB, dbs, config.Concurrency, protocol, isBeaconNode, config.LogProgress) + stageStatesCfg := NewStageStatesCfg(bc, mainDB, dbs, config.Concurrency, logger, config.LogProgress) + lastMileCfg := NewStageLastMileCfg(ctx, bc, mainDB) stageFinishCfg := NewStageFinishCfg(mainDB) stages := DefaultStages(ctx, @@ -81,17 +95,27 @@ func CreateStagedSync(ctx context.Context, stageShortRangeCfg, stageBodiesCfg, stageStatesCfg, + lastMileCfg, stageFinishCfg, ) + logger.Info(). + Uint32("shard", bc.ShardID()). + Bool("beaconNode", isBeaconNode). + Bool("memdb", config.UseMemDB). + Str("dbDir", dbDir). + Bool("serverOnly", config.ServerOnly). + Int("minStreams", config.MinStreams). + Msg(WrapStagedSyncMsg("staged sync created successfully")) + return New( bc, + consensus, mainDB, stages, - isBeacon, + isBeaconNode, protocol, isBeaconNode, - UseMemDB, config, logger, ), nil @@ -147,7 +171,7 @@ func getMemDbTempPath(dbDir string, dbIndex int) string { } // getBlockDbPath returns the path of the cache database which stores blocks -func getBlockDbPath(beacon bool, loopID int, dbDir string) string { +func getBlockDbPath(shardID uint32, beacon bool, loopID int, dbDir string) string { if beacon { if loopID >= 0 { return fmt.Sprintf("%s_%d", filepath.Join(dbDir, "cache/beacon_blocks_db"), loopID) @@ -156,30 +180,57 @@ func getBlockDbPath(beacon bool, loopID int, dbDir string) string { } } else { if loopID >= 0 { - return fmt.Sprintf("%s_%d", filepath.Join(dbDir, "cache/blocks_db"), loopID) + return fmt.Sprintf("%s_%d_%d", filepath.Join(dbDir, "cache/blocks_db"), shardID, loopID) } else { - return filepath.Join(dbDir, "cache/blocks_db_main") + return fmt.Sprintf("%s_%d", filepath.Join(dbDir, "cache/blocks_db_main"), shardID) } } } +func (s *StagedStreamSync) Debug(source string, msg interface{}) { + // only log the msg in debug mode + if !s.config.DebugMode { + return + } + pc, _, _, _ := runtime.Caller(1) + caller := runtime.FuncForPC(pc).Name() + callerParts := strings.Split(caller, "/") + if len(callerParts) > 0 { + caller = callerParts[len(callerParts)-1] + } + src := source + if src == "" { + src = "message" + } + // SSSD: STAGED STREAM SYNC DEBUG + if msg == nil { + fmt.Printf("[SSSD:%s] %s: nil or no error\n", caller, src) + } else if err, ok := msg.(error); ok { + fmt.Printf("[SSSD:%s] %s: %s\n", caller, src, err.Error()) + } else if str, ok := msg.(string); ok { + fmt.Printf("[SSSD:%s] %s: %s\n", caller, src, str) + } else { + fmt.Printf("[SSSD:%s] %s: %v\n", caller, src, msg) + } +} + // doSync does the long range sync. // One LongRangeSync consists of several iterations. // For each iteration, estimate the current block number, then fetch block & insert to blockchain -func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bool) (int, error) { +func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bool) (uint64, int, error) { var totalInserted int s.initSync = initSync if err := s.checkPrerequisites(); err != nil { - return 0, err + return 0, 0, err } var estimatedHeight uint64 if initSync { if h, err := s.estimateCurrentNumber(downloaderContext); err != nil { - return 0, err + return 0, 0, err } else { estimatedHeight = h //TODO: use directly currentCycle var @@ -187,36 +238,70 @@ func (s *StagedStreamSync) doSync(downloaderContext context.Context, initSync bo } if curBN := s.bc.CurrentBlock().NumberU64(); estimatedHeight <= curBN { s.logger.Info().Uint64("current number", curBN).Uint64("target number", estimatedHeight). - Msg(WrapStagedSyncMsg("early return of long range sync")) - return 0, nil + Msg(WrapStagedSyncMsg("early return of long range sync (chain is already ahead of target height)")) + return estimatedHeight, 0, nil } - - s.startSyncing() - defer s.finishSyncing() } + s.startSyncing() + defer s.finishSyncing() + for { ctx, cancel := context.WithCancel(downloaderContext) n, err := s.doSyncCycle(ctx, initSync) if err != nil { + utils.Logger().Error(). + Err(err). + Bool("initSync", s.initSync). + Bool("isBeacon", s.isBeacon). + Uint32("shard", s.bc.ShardID()). + Msg(WrapStagedSyncMsg("sync cycle failed")) + pl := s.promLabels() pl["error"] = err.Error() numFailedDownloadCounterVec.With(pl).Inc() cancel() - return totalInserted + n, err + return estimatedHeight, totalInserted + n, err } cancel() totalInserted += n // if it's not long range sync, skip loop - if n < LastMileBlocksThreshold || !initSync { - return totalInserted, nil + if n == 0 || !initSync { + break + } + } + + if totalInserted > 0 { + utils.Logger().Info(). + Bool("initSync", s.initSync). + Bool("isBeacon", s.isBeacon). + Uint32("shard", s.bc.ShardID()). + Int("blocks", totalInserted). + Msg(WrapStagedSyncMsg("sync cycle blocks inserted successfully")) + } + + // add consensus last mile blocks + if s.consensus != nil { + if hashes, err := s.addConsensusLastMile(s.Blockchain(), s.consensus); err != nil { + utils.Logger().Error().Err(err). + Msg("[STAGED_STREAM_SYNC] Add consensus last mile failed") + s.RollbackLastMileBlocks(downloaderContext, hashes) + return estimatedHeight, totalInserted, err + } else { + totalInserted += len(hashes) + } + // TODO: move this to explorer handler code. + if s.isExplorer { + s.consensus.UpdateConsensusInformation() } } + s.purgeLastMileBlocksFromCache() + return estimatedHeight, totalInserted, nil } func (s *StagedStreamSync) doSyncCycle(ctx context.Context, initSync bool) (int, error) { diff --git a/api/service/stagedsync/stagedsync.go b/api/service/stagedsync/stagedsync.go index 83af6abf9f..f1de66f9fc 100644 --- a/api/service/stagedsync/stagedsync.go +++ b/api/service/stagedsync/stagedsync.go @@ -83,6 +83,9 @@ type StagedSync struct { StagedSyncTurboMode bool // log the full sync progress in console LogProgress bool + // log every single process and error to help to debug the syncing + // DebugMode is not accessible to the end user and is only an aid for development + DebugMode bool } // BlockWithSig the serialization structure for request DownloaderRequest_BLOCKWITHSIG @@ -258,7 +261,8 @@ func New(ctx context.Context, verifyAllSig bool, verifyHeaderBatchSize uint64, insertChainBatchSize int, - logProgress bool) *StagedSync { + logProgress bool, + debugMode bool) *StagedSync { revertStages := make([]*Stage, len(stagesList)) for i, stageIndex := range revertOrder { @@ -312,6 +316,7 @@ func New(ctx context.Context, VerifyHeaderBatchSize: verifyHeaderBatchSize, InsertChainBatchSize: insertChainBatchSize, LogProgress: logProgress, + DebugMode: debugMode, } } diff --git a/api/service/stagedsync/syncing.go b/api/service/stagedsync/syncing.go index 11147f6a62..a22a4e9253 100644 --- a/api/service/stagedsync/syncing.go +++ b/api/service/stagedsync/syncing.go @@ -64,6 +64,7 @@ func CreateStagedSync( verifyHeaderBatchSize uint64, insertChainBatchSize int, logProgress bool, + debugMode bool, ) (*StagedSync, error) { ctx := context.Background() @@ -134,6 +135,7 @@ func CreateStagedSync( verifyHeaderBatchSize, insertChainBatchSize, logProgress, + debugMode, ), nil } diff --git a/cmd/harmony/default.go b/cmd/harmony/default.go index 38ae3dacbb..4cc20cfdf4 100644 --- a/cmd/harmony/default.go +++ b/cmd/harmony/default.go @@ -178,6 +178,7 @@ var defaultStagedSyncConfig = harmonyconfig.StagedSyncConfig{ MaxMemSyncCycleSize: 1024, // max number of blocks to use a single transaction for staged sync UseMemDB: true, // it uses memory by default. set it to false to use disk LogProgress: false, // log the full sync progress in console + DebugMode: false, // log every single process and error to help to debug the syncing (DebugMode is not accessible to the end user and is only an aid for development) } var ( diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index e66d3c9c73..16f985beac 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -606,6 +606,7 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType, nodeConfig.VerifyHeaderBatchSize = hc.Sync.StagedSyncCfg.VerifyHeaderBatchSize nodeConfig.InsertChainBatchSize = hc.Sync.StagedSyncCfg.InsertChainBatchSize nodeConfig.LogProgress = hc.Sync.StagedSyncCfg.LogProgress + nodeConfig.DebugMode = hc.Sync.StagedSyncCfg.DebugMode // P2P private key is used for secure message transfer between p2p nodes. nodeConfig.P2PPriKey, _, err = utils.LoadKeyFromFile(hc.P2P.KeyFile) if err != nil { @@ -942,7 +943,9 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har SmHardLowCap: hc.Sync.DiscHardLowCap, SmHiCap: hc.Sync.DiscHighCap, SmDiscBatch: hc.Sync.DiscBatch, - LogProgress: node.NodeConfig.LogProgress, + UseMemDB: hc.Sync.StagedSyncCfg.UseMemDB, + LogProgress: hc.Sync.StagedSyncCfg.LogProgress, + DebugMode: hc.Sync.StagedSyncCfg.DebugMode, } // If we are running side chain, we will need to do some extra works for beacon @@ -954,7 +957,7 @@ func setupStagedSyncService(node *node.Node, host p2p.Host, hc harmonyconfig.Har } } //Setup stream sync service - s := stagedstreamsync.NewService(host, blockchains, sConfig, hc.General.DataDir) + s := stagedstreamsync.NewService(host, blockchains, node.Consensus, sConfig, hc.General.DataDir) node.RegisterService(service.StagedStreamSync, s) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 38bb06ff4a..3627f0b658 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -157,10 +157,11 @@ func (consensus *Consensus) finalCommit() { Msg("[finalCommit] Unable to construct Committed message") return } - msgToSend, FBFTMsg := - network.Bytes, - network.FBFTMsg - commitSigAndBitmap := FBFTMsg.Payload + var ( + msgToSend = network.Bytes + FBFTMsg = network.FBFTMsg + commitSigAndBitmap = FBFTMsg.Payload + ) consensus.fBFTLog.AddVerifiedMessage(FBFTMsg) // find correct block content curBlockHash := consensus.blockHash diff --git a/consensus/validator.go b/consensus/validator.go index ee30d8c006..0506f4359d 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -132,7 +132,7 @@ func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block if err := consensus.verifyBlock(&blockObj); err != nil { consensus.getLogger().Error().Err(err).Msg("[validateNewBlock] Block verification failed") - return nil, errors.New("Block verification failed") + return nil, errors.Errorf("Block verification failed: %s", err.Error()) } return &blockObj, nil } diff --git a/core/blockchain_impl.go b/core/blockchain_impl.go index 698eaeebac..965dccd9a4 100644 --- a/core/blockchain_impl.go +++ b/core/blockchain_impl.go @@ -221,6 +221,7 @@ type BlockChainImpl struct { badBlocks *lru.Cache // Bad block cache pendingSlashes slash.Records maxGarbCollectedBlkNum int64 + leaderRotationMeta leaderRotationMeta options Options } @@ -359,6 +360,12 @@ func newBlockChainWithOptions( bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Hash()) } + curHeader := bc.CurrentBlock().Header() + err = bc.buildLeaderRotationMeta(curHeader) + if err != nil { + return nil, errors.WithMessage(err, "failed to build leader rotation meta") + } + // Take ownership of this particular state go bc.update() return bc, nil @@ -1479,8 +1486,11 @@ func (bc *BlockChainImpl) WriteBlockWithState( defer bc.mu.Unlock() currentBlock := bc.CurrentBlock() - if currentBlock == nil || block.ParentHash() != currentBlock.Hash() { - return NonStatTy, errors.New("Hash of parent block doesn't match the current block hash") + if currentBlock == nil { + return NonStatTy, errors.New("Current block is nil") + } + if block.ParentHash() != currentBlock.Hash() { + return NonStatTy, errors.Errorf("Hash of parent block %s doesn't match the current block hash %s", currentBlock.Hash().Hex(), block.ParentHash().Hex()) } // Commit state object changes to in-memory trie @@ -1650,20 +1660,52 @@ func (bc *BlockChainImpl) InsertChain(chain types.Blocks, verifyHeaders bool) (i return n, err } +// buildLeaderRotationMeta builds leader rotation meta if feature is activated. +func (bc *BlockChainImpl) buildLeaderRotationMeta(curHeader *block.Header) error { + if !bc.chainConfig.IsLeaderRotation(curHeader.Epoch()) { + return nil + } + if curHeader.NumberU64() == 0 { + return errors.New("current header is genesis") + } + curPubKey, err := bc.getLeaderPubKeyFromCoinbase(curHeader) + if err != nil { + return err + } + for i := curHeader.NumberU64() - 1; i >= 0; i-- { + header := bc.GetHeaderByNumber(i) + if header == nil { + return errors.New("header is nil") + } + blockPubKey, err := bc.getLeaderPubKeyFromCoinbase(header) + if err != nil { + return err + } + if curPubKey.Bytes != blockPubKey.Bytes || curHeader.Epoch().Uint64() != header.Epoch().Uint64() { + for j := i; i <= curHeader.NumberU64(); j++ { + header := bc.GetHeaderByNumber(i) + if header == nil { + return errors.New("header is nil") + } + err := bc.saveLeaderRotationMeta(header) + if err != nil { + utils.Logger().Error().Err(err).Msg("save leader continuous blocks count error") + return err + } + } + return nil + } + } + return errors.New("no leader rotation meta to save") +} + func (bc *BlockChainImpl) saveLeaderRotationMeta(h *block.Header) error { blockPubKey, err := bc.getLeaderPubKeyFromCoinbase(h) if err != nil { return err } - type stored struct { - pub []byte - epoch uint64 - count uint64 - shifts uint64 - } - var s stored - // error is possible here only on the first iteration, so we can ignore it - s.pub, s.epoch, s.count, s.shifts, _ = rawdb.ReadLeaderRotationMeta(bc.db) + + var s = bc.leaderRotationMeta // increase counter only if the same leader and epoch if bytes.Equal(s.pub, blockPubKey.Bytes[:]) && s.epoch == h.Epoch().Uint64() { @@ -1679,11 +1721,9 @@ func (bc *BlockChainImpl) saveLeaderRotationMeta(h *block.Header) error { if s.epoch != h.Epoch().Uint64() { s.shifts = 0 } + s.epoch = h.Epoch().Uint64() + bc.leaderRotationMeta = s - err = rawdb.WriteLeaderRotationMeta(bc.db, blockPubKey.Bytes[:], h.Epoch().Uint64(), s.count, s.shifts) - if err != nil { - return err - } return nil } diff --git a/core/blockchain_leader_rotation.go b/core/blockchain_leader_rotation.go new file mode 100644 index 0000000000..6b9c91a927 --- /dev/null +++ b/core/blockchain_leader_rotation.go @@ -0,0 +1,8 @@ +package core + +type leaderRotationMeta struct { + pub []byte + epoch uint64 + count uint64 + shifts uint64 +} diff --git a/core/state_processor.go b/core/state_processor.go index fe7eeffd12..7a2f2a5d47 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -119,8 +119,8 @@ func (p *StateProcessor) Process( usedGas = new(uint64) header = block.Header() allLogs []*types.Log - gp = new(GasPool).AddGas(block.GasLimit()) - blockStakeMsgs []staking.StakeMsg = make([]staking.StakeMsg, 0) + gp = new(GasPool).AddGas(block.GasLimit()) + blockStakeMsgs = make([]staking.StakeMsg, 0) ) beneficiary, err := p.bc.GetECDSAFromCoinbase(header) @@ -202,7 +202,7 @@ func (p *StateProcessor) Process( receipts, outcxs, incxs, block.StakingTransactions(), slashes, sigsReady, func() uint64 { return header.ViewID().Uint64() }, ) if err != nil { - return nil, nil, nil, nil, 0, nil, statedb, errors.New("[Process] Cannot finalize block") + return nil, nil, nil, nil, 0, nil, statedb, errors.WithMessage(err, "[Process] Cannot finalize block") } result := &ProcessorResult{ diff --git a/core_test/shardchain_test.go b/core_test/shardchain_test.go new file mode 100644 index 0000000000..36a32f5433 --- /dev/null +++ b/core_test/shardchain_test.go @@ -0,0 +1,83 @@ +package core_test + +import ( + "fmt" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/harmony-one/harmony/consensus" + "github.com/harmony-one/harmony/consensus/quorum" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/crypto/bls" + "github.com/harmony-one/harmony/internal/chain" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/registry" + "github.com/harmony-one/harmony/internal/shardchain" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/multibls" + "github.com/harmony-one/harmony/node" + "github.com/harmony-one/harmony/p2p" + "github.com/harmony-one/harmony/shard" + "github.com/stretchr/testify/require" +) + +var testDBFactory = &shardchain.MemDBFactory{} + +func TestAddNewBlock(t *testing.T) { + blsKey := bls.RandPrivateKey() + pubKey := blsKey.GetPublicKey() + leader := p2p.Peer{IP: "127.0.0.1", Port: "9882", ConsensusPubKey: pubKey} + priKey, _, _ := utils.GenKeyP2P("127.0.0.1", "9902") + host, err := p2p.NewHost(p2p.HostConfig{ + Self: &leader, + BLSKey: priKey, + }) + if err != nil { + t.Fatalf("newhost failure: %v", err) + } + engine := chain.NewEngine() + chainconfig := nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType().ChainConfig() + collection := shardchain.NewCollection( + nil, testDBFactory, &core.GenesisInitializer{NetworkType: nodeconfig.GetShardConfig(shard.BeaconChainShardID).GetNetworkType()}, engine, &chainconfig, + ) + decider := quorum.NewDecider( + quorum.SuperMajorityVote, shard.BeaconChainShardID, + ) + blockchain, err := collection.ShardChain(shard.BeaconChainShardID) + if err != nil { + t.Fatal("cannot get blockchain") + } + reg := registry.New().SetBlockchain(blockchain) + consensus, err := consensus.New( + host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false, + ) + if err != nil { + t.Fatalf("Cannot craeate consensus: %v", err) + } + nodeconfig.SetNetworkType(nodeconfig.Testnet) + var block *types.Block + node := node.New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg) + commitSigs := make(chan []byte, 1) + commitSigs <- []byte{} + block, err = node.Worker.FinalizeNewBlock( + commitSigs, func() uint64 { return uint64(0) }, common.Address{}, nil, nil, + ) + if err != nil { + t.Fatal("cannot finalize new block") + } + + nn := node.Blockchain().CurrentBlock() + t.Log("[*]", nn.NumberU64(), nn.Hash().Hex(), nn.ParentHash()) + + _, err = blockchain.InsertChain([]*types.Block{block}, false) + require.NoError(t, err, "error when adding new block") + + pk, epoch, count, shifts, err := blockchain.LeaderRotationMeta() + fmt.Println("pk", pk, "epoch", epoch, "count", count, "shifts", shifts, "err", err) + + t.Log("#", block.Header().NumberU64(), node.Blockchain().CurrentBlock().NumberU64(), block.Hash().Hex(), block.ParentHash()) + + err = blockchain.Rollback([]common.Hash{block.Hash()}) + require.NoError(t, err, "error when rolling back") +} diff --git a/internal/configs/harmony/harmony.go b/internal/configs/harmony/harmony.go index f4f2ddb96f..5aca663a89 100644 --- a/internal/configs/harmony/harmony.go +++ b/internal/configs/harmony/harmony.go @@ -343,6 +343,7 @@ type StagedSyncConfig struct { VerifyHeaderBatchSize uint64 // batch size to verify header before insert to chain UseMemDB bool // it uses memory by default. set it to false to use disk LogProgress bool // log the full sync progress in console + DebugMode bool // log every single process and error to help to debug syncing issues (DebugMode is not accessible to the end user and is only an aid for development) } type PriceLimit int64 diff --git a/internal/configs/node/config.go b/internal/configs/node/config.go index 9f681fca9f..5370a2e52f 100644 --- a/internal/configs/node/config.go +++ b/internal/configs/node/config.go @@ -58,6 +58,31 @@ const ( Localnet = "localnet" ) +// ChainConfig returns the chain configuration for the network type. +func (t NetworkType) ChainConfig() params.ChainConfig { + switch t { + case Mainnet: + return *params.MainnetChainConfig + case Pangaea: + return *params.PangaeaChainConfig + case Partner: + return *params.PartnerChainConfig + case Stressnet: + return *params.StressnetChainConfig + case Localnet: + return *params.LocalnetChainConfig + default: + return *params.TestnetChainConfig + } +} + +func (n NetworkType) String() string { + if n == "" { + return Testnet // default to testnet + } + return string(n) +} + // Global is the index of the global node configuration const ( Global = 0 @@ -93,6 +118,7 @@ type ConfigType struct { VerifyAllSig bool // verify signatures for all blocks regardless of height and batch size VerifyHeaderBatchSize uint64 // batch size to verify header before insert to chain LogProgress bool // log the full sync progress in console + DebugMode bool // log every single process and error to help to debug the syncing issues NtpServer string StringRole string P2PPriKey p2p_crypto.PrivKey `json:"-"` @@ -351,21 +377,3 @@ func (conf *ConfigType) ValidateConsensusKeysForSameShard(pubkeys multibls.Publi } return nil } - -// ChainConfig returns the chain configuration for the network type. -func (t NetworkType) ChainConfig() params.ChainConfig { - switch t { - case Mainnet: - return *params.MainnetChainConfig - case Pangaea: - return *params.PangaeaChainConfig - case Partner: - return *params.PartnerChainConfig - case Stressnet: - return *params.StressnetChainConfig - case Localnet: - return *params.LocalnetChainConfig - default: - return *params.TestnetChainConfig - } -} diff --git a/node/node_handler_test.go b/node/node_handler_test.go index 3de2918398..307e21b11a 100644 --- a/node/node_handler_test.go +++ b/node/node_handler_test.go @@ -46,13 +46,11 @@ func TestAddNewBlock(t *testing.T) { t.Fatal("cannot get blockchain") } reg := registry.New().SetBlockchain(blockchain) - consensus, err := consensus.New( - host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false, - ) + consensus, err := consensus.New(host, shard.BeaconChainShardID, multibls.GetPrivateKeys(blsKey), reg, decider, 3, false) if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) } - nodeconfig.SetNetworkType(nodeconfig.Devnet) + nodeconfig.SetNetworkType(nodeconfig.Testnet) node := New(host, consensus, engine, collection, nil, nil, nil, nil, nil, reg) txs := make(map[common.Address]types.Transactions) diff --git a/node/node_newblock_test.go b/node/node_newblock_test.go index 39affacaba..963af2f55b 100644 --- a/node/node_newblock_test.go +++ b/node/node_newblock_test.go @@ -63,10 +63,8 @@ func TestFinalizeNewBlockAsync(t *testing.T) { node.Worker.CommitTransactions( txs, stks, common.Address{}, ) - commitSigs := make(chan []byte) - go func() { - commitSigs <- []byte{} - }() + commitSigs := make(chan []byte, 1) + commitSigs <- []byte{} block, _ := node.Worker.FinalizeNewBlock( commitSigs, func() uint64 { return 0 }, common.Address{}, nil, nil, diff --git a/node/node_syncing.go b/node/node_syncing.go index 84eb1256ff..68c3338362 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -122,7 +122,8 @@ func (node *Node) createStagedSync(bc core.BlockChain) *stagedsync.StagedSync { node.NodeConfig.VerifyAllSig, node.NodeConfig.VerifyHeaderBatchSize, node.NodeConfig.InsertChainBatchSize, - node.NodeConfig.LogProgress); err != nil { + node.NodeConfig.LogProgress, + node.NodeConfig.DebugMode); err != nil { return nil } else { return s @@ -352,7 +353,7 @@ func (node *Node) NodeSyncing() { if node.HarmonyConfig.TiKV.Role == tikv.RoleWriter { node.supportSyncing() // the writer needs to be in sync with it's other peers } - } else if !node.HarmonyConfig.General.IsOffline && node.HarmonyConfig.DNSSync.Client { + } else if !node.HarmonyConfig.General.IsOffline && (node.HarmonyConfig.DNSSync.Client || node.HarmonyConfig.Sync.Downloader) { node.supportSyncing() // for non-writer-reader mode a.k.a tikv nodes } } @@ -372,6 +373,11 @@ func (node *Node) supportSyncing() { go node.SendNewBlockToUnsync() } + // if stream sync client is running, don't create other sync client instances + if node.HarmonyConfig.Sync.Downloader { + return + } + if !node.NodeConfig.StagedSync && node.stateSync == nil { node.stateSync = node.createStateSync(node.Blockchain()) utils.Logger().Debug().Msg("[SYNC] initialized state sync") diff --git a/p2p/host.go b/p2p/host.go index 9395e1df45..246b598b22 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -12,17 +12,17 @@ import ( "time" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/routing" dht "github.com/libp2p/go-libp2p-kad-dht" libp2p_pubsub "github.com/libp2p/go-libp2p-pubsub" libp2p_config "github.com/libp2p/go-libp2p/config" libp2p_crypto "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/host" libp2p_host "github.com/libp2p/go-libp2p/core/host" libp2p_network "github.com/libp2p/go-libp2p/core/network" libp2p_peer "github.com/libp2p/go-libp2p/core/peer" libp2p_peerstore "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/libp2p/go-libp2p/p2p/security/noise" diff --git a/p2p/stream/protocols/sync/protocol.go b/p2p/stream/protocols/sync/protocol.go index 3dcbd6f063..ca4590c972 100644 --- a/p2p/stream/protocols/sync/protocol.go +++ b/p2p/stream/protocols/sync/protocol.go @@ -2,7 +2,6 @@ package sync import ( "context" - "fmt" "strconv" "time" @@ -180,7 +179,8 @@ func (p *Protocol) HandleStream(raw libp2p_network.Stream) { Msg("failed to add new stream") return } - fmt.Println("Connected to", raw.Conn().RemotePeer().String(), "(", st.ProtoID(), ")", "my ID: ", raw.Conn().LocalPeer().String()) + //to get my ID use raw.Conn().LocalPeer().String() + p.logger.Info().Msgf("Connected to %s (%s)", raw.Conn().RemotePeer().String(), st.ProtoID()) st.run() } diff --git a/p2p/stream/types/utils.go b/p2p/stream/types/utils.go index c27d95d60f..72838222b4 100644 --- a/p2p/stream/types/utils.go +++ b/p2p/stream/types/utils.go @@ -11,7 +11,7 @@ import ( nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/hashicorp/go-version" - libp2p_proto "github.com/libp2p/go-libp2p-core/protocol" + libp2p_proto "github.com/libp2p/go-libp2p/core/protocol" "github.com/pkg/errors" ) diff --git a/scripts/travis_go_checker.sh b/scripts/travis_go_checker.sh index 70638377a5..c6c7640caf 100755 --- a/scripts/travis_go_checker.sh +++ b/scripts/travis_go_checker.sh @@ -84,7 +84,7 @@ fi echo "Running go test..." # Fix https://github.com/golang/go/issues/44129#issuecomment-788351567 go get -t ./... -if go test -v -count=1 -vet=all -race ./... +if go test -count=1 -vet=all -race ./... then echo "go test succeeded." else