Skip to content

Commit

Permalink
Fixed concurrent map access. (#4469)
Browse files Browse the repository at this point in the history
  • Loading branch information
Frozen authored Jul 19, 2023
1 parent 9966c42 commit 8656c7a
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 76 deletions.
2 changes: 1 addition & 1 deletion consensus/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (consensus *Consensus) isRightBlockNumAndViewID(recvMsg *FBFTMessage) bool
}

func (consensus *Consensus) onAnnounceSanityChecks(recvMsg *FBFTMessage) bool {
logMsgs := consensus.FBFTLog.GetMessagesByTypeSeqView(
logMsgs := consensus.fBFTLog.GetMessagesByTypeSeqView(
msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID,
)
if len(logMsgs) > 0 {
Expand Down
43 changes: 25 additions & 18 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type VerifyBlockFunc func(*types.Block) error
type Consensus struct {
Decider quorum.Decider
// FBFTLog stores the pbft messages and blocks during FBFT process
FBFTLog *FBFTLog
fBFTLog *FBFTLog
// phase: different phase of FBFT protocol: pre-prepare, prepare, commit, finish etc
phase FBFTPhase
// current indicates what state a node is in
Expand Down Expand Up @@ -84,7 +84,7 @@ type Consensus struct {
// IgnoreViewIDCheck determines whether to ignore viewID check
IgnoreViewIDCheck *abool.AtomicBool
// consensus mutex
mutex sync.RWMutex
mutex *sync.RWMutex
// ViewChange struct
vc *viewChange
// Signal channel for proposing a new block and start new consensus
Expand Down Expand Up @@ -140,6 +140,13 @@ func (consensus *Consensus) Blockchain() core.BlockChain {
return consensus.registry.GetBlockchain()
}

func (consensus *Consensus) FBFTLog() FBFT {
return threadsafeFBFTLog{
log: consensus.fBFTLog,
mu: consensus.mutex,
}
}

// ChainReader returns the chain reader.
// This is mostly the same as Blockchain, but it returns only read methods, so we assume it's safe for concurrent use.
func (consensus *Consensus) ChainReader() engine.ChainReader {
Expand All @@ -165,11 +172,11 @@ func (consensus *Consensus) Beaconchain() core.BlockChain {

// VerifyBlock is a function used to verify the block and keep trace of verified blocks.
func (consensus *Consensus) verifyBlock(block *types.Block) error {
if !consensus.FBFTLog.IsBlockVerified(block.Hash()) {
if !consensus.fBFTLog.IsBlockVerified(block.Hash()) {
if err := consensus.BlockVerifier(block); err != nil {
return errors.Errorf("Block verification failed: %s", err)
}
consensus.FBFTLog.MarkBlockVerified(block)
consensus.fBFTLog.MarkBlockVerified(block)
}
return nil
}
Expand Down Expand Up @@ -261,21 +268,21 @@ func New(
Decider quorum.Decider, minPeers int, aggregateSig bool,
) (*Consensus, error) {
consensus := Consensus{
ShardID: shard,
mutex: &sync.RWMutex{},
ShardID: shard,
fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce,
current: State{mode: Normal},
Decider: Decider,
registry: registry,
MinPeers: minPeers,
AggregateSig: aggregateSig,
host: host,
msgSender: NewMessageSender(host),
BlockNumLowChan: make(chan struct{}, 1),
// FBFT timeout
consensusTimeout: createTimeout(),
}
consensus.Decider = Decider
consensus.registry = registry
consensus.MinPeers = minPeers
consensus.AggregateSig = aggregateSig
consensus.host = host
consensus.msgSender = NewMessageSender(host)
consensus.BlockNumLowChan = make(chan struct{}, 1)
// FBFT related
consensus.FBFTLog = NewFBFTLog()
consensus.phase = FBFTAnnounce
consensus.current = State{mode: Normal}
// FBFT timeout
consensus.consensusTimeout = createTimeout()

if multiBLSPriKey != nil {
consensus.priKey = multiBLSPriKey
Expand Down
2 changes: 1 addition & 1 deletion consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
copy(blockHash[:], payload[:32])

// Leader sign and add commit message
block := consensus.FBFTLog.GetBlockByHash(blockHash)
block := consensus.fBFTLog.GetBlockByHash(blockHash)
if block == nil {
return errGetPreparedBlock
}
Expand Down
3 changes: 1 addition & 2 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func TestConsensusInitialization(t *testing.T) {
assert.NoError(t, err)

messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec}
fbtLog := NewFBFTLog()
state := State{mode: Normal}

timeouts := createTimeout()
Expand All @@ -37,7 +36,7 @@ func TestConsensusInitialization(t *testing.T) {
assert.IsType(t, make(chan struct{}), consensus.BlockNumLowChan)

// FBFTLog
assert.Equal(t, fbtLog, consensus.FBFTLog)
assert.NotNil(t, consensus.FBFTLog())

assert.Equal(t, FBFTAnnounce, consensus.phase)

Expand Down
33 changes: 13 additions & 20 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ func (consensus *Consensus) finalCommit() {
network.Bytes,
network.FBFTMsg
commitSigAndBitmap := FBFTMsg.Payload
consensus.FBFTLog.AddVerifiedMessage(FBFTMsg)
consensus.fBFTLog.AddVerifiedMessage(FBFTMsg)
// find correct block content
curBlockHash := consensus.blockHash
block := consensus.FBFTLog.GetBlockByHash(curBlockHash)
block := consensus.fBFTLog.GetBlockByHash(curBlockHash)
if block == nil {
consensus.getLogger().Warn().
Str("blockHash", hex.EncodeToString(curBlockHash[:])).
Expand Down Expand Up @@ -278,7 +278,7 @@ func (consensus *Consensus) BlockCommitSigs(blockNum uint64) ([]byte, error) {
lastCommits, err := consensus.Blockchain().ReadCommitSig(blockNum)
if err != nil ||
len(lastCommits) < bls.BLSSignatureSizeInBytes {
msgs := consensus.FBFTLog.GetMessagesByTypeSeq(
msgs := consensus.FBFTLog().GetMessagesByTypeSeq(
msg_pb.MessageType_COMMITTED, blockNum,
)
if len(msgs) != 1 {
Expand Down Expand Up @@ -482,7 +482,7 @@ func (consensus *Consensus) getLastMileBlockIter(bnStart uint64, cb func(iter *L
}
return cb(&LastMileBlockIter{
blockCandidates: blocks,
fbftLog: consensus.FBFTLog,
fbftLog: consensus.fBFTLog,
verify: consensus.BlockVerifier,
curIndex: 0,
logger: consensus.getLogger(),
Expand Down Expand Up @@ -513,7 +513,7 @@ func (consensus *Consensus) getLastMileBlocksAndMsg(bnStart uint64) ([]*types.Bl
msgs []*FBFTMessage
)
for blockNum := bnStart; ; blockNum++ {
blk, msg, err := consensus.FBFTLog.GetCommittedBlockAndMsgsFromNumber(blockNum, consensus.getLogger())
blk, msg, err := consensus.fBFTLog.GetCommittedBlockAndMsgsFromNumber(blockNum, consensus.getLogger())
if err != nil {
if err == errFBFTLogNotFound {
break
Expand Down Expand Up @@ -551,7 +551,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
network.Bytes,
network.FBFTMsg
bareMinimumCommit := FBFTMsg.Payload
consensus.FBFTLog.AddVerifiedMessage(FBFTMsg)
consensus.fBFTLog.AddVerifiedMessage(FBFTMsg)

if err := consensus.verifyLastCommitSig(bareMinimumCommit, blk); err != nil {
return errors.Wrap(err, "[preCommitAndPropose] failed verifying last commit sig")
Expand All @@ -567,16 +567,16 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
p2p.ConstructMessage(msgToSend)); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message")
consensus.GetLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message")
} else {
consensus.getLogger().Info().
consensus.GetLogger().Info().
Str("blockHash", blk.Hash().Hex()).
Uint64("blockNum", consensus.BlockNum()).
Hex("lastCommitSig", bareMinimumCommit).
Msg("[preCommitAndPropose] Sent Committed Message")
}

if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil {
if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog().IsBlockVerified(blk.Hash())); err != nil {
consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain")
return
}
Expand Down Expand Up @@ -661,7 +661,7 @@ func (consensus *Consensus) tryCatchup() error {

func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error {
if consensus.Blockchain().CurrentBlock().NumberU64() < blk.NumberU64() {
if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil {
if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.fBFTLog.IsBlockVerified(blk.Hash())); err != nil {
consensus.getLogger().Error().Err(err).Msg("[commitBlock] Failed to add block to chain")
return err
}
Expand Down Expand Up @@ -785,7 +785,7 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg
if blk.IsLastBlockInEpoch() {
consensus.setMode(consensus.updateConsensusInformation())
}
consensus.FBFTLog.PruneCacheBeforeBlock(blk.NumberU64())
consensus.fBFTLog.PruneCacheBeforeBlock(blk.NumberU64())
consensus.resetState()
}

Expand Down Expand Up @@ -920,19 +920,12 @@ func (consensus *Consensus) ValidateVdfAndProof(headerObj *block.Header) bool {
func (consensus *Consensus) DeleteBlocksLessThan(number uint64) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.FBFTLog.deleteBlocksLessThan(number)
consensus.fBFTLog.deleteBlocksLessThan(number)
}

// DeleteMessagesLessThan deletes messages less than given block number.
func (consensus *Consensus) DeleteMessagesLessThan(number uint64) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.FBFTLog.deleteMessagesLessThan(number)
}

// DeleteBlockByNumber deletes block by given block number.
func (consensus *Consensus) DeleteBlockByNumber(number uint64) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.FBFTLog.deleteBlockByNumber(number)
consensus.fBFTLog.deleteMessagesLessThan(number)
}
6 changes: 3 additions & 3 deletions consensus/double_sign.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (consensus *Consensus) checkDoubleSign(recvMsg *FBFTMessage) bool {
); alreadyCastBallot != nil {
for _, pubKey1 := range alreadyCastBallot.SignerPubKeys {
if bytes.Compare(pubKey2.Bytes[:], pubKey1[:]) == 0 {
for _, blk := range consensus.FBFTLog.GetBlocksByNumber(recvMsg.BlockNum) {
for _, blk := range consensus.fBFTLog.GetBlocksByNumber(recvMsg.BlockNum) {
firstSignedHeader := blk.Header()
areHeightsEqual := firstSignedHeader.Number().Uint64() == recvMsg.BlockNum
areViewIDsEqual := firstSignedHeader.ViewID().Uint64() == recvMsg.ViewID
Expand Down Expand Up @@ -138,8 +138,8 @@ func (consensus *Consensus) couldThisBeADoubleSigner(
recvMsg *FBFTMessage,
) bool {
num, hash := consensus.BlockNum(), recvMsg.BlockHash
suspicious := !consensus.FBFTLog.HasMatchingAnnounce(num, hash) ||
!consensus.FBFTLog.HasMatchingPrepared(num, hash)
suspicious := !consensus.fBFTLog.HasMatchingAnnounce(num, hash) ||
!consensus.fBFTLog.HasMatchingPrepared(num, hash)
if suspicious {
consensus.getLogger().Debug().
Str("message", recvMsg.String()).
Expand Down
60 changes: 59 additions & 1 deletion consensus/fbft_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consensus
import (
"encoding/binary"
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"
bls_core "github.com/harmony-one/bls/ffi/go/bls"
Expand Down Expand Up @@ -97,6 +98,16 @@ func (m *FBFTMessage) id() fbftMsgID {
return id
}

type FBFT interface {
GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64) []*FBFTMessage
IsBlockVerified(hash common.Hash) bool
DeleteBlockByNumber(number uint64)
GetBlockByHash(hash common.Hash) *types.Block
AddVerifiedMessage(msg *FBFTMessage)
AddBlock(block *types.Block)
GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum uint64, blockHash common.Hash) []*FBFTMessage
}

// FBFTLog represents the log stored by a node during FBFT process
type FBFTLog struct {
blocks map[common.Hash]*types.Block // store blocks received in FBFT
Expand Down Expand Up @@ -157,7 +168,7 @@ func (log *FBFTLog) deleteBlocksLessThan(number uint64) {
}

// DeleteBlockByNumber deletes block of specific number
func (log *FBFTLog) deleteBlockByNumber(number uint64) {
func (log *FBFTLog) DeleteBlockByNumber(number uint64) {
for h, block := range log.blocks {
if block.NumberU64() == number {
delete(log.blocks, h)
Expand Down Expand Up @@ -360,3 +371,50 @@ func (log *FBFTLog) PruneCacheBeforeBlock(bn uint64) {
log.deleteBlocksLessThan(bn - 1)
log.deleteMessagesLessThan(bn - 1)
}

type threadsafeFBFTLog struct {
log *FBFTLog
mu *sync.RWMutex
}

func (t threadsafeFBFTLog) GetMessagesByTypeSeq(typ msg_pb.MessageType, blockNum uint64) []*FBFTMessage {
t.mu.RLock()
defer t.mu.RUnlock()
return t.log.GetMessagesByTypeSeq(typ, blockNum)
}

func (t threadsafeFBFTLog) IsBlockVerified(hash common.Hash) bool {
t.mu.RLock()
defer t.mu.RUnlock()
return t.log.IsBlockVerified(hash)
}

func (t threadsafeFBFTLog) DeleteBlockByNumber(number uint64) {
t.mu.Lock()
defer t.mu.Unlock()
t.log.DeleteBlockByNumber(number)
}

func (t threadsafeFBFTLog) GetBlockByHash(hash common.Hash) *types.Block {
t.mu.RLock()
defer t.mu.RUnlock()
return t.log.GetBlockByHash(hash)
}

func (t threadsafeFBFTLog) AddVerifiedMessage(msg *FBFTMessage) {
t.mu.Lock()
defer t.mu.Unlock()
t.log.AddVerifiedMessage(msg)
}

func (t threadsafeFBFTLog) AddBlock(block *types.Block) {
t.mu.Lock()
defer t.mu.Unlock()
t.log.AddBlock(block)
}

func (t threadsafeFBFTLog) GetMessagesByTypeSeqHash(typ msg_pb.MessageType, blockNum uint64, blockHash common.Hash) []*FBFTMessage {
t.mu.RLock()
defer t.mu.RUnlock()
return t.log.GetMessagesByTypeSeqHash(typ, blockNum, blockHash)
}
10 changes: 5 additions & 5 deletions consensus/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ func (consensus *Consensus) announce(block *types.Block) {
}
msgToSend, FPBTMsg := networkMessage.Bytes, networkMessage.FBFTMsg

consensus.FBFTLog.AddVerifiedMessage(FPBTMsg)
consensus.fBFTLog.AddVerifiedMessage(FPBTMsg)
consensus.getLogger().Debug().
Str("MsgBlockHash", FPBTMsg.BlockHash.Hex()).
Uint64("MsgViewID", FPBTMsg.ViewID).
Uint64("MsgBlockNum", FPBTMsg.BlockNum).
Msg("[Announce] Added Announce message in FPBT")
consensus.FBFTLog.AddBlock(block)
consensus.fBFTLog.AddBlock(block)

// Leader sign the block hash itself
for i, key := range consensus.priKey {
Expand Down Expand Up @@ -94,7 +94,7 @@ func (consensus *Consensus) announce(block *types.Block) {

func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) {
// TODO(audit): make FBFT lookup using map instead of looping through all items.
if !consensus.FBFTLog.HasMatchingViewAnnounce(
if !consensus.fBFTLog.HasMatchingViewAnnounce(
consensus.getBlockNum(), consensus.getCurBlockViewID(), recvMsg.BlockHash,
) {
consensus.getLogger().Debug().
Expand Down Expand Up @@ -226,7 +226,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {
}

// Must have the corresponding block to verify commit signature.
blockObj := consensus.FBFTLog.GetBlockByHash(recvMsg.BlockHash)
blockObj := consensus.fBFTLog.GetBlockByHash(recvMsg.BlockHash)
if blockObj == nil {
consensus.getLogger().Info().
Uint64("blockNum", recvMsg.BlockNum).
Expand Down Expand Up @@ -295,7 +295,7 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) {

if !quorumWasMet && quorumIsMet {
logger.Info().Msg("[OnCommit] 2/3 Enough commits received")
consensus.FBFTLog.MarkBlockVerified(blockObj)
consensus.fBFTLog.MarkBlockVerified(blockObj)

if !blockObj.IsLastBlockInEpoch() {
// only do early commit if it's not epoch block to avoid problems
Expand Down
2 changes: 1 addition & 1 deletion consensus/threshold.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (consensus *Consensus) didReachPrepareQuorum() error {
networkMessage.OptionalAggregateSignature

consensus.aggregatedPrepareSig = aggSig
consensus.FBFTLog.AddVerifiedMessage(FBFTMsg)
consensus.fBFTLog.AddVerifiedMessage(FBFTMsg)
// Leader add commit phase signature
var blockObj types.Block
if err := rlp.DecodeBytes(consensus.block, &blockObj); err != nil {
Expand Down
Loading

0 comments on commit 8656c7a

Please sign in to comment.