From 036fb02814ed44086ffe6ae51ec0b8528226ceb2 Mon Sep 17 00:00:00 2001 From: Sanaz Taheri <35961250+staheri14@users.noreply.github.com> Date: Mon, 29 Jan 2024 14:56:43 -0800 Subject: [PATCH 01/11] test: channel ID impact on max bandwidth utilization in MConnection (#1189) This PR implements a benchmark test to evaluate how channel IDs influence maximum bandwidth utilization. Part of #1162 --- p2p/conn/connection_bench_test.go | 99 +++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/p2p/conn/connection_bench_test.go b/p2p/conn/connection_bench_test.go index 323a3c8e5c..b0255842d8 100644 --- a/p2p/conn/connection_bench_test.go +++ b/p2p/conn/connection_bench_test.go @@ -467,3 +467,102 @@ func BenchmarkMConnection_ScalingPayloadSizes_LowSendRate(b *testing.B) { runBenchmarkTest(b, tt) } } + +// BenchmarkMConnection_Multiple_ChannelID assesses the max bw/send rate +// utilization of MConnection when configured with multiple channel IDs. +func BenchmarkMConnection_Multiple_ChannelID(b *testing.B) { + // These tests create two connections with two channels each, one channel having higher priority. + // Traffic is sent from the client to the server, split between the channels with varying proportions in each test case. + // All tests should complete in 2 seconds (calculated as totalTraffic* msgSize / sendRate), + // demonstrating that channel count doesn't affect max bandwidth utilization. + totalTraffic := 100 + msgSize := 1 * kibibyte + sendRate := 50 * kibibyte + recRate := 50 * kibibyte + chDescs := []*ChannelDescriptor{ + {ID: 0x01, Priority: 1, SendQueueCapacity: 50}, + {ID: 0x02, Priority: 2, SendQueueCapacity: 50}, + } + type testCase struct { + name string + trafficMap map[byte]int // channel ID -> number of messages to be sent + } + var tests []testCase + for i := 0.0; i < 1; i += 0.1 { + tests = append(tests, testCase{ + name: fmt.Sprintf("2 channel IDs with traffic proportion %f %f", + i, 1-i), + trafficMap: map[byte]int{ // channel ID -> number of messages to be sent + 0x01: int(i * float64(totalTraffic)), + 0x02: totalTraffic - int(i*float64(totalTraffic)), // the rest of the traffic + }, + }) + } + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + for n := 0; n < b.N; n++ { + // set up two networked connections + // server, client := NetPipe() // can alternatively use this and comment out the line below + server, client := tcpNetPipe() + defer server.Close() + defer client.Close() + + // prepare callback to receive messages + allReceived := make(chan bool) + receivedLoad := 0 // number of messages received + onReceive := func(chID byte, msgBytes []byte) { + receivedLoad++ + if receivedLoad >= totalTraffic { + allReceived <- true + } + } + + cnfg := DefaultMConnConfig() + cnfg.SendRate = int64(sendRate) + cnfg.RecvRate = int64(recRate) + + // mount the channel descriptors to the connections + clientMconn := NewMConnectionWithConfig(client, chDescs, + func(chID byte, msgBytes []byte) {}, + func(r interface{}) {}, + cnfg) + serverMconn := NewMConnectionWithConfig(server, chDescs, + onReceive, + func(r interface{}) {}, + cnfg) + clientMconn.SetLogger(log.TestingLogger()) + serverMconn.SetLogger(log.TestingLogger()) + + err := clientMconn.Start() + require.Nil(b, err) + defer func() { + _ = clientMconn.Stop() + }() + err = serverMconn.Start() + require.Nil(b, err) + defer func() { + _ = serverMconn.Stop() + }() + + // start measuring the time from here to exclude the time + // taken to set up the connections + b.StartTimer() + // start generating messages over the two channels, + // concurrently, with the specified proportions + for chID, trafficPortion := range tt.trafficMap { + go generateAndSendMessages(clientMconn, + time.Millisecond, + 1*time.Minute, + trafficPortion, + msgSize, + nil, + chID) + } + + // wait for all messages to be received + <-allReceived + b.StopTimer() + } + }) + } +} From be1f465cc14f344db5e832b2f69faa80bb8d66dc Mon Sep 17 00:00:00 2001 From: Sanaz Taheri <35961250+staheri14@users.noreply.github.com> Date: Wed, 31 Jan 2024 11:09:11 -0800 Subject: [PATCH 02/11] test: sequential receipt of messages in MConnection (#1193) Accomplishes the first task of #1192 --- p2p/conn/connection_bench_test.go | 151 ++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) diff --git a/p2p/conn/connection_bench_test.go b/p2p/conn/connection_bench_test.go index b0255842d8..c8ab74bdfe 100644 --- a/p2p/conn/connection_bench_test.go +++ b/p2p/conn/connection_bench_test.go @@ -65,6 +65,48 @@ func generateAndSendMessages(mc *MConnection, } } +// sendMessages sends the supplied messages `msgs` to the specified multiplex +// connection in order and according to the specified rate `messagingRate`. +// chIDs is the list of channel IDs to which the messages are sent. +// This process terminates after the duration `timeout` or when all +// messages are sent. +func sendMessages(mc *MConnection, + messagingRate time.Duration, + timeout time.Duration, + msgs [][]byte, chIDs []byte) { + + var i = 0 + total := len(msgs) + // message generation interval ticker + ticker := time.NewTicker(messagingRate) + defer ticker.Stop() + + // timer for the total duration + timer := time.NewTimer(timeout) + defer timer.Stop() + + // generating messages + for { + select { + case <-ticker.C: + // generate message + if mc.Send(chIDs[i], msgs[i]) { + i++ + if i >= total { + log.TestingLogger().Info("Completed the message generation as the" + + " total number of messages is reached") + return + } + } + case <-timer.C: + // time's up + log.TestingLogger().Info("Completed the message generation as the total " + + "duration is reached") + return + } + } +} + func BenchmarkMConnection(b *testing.B) { chID := byte(0x01) @@ -566,3 +608,112 @@ func BenchmarkMConnection_Multiple_ChannelID(b *testing.B) { }) } } + +func TestMConnection_Message_Order_ChannelID(t *testing.T) { + // This test involves two connections, each with two channels: + // channel ID 1 (high priority) and channel ID 2 (low priority). + // It sends 11 messages from client to server, with the first 10 on + // channel ID 2 and the final one on channel ID 1. + // The aim is to show that message order at the receiver is based solely + // on the send order, as the receiver does not prioritize channels. + // To enforce a specific send order, + // channel ID 2's send queue capacity is limited to 1; + // preventing message queuing on channel ID 2 that could otherwise give + // priority to channel ID 1's message (despite being the last message), + // disrupting the send order. + totalMsgs := 11 + msgSize := 1 * kibibyte + sendRate := 50 * kibibyte + recRate := 50 * kibibyte + clientChDesc := []*ChannelDescriptor{ + {ID: 0x01, Priority: 1, SendQueueCapacity: 10, + RecvMessageCapacity: defaultRecvMessageCapacity, + RecvBufferCapacity: defaultRecvBufferCapacity}, + {ID: 0x02, Priority: 2, + // channel ID 2's send queue capacity is limited to 1; + // to enforce a specific send order. + SendQueueCapacity: 1, + RecvMessageCapacity: defaultRecvMessageCapacity, + RecvBufferCapacity: defaultRecvBufferCapacity}, + } + serverChDesc := []*ChannelDescriptor{ + {ID: 0x01, Priority: 1, SendQueueCapacity: 50, + RecvMessageCapacity: defaultRecvMessageCapacity, + RecvBufferCapacity: defaultRecvBufferCapacity}, + {ID: 0x02, Priority: 2, SendQueueCapacity: 50, + RecvMessageCapacity: defaultRecvMessageCapacity, + RecvBufferCapacity: defaultRecvBufferCapacity}, + } + + // prepare messages and channel IDs + // 10 messages on channel ID 2 and 1 message on channel ID 1 + msgs := make([][]byte, totalMsgs) + chIDs := make([]byte, totalMsgs) + for i := 0; i < totalMsgs-1; i++ { + msg := bytes.Repeat([]byte{'x'}, msgSize) + msgs[i] = msg + chIDs[i] = 0x02 + } + msgs[totalMsgs-1] = bytes.Repeat([]byte{'y'}, msgSize) + chIDs[totalMsgs-1] = 0x01 + + // set up two networked connections + // server, client := NetPipe() // can alternatively use this and comment out the line below + server, client := tcpNetPipe() + defer server.Close() + defer client.Close() + + // prepare callback to receive messages + allReceived := make(chan bool) + received := 0 // number of messages received + recvChIds := make([]byte, totalMsgs) // keep track of the order of channel IDs of received messages + onReceive := func(chID byte, msgBytes []byte) { + // wait for 100ms to simulate processing time + // Also, the added delay allows the receiver to buffer all 11 messages, + // testing if the message on channel ID 1 (high priority) is received last or + // prioritized among the 10 messages on channel ID 2. + time.Sleep(100 * time.Millisecond) + recvChIds[received] = chID + received++ + if received >= totalMsgs { + allReceived <- true + } + } + + cnfg := DefaultMConnConfig() + cnfg.SendRate = int64(sendRate) + cnfg.RecvRate = int64(recRate) + + // mount the channel descriptors to the connections + clientMconn := NewMConnectionWithConfig(client, clientChDesc, + func(chID byte, msgBytes []byte) {}, + func(r interface{}) {}, + cnfg) + serverMconn := NewMConnectionWithConfig(server, serverChDesc, + onReceive, + func(r interface{}) {}, + cnfg) + clientMconn.SetLogger(log.TestingLogger()) + serverMconn.SetLogger(log.TestingLogger()) + + err := clientMconn.Start() + require.Nil(t, err) + defer func() { + _ = clientMconn.Stop() + }() + err = serverMconn.Start() + require.Nil(t, err) + defer func() { + _ = serverMconn.Stop() + }() + + go sendMessages(clientMconn, + time.Millisecond, + 1*time.Minute, + msgs, chIDs) + + // wait for all messages to be received + <-allReceived + + require.Equal(t, chIDs, recvChIds) // assert that the order of received messages is the same as the order of sent messages +} From de08d3a3b4a722931fa8ef8ea0fa5899607bf94a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?nina=20/=20=E1=83=9C=E1=83=98=E1=83=9C=E1=83=90?= Date: Thu, 1 Feb 2024 15:39:58 +0100 Subject: [PATCH 03/11] chore: log already rejected txs to debug instead of info (#1206) reopened so we can merge https://github.com/celestiaorg/celestia-core/pull/1188 Co-authored-by: evan-forbes --- mempool/cat/reactor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mempool/cat/reactor.go b/mempool/cat/reactor.go index 399f9fb5ce..54e0bbe999 100644 --- a/mempool/cat/reactor.go +++ b/mempool/cat/reactor.go @@ -253,7 +253,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { } _, err = memR.mempool.TryAddNewTx(ntx, key, txInfo) if err != nil && err != ErrTxInMempool { - memR.Logger.Info("Could not add tx", "txKey", key, "err", err) + memR.Logger.Debug("Could not add tx", "txKey", key, "err", err) return } if !memR.opts.ListenOnly { From 40ee792bb2f878dfaba02f352c4951386eb59883 Mon Sep 17 00:00:00 2001 From: Sanaz Taheri <35961250+staheri14@users.noreply.github.com> Date: Fri, 2 Feb 2024 10:17:07 -0800 Subject: [PATCH 04/11] feat!: traces gossiped votes in the InfluxDB (#1204) Closes #1200 --- consensus/reactor.go | 47 +++++++++++++++++++++---------- pkg/trace/schema/consensus.go | 52 +++++++++++++++++++++++++++++++++++ pkg/trace/schema/tables.go | 3 +- 3 files changed, 87 insertions(+), 15 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 12b08c6c0c..e8662f7a12 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -353,8 +353,12 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) { case *VoteMessage: cs := conR.conS cs.mtx.RLock() - height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size() + height, round, valSize, lastCommitSize := cs.Height, cs.Round, + cs.Validators.Size(), cs.LastCommit.Size() cs.mtx.RUnlock() + + schema.WriteVote(conR.traceClient, height, round, msg.Vote, e.Src.ID(), schema.TransferTypeDownload) + ps.EnsureVoteBitArrays(height, valSize) ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) @@ -763,7 +767,7 @@ OUTER_LOOP: // Special catchup logic. // If peer is lagging by height 1, send LastCommit. if prs.Height != 0 && rs.Height == prs.Height+1 { - if ps.PickSendVote(rs.LastCommit) { + if conR.pickSendVoteAndTrace(rs.LastCommit, rs, ps) { logger.Debug("Picked rs.LastCommit to send", "height", prs.Height) continue OUTER_LOOP } @@ -776,8 +780,11 @@ OUTER_LOOP: // Load the block commit for prs.Height, // which contains precommit signatures for prs.Height. if commit := conR.conS.blockStore.LoadBlockCommit(prs.Height); commit != nil { - if ps.PickSendVote(commit) { + vote := ps.PickSendVote(commit) + if vote != nil { logger.Debug("Picked Catchup commit to send", "height", prs.Height) + schema.WriteVote(conR.traceClient, rs.Height, rs.Round, vote, + ps.peer.ID(), schema.TransferTypeUpload) continue OUTER_LOOP } } @@ -799,6 +806,18 @@ OUTER_LOOP: } } +// pickSendVoteAndTrace picks a vote to send and traces it. +// It returns true if a vote is sent. +// Note that it is a wrapper around PickSendVote with the addition of tracing the vote. +func (conR *Reactor) pickSendVoteAndTrace(votes types.VoteSetReader, rs *cstypes.RoundState, ps *PeerState) bool { + vote := ps.PickSendVote(votes) + if vote != nil { // if a vote is sent, trace it + schema.WriteVote(conR.traceClient, rs.Height, rs.Round, vote, + ps.peer.ID(), schema.TransferTypeUpload) + return true + } + return false +} func (conR *Reactor) gossipVotesForHeight( logger log.Logger, rs *cstypes.RoundState, @@ -808,7 +827,7 @@ func (conR *Reactor) gossipVotesForHeight( // If there are lastCommits to send... if prs.Step == cstypes.RoundStepNewHeight { - if ps.PickSendVote(rs.LastCommit) { + if conR.pickSendVoteAndTrace(rs.LastCommit, rs, ps) { logger.Debug("Picked rs.LastCommit to send") return true } @@ -816,7 +835,7 @@ func (conR *Reactor) gossipVotesForHeight( // If there are POL prevotes to send... if prs.Step <= cstypes.RoundStepPropose && prs.Round != -1 && prs.Round <= rs.Round && prs.ProposalPOLRound != -1 { if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { - if ps.PickSendVote(polPrevotes) { + if conR.pickSendVoteAndTrace(polPrevotes, rs, ps) { logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound) return true @@ -825,21 +844,21 @@ func (conR *Reactor) gossipVotesForHeight( } // If there are prevotes to send... if prs.Step <= cstypes.RoundStepPrevoteWait && prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { + if conR.pickSendVoteAndTrace(rs.Votes.Prevotes(prs.Round), rs, ps) { logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round) return true } } // If there are precommits to send... if prs.Step <= cstypes.RoundStepPrecommitWait && prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) { + if conR.pickSendVoteAndTrace(rs.Votes.Precommits(prs.Round), rs, ps) { logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round) return true } } // If there are prevotes to send...Needed because of validBlock mechanism if prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { + if conR.pickSendVoteAndTrace(rs.Votes.Prevotes(prs.Round), rs, ps) { logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round) return true } @@ -847,7 +866,7 @@ func (conR *Reactor) gossipVotesForHeight( // If there are POLPrevotes to send... if prs.ProposalPOLRound != -1 { if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { - if ps.PickSendVote(polPrevotes) { + if conR.pickSendVoteAndTrace(polPrevotes, rs, ps) { logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound) return true @@ -1163,8 +1182,8 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in } // PickSendVote picks a vote and sends it to the peer. -// Returns true if vote was sent. -func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { +// Returns the vote if vote was sent. Otherwise, returns nil. +func (ps *PeerState) PickSendVote(votes types.VoteSetReader) *types.Vote { if vote, ok := ps.PickVoteToSend(votes); ok { ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote) if p2p.SendEnvelopeShim(ps.peer, p2p.Envelope{ //nolint: staticcheck @@ -1174,11 +1193,11 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { }, }, ps.logger) { ps.SetHasVote(vote) - return true + return vote } - return false + return nil } - return false + return nil } // PickVoteToSend picks a vote to send to the peer. diff --git a/pkg/trace/schema/consensus.go b/pkg/trace/schema/consensus.go index 6609a8f363..e3d5371d8d 100644 --- a/pkg/trace/schema/consensus.go +++ b/pkg/trace/schema/consensus.go @@ -14,6 +14,7 @@ func ConsensusTables() []string { RoundStateTable, BlockPartsTable, BlockTable, + VoteTable, } } @@ -128,3 +129,54 @@ func WriteBlock(client *trace.Client, block *types.Block, size int) { LastCommitRoundFieldKey: block.LastCommit.Round, }) } + +// Schema constants for the consensus votes tracing database. +const ( + // VoteTable is the name of the table that stores the consensus + // voting traces. Follows this schema: + // + // | time | height | round | vote_type | vote_height | vote_round + // | vote_block_id| vote_unix_millisecond_timestamp + // | vote_validator_address | vote_validator_index | peer + // | transfer_type | + VoteTable = "consensus_vote" + + VoteTypeFieldKey = "vote_type" + VoteHeightFieldKey = "vote_height" + VoteRoundFieldKey = "vote_round" + VoteBlockIDFieldKey = "vote_block_id" + VoteTimestampFieldKey = "vote_unix_millisecond_timestamp" + ValidatorAddressFieldKey = "vote_validator_address" + ValidatorIndexFieldKey = "vote_validator_index" +) + +// WriteVote writes a tracing point for a vote using the predetermined +// schema for consensus vote tracing. +// This is used to create a table in the following +// schema: +// +// | time | height | round | vote_type | vote_height | vote_round +// | vote_block_id| vote_unix_millisecond_timestamp +// | vote_validator_address | vote_validator_index | peer +// | transfer_type | +func WriteVote(client *trace.Client, + height int64, // height of the current peer when it received/sent the vote + round int32, // round of the current peer when it received/sent the vote + vote *types.Vote, // vote received by the current peer + peer p2p.ID, // the peer from which it received the vote or the peer to which it sent the vote + transferType string, // download (received) or upload(sent) +) { + client.WritePoint(VoteTable, map[string]interface{}{ + HeightFieldKey: height, + RoundFieldKey: round, + VoteTypeFieldKey: vote.Type.String(), + VoteHeightFieldKey: vote.Height, + VoteRoundFieldKey: vote.Round, + VoteBlockIDFieldKey: vote.BlockID.Hash.String(), + VoteTimestampFieldKey: vote.Timestamp.UnixMilli(), + ValidatorAddressFieldKey: vote.ValidatorAddress.String(), + ValidatorIndexFieldKey: vote.ValidatorIndex, + PeerFieldKey: peer, + TransferTypeFieldKey: transferType, + }) +} diff --git a/pkg/trace/schema/tables.go b/pkg/trace/schema/tables.go index e67d8f7a64..f229c4a50c 100644 --- a/pkg/trace/schema/tables.go +++ b/pkg/trace/schema/tables.go @@ -24,7 +24,8 @@ const ( // value. PeerFieldKey = "peer" - // TransferTypeFieldKey is the tracing field key for the class of a tx. + // TransferTypeFieldKey is the tracing field key for the class of a tx + // and votes. TransferTypeFieldKey = "transfer_type" // TransferTypeDownload is a tracing field value for receiving some From 0faeb5996f394bb38e03c20a2ae5b767adb25f8f Mon Sep 17 00:00:00 2001 From: Rootul P Date: Tue, 6 Feb 2024 10:23:28 -0500 Subject: [PATCH 05/11] chore: add more CODEOWNERS (#1212) Closes https://github.com/celestiaorg/celestia-core/issues/1211 --- .github/CODEOWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 82516cf5d9..09e849ee7e 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -7,7 +7,7 @@ # global owners are only requested if there isn't a more specific # codeowner specified below. For this reason, the global codeowners # are often repeated in package-level definitions. -* @evan-forbes @cmwaters +* @evan-forbes @cmwaters @staheri14 @rach-id @ninabarbakadze @rootulp # Overrides for tooling packages docs/celestia-architecture @liamsi @adlerjohn From 2b5f6efe4353c117b58fbecfbd7686e90a546c59 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Tue, 6 Feb 2024 18:09:00 +0100 Subject: [PATCH 06/11] chore: cleanup go.mod file require sections (#1172) This PR does not change any of the dependency, it only reorders the go.mod file into a direct and undirect import section, as go versions do since 1.17. The reason your go.mod file got so many require sections is because during the 1.16 to 1.17 go.mod styling transition you had different contributors updating the go.mod file with pre 1.17 and post 1.17 releases of go. See https://go.dev/doc/modules/gomod-ref: > At go 1.17 or higher: section for a description of what changed. --- go.mod | 53 +++++++++++++++++++---------------------------------- 1 file changed, 19 insertions(+), 34 deletions(-) diff --git a/go.mod b/go.mod index d139c3087b..862d3e8972 100644 --- a/go.mod +++ b/go.mod @@ -5,24 +5,38 @@ go 1.21.5 require ( github.com/BurntSushi/toml v1.2.1 github.com/ChainSafe/go-schnorrkel v1.0.0 + github.com/Masterminds/semver/v3 v3.2.0 github.com/Workiva/go-datastructures v1.0.53 github.com/adlio/schema v1.3.3 + github.com/btcsuite/btcd/btcec/v2 v2.2.1 + github.com/btcsuite/btcd/btcutil v1.1.2 + github.com/bufbuild/buf v1.9.0 + github.com/celestiaorg/nmt v0.20.0 + github.com/cometbft/cometbft-db v0.7.0 + github.com/creachadair/taskgroup v0.3.2 github.com/fortytw2/leaktest v1.3.0 + github.com/go-git/go-git/v5 v5.11.0 github.com/go-kit/kit v0.12.0 github.com/go-kit/log v0.2.1 github.com/go-logfmt/logfmt v0.5.1 github.com/gofrs/uuid v4.3.0+incompatible + github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.3 github.com/golangci/golangci-lint v1.50.1 github.com/google/orderedcode v0.0.1 + github.com/google/uuid v1.3.1 github.com/gorilla/websocket v1.5.0 + github.com/grafana/pyroscope-go v1.0.3 github.com/gtank/merlin v0.1.1 + github.com/influxdata/influxdb-client-go/v2 v2.12.3 + github.com/informalsystems/tm-load-test v1.3.0 github.com/lib/pq v1.10.6 github.com/libp2p/go-buffer-pool v0.1.0 github.com/minio/highwayhash v1.0.2 github.com/ory/dockertest v3.3.5+incompatible github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.14.0 + github.com/pyroscope-io/otel-profiling-go v0.4.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/rs/cors v1.8.2 github.com/sasha-s/go-deadlock v0.3.1 @@ -30,45 +44,16 @@ require ( github.com/spf13/cobra v1.6.1 github.com/spf13/viper v1.13.0 github.com/stretchr/testify v1.8.4 -) - -require ( - github.com/google/uuid v1.3.1 - golang.org/x/crypto v0.17.0 - golang.org/x/net v0.19.0 - google.golang.org/grpc v1.59.0 -) - -require ( - github.com/gogo/protobuf v1.3.2 - github.com/informalsystems/tm-load-test v1.3.0 -) - -require ( - github.com/bufbuild/buf v1.9.0 - github.com/creachadair/taskgroup v0.3.2 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 -) - -require ( - github.com/Masterminds/semver/v3 v3.2.0 - github.com/btcsuite/btcd/btcec/v2 v2.2.1 - github.com/btcsuite/btcd/btcutil v1.1.2 - github.com/celestiaorg/nmt v0.20.0 - github.com/cometbft/cometbft-db v0.7.0 - github.com/go-git/go-git/v5 v5.11.0 github.com/vektra/mockery/v2 v2.14.0 - gonum.org/v1/gonum v0.8.2 - google.golang.org/protobuf v1.31.0 -) - -require ( - github.com/grafana/pyroscope-go v1.0.3 - github.com/influxdata/influxdb-client-go/v2 v2.12.3 - github.com/pyroscope-io/otel-profiling-go v0.4.0 go.opentelemetry.io/otel v1.18.0 go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.18.0 go.opentelemetry.io/otel/sdk v1.18.0 + golang.org/x/crypto v0.17.0 + golang.org/x/net v0.19.0 + gonum.org/v1/gonum v0.8.2 + google.golang.org/grpc v1.59.0 + google.golang.org/protobuf v1.31.0 ) require ( From 52b993cb7d493e1ce62f48849effe933c4a3a108 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Tue, 6 Feb 2024 11:33:25 -0600 Subject: [PATCH 07/11] fix: correctly prune peers from the seenTxSet when disconnecting (#1215) ## Description Closes: #1208 --- #### PR checklist - [ ] Tests written/updated - [ ] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog) - [ ] Updated relevant documentation (`docs/` or `spec/`) and code comments --- mempool/cat/cache.go | 24 ++++++++++---------- mempool/cat/cache_test.go | 6 ++--- mempool/cat/reactor.go | 3 +++ mempool/cat/reactor_test.go | 44 +++++++++++++++++++++++++++++++++++++ mempool/cat/requests.go | 3 ++- 5 files changed, 63 insertions(+), 17 deletions(-) diff --git a/mempool/cat/cache.go b/mempool/cat/cache.go index 75b4db452b..95f9393c8f 100644 --- a/mempool/cat/cache.go +++ b/mempool/cat/cache.go @@ -130,19 +130,6 @@ func (s *SeenTxSet) Add(txKey types.TxKey, peer uint16) { } } -func (s *SeenTxSet) Pop(txKey types.TxKey) uint16 { - s.mtx.Lock() - defer s.mtx.Unlock() - seenSet, exists := s.set[txKey] - if exists { - for peer := range seenSet.peers { - delete(seenSet.peers, peer) - return peer - } - } - return 0 -} - func (s *SeenTxSet) RemoveKey(txKey types.TxKey) { s.mtx.Lock() defer s.mtx.Unlock() @@ -162,6 +149,17 @@ func (s *SeenTxSet) Remove(txKey types.TxKey, peer uint16) { } } +func (s *SeenTxSet) RemovePeer(peer uint16) { + s.mtx.Lock() + defer s.mtx.Unlock() + for key, seenSet := range s.set { + delete(seenSet.peers, peer) + if len(seenSet.peers) == 0 { + delete(s.set, key) + } + } +} + func (s *SeenTxSet) Prune(limit time.Time) { s.mtx.Lock() defer s.mtx.Unlock() diff --git a/mempool/cat/cache_test.go b/mempool/cat/cache_test.go index 5c7f6453bf..539670ff0a 100644 --- a/mempool/cat/cache_test.go +++ b/mempool/cat/cache_test.go @@ -22,7 +22,7 @@ func TestSeenTxSet(t *testing.T) { ) seenSet := NewSeenTxSet() - require.Zero(t, seenSet.Pop(tx1Key)) + require.Nil(t, seenSet.Get(tx1Key)) seenSet.Add(tx1Key, peer1) seenSet.Add(tx1Key, peer1) @@ -36,8 +36,8 @@ func TestSeenTxSet(t *testing.T) { require.Equal(t, 3, seenSet.Len()) seenSet.RemoveKey(tx2Key) require.Equal(t, 2, seenSet.Len()) - require.Zero(t, seenSet.Pop(tx2Key)) - require.Equal(t, peer1, seenSet.Pop(tx3Key)) + require.Nil(t, seenSet.Get(tx2Key)) + require.True(t, seenSet.Has(tx3Key, peer1)) } func TestLRUTxCacheRemove(t *testing.T) { diff --git a/mempool/cat/reactor.go b/mempool/cat/reactor.go index 54e0bbe999..abfdfd56d9 100644 --- a/mempool/cat/reactor.go +++ b/mempool/cat/reactor.go @@ -189,6 +189,9 @@ func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer { // peer it will find a new peer to rerequest the same transactions. func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { peerID := memR.ids.Reclaim(peer.ID()) + // clear all memory of seen txs by that peer + memR.mempool.seenByPeersSet.RemovePeer(peerID) + // remove and rerequest all pending outbound requests to that peer since we know // we won't receive any responses from them. outboundRequests := memR.requests.ClearAllRequestsFrom(peerID) diff --git a/mempool/cat/reactor_test.go b/mempool/cat/reactor_test.go index 15d67bfad5..2a9b8bd655 100644 --- a/mempool/cat/reactor_test.go +++ b/mempool/cat/reactor_test.go @@ -157,6 +157,50 @@ func TestReactorBroadcastsSeenTxAfterReceivingTx(t *testing.T) { peers[1].AssertExpectations(t) } +func TestRemovePeerRequestFromOtherPeer(t *testing.T) { + reactor, _ := setupReactor(t) + + tx := newDefaultTx("hello") + key := tx.Key() + peers := genPeers(2) + reactor.InitPeer(peers[0]) + reactor.InitPeer(peers[1]) + + seenMsg := &protomem.SeenTx{TxKey: key[:]} + + wantEnv := p2p.Envelope{ + Message: &protomem.Message{ + Sum: &protomem.Message_WantTx{WantTx: &protomem.WantTx{TxKey: key[:]}}, + }, + ChannelID: MempoolStateChannel, + } + peers[0].On("SendEnvelope", wantEnv).Return(true) + peers[1].On("SendEnvelope", wantEnv).Return(true) + + reactor.ReceiveEnvelope(p2p.Envelope{ + Src: peers[0], + Message: seenMsg, + ChannelID: MempoolStateChannel, + }) + time.Sleep(100 * time.Millisecond) + reactor.ReceiveEnvelope(p2p.Envelope{ + Src: peers[1], + Message: seenMsg, + ChannelID: MempoolStateChannel, + }) + + reactor.RemovePeer(peers[0], "test") + + peers[0].AssertExpectations(t) + peers[1].AssertExpectations(t) + + require.True(t, reactor.mempool.seenByPeersSet.Has(key, 2)) + // we should have automatically sent another request out for peer 2 + require.EqualValues(t, 2, reactor.requests.ForTx(key)) + require.True(t, reactor.requests.Has(2, key)) + require.False(t, reactor.mempool.seenByPeersSet.Has(key, 1)) +} + func TestMempoolVectors(t *testing.T) { testCases := []struct { testName string diff --git a/mempool/cat/requests.go b/mempool/cat/requests.go index 5fdb344a87..8d0b78778f 100644 --- a/mempool/cat/requests.go +++ b/mempool/cat/requests.go @@ -113,8 +113,9 @@ func (r *requestScheduler) ClearAllRequestsFrom(peer uint16) requestSet { if !ok { return requestSet{} } - for _, timer := range requests { + for tx, timer := range requests { timer.Stop() + delete(r.requestsByTx, tx) } delete(r.requestsByPeer, peer) return requests From 9f641e7ff15a73e7e6fa520fe15ce6288258adfc Mon Sep 17 00:00:00 2001 From: Hoa Nguyen Date: Fri, 9 Feb 2024 22:10:43 +0700 Subject: [PATCH 08/11] feat(mempool): add metric size of pool in bytes (#1195) This PR exposes a new metric SizeBytes for the total size of the mempool in bytes. We already keep track of this value internally, in the variable txsBytes. Currently there is the metric Size for the size of the mempool in number of transactions. --- mempool/cat/pool.go | 3 +++ mempool/metrics.go | 11 +++++++++++ mempool/v0/clist_mempool.go | 3 +++ mempool/v1/mempool.go | 3 +++ 4 files changed, 20 insertions(+) diff --git a/mempool/cat/pool.go b/mempool/cat/pool.go index efe644f809..a1a86e1ef4 100644 --- a/mempool/cat/pool.go +++ b/mempool/cat/pool.go @@ -499,6 +499,7 @@ func (txmp *TxPool) Update( // transactions are left. size := txmp.Size() txmp.metrics.Size.Set(float64(size)) + txmp.metrics.SizeBytes.Set(float64(txmp.SizeBytes())) if size > 0 { if txmp.config.Recheck { txmp.recheckTransactions() @@ -576,6 +577,7 @@ func (txmp *TxPool) addNewTransaction(wtx *wrappedTx, checkTxRes *abci.ResponseC txmp.metrics.TxSizeBytes.Observe(float64(wtx.size())) txmp.metrics.Size.Set(float64(txmp.Size())) + txmp.metrics.SizeBytes.Set(float64(txmp.SizeBytes())) txmp.logger.Debug( "inserted new valid transaction", "priority", wtx.priority, @@ -628,6 +630,7 @@ func (txmp *TxPool) handleRecheckResult(wtx *wrappedTx, checkTxRes *abci.Respons } txmp.metrics.FailedTxs.Add(1) txmp.metrics.Size.Set(float64(txmp.Size())) + txmp.metrics.SizeBytes.Set(float64(txmp.SizeBytes())) } // recheckTransactions initiates re-CheckTx ABCI calls for all the transactions diff --git a/mempool/metrics.go b/mempool/metrics.go index 6c00ce59bc..7c23a5f94b 100644 --- a/mempool/metrics.go +++ b/mempool/metrics.go @@ -30,6 +30,9 @@ type Metrics struct { // Size of the mempool. Size metrics.Gauge + // Total size of the mempool in bytes. + SizeBytes metrics.Gauge + // Histogram of transaction sizes, in bytes. TxSizeBytes metrics.Histogram @@ -81,6 +84,13 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Help: "Size of the mempool (number of uncommitted transactions).", }, labels).With(labelsAndValues...), + SizeBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "size_bytes", + Help: "Total size of the mempool in bytes.", + }, labels).With(labelsAndValues...), + TxSizeBytes: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -144,6 +154,7 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { func NopMetrics() *Metrics { return &Metrics{ Size: discard.NewGauge(), + SizeBytes: discard.NewGauge(), TxSizeBytes: discard.NewHistogram(), FailedTxs: discard.NewCounter(), EvictedTxs: discard.NewCounter(), diff --git a/mempool/v0/clist_mempool.go b/mempool/v0/clist_mempool.go index 17dda10487..e81083cac8 100644 --- a/mempool/v0/clist_mempool.go +++ b/mempool/v0/clist_mempool.go @@ -277,6 +277,7 @@ func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) { // update metrics mem.metrics.Size.Set(float64(mem.Size())) + mem.metrics.SizeBytes.Set(float64(mem.SizeBytes())) } // Request specific callback that should be set on individual reqRes objects @@ -304,6 +305,7 @@ func (mem *CListMempool) reqResCb( // update metrics mem.metrics.Size.Set(float64(mem.Size())) + mem.metrics.SizeBytes.Set(float64(mem.SizeBytes())) // passed in by the caller of CheckTx, eg. the RPC if externalCb != nil { @@ -639,6 +641,7 @@ func (mem *CListMempool) Update( // Update metrics mem.metrics.Size.Set(float64(mem.Size())) + mem.metrics.SizeBytes.Set(float64(mem.SizeBytes())) return nil } diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index d3736ef74f..c7191fad9c 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -439,6 +439,7 @@ func (txmp *TxMempool) Update( // transactions are left. size := txmp.Size() txmp.metrics.Size.Set(float64(size)) + txmp.metrics.SizeBytes.Set(float64(txmp.SizeBytes())) if size > 0 { if txmp.config.Recheck { txmp.recheckTransactions() @@ -608,6 +609,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size())) txmp.metrics.Size.Set(float64(txmp.Size())) + txmp.metrics.SizeBytes.Set(float64(txmp.SizeBytes())) txmp.logger.Debug( "inserted new valid transaction", "priority", wtx.Priority(), @@ -675,6 +677,7 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.Respons } } txmp.metrics.Size.Set(float64(txmp.Size())) + txmp.metrics.SizeBytes.Set(float64(txmp.SizeBytes())) } // recheckTransactions initiates re-CheckTx ABCI calls for all the transactions From 938fb01ccbf0b9ac0f270fe5d6f9ce6b2aad7ab6 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Fri, 9 Feb 2024 13:50:38 -0600 Subject: [PATCH 09/11] feat: add more consensus metrics on rejected proposals (#1216) This adds two counters to the consensus metrics: - ApplicationRejectedProposals - TimedOutProposals It also adds a third `StartHeight` which is set once upon start so we know how many heights the metrics have been running for --- consensus/metrics.go | 51 ++++++++++++++++++++++++++++++++++---------- consensus/state.go | 5 +++++ 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/consensus/metrics.go b/consensus/metrics.go index e435a0ee92..cafa2ab6bc 100644 --- a/consensus/metrics.go +++ b/consensus/metrics.go @@ -22,6 +22,8 @@ const ( type Metrics struct { // Height of the chain. Height metrics.Gauge + // The height when the metrics started from + StartHeight metrics.Gauge // ValidatorLastSignedHeight of a validator. ValidatorLastSignedHeight metrics.Gauge @@ -88,6 +90,12 @@ type Metrics struct { // timestamp and the timestamp of the latest prevote in a round where 100% // of the voting power on the network issued prevotes. FullPrevoteMessageDelay metrics.Gauge + + // The amount of proposals that were rejected by the application. + ApplicationRejectedProposals metrics.Counter + + // The amount of proposals that failed to be received in time + TimedOutProposals metrics.Counter } // PrometheusMetrics returns Metrics build using Prometheus client library. @@ -105,6 +113,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "height", Help: "Height of the chain.", }, labels).With(labelsAndValues...), + StartHeight: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "start_height", + Help: "Height that metrics began", + }, labels).With(labelsAndValues...), Rounds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -241,13 +255,26 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Help: "Difference in seconds between the proposal timestamp and the timestamp " + "of the latest prevote that achieved 100% of the voting power in the prevote step.", }, labels).With(labelsAndValues...), + ApplicationRejectedProposals: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "application_rejected_proposals", + Help: "Number of proposals rejected by the application", + }, labels).With(labelsAndValues...), + TimedOutProposals: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "timed_out_proposals", + Help: "Number of proposals that failed to be received in time", + }, labels).With(labelsAndValues...), } } // NopMetrics returns no-op Metrics. func NopMetrics() *Metrics { return &Metrics{ - Height: discard.NewGauge(), + Height: discard.NewGauge(), + StartHeight: discard.NewGauge(), ValidatorLastSignedHeight: discard.NewGauge(), @@ -265,16 +292,18 @@ func NopMetrics() *Metrics { BlockIntervalSeconds: discard.NewHistogram(), - NumTxs: discard.NewGauge(), - BlockSizeBytes: discard.NewGauge(), - TotalTxs: discard.NewGauge(), - CommittedHeight: discard.NewGauge(), - FastSyncing: discard.NewGauge(), - StateSyncing: discard.NewGauge(), - BlockParts: discard.NewCounter(), - BlockGossipPartsReceived: discard.NewCounter(), - QuorumPrevoteMessageDelay: discard.NewGauge(), - FullPrevoteMessageDelay: discard.NewGauge(), + NumTxs: discard.NewGauge(), + BlockSizeBytes: discard.NewGauge(), + TotalTxs: discard.NewGauge(), + CommittedHeight: discard.NewGauge(), + FastSyncing: discard.NewGauge(), + StateSyncing: discard.NewGauge(), + BlockParts: discard.NewCounter(), + BlockGossipPartsReceived: discard.NewCounter(), + QuorumPrevoteMessageDelay: discard.NewGauge(), + FullPrevoteMessageDelay: discard.NewGauge(), + ApplicationRejectedProposals: discard.NewCounter(), + TimedOutProposals: discard.NewCounter(), } } diff --git a/consensus/state.go b/consensus/state.go index 861319773b..9c6ed6ba63 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -315,6 +315,8 @@ func (cs *State) OnStart() error { } } + cs.metrics.StartHeight.Set(float64(cs.Height)) + // we need the timeoutRoutine for replay so // we don't block on the tick chan. // NOTE: we will get a build up of garbage go routines @@ -1281,6 +1283,7 @@ func (cs *State) defaultDoPrevote(height int64, round int32) { // If ProposalBlock is nil, prevote nil. if cs.ProposalBlock == nil { logger.Debug("prevote step: ProposalBlock is nil") + cs.metrics.TimedOutProposals.Add(1) cs.signAddVote(cmtproto.PrevoteType, nil, types.PartSetHeader{}) return } @@ -1297,12 +1300,14 @@ func (cs *State) defaultDoPrevote(height int64, round int32) { stateMachineValidBlock, err := cs.blockExec.ProcessProposal(cs.ProposalBlock, cs.state) if err != nil { cs.Logger.Error("state machine returned an error when trying to process proposal block", "err", err) + return } // Vote nil if application invalidated the block if !stateMachineValidBlock { // The app says we must vote nil logger.Error("prevote step: the application deems this block to be mustVoteNil", "err", err) + cs.metrics.ApplicationRejectedProposals.Add(1) cs.signAddVote(cmtproto.PrevoteType, nil, types.PartSetHeader{}) return } From edd9b9d8c38100ec0731ece4ac5f111e3a17ce32 Mon Sep 17 00:00:00 2001 From: Sanaz Taheri <35961250+staheri14@users.noreply.github.com> Date: Mon, 12 Feb 2024 16:24:04 -0800 Subject: [PATCH 10/11] test: messages surpassing MConnection's receiving buffer capacity are dropped (#1199) Addresses the first item of #1190 Closes #1162 --- p2p/conn/connection_bench_test.go | 96 +++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/p2p/conn/connection_bench_test.go b/p2p/conn/connection_bench_test.go index c8ab74bdfe..5fa6a62942 100644 --- a/p2p/conn/connection_bench_test.go +++ b/p2p/conn/connection_bench_test.go @@ -91,6 +91,8 @@ func sendMessages(mc *MConnection, case <-ticker.C: // generate message if mc.Send(chIDs[i], msgs[i]) { + log.TestingLogger().Info("Sent message ", i, " on channel ", + chIDs[i]) i++ if i >= total { log.TestingLogger().Info("Completed the message generation as the" + @@ -717,3 +719,97 @@ func TestMConnection_Message_Order_ChannelID(t *testing.T) { require.Equal(t, chIDs, recvChIds) // assert that the order of received messages is the same as the order of sent messages } + +func TestMConnection_Failing_Large_Messages(t *testing.T) { + // This test evaluates how MConnection handles messages exceeding channel + // ID's receive message capacity i.e., `RecvMessageCapacity`. + // It involves two connections, each with two channels: Channel ID 1 ( + // capacity 1024 bytes) and Channel ID 2 (capacity 1023 bytes). + // All the other channel ID's and MConnection's configurations are set high + // enough to not be a limiting factor. + // A 1KB message is sent over the first and second channels in succession. + // Message on Channel ID 1 (capacity equal to message size) is received, + // while the message on Channel ID 2 (capacity less than message size) is dropped. + + totalMsgs := 2 + msgSize := 1 * kibibyte + sendRate := 50 * kibibyte + recRate := 50 * kibibyte + chDesc := []*ChannelDescriptor{ + {ID: 0x01, Priority: 1, SendQueueCapacity: 50, + RecvMessageCapacity: msgSize, + RecvBufferCapacity: defaultRecvBufferCapacity}, + {ID: 0x02, Priority: 1, SendQueueCapacity: 50, + RecvMessageCapacity: msgSize - 1, + RecvBufferCapacity: defaultRecvBufferCapacity}, + } + + // prepare messages and channel IDs + // 1 message on channel ID 1 and 1 message on channel ID 2 + msgs := make([][]byte, totalMsgs) + chIDs := make([]byte, totalMsgs) + msgs[0] = bytes.Repeat([]byte{'x'}, msgSize) + chIDs[0] = 0x01 + msgs[1] = bytes.Repeat([]byte{'y'}, msgSize) + chIDs[1] = 0x02 + + // set up two networked connections + // server, client := NetPipe() // can alternatively use this and comment out the line below + server, client := tcpNetPipe() + defer server.Close() + defer client.Close() + + // prepare callback to receive messages + allReceived := make(chan bool) + recvChIds := make(chan byte, totalMsgs) + onReceive := func(chID byte, msgBytes []byte) { + recvChIds <- chID + if len(recvChIds) >= totalMsgs { + allReceived <- true + } + } + + cnfg := DefaultMConnConfig() + cnfg.SendRate = int64(sendRate) + cnfg.RecvRate = int64(recRate) + + // mount the channel descriptors to the connections + clientMconn := NewMConnectionWithConfig(client, chDesc, + func(chID byte, msgBytes []byte) {}, + func(r interface{}) {}, + cnfg) + serverMconn := NewMConnectionWithConfig(server, chDesc, + onReceive, + func(r interface{}) {}, + cnfg) + clientMconn.SetLogger(log.TestingLogger()) + serverMconn.SetLogger(log.TestingLogger()) + + err := clientMconn.Start() + require.Nil(t, err) + defer func() { + _ = clientMconn.Stop() + }() + err = serverMconn.Start() + require.Nil(t, err) + defer func() { + _ = serverMconn.Stop() + }() + + // start sending messages + go sendMessages(clientMconn, + time.Millisecond, + 1*time.Second, + msgs, chIDs) + + // wait for messages to be received + select { + case <-allReceived: + require.Fail(t, "All messages should not have been received") // the message sent + // on channel ID 2 should have been dropped + case <-time.After(500 * time.Millisecond): + require.Equal(t, 1, len(recvChIds)) + require.Equal(t, chIDs[0], <-recvChIds) // the first message should be received + require.True(t, !serverMconn.IsRunning()) // the serverMconn should have stopped due to the error + } +} From 48abbeeb57d16a92b905d76fa2bfa0dcdfac6cc6 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Fri, 16 Feb 2024 14:25:27 +0100 Subject: [PATCH 11/11] fix: use genesis file app version if it is set (#1227) ## Description Closes: #1226 --- consensus/replay.go | 18 ++++++++---------- state/state.go | 26 +++++++++++++++----------- state/state_test.go | 16 ++++++++++++++++ 3 files changed, 39 insertions(+), 21 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index c6bf2860bd..b78ac05463 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -252,15 +252,10 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) (string, error) { } appHash := res.LastBlockAppHash - h.logger.Info("ABCI Handshake App Info", - "height", blockHeight, - "hash", appHash, - "software-version", res.Version, - "protocol-version", res.AppVersion, - ) - - // Only set the version if there is no existing state. - if h.initialState.LastBlockHeight == 0 { + appVersion := h.initialState.Version.Consensus.App + // set app version if it's not set via genesis + if h.initialState.LastBlockHeight == 0 && appVersion == 0 && res.AppVersion != 0 { + appVersion = res.AppVersion h.initialState.Version.Consensus.App = res.AppVersion } @@ -271,7 +266,10 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) (string, error) { } h.logger.Info("Completed ABCI Handshake - CometBFT and App are synced", - "appHeight", blockHeight, "appHash", appHash) + "appHeight", blockHeight, + "appHash", appHash, + "appVersion", appVersion, + ) // TODO: (on restart) replay mempool diff --git a/state/state.go b/state/state.go index da6b73e46d..27c5bcc1d6 100644 --- a/state/state.go +++ b/state/state.go @@ -23,16 +23,15 @@ var ( //----------------------------------------------------------------------------- -// InitStateVersion sets the Consensus.Block and Software versions, -// but leaves the Consensus.App version blank. -// The Consensus.App version will be set during the Handshake, once -// we hear from the app what protocol version it is running. -var InitStateVersion = cmtstate.Version{ - Consensus: cmtversion.Consensus{ - Block: version.BlockProtocol, - App: 0, - }, - Software: version.TMCoreSemVer, +// InitStateVersion sets the Consensus.Block, Consensus.App and Software versions +func InitStateVersion(appVersion uint64) cmtstate.Version { + return cmtstate.Version{ + Consensus: cmtversion.Consensus{ + Block: version.BlockProtocol, + App: appVersion, + }, + Software: version.TMCoreSemVer, + } } //----------------------------------------------------------------------------- @@ -331,8 +330,13 @@ func MakeGenesisState(genDoc *types.GenesisDoc) (State, error) { nextValidatorSet = types.NewValidatorSet(validators).CopyIncrementProposerPriority(1) } + appVersion := uint64(0) + if genDoc.ConsensusParams != nil { + appVersion = genDoc.ConsensusParams.Version.App + } + return State{ - Version: InitStateVersion, + Version: InitStateVersion(appVersion), ChainID: genDoc.ChainID, InitialHeight: genDoc.InitialHeight, diff --git a/state/state_test.go b/state/state_test.go index f4c698b0e8..a6adf71c61 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -21,6 +21,7 @@ import ( cmtstate "github.com/cometbft/cometbft/proto/tendermint/state" sm "github.com/cometbft/cometbft/state" "github.com/cometbft/cometbft/types" + "github.com/cometbft/cometbft/version" ) // setupTestCase does setup common to all test cases. @@ -73,6 +74,21 @@ func TestMakeGenesisStateNilValidators(t *testing.T) { require.Equal(t, 0, len(state.NextValidators.Validators)) } +func TestMakeGenesisStateSetsAppVersion(t *testing.T) { + cp := types.DefaultConsensusParams() + appVersion := uint64(5) + cp.Version.App = appVersion + doc := types.GenesisDoc{ + ChainID: "dummy", + ConsensusParams: cp, + } + require.Nil(t, doc.ValidateAndComplete()) + state, err := sm.MakeGenesisState(&doc) + require.Nil(t, err) + require.Equal(t, appVersion, state.Version.Consensus.App) + require.Equal(t, version.BlockProtocol, state.Version.Consensus.Block) +} + // TestStateSaveLoad tests saving and loading State from a db. func TestStateSaveLoad(t *testing.T) { tearDown, stateDB, state := setupTestCase(t)