Skip to content

Commit

Permalink
Merge branch 'main' into cal/fix-setting-data-root
Browse files Browse the repository at this point in the history
  • Loading branch information
cmwaters committed Apr 23, 2024
2 parents 2d21196 + ac96be4 commit 4ff558d
Show file tree
Hide file tree
Showing 18 changed files with 482 additions and 62 deletions.
194 changes: 175 additions & 19 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,21 +272,55 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
conR.conS.mtx.Lock()
initialHeight := conR.conS.state.InitialHeight
conR.conS.mtx.Unlock()
schema.WriteConsensusState(
conR.traceClient,
msg.Height,
msg.Round,
string(e.Src.ID()),
schema.ConsensusNewRoundStep,
schema.Download,
fmt.Sprintf("%d", msg.Step),
)
if err = msg.ValidateHeight(initialHeight); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
ps.ApplyNewRoundStepMessage(msg)
case *NewValidBlockMessage:
schema.WriteConsensusState(
conR.traceClient,
msg.Height,
msg.Round,
string(e.Src.ID()),
schema.ConsensusNewValidBlock,
schema.Download,
)
ps.ApplyNewValidBlockMessage(msg)
case *HasVoteMessage:
ps.ApplyHasVoteMessage(msg)
schema.WriteConsensusState(
conR.traceClient,
msg.Height,
msg.Round,
string(e.Src.ID()),
schema.ConsensusHasVote,
schema.Download,
msg.Type.String(),
)
case *VoteSetMaj23Message:
cs := conR.conS
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
cs.mtx.Unlock()
schema.WriteConsensusState(
conR.traceClient,
msg.Height,
msg.Round,
string(e.Src.ID()),
schema.ConsensusVoteSet23Precommit,
schema.Download,
)
if height != msg.Height {
return
}
Expand Down Expand Up @@ -316,10 +350,20 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
if votes := ourVotes.ToProto(); votes != nil {
eMsg.Votes = *votes
}
p2p.TrySendEnvelopeShim(e.Src, p2p.Envelope{ //nolint: staticcheck
if p2p.TrySendEnvelopeShim(e.Src, p2p.Envelope{ //nolint: staticcheck
ChannelID: VoteSetBitsChannel,
Message: eMsg,
}, conR.Logger)
}, conR.Logger) {
schema.WriteConsensusState(
conR.traceClient,
msg.Height,
msg.Round,
string(e.Src.ID()),
schema.ConsensusVoteSetBits,
schema.Upload,
msg.Type.String(),
)
}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
Expand All @@ -333,12 +377,27 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
schema.WriteProposal(
conR.traceClient,
msg.Proposal.Height,
msg.Proposal.Round,
string(e.Src.ID()),
schema.Download,
)
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
schema.WriteConsensusState(
conR.traceClient,
msg.Height,
msg.ProposalPOLRound,
string(e.Src.ID()),
schema.ConsensusPOL,
schema.Download,
)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1)
schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round, e.Src.ID(), msg.Part.Index, schema.Download)
schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round, msg.Part.Index, false, string(e.Src.ID()), schema.Download)
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
Expand All @@ -357,7 +416,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
cs.Validators.Size(), cs.LastCommit.Size()
cs.mtx.RUnlock()

schema.WriteVote(conR.traceClient, height, round, msg.Vote, e.Src.ID(), schema.Download)
schema.WriteVote(conR.traceClient, height, round, msg.Vote, string(e.Src.ID()), schema.Download)

ps.EnsureVoteBitArrays(height, valSize)
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
Expand Down Expand Up @@ -477,6 +536,15 @@ func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
ChannelID: StateChannel,
Message: nrsMsg,
})
schema.WriteConsensusState(
conR.traceClient,
nrsMsg.Height,
nrsMsg.Round,
schema.Broadcast,
schema.ConsensusNewRoundStep,
schema.Upload,
fmt.Sprintf("%d", nrsMsg.Step),
)
}

func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
Expand All @@ -492,6 +560,14 @@ func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
ChannelID: StateChannel,
Message: csMsg,
})
schema.WriteConsensusState(
conR.traceClient,
rs.Height,
rs.Round,
schema.Broadcast,
schema.ConsensusNewValidBlock,
schema.Upload,
)
}

// Broadcasts HasVoteMessage to peers that care.
Expand All @@ -506,6 +582,15 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
ChannelID: StateChannel,
Message: msg,
})
schema.WriteConsensusState(
conR.traceClient,
vote.Height,
vote.Round,
schema.Broadcast,
schema.ConsensusHasVote,
schema.Upload,
vote.Type.String(),
)
/*
// TODO: Make this broadcast more selective.
for _, peer := range conR.Switch.Peers().List() {
Expand Down Expand Up @@ -544,10 +629,20 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *cmtcons.NewRoundStep)
func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
rs := conR.getRoundState()
nrsMsg := makeRoundStepMessage(rs)
p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: StateChannel,
Message: nrsMsg,
}, conR.Logger)
}, conR.Logger) {
schema.WriteConsensusState(
conR.traceClient,
nrsMsg.Height,
nrsMsg.Round,
string(peer.ID()),
schema.ConsensusNewRoundStep,
schema.Upload,
fmt.Sprintf("%d", nrsMsg.Step),
)
}
}

func (conR *Reactor) updateRoundStateRoutine() {
Expand Down Expand Up @@ -599,7 +694,7 @@ OUTER_LOOP:
Part: *parts,
},
}, logger) {
schema.WriteBlockPart(conR.traceClient, rs.Height, rs.Round, peer.ID(), part.Index, schema.Upload)
schema.WriteBlockPart(conR.traceClient, rs.Height, rs.Round, part.Index, false, string(peer.ID()), schema.Upload)
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
Expand Down Expand Up @@ -653,6 +748,13 @@ OUTER_LOOP:
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
// TODO But yet we send block parts of this proposal to the peer (in the first if statement of the current function) while the proposal is rejected. This part of the protocol could be improved by sending the proposal block parts only if the proposal is accepted.
ps.SetHasProposal(rs.Proposal)
schema.WriteProposal(
conR.traceClient,
rs.Height,
rs.Round,
string(peer.ID()),
schema.Upload,
)
}
}
// ProposalPOL: lets peer know which POL votes we have so far.
Expand All @@ -661,14 +763,23 @@ OUTER_LOOP:
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
if 0 <= rs.Proposal.POLRound {
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: DataChannel,
Message: &cmtcons.ProposalPOL{
Height: rs.Height,
ProposalPolRound: rs.Proposal.POLRound,
ProposalPol: *rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray().ToProto(),
},
}, logger)
}, logger) {
schema.WriteConsensusState(
conR.traceClient,
rs.Height,
rs.Round,
string(peer.ID()),
schema.ConsensusPOL,
schema.Upload,
)
}
}
continue OUTER_LOOP
}
Expand Down Expand Up @@ -720,6 +831,15 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
},
}, logger) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
schema.WriteBlockPart(
conR.traceClient,
prs.Height,
prs.Round,
uint32(index),
true,
string(peer.ID()),
schema.Upload,
)
} else {
logger.Debug("Sending block part for catchup failed")
// sleep to avoid retrying too fast
Expand Down Expand Up @@ -784,7 +904,7 @@ OUTER_LOOP:
if vote != nil {
logger.Debug("Picked Catchup commit to send", "height", prs.Height)
schema.WriteVote(conR.traceClient, rs.Height, rs.Round, vote,
ps.peer.ID(), schema.Upload)
string(ps.peer.ID()), schema.Upload)
continue OUTER_LOOP
}
}
Expand Down Expand Up @@ -813,7 +933,7 @@ func (conR *Reactor) pickSendVoteAndTrace(votes types.VoteSetReader, rs *cstypes
vote := ps.PickSendVote(votes)
if vote != nil { // if a vote is sent, trace it
schema.WriteVote(conR.traceClient, rs.Height, rs.Round, vote,
ps.peer.ID(), schema.Upload)
string(ps.peer.ID()), schema.Upload)
return true
}
return false
Expand Down Expand Up @@ -895,15 +1015,24 @@ OUTER_LOOP:
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {

p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
if p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: StateChannel,
Message: &cmtcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.Round,
Type: cmtproto.PrevoteType,
BlockID: maj23.ToProto(),
},
}, ps.logger)
}, ps.logger) {
schema.WriteConsensusState(
conR.traceClient,
rs.Height,
rs.Round,
string(peer.ID()),
schema.ConsensusVoteSet23Prevote,
schema.Upload,
)
}
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
Expand All @@ -915,15 +1044,24 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
if p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: StateChannel,
Message: &cmtcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.Round,
Type: cmtproto.PrecommitType,
BlockID: maj23.ToProto(),
},
}, ps.logger)
}, ps.logger) {
schema.WriteConsensusState(
conR.traceClient,
rs.Height,
rs.Round,
string(peer.ID()),
schema.ConsensusVoteSet23Precommit,
schema.Upload,
)
}
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
Expand All @@ -936,15 +1074,24 @@ OUTER_LOOP:
if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {

p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
if p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: StateChannel,
Message: &cmtcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: cmtproto.PrevoteType,
BlockID: maj23.ToProto(),
},
}, ps.logger)
}, ps.logger) {
schema.WriteConsensusState(
conR.traceClient,
rs.Height,
rs.Round,
string(peer.ID()),
schema.ConsensusPOL,
schema.Upload,
)
}
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
Expand All @@ -959,15 +1106,24 @@ OUTER_LOOP:
if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() &&
prs.Height >= conR.conS.blockStore.Base() {
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
if p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: StateChannel,
Message: &cmtcons.VoteSetMaj23{
Height: prs.Height,
Round: commit.Round,
Type: cmtproto.PrecommitType,
BlockID: commit.BlockID.ToProto(),
},
}, ps.logger)
}, ps.logger) {
schema.WriteConsensusState(
conR.traceClient,
prs.Height,
prs.Round,
string(peer.ID()),
schema.ConsensusVoteSet23Precommit,
schema.Upload,
)
}
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
Expand Down
Loading

0 comments on commit 4ff558d

Please sign in to comment.