diff --git a/consensus/checks.go b/consensus/checks.go index 28da66ad7b..32f59fb938 100644 --- a/consensus/checks.go +++ b/consensus/checks.go @@ -68,7 +68,7 @@ func (consensus *Consensus) isRightBlockNumAndViewID(recvMsg *FBFTMessage) bool } func (consensus *Consensus) onAnnounceSanityChecks(recvMsg *FBFTMessage) bool { - logMsgs := consensus.FBFTLog.GetMessagesByTypeSeqView( + logMsgs := consensus.fBFTLog.GetMessagesByTypeSeqView( msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID, ) if len(logMsgs) > 0 { diff --git a/consensus/consensus.go b/consensus/consensus.go index ddbea9ec46..09bdef51ae 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -46,7 +46,7 @@ type VerifyBlockFunc func(*types.Block) error type Consensus struct { Decider quorum.Decider // FBFTLog stores the pbft messages and blocks during FBFT process - FBFTLog *FBFTLog + fBFTLog *FBFTLog // phase: different phase of FBFT protocol: pre-prepare, prepare, commit, finish etc phase FBFTPhase // current indicates what state a node is in @@ -84,7 +84,7 @@ type Consensus struct { // IgnoreViewIDCheck determines whether to ignore viewID check IgnoreViewIDCheck *abool.AtomicBool // consensus mutex - mutex sync.RWMutex + mutex *sync.RWMutex // ViewChange struct vc *viewChange // Signal channel for proposing a new block and start new consensus @@ -140,6 +140,13 @@ func (consensus *Consensus) Blockchain() core.BlockChain { return consensus.registry.GetBlockchain() } +func (consensus *Consensus) FBFTLog() FBFT { + return threadsafeFBFTLog{ + log: consensus.fBFTLog, + mu: consensus.mutex, + } +} + // ChainReader returns the chain reader. // This is mostly the same as Blockchain, but it returns only read methods, so we assume it's safe for concurrent use. func (consensus *Consensus) ChainReader() engine.ChainReader { @@ -165,11 +172,11 @@ func (consensus *Consensus) Beaconchain() core.BlockChain { // VerifyBlock is a function used to verify the block and keep trace of verified blocks. func (consensus *Consensus) verifyBlock(block *types.Block) error { - if !consensus.FBFTLog.IsBlockVerified(block.Hash()) { + if !consensus.fBFTLog.IsBlockVerified(block.Hash()) { if err := consensus.BlockVerifier(block); err != nil { return errors.Errorf("Block verification failed: %s", err) } - consensus.FBFTLog.MarkBlockVerified(block) + consensus.fBFTLog.MarkBlockVerified(block) } return nil } @@ -261,21 +268,21 @@ func New( Decider quorum.Decider, minPeers int, aggregateSig bool, ) (*Consensus, error) { consensus := Consensus{ - ShardID: shard, + mutex: &sync.RWMutex{}, + ShardID: shard, + fBFTLog: NewFBFTLog(), + phase: FBFTAnnounce, + current: State{mode: Normal}, + Decider: Decider, + registry: registry, + MinPeers: minPeers, + AggregateSig: aggregateSig, + host: host, + msgSender: NewMessageSender(host), + BlockNumLowChan: make(chan struct{}, 1), + // FBFT timeout + consensusTimeout: createTimeout(), } - consensus.Decider = Decider - consensus.registry = registry - consensus.MinPeers = minPeers - consensus.AggregateSig = aggregateSig - consensus.host = host - consensus.msgSender = NewMessageSender(host) - consensus.BlockNumLowChan = make(chan struct{}, 1) - // FBFT related - consensus.FBFTLog = NewFBFTLog() - consensus.phase = FBFTAnnounce - consensus.current = State{mode: Normal} - // FBFT timeout - consensus.consensusTimeout = createTimeout() if multiBLSPriKey != nil { consensus.priKey = multiBLSPriKey diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index af40c42bcb..5229f0ed31 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -575,7 +575,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error { copy(blockHash[:], payload[:32]) // Leader sign and add commit message - block := consensus.FBFTLog.GetBlockByHash(blockHash) + block := consensus.fBFTLog.GetBlockByHash(blockHash) if block == nil { return errGetPreparedBlock } diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index d2397a64aa..697ba49525 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -22,7 +22,6 @@ func TestConsensusInitialization(t *testing.T) { assert.NoError(t, err) messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec} - fbtLog := NewFBFTLog() state := State{mode: Normal} timeouts := createTimeout() @@ -37,7 +36,7 @@ func TestConsensusInitialization(t *testing.T) { assert.IsType(t, make(chan struct{}), consensus.BlockNumLowChan) // FBFTLog - assert.Equal(t, fbtLog, consensus.FBFTLog) + assert.NotNil(t, consensus.FBFTLog()) assert.Equal(t, FBFTAnnounce, consensus.phase) diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 262cbe37d6..38bb06ff4a 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -161,10 +161,10 @@ func (consensus *Consensus) finalCommit() { network.Bytes, network.FBFTMsg commitSigAndBitmap := FBFTMsg.Payload - consensus.FBFTLog.AddVerifiedMessage(FBFTMsg) + consensus.fBFTLog.AddVerifiedMessage(FBFTMsg) // find correct block content curBlockHash := consensus.blockHash - block := consensus.FBFTLog.GetBlockByHash(curBlockHash) + block := consensus.fBFTLog.GetBlockByHash(curBlockHash) if block == nil { consensus.getLogger().Warn(). Str("blockHash", hex.EncodeToString(curBlockHash[:])). @@ -278,7 +278,7 @@ func (consensus *Consensus) BlockCommitSigs(blockNum uint64) ([]byte, error) { lastCommits, err := consensus.Blockchain().ReadCommitSig(blockNum) if err != nil || len(lastCommits) < bls.BLSSignatureSizeInBytes { - msgs := consensus.FBFTLog.GetMessagesByTypeSeq( + msgs := consensus.FBFTLog().GetMessagesByTypeSeq( msg_pb.MessageType_COMMITTED, blockNum, ) if len(msgs) != 1 { @@ -482,7 +482,7 @@ func (consensus *Consensus) getLastMileBlockIter(bnStart uint64, cb func(iter *L } return cb(&LastMileBlockIter{ blockCandidates: blocks, - fbftLog: consensus.FBFTLog, + fbftLog: consensus.fBFTLog, verify: consensus.BlockVerifier, curIndex: 0, logger: consensus.getLogger(), @@ -513,7 +513,7 @@ func (consensus *Consensus) getLastMileBlocksAndMsg(bnStart uint64) ([]*types.Bl msgs []*FBFTMessage ) for blockNum := bnStart; ; blockNum++ { - blk, msg, err := consensus.FBFTLog.GetCommittedBlockAndMsgsFromNumber(blockNum, consensus.getLogger()) + blk, msg, err := consensus.fBFTLog.GetCommittedBlockAndMsgsFromNumber(blockNum, consensus.getLogger()) if err != nil { if err == errFBFTLogNotFound { break @@ -551,7 +551,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { network.Bytes, network.FBFTMsg bareMinimumCommit := FBFTMsg.Payload - consensus.FBFTLog.AddVerifiedMessage(FBFTMsg) + consensus.fBFTLog.AddVerifiedMessage(FBFTMsg) if err := consensus.verifyLastCommitSig(bareMinimumCommit, blk); err != nil { return errors.Wrap(err, "[preCommitAndPropose] failed verifying last commit sig") @@ -567,16 +567,16 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)), }, p2p.ConstructMessage(msgToSend)); err != nil { - consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message") + consensus.GetLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message") } else { - consensus.getLogger().Info(). + consensus.GetLogger().Info(). Str("blockHash", blk.Hash().Hex()). Uint64("blockNum", consensus.BlockNum()). Hex("lastCommitSig", bareMinimumCommit). Msg("[preCommitAndPropose] Sent Committed Message") } - if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil { + if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog().IsBlockVerified(blk.Hash())); err != nil { consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain") return } @@ -661,7 +661,7 @@ func (consensus *Consensus) tryCatchup() error { func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error { if consensus.Blockchain().CurrentBlock().NumberU64() < blk.NumberU64() { - if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil { + if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.fBFTLog.IsBlockVerified(blk.Hash())); err != nil { consensus.getLogger().Error().Err(err).Msg("[commitBlock] Failed to add block to chain") return err } @@ -785,7 +785,7 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg if blk.IsLastBlockInEpoch() { consensus.setMode(consensus.updateConsensusInformation()) } - consensus.FBFTLog.PruneCacheBeforeBlock(blk.NumberU64()) + consensus.fBFTLog.PruneCacheBeforeBlock(blk.NumberU64()) consensus.resetState() } @@ -920,19 +920,12 @@ func (consensus *Consensus) ValidateVdfAndProof(headerObj *block.Header) bool { func (consensus *Consensus) DeleteBlocksLessThan(number uint64) { consensus.mutex.Lock() defer consensus.mutex.Unlock() - consensus.FBFTLog.deleteBlocksLessThan(number) + consensus.fBFTLog.deleteBlocksLessThan(number) } // DeleteMessagesLessThan deletes messages less than given block number. func (consensus *Consensus) DeleteMessagesLessThan(number uint64) { consensus.mutex.Lock() defer consensus.mutex.Unlock() - consensus.FBFTLog.deleteMessagesLessThan(number) -} - -// DeleteBlockByNumber deletes block by given block number. -func (consensus *Consensus) DeleteBlockByNumber(number uint64) { - consensus.mutex.Lock() - defer consensus.mutex.Unlock() - consensus.FBFTLog.deleteBlockByNumber(number) + consensus.fBFTLog.deleteMessagesLessThan(number) } diff --git a/consensus/double_sign.go b/consensus/double_sign.go index b9bf818cbd..3a8d559fd6 100644 --- a/consensus/double_sign.go +++ b/consensus/double_sign.go @@ -22,7 +22,7 @@ func (consensus *Consensus) checkDoubleSign(recvMsg *FBFTMessage) bool { ); alreadyCastBallot != nil { for _, pubKey1 := range alreadyCastBallot.SignerPubKeys { if bytes.Compare(pubKey2.Bytes[:], pubKey1[:]) == 0 { - for _, blk := range consensus.FBFTLog.GetBlocksByNumber(recvMsg.BlockNum) { + for _, blk := range consensus.fBFTLog.GetBlocksByNumber(recvMsg.BlockNum) { firstSignedHeader := blk.Header() areHeightsEqual := firstSignedHeader.Number().Uint64() == recvMsg.BlockNum areViewIDsEqual := firstSignedHeader.ViewID().Uint64() == recvMsg.ViewID @@ -138,8 +138,8 @@ func (consensus *Consensus) couldThisBeADoubleSigner( recvMsg *FBFTMessage, ) bool { num, hash := consensus.BlockNum(), recvMsg.BlockHash - suspicious := !consensus.FBFTLog.HasMatchingAnnounce(num, hash) || - !consensus.FBFTLog.HasMatchingPrepared(num, hash) + suspicious := !consensus.fBFTLog.HasMatchingAnnounce(num, hash) || + !consensus.fBFTLog.HasMatchingPrepared(num, hash) if suspicious { consensus.getLogger().Debug(). Str("message", recvMsg.String()). diff --git a/consensus/fbft_log.go b/consensus/fbft_log.go index 5a4fe21889..982aecab75 100644 --- a/consensus/fbft_log.go +++ b/consensus/fbft_log.go @@ -3,6 +3,7 @@ package consensus import ( "encoding/binary" "fmt" + "sync" "github.com/ethereum/go-ethereum/common" bls_core "github.com/harmony-one/bls/ffi/go/bls" @@ -97,6 +98,16 @@ func (m *FBFTMessage) id() fbftMsgID { return id } +type FBFT interface { + GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64) []*FBFTMessage + IsBlockVerified(hash common.Hash) bool + DeleteBlockByNumber(number uint64) + GetBlockByHash(hash common.Hash) *types.Block + AddVerifiedMessage(msg *FBFTMessage) + AddBlock(block *types.Block) + GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum uint64, blockHash common.Hash) []*FBFTMessage +} + // FBFTLog represents the log stored by a node during FBFT process type FBFTLog struct { blocks map[common.Hash]*types.Block // store blocks received in FBFT @@ -157,7 +168,7 @@ func (log *FBFTLog) deleteBlocksLessThan(number uint64) { } // DeleteBlockByNumber deletes block of specific number -func (log *FBFTLog) deleteBlockByNumber(number uint64) { +func (log *FBFTLog) DeleteBlockByNumber(number uint64) { for h, block := range log.blocks { if block.NumberU64() == number { delete(log.blocks, h) @@ -360,3 +371,50 @@ func (log *FBFTLog) PruneCacheBeforeBlock(bn uint64) { log.deleteBlocksLessThan(bn - 1) log.deleteMessagesLessThan(bn - 1) } + +type threadsafeFBFTLog struct { + log *FBFTLog + mu *sync.RWMutex +} + +func (t threadsafeFBFTLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64) []*FBFTMessage { + t.mu.RLock() + defer t.mu.RUnlock() + return t.log.GetMessagesByTypeSeq(typ, blockNum) +} + +func (t threadsafeFBFTLog) IsBlockVerified(hash common.Hash) bool { + t.mu.RLock() + defer t.mu.RUnlock() + return t.log.IsBlockVerified(hash) +} + +func (t threadsafeFBFTLog) DeleteBlockByNumber(number uint64) { + t.mu.Lock() + defer t.mu.Unlock() + t.log.DeleteBlockByNumber(number) +} + +func (t threadsafeFBFTLog) GetBlockByHash(hash common.Hash) *types.Block { + t.mu.RLock() + defer t.mu.RUnlock() + return t.log.GetBlockByHash(hash) +} + +func (t threadsafeFBFTLog) AddVerifiedMessage(msg *FBFTMessage) { + t.mu.Lock() + defer t.mu.Unlock() + t.log.AddVerifiedMessage(msg) +} + +func (t threadsafeFBFTLog) AddBlock(block *types.Block) { + t.mu.Lock() + defer t.mu.Unlock() + t.log.AddBlock(block) +} + +func (t threadsafeFBFTLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum uint64, blockHash common.Hash) []*FBFTMessage { + t.mu.RLock() + defer t.mu.RUnlock() + return t.log.GetMessagesByTypeSeqHash(typ, blockNum, blockHash) +} diff --git a/consensus/leader.go b/consensus/leader.go index 1b10c96086..82ba3069bb 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -44,13 +44,13 @@ func (consensus *Consensus) announce(block *types.Block) { } msgToSend, FPBTMsg := networkMessage.Bytes, networkMessage.FBFTMsg - consensus.FBFTLog.AddVerifiedMessage(FPBTMsg) + consensus.fBFTLog.AddVerifiedMessage(FPBTMsg) consensus.getLogger().Debug(). Str("MsgBlockHash", FPBTMsg.BlockHash.Hex()). Uint64("MsgViewID", FPBTMsg.ViewID). Uint64("MsgBlockNum", FPBTMsg.BlockNum). Msg("[Announce] Added Announce message in FPBT") - consensus.FBFTLog.AddBlock(block) + consensus.fBFTLog.AddBlock(block) // Leader sign the block hash itself for i, key := range consensus.priKey { @@ -94,7 +94,7 @@ func (consensus *Consensus) announce(block *types.Block) { func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { // TODO(audit): make FBFT lookup using map instead of looping through all items. - if !consensus.FBFTLog.HasMatchingViewAnnounce( + if !consensus.fBFTLog.HasMatchingViewAnnounce( consensus.getBlockNum(), consensus.getCurBlockViewID(), recvMsg.BlockHash, ) { consensus.getLogger().Debug(). @@ -226,7 +226,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { } // Must have the corresponding block to verify commit signature. - blockObj := consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash) + blockObj := consensus.fBFTLog.GetBlockByHash(recvMsg.BlockHash) if blockObj == nil { consensus.getLogger().Info(). Uint64("blockNum", recvMsg.BlockNum). @@ -295,7 +295,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { if !quorumWasMet && quorumIsMet { logger.Info().Msg("[OnCommit] 2/3 Enough commits received") - consensus.FBFTLog.MarkBlockVerified(blockObj) + consensus.fBFTLog.MarkBlockVerified(blockObj) if !blockObj.IsLastBlockInEpoch() { // only do early commit if it's not epoch block to avoid problems diff --git a/consensus/threshold.go b/consensus/threshold.go index 11e65709e6..e611eaedcd 100644 --- a/consensus/threshold.go +++ b/consensus/threshold.go @@ -36,7 +36,7 @@ func (consensus *Consensus) didReachPrepareQuorum() error { networkMessage.OptionalAggregateSignature consensus.aggregatedPrepareSig = aggSig - consensus.FBFTLog.AddVerifiedMessage(FBFTMsg) + consensus.fBFTLog.AddVerifiedMessage(FBFTMsg) // Leader add commit phase signature var blockObj types.Block if err := rlp.DecodeBytes(consensus.block, &blockObj); err != nil { diff --git a/consensus/validator.go b/consensus/validator.go index c67765cc8a..ee30d8c006 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -37,7 +37,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { Uint64("MsgViewID", recvMsg.ViewID). Uint64("MsgBlockNum", recvMsg.BlockNum). Msg("[OnAnnounce] Announce message Added") - consensus.FBFTLog.AddVerifiedMessage(recvMsg) + consensus.fBFTLog.AddVerifiedMessage(recvMsg) consensus.blockHash = recvMsg.BlockHash // we have already added message and block, skip check viewID // and send prepare message if is in ViewChanging mode @@ -77,10 +77,10 @@ func (consensus *Consensus) ValidateNewBlock(recvMsg *FBFTMessage) (*types.Block return consensus.validateNewBlock(recvMsg) } func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block, error) { - if consensus.FBFTLog.IsBlockVerified(recvMsg.BlockHash) { + if consensus.fBFTLog.IsBlockVerified(recvMsg.BlockHash) { var blockObj *types.Block - blockObj = consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash) + blockObj = consensus.fBFTLog.GetBlockByHash(recvMsg.BlockHash) if blockObj == nil { var blockObj2 types.Block if err := rlp.DecodeBytes(recvMsg.Block, &blockObj2); err != nil { @@ -106,7 +106,7 @@ func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block return nil, errors.New("Failed parsing new block") } - consensus.FBFTLog.AddBlock(&blockObj) + consensus.fBFTLog.AddBlock(&blockObj) // let this handle it own logs if !consensus.newBlockSanityChecks(&blockObj, recvMsg) { @@ -118,7 +118,7 @@ func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block copy(blockPayload[:], recvMsg.Block[:]) consensus.block = blockPayload recvMsg.Block = []byte{} // save memory space - consensus.FBFTLog.AddVerifiedMessage(recvMsg) + consensus.fBFTLog.AddVerifiedMessage(recvMsg) consensus.getLogger().Debug(). Uint64("MsgViewID", recvMsg.ViewID). Uint64("MsgBlockNum", recvMsg.BlockNum). @@ -272,7 +272,7 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) { curBlockNum := consensus.BlockNum() consensus.mutex.Lock() defer consensus.mutex.Unlock() - for _, committedMsg := range consensus.FBFTLog.GetNotVerifiedCommittedMessages(blockObj.NumberU64(), blockObj.Header().ViewID().Uint64(), blockObj.Hash()) { + for _, committedMsg := range consensus.fBFTLog.GetNotVerifiedCommittedMessages(blockObj.NumberU64(), blockObj.Header().ViewID().Uint64(), blockObj.Hash()) { if committedMsg != nil { consensus.onCommitted(committedMsg) } @@ -309,10 +309,10 @@ func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) { } // Optimistically add committedMessage in case of receiving committed before prepared - consensus.FBFTLog.AddNotVerifiedMessage(recvMsg) + consensus.fBFTLog.AddNotVerifiedMessage(recvMsg) // Must have the corresponding block to verify committed message. - blockObj := consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash) + blockObj := consensus.fBFTLog.GetBlockByHash(recvMsg.BlockHash) if blockObj == nil { consensus.getLogger().Info(). Uint64("blockNum", recvMsg.BlockNum). @@ -345,7 +345,7 @@ func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) { consensus.getLogger().Error().Err(err).Msg("[OnCommitted] readSignatureBitmapPayload failed") return } - consensus.FBFTLog.AddVerifiedMessage(recvMsg) + consensus.fBFTLog.AddVerifiedMessage(recvMsg) consensus.aggregatedCommitSig = aggSig consensus.commitBitmap = mask diff --git a/consensus/view_change.go b/consensus/view_change.go index 151074817a..3927714e9a 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -274,7 +274,7 @@ func (consensus *Consensus) startViewChange() { // init my own payload if err := consensus.vc.InitPayload( - consensus.FBFTLog, + consensus.fBFTLog, nextViewID, consensus.getBlockNum(), consensus.priKey, @@ -394,7 +394,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { consensus.vc.AddViewIDKeyIfNotExist(recvMsg.ViewID, members) // do it once only per viewID/Leader - if err := consensus.vc.InitPayload(consensus.FBFTLog, + if err := consensus.vc.InitPayload(consensus.fBFTLog, recvMsg.ViewID, recvMsg.BlockNum, consensus.priKey, @@ -403,7 +403,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { return } - err = consensus.vc.ProcessViewChangeMsg(consensus.FBFTLog, consensus.Decider, recvMsg) + err = consensus.vc.ProcessViewChangeMsg(consensus.fBFTLog, consensus.Decider, recvMsg) if err != nil { consensus.getLogger().Error().Err(err). Uint64("viewID", recvMsg.ViewID). @@ -517,10 +517,10 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { copy(preparedMsg.Payload[:], recvMsg.Payload[32:]) preparedMsg.SenderPubkeys = []*bls.PublicKeyWrapper{senderKey} - consensus.FBFTLog.AddVerifiedMessage(&preparedMsg) + consensus.fBFTLog.AddVerifiedMessage(&preparedMsg) if preparedBlock != nil { - consensus.FBFTLog.AddBlock(preparedBlock) + consensus.fBFTLog.AddBlock(preparedBlock) } } diff --git a/consensus/view_change_msg.go b/consensus/view_change_msg.go index c241450306..fdb3f3afe2 100644 --- a/consensus/view_change_msg.go +++ b/consensus/view_change_msg.go @@ -32,14 +32,14 @@ func (consensus *Consensus) constructViewChangeMessage(priKey *bls.PrivateKeyWra }, } - preparedMsgs := consensus.FBFTLog.GetMessagesByTypeSeq( + preparedMsgs := consensus.fBFTLog.GetMessagesByTypeSeq( msg_pb.MessageType_PREPARED, consensus.getBlockNum(), ) - preparedMsg := consensus.FBFTLog.FindMessageByMaxViewID(preparedMsgs) + preparedMsg := consensus.fBFTLog.FindMessageByMaxViewID(preparedMsgs) var encodedBlock []byte if preparedMsg != nil { - block := consensus.FBFTLog.GetBlockByHash(preparedMsg.BlockHash) + block := consensus.fBFTLog.GetBlockByHash(preparedMsg.BlockHash) consensus.getLogger().Info(). Interface("Block", block). Interface("preparedMsg", preparedMsg). @@ -115,7 +115,7 @@ func (consensus *Consensus) constructNewViewMessage(viewID uint64, priKey *bls.P } vcMsg := message.GetViewchange() - vcMsg.Payload, vcMsg.PreparedBlock = consensus.vc.GetPreparedBlock(consensus.FBFTLog) + vcMsg.Payload, vcMsg.PreparedBlock = consensus.vc.GetPreparedBlock(consensus.fBFTLog) vcMsg.M2Aggsigs, vcMsg.M2Bitmap = consensus.vc.GetM2Bitmap(viewID) vcMsg.M3Aggsigs, vcMsg.M3Bitmap = consensus.vc.GetM3Bitmap(viewID) if vcMsg.M3Bitmap == nil || vcMsg.M3Aggsigs == nil { diff --git a/node/node_explorer.go b/node/node_explorer.go index 0dd7319761..fbb5b88985 100644 --- a/node/node_explorer.go +++ b/node/node_explorer.go @@ -58,13 +58,13 @@ func (node *Node) explorerMessageHandler(ctx context.Context, msg *msg_pb.Messag return nil } - block := node.Consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash) + block := node.Consensus.FBFTLog().GetBlockByHash(recvMsg.BlockHash) if block == nil { utils.Logger().Info(). Uint64("msgBlock", recvMsg.BlockNum). Msg("[Explorer] Haven't received the block before the committed msg") - node.Consensus.FBFTLog.AddVerifiedMessage(recvMsg) + node.Consensus.FBFTLog().AddVerifiedMessage(recvMsg) return errBlockBeforeCommit } @@ -94,9 +94,9 @@ func (node *Node) explorerMessageHandler(ctx context.Context, msg *msg_pb.Messag return err } // Add the block into FBFT log. - node.Consensus.FBFTLog.AddBlock(blockObj) + node.Consensus.FBFTLog().AddBlock(blockObj) // Try to search for MessageType_COMMITTED message from pbft log. - msgs := node.Consensus.FBFTLog.GetMessagesByTypeSeqHash( + msgs := node.Consensus.FBFTLog().GetMessagesByTypeSeqHash( msg_pb.MessageType_COMMITTED, blockObj.NumberU64(), blockObj.Hash(), @@ -148,7 +148,7 @@ func (node *Node) TraceLoopForExplorer() { // AddNewBlockForExplorer add new block for explorer. func (node *Node) AddNewBlockForExplorer(block *types.Block) { if node.HarmonyConfig.General.RunElasticMode && node.HarmonyConfig.TiKV.Role == tikv.RoleReader { - node.Consensus.DeleteBlockByNumber(block.NumberU64()) + node.Consensus.FBFTLog().DeleteBlockByNumber(block.NumberU64()) return } @@ -159,7 +159,7 @@ func (node *Node) AddNewBlockForExplorer(block *types.Block) { node.Consensus.UpdateConsensusInformation() } // Clean up the blocks to avoid OOM. - node.Consensus.DeleteBlockByNumber(block.NumberU64()) + node.Consensus.FBFTLog().DeleteBlockByNumber(block.NumberU64()) // if in tikv mode, only master writer node need dump all explorer block if !node.HarmonyConfig.General.RunElasticMode || node.Blockchain().IsTikvWriterMaster() {