Skip to content

Commit

Permalink
feat: add tracing for remaining consensus messages, abci calls, and p…
Browse files Browse the repository at this point in the history
…eer updates (#1292)

## Description

adds tracing for remaining consensus messages, abci calls (prepare,
process, and commit), along with adding and removing peers.
  • Loading branch information
evan-forbes authored Apr 23, 2024
1 parent c89f6f9 commit ac96be4
Show file tree
Hide file tree
Showing 18 changed files with 482 additions and 62 deletions.
194 changes: 175 additions & 19 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,21 +272,55 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
conR.conS.mtx.Lock()
initialHeight := conR.conS.state.InitialHeight
conR.conS.mtx.Unlock()
schema.WriteConsensusState(
conR.traceClient,
msg.Height,
msg.Round,
string(e.Src.ID()),
schema.ConsensusNewRoundStep,
schema.Download,
fmt.Sprintf("%d", msg.Step),
)
if err = msg.ValidateHeight(initialHeight); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
return
}
ps.ApplyNewRoundStepMessage(msg)
case *NewValidBlockMessage:
schema.WriteConsensusState(
conR.traceClient,
msg.Height,
msg.Round,
string(e.Src.ID()),
schema.ConsensusNewValidBlock,
schema.Download,
)
ps.ApplyNewValidBlockMessage(msg)
case *HasVoteMessage:
ps.ApplyHasVoteMessage(msg)
schema.WriteConsensusState(
conR.traceClient,
msg.Height,
msg.Round,
string(e.Src.ID()),
schema.ConsensusHasVote,
schema.Download,
msg.Type.String(),
)
case *VoteSetMaj23Message:
cs := conR.conS
cs.mtx.Lock()
height, votes := cs.Height, cs.Votes
cs.mtx.Unlock()
schema.WriteConsensusState(
conR.traceClient,
msg.Height,
msg.Round,
string(e.Src.ID()),
schema.ConsensusVoteSet23Precommit,
schema.Download,
)
if height != msg.Height {
return
}
Expand Down Expand Up @@ -316,10 +350,20 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
if votes := ourVotes.ToProto(); votes != nil {
eMsg.Votes = *votes
}
p2p.TrySendEnvelopeShim(e.Src, p2p.Envelope{ //nolint: staticcheck
if p2p.TrySendEnvelopeShim(e.Src, p2p.Envelope{ //nolint: staticcheck
ChannelID: VoteSetBitsChannel,
Message: eMsg,
}, conR.Logger)
}, conR.Logger) {
schema.WriteConsensusState(
conR.traceClient,
msg.Height,
msg.Round,
string(e.Src.ID()),
schema.ConsensusVoteSetBits,
schema.Upload,
msg.Type.String(),
)
}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
Expand All @@ -333,12 +377,27 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
schema.WriteProposal(
conR.traceClient,
msg.Proposal.Height,
msg.Proposal.Round,
string(e.Src.ID()),
schema.Download,
)
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
schema.WriteConsensusState(
conR.traceClient,
msg.Height,
msg.ProposalPOLRound,
string(e.Src.ID()),
schema.ConsensusPOL,
schema.Download,
)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1)
schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round, e.Src.ID(), msg.Part.Index, schema.Download)
schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round, msg.Part.Index, false, string(e.Src.ID()), schema.Download)
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
Expand All @@ -357,7 +416,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
cs.Validators.Size(), cs.LastCommit.Size()
cs.mtx.RUnlock()

schema.WriteVote(conR.traceClient, height, round, msg.Vote, e.Src.ID(), schema.Download)
schema.WriteVote(conR.traceClient, height, round, msg.Vote, string(e.Src.ID()), schema.Download)

ps.EnsureVoteBitArrays(height, valSize)
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
Expand Down Expand Up @@ -477,6 +536,15 @@ func (conR *Reactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
ChannelID: StateChannel,
Message: nrsMsg,
})
schema.WriteConsensusState(
conR.traceClient,
nrsMsg.Height,
nrsMsg.Round,
schema.Broadcast,
schema.ConsensusNewRoundStep,
schema.Upload,
fmt.Sprintf("%d", nrsMsg.Step),
)
}

func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
Expand All @@ -492,6 +560,14 @@ func (conR *Reactor) broadcastNewValidBlockMessage(rs *cstypes.RoundState) {
ChannelID: StateChannel,
Message: csMsg,
})
schema.WriteConsensusState(
conR.traceClient,
rs.Height,
rs.Round,
schema.Broadcast,
schema.ConsensusNewValidBlock,
schema.Upload,
)
}

// Broadcasts HasVoteMessage to peers that care.
Expand All @@ -506,6 +582,15 @@ func (conR *Reactor) broadcastHasVoteMessage(vote *types.Vote) {
ChannelID: StateChannel,
Message: msg,
})
schema.WriteConsensusState(
conR.traceClient,
vote.Height,
vote.Round,
schema.Broadcast,
schema.ConsensusHasVote,
schema.Upload,
vote.Type.String(),
)
/*
// TODO: Make this broadcast more selective.
for _, peer := range conR.Switch.Peers().List() {
Expand Down Expand Up @@ -544,10 +629,20 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *cmtcons.NewRoundStep)
func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) {
rs := conR.getRoundState()
nrsMsg := makeRoundStepMessage(rs)
p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: StateChannel,
Message: nrsMsg,
}, conR.Logger)
}, conR.Logger) {
schema.WriteConsensusState(
conR.traceClient,
nrsMsg.Height,
nrsMsg.Round,
string(peer.ID()),
schema.ConsensusNewRoundStep,
schema.Upload,
fmt.Sprintf("%d", nrsMsg.Step),
)
}
}

func (conR *Reactor) updateRoundStateRoutine() {
Expand Down Expand Up @@ -599,7 +694,7 @@ OUTER_LOOP:
Part: *parts,
},
}, logger) {
schema.WriteBlockPart(conR.traceClient, rs.Height, rs.Round, peer.ID(), part.Index, schema.Upload)
schema.WriteBlockPart(conR.traceClient, rs.Height, rs.Round, part.Index, false, string(peer.ID()), schema.Upload)
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
Expand Down Expand Up @@ -653,6 +748,13 @@ OUTER_LOOP:
// NOTE[ZM]: A peer might have received different proposal msg so this Proposal msg will be rejected!
// TODO But yet we send block parts of this proposal to the peer (in the first if statement of the current function) while the proposal is rejected. This part of the protocol could be improved by sending the proposal block parts only if the proposal is accepted.
ps.SetHasProposal(rs.Proposal)
schema.WriteProposal(
conR.traceClient,
rs.Height,
rs.Round,
string(peer.ID()),
schema.Upload,
)
}
}
// ProposalPOL: lets peer know which POL votes we have so far.
Expand All @@ -661,14 +763,23 @@ OUTER_LOOP:
// so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound).
if 0 <= rs.Proposal.POLRound {
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: DataChannel,
Message: &cmtcons.ProposalPOL{
Height: rs.Height,
ProposalPolRound: rs.Proposal.POLRound,
ProposalPol: *rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray().ToProto(),
},
}, logger)
}, logger) {
schema.WriteConsensusState(
conR.traceClient,
rs.Height,
rs.Round,
string(peer.ID()),
schema.ConsensusPOL,
schema.Upload,
)
}
}
continue OUTER_LOOP
}
Expand Down Expand Up @@ -720,6 +831,15 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
},
}, logger) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
schema.WriteBlockPart(
conR.traceClient,
prs.Height,
prs.Round,
uint32(index),
true,
string(peer.ID()),
schema.Upload,
)
} else {
logger.Debug("Sending block part for catchup failed")
// sleep to avoid retrying too fast
Expand Down Expand Up @@ -784,7 +904,7 @@ OUTER_LOOP:
if vote != nil {
logger.Debug("Picked Catchup commit to send", "height", prs.Height)
schema.WriteVote(conR.traceClient, rs.Height, rs.Round, vote,
ps.peer.ID(), schema.Upload)
string(ps.peer.ID()), schema.Upload)
continue OUTER_LOOP
}
}
Expand Down Expand Up @@ -813,7 +933,7 @@ func (conR *Reactor) pickSendVoteAndTrace(votes types.VoteSetReader, rs *cstypes
vote := ps.PickSendVote(votes)
if vote != nil { // if a vote is sent, trace it
schema.WriteVote(conR.traceClient, rs.Height, rs.Round, vote,
ps.peer.ID(), schema.Upload)
string(ps.peer.ID()), schema.Upload)
return true
}
return false
Expand Down Expand Up @@ -895,15 +1015,24 @@ OUTER_LOOP:
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {

p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
if p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: StateChannel,
Message: &cmtcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.Round,
Type: cmtproto.PrevoteType,
BlockID: maj23.ToProto(),
},
}, ps.logger)
}, ps.logger) {
schema.WriteConsensusState(
conR.traceClient,
rs.Height,
rs.Round,
string(peer.ID()),
schema.ConsensusVoteSet23Prevote,
schema.Upload,
)
}
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
Expand All @@ -915,15 +1044,24 @@ OUTER_LOOP:
prs := ps.GetRoundState()
if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
if p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: StateChannel,
Message: &cmtcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.Round,
Type: cmtproto.PrecommitType,
BlockID: maj23.ToProto(),
},
}, ps.logger)
}, ps.logger) {
schema.WriteConsensusState(
conR.traceClient,
rs.Height,
rs.Round,
string(peer.ID()),
schema.ConsensusVoteSet23Precommit,
schema.Upload,
)
}
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
Expand All @@ -936,15 +1074,24 @@ OUTER_LOOP:
if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {

p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
if p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: StateChannel,
Message: &cmtcons.VoteSetMaj23{
Height: prs.Height,
Round: prs.ProposalPOLRound,
Type: cmtproto.PrevoteType,
BlockID: maj23.ToProto(),
},
}, ps.logger)
}, ps.logger) {
schema.WriteConsensusState(
conR.traceClient,
rs.Height,
rs.Round,
string(peer.ID()),
schema.ConsensusPOL,
schema.Upload,
)
}
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
Expand All @@ -959,15 +1106,24 @@ OUTER_LOOP:
if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() &&
prs.Height >= conR.conS.blockStore.Base() {
if commit := conR.conS.LoadCommit(prs.Height); commit != nil {
p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
if p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck
ChannelID: StateChannel,
Message: &cmtcons.VoteSetMaj23{
Height: prs.Height,
Round: commit.Round,
Type: cmtproto.PrecommitType,
BlockID: commit.BlockID.ToProto(),
},
}, ps.logger)
}, ps.logger) {
schema.WriteConsensusState(
conR.traceClient,
prs.Height,
prs.Round,
string(peer.ID()),
schema.ConsensusVoteSet23Precommit,
schema.Upload,
)
}
time.Sleep(conR.conS.config.PeerQueryMaj23SleepDuration)
}
}
Expand Down
Loading

0 comments on commit ac96be4

Please sign in to comment.