From 04c443f7532a289277b56e92e02edc2654d3e132 Mon Sep 17 00:00:00 2001 From: Brandon Kite Date: Mon, 11 Dec 2023 11:46:02 -0800 Subject: [PATCH] Show info about connected peers in the gql api (#1524) closes: #649 Show information about connected peers that may be useful for debugging purposes via the gql api. Note: in the future it may make sense to protect this API with a special authentication token. --------- Co-authored-by: xgreenx --- CHANGELOG.md | 1 + Cargo.lock | 1 + bin/e2e-test-client/src/test_context.rs | 1 + crates/client/assets/schema.sdl | 14 +++- crates/client/src/client/schema/chain.rs | 13 +++- ..._chain__tests__chain_gql_query_output.snap | 9 ++- crates/client/src/client/types/chain_info.rs | 37 ++++++++- crates/fuel-core/src/graphql_api/ports.rs | 6 ++ crates/fuel-core/src/graphql_api/service.rs | 4 + crates/fuel-core/src/schema/chain.rs | 61 ++++++++++++++- .../src/service/adapters/graphql_api.rs | 46 ++++++++++- crates/fuel-core/src/service/sub_services.rs | 3 +- .../p2p/src/peer_manager/heartbeat_data.rs | 8 +- crates/services/p2p/src/service.rs | 32 +++++++- crates/types/Cargo.toml | 1 + crates/types/src/services/p2p.rs | 48 +++++++++++- tests/tests/chain.rs | 77 +++++++++++++++++++ 17 files changed, 348 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 36fb7797072..07b6f8ea9da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Description of the upcoming release here. ### Added +- [#1524](https://github.com/FuelLabs/fuel-core/pull/1524): Adds information about connected peers to the GQL API. - [#1515](https://github.com/FuelLabs/fuel-core/pull/1515): Added support of `--version` command for `fuel-core-keygen` binary. - [#1504](https://github.com/FuelLabs/fuel-core/pull/1504): A `Success` or `Failure` variant of `TransactionStatus` returned by a query now contains the associated receipts generated by transaction execution. diff --git a/Cargo.lock b/Cargo.lock index 6c41cef1081..a5945090e5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3250,6 +3250,7 @@ name = "fuel-core-types" version = "0.21.0" dependencies = [ "anyhow", + "bs58 0.5.0", "derive_more", "fuel-vm", "secrecy", diff --git a/bin/e2e-test-client/src/test_context.rs b/bin/e2e-test-client/src/test_context.rs index d5dad40d237..c383c70264e 100644 --- a/bin/e2e-test-client/src/test_context.rs +++ b/bin/e2e-test-client/src/test_context.rs @@ -284,6 +284,7 @@ impl Wallet { .transfer_tx(destination, transfer_amount, asset_id) .await?; let tx_id = tx.id(&self.consensus_params.chain_id); + println!("submitting tx... {:?}", tx_id); let status = self.client.submit_and_await_commit(&tx).await?; // we know the transferred coin should be output 0 from above diff --git a/crates/client/assets/schema.sdl b/crates/client/assets/schema.sdl index e05091cc901..4735901cbfa 100644 --- a/crates/client/assets/schema.sdl +++ b/crates/client/assets/schema.sdl @@ -94,7 +94,7 @@ type ChainInfo { name: String! latestBlock: Block! daHeight: U64! - peerCount: Int! + peers: [PeerInfo!]! consensusParameters: ConsensusParameters! gasCosts: GasCosts! } @@ -650,6 +650,18 @@ type PageInfo { endCursor: String } +type PeerInfo { + id: String! + addresses: [String!]! + clientVersion: String + blockHeight: U32 + """ + The last heartbeat from this peer in unix epoch time ms + """ + lastHeartbeatMs: U64! + appScore: Float! +} + type PoAConsensus { """ Gets the signature of the block produced by `PoA` consensus. diff --git a/crates/client/src/client/schema/chain.rs b/crates/client/src/client/schema/chain.rs index edba6366522..12ca0b7415f 100644 --- a/crates/client/src/client/schema/chain.rs +++ b/crates/client/src/client/schema/chain.rs @@ -322,11 +322,22 @@ pub struct ChainQuery { pub struct ChainInfo { pub da_height: U64, pub name: String, - pub peer_count: i32, + pub peers: Vec, pub latest_block: Block, pub consensus_parameters: ConsensusParameters, } +#[derive(cynic::QueryFragment, Debug)] +#[cynic(schema_path = "./assets/schema.sdl", graphql_type = "PeerInfo")] +pub struct PeerInfo { + pub id: String, + pub addresses: Vec, + pub client_version: Option, + pub block_height: Option, + pub last_heartbeat_ms: U64, + pub app_score: f64, +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/client/src/client/schema/snapshots/fuel_core_client__client__schema__chain__tests__chain_gql_query_output.snap b/crates/client/src/client/schema/snapshots/fuel_core_client__client__schema__chain__tests__chain_gql_query_output.snap index f3ae948d31e..df06e076049 100644 --- a/crates/client/src/client/schema/snapshots/fuel_core_client__client__schema__chain__tests__chain_gql_query_output.snap +++ b/crates/client/src/client/schema/snapshots/fuel_core_client__client__schema__chain__tests__chain_gql_query_output.snap @@ -6,7 +6,14 @@ query { chain { daHeight name - peerCount + peers { + id + addresses + clientVersion + blockHeight + lastHeartbeatMs + appScore + } latestBlock { id header { diff --git a/crates/client/src/client/types/chain_info.rs b/crates/client/src/client/types/chain_info.rs index 034bbdc5c60..81ab508b795 100644 --- a/crates/client/src/client/types/chain_info.rs +++ b/crates/client/src/client/types/chain_info.rs @@ -2,12 +2,26 @@ use crate::client::{ schema, types::Block, }; -use fuel_core_types::fuel_tx::ConsensusParameters; +use fuel_core_types::{ + fuel_tx::ConsensusParameters, + services::p2p::{ + HeartbeatData, + PeerId, + PeerInfo, + }, +}; +use std::{ + str::FromStr, + time::{ + Duration, + UNIX_EPOCH, + }, +}; pub struct ChainInfo { pub da_height: u64, pub name: String, - pub peer_count: i32, + pub peers: Vec, pub latest_block: Block, pub consensus_parameters: ConsensusParameters, } @@ -19,9 +33,26 @@ impl From for ChainInfo { Self { da_height: value.da_height.into(), name: value.name, - peer_count: value.peer_count, + peers: value.peers.into_iter().map(|info| info.into()).collect(), latest_block: value.latest_block.into(), consensus_parameters: value.consensus_parameters.into(), } } } + +impl From for PeerInfo { + fn from(info: schema::chain::PeerInfo) -> Self { + Self { + id: PeerId::from_str(info.id.as_str()).unwrap_or_default(), + peer_addresses: info.addresses.into_iter().collect(), + client_version: info.client_version, + heartbeat_data: HeartbeatData { + block_height: info.block_height.map(|h| h.0.into()), + last_heartbeat: UNIX_EPOCH + .checked_add(Duration::from_millis(info.last_heartbeat_ms.0)) + .unwrap_or(UNIX_EPOCH), + }, + app_score: info.app_score, + } + } +} diff --git a/crates/fuel-core/src/graphql_api/ports.rs b/crates/fuel-core/src/graphql_api/ports.rs index 8edfcb42f35..b897acb2489 100644 --- a/crates/fuel-core/src/graphql_api/ports.rs +++ b/crates/fuel-core/src/graphql_api/ports.rs @@ -47,6 +47,7 @@ use fuel_core_types::{ }, services::{ graphql_api::ContractBalance, + p2p::PeerInfo, txpool::{ InsertionResult, TransactionStatus, @@ -203,3 +204,8 @@ pub trait DatabaseMessageProof: Send + Sync { commit_block_height: &BlockHeight, ) -> StorageResult; } + +#[async_trait::async_trait] +pub trait P2pPort: Send + Sync { + async fn all_peer_info(&self) -> anyhow::Result>; +} diff --git a/crates/fuel-core/src/graphql_api/service.rs b/crates/fuel-core/src/graphql_api/service.rs index 2ecb2c1457f..6c6879ae308 100644 --- a/crates/fuel-core/src/graphql_api/service.rs +++ b/crates/fuel-core/src/graphql_api/service.rs @@ -3,6 +3,7 @@ use crate::{ BlockProducerPort, ConsensusModulePort, DatabasePort, + P2pPort, TxPoolPort, }, graphql_api::{ @@ -81,6 +82,7 @@ pub type BlockProducer = Box; // use only `Database` to receive all information about transactions. pub type TxPool = Box; pub type ConsensusModule = Box; +pub type P2pService = Box; #[derive(Clone)] pub struct SharedState { @@ -165,6 +167,7 @@ pub fn new_service( txpool: TxPool, producer: BlockProducer, consensus_module: ConsensusModule, + p2p_service: P2pService, log_threshold_ms: Duration, request_timeout: Duration, ) -> anyhow::Result { @@ -176,6 +179,7 @@ pub fn new_service( .data(txpool) .data(producer) .data(consensus_module) + .data(p2p_service) .extension(async_graphql::extensions::Tracing) .extension(MetricsExtension::new(log_threshold_ms)) .finish(); diff --git a/crates/fuel-core/src/schema/chain.rs b/crates/fuel-core/src/schema/chain.rs index f75461e3ad9..82198a62d92 100644 --- a/crates/fuel-core/src/schema/chain.rs +++ b/crates/fuel-core/src/schema/chain.rs @@ -23,6 +23,7 @@ use async_graphql::{ Union, }; use fuel_core_types::fuel_tx; +use std::time::UNIX_EPOCH; pub struct ChainInfo; pub struct ConsensusParameters(fuel_tx::ConsensusParameters); @@ -658,6 +659,49 @@ impl GasCosts { } } +struct PeerInfo(fuel_core_types::services::p2p::PeerInfo); + +#[Object] +impl PeerInfo { + /// The libp2p peer id + async fn id(&self) -> String { + self.0.id.to_string() + } + + /// The advertised multi-addrs that can be used to connect to this peer + async fn addresses(&self) -> Vec { + self.0.peer_addresses.iter().cloned().collect() + } + + /// The self-reported version of the client the peer is using + async fn client_version(&self) -> Option { + self.0.client_version.clone() + } + + /// The last reported height of the peer + async fn block_height(&self) -> Option { + self.0 + .heartbeat_data + .block_height + .map(|height| (*height).into()) + } + + /// The last heartbeat from this peer in unix epoch time ms + async fn last_heartbeat_ms(&self) -> U64 { + let time = self.0.heartbeat_data.last_heartbeat; + let time = time + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + U64(time.try_into().unwrap_or_default()) + } + + /// The internal fuel p2p reputation of this peer + async fn app_score(&self) -> f64 { + self.0.app_score + } +} + #[Object] impl LightOperation { async fn base(&self) -> U64 { @@ -704,8 +748,21 @@ impl ChainInfo { height.0.into() } - async fn peer_count(&self) -> u16 { - 0 + async fn peers(&self, _ctx: &Context<'_>) -> anyhow::Result> { + #[cfg(feature = "p2p")] + { + let p2p: &crate::fuel_core_graphql_api::service::P2pService = + _ctx.data_unchecked(); + let peer_info = p2p.all_peer_info().await?; + let peers = peer_info.into_iter().map(PeerInfo).collect(); + Ok(peers) + } + #[cfg(not(feature = "p2p"))] + { + Err(anyhow::anyhow!( + "Peering is disabled in this build, try using the `p2p` feature flag." + )) + } } async fn consensus_parameters( diff --git a/crates/fuel-core/src/service/adapters/graphql_api.rs b/crates/fuel-core/src/service/adapters/graphql_api.rs index e55caf2c8c1..39ec466be1b 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api.rs @@ -1,3 +1,4 @@ +use super::BlockProducerAdapter; use crate::{ database::{ transactions::OwnedTransactionIndexCursor, @@ -14,9 +15,13 @@ use crate::{ DatabasePort, DatabaseTransactions, DryRunExecution, + P2pPort, TxPoolPort, }, - service::adapters::TxPoolAdapter, + service::adapters::{ + P2PAdapter, + TxPoolAdapter, + }, }; use async_trait::async_trait; use fuel_core_services::stream::BoxStream; @@ -60,6 +65,7 @@ use fuel_core_types::{ }, services::{ graphql_api::ContractBalance, + p2p::PeerInfo, txpool::{ InsertionResult, TransactionStatus, @@ -264,4 +270,40 @@ impl DryRunExecution for BlockProducerAdapter { impl BlockProducerPort for BlockProducerAdapter {} -use super::BlockProducerAdapter; +#[async_trait::async_trait] +impl P2pPort for P2PAdapter { + async fn all_peer_info(&self) -> anyhow::Result> { + #[cfg(feature = "p2p")] + { + use fuel_core_types::services::p2p::HeartbeatData; + if let Some(service) = &self.service { + let peers = service.get_all_peers().await?; + Ok(peers + .into_iter() + .map(|(peer_id, peer_info)| PeerInfo { + id: fuel_core_types::services::p2p::PeerId::from( + peer_id.to_bytes(), + ), + peer_addresses: peer_info + .peer_addresses + .iter() + .map(|addr| addr.to_string()) + .collect(), + client_version: None, + heartbeat_data: HeartbeatData { + block_height: peer_info.heartbeat_data.block_height, + last_heartbeat: peer_info.heartbeat_data.last_heartbeat_sys, + }, + app_score: peer_info.score, + }) + .collect()) + } else { + Ok(vec![]) + } + } + #[cfg(not(feature = "p2p"))] + { + Ok(vec![]) + } + } +} diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index b68fae2a753..36abbf6c54b 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -175,7 +175,7 @@ pub fn init_sub_services( #[cfg(feature = "p2p")] let sync = fuel_core_sync::service::new_service( *last_block.header().height(), - p2p_adapter, + p2p_adapter.clone(), importer_adapter.clone(), verifier, config.sync, @@ -206,6 +206,7 @@ pub fn init_sub_services( Box::new(tx_pool_adapter), Box::new(producer_adapter), Box::new(poa_adapter), + Box::new(p2p_adapter), config.query_log_threshold_time, config.api_request_timeout, )?; diff --git a/crates/services/p2p/src/peer_manager/heartbeat_data.rs b/crates/services/p2p/src/peer_manager/heartbeat_data.rs index af3043549aa..c0edf86c684 100644 --- a/crates/services/p2p/src/peer_manager/heartbeat_data.rs +++ b/crates/services/p2p/src/peer_manager/heartbeat_data.rs @@ -1,7 +1,10 @@ use fuel_core_types::fuel_types::BlockHeight; use std::{ collections::VecDeque, - time::Duration, + time::{ + Duration, + SystemTime, + }, }; use tokio::time::Instant; @@ -9,6 +12,7 @@ use tokio::time::Instant; pub struct HeartbeatData { pub block_height: Option, pub last_heartbeat: Instant, + pub last_heartbeat_sys: SystemTime, // Size of moving average window pub window: u32, pub durations: VecDeque, @@ -19,6 +23,7 @@ impl HeartbeatData { Self { block_height: None, last_heartbeat: Instant::now(), + last_heartbeat_sys: SystemTime::now(), window, durations: VecDeque::with_capacity(window as usize), } @@ -53,6 +58,7 @@ impl HeartbeatData { self.block_height = Some(block_height); let old_hearbeat = self.last_heartbeat; self.last_heartbeat = Instant::now(); + self.last_heartbeat_sys = SystemTime::now(); let new_duration = self.last_heartbeat.saturating_duration_since(old_hearbeat); self.add_new_duration(new_duration); } diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 8ab4659c92e..bff726fff8b 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -92,6 +92,10 @@ enum TaskRequest { BroadcastTransaction(Arc), // Request to get one-off data from p2p network GetPeerIds(oneshot::Sender>), + // Request to get information about all connected peers + GetAllPeerInfo { + channel: oneshot::Sender>, + }, GetBlock { height: BlockHeight, channel: oneshot::Sender>, @@ -138,6 +142,9 @@ impl Debug for TaskRequest { TaskRequest::RespondWithPeerReport { .. } => { write!(f, "TaskRequest::RespondWithPeerReport") } + TaskRequest::GetAllPeerInfo { .. } => { + write!(f, "TaskRequest::GetPeerInfo") + } } } } @@ -515,6 +522,13 @@ where Some(TaskRequest::RespondWithPeerReport { peer_id, score, reporting_service }) => { let _ = self.p2p_service.report_peer(peer_id, score, reporting_service); } + Some(TaskRequest::GetAllPeerInfo { channel }) => { + let peers = self.p2p_service.get_all_peer_info() + .into_iter() + .map(|(id, info)| (*id, info.clone())) + .collect::>(); + let _ = channel.send(peers); + } None => { unreachable!("The `Task` is holder of the `Sender`, so it should not be possible"); } @@ -740,6 +754,16 @@ impl SharedState { receiver.await.map_err(|e| anyhow!("{}", e)) } + pub async fn get_all_peers(&self) -> anyhow::Result> { + let (sender, receiver) = oneshot::channel(); + + self.request_sender + .send(TaskRequest::GetAllPeerInfo { channel: sender }) + .await?; + + receiver.await.map_err(|e| anyhow!("{}", e)) + } + pub fn subscribe_tx(&self) -> broadcast::Receiver { self.tx_broadcast.subscribe() } @@ -845,7 +869,10 @@ pub mod tests { use fuel_core_storage::Result as StorageResult; use fuel_core_types::fuel_types::BlockHeight; use futures::FutureExt; - use std::collections::VecDeque; + use std::{ + collections::VecDeque, + time::SystemTime, + }; #[derive(Clone, Debug)] struct FakeDb; @@ -1049,6 +1076,7 @@ pub mod tests { let heartbeat_data = HeartbeatData { block_height: None, last_heartbeat: Instant::now(), + last_heartbeat_sys: SystemTime::now(), window: 0, durations, }; @@ -1122,12 +1150,14 @@ pub mod tests { // under the limit let last_duration = Duration::from_secs(5); let last_heartbeat = Instant::now() - Duration::from_secs(50); + let last_heartbeat_sys = SystemTime::now() - Duration::from_secs(50); let mut durations = VecDeque::new(); durations.push_front(last_duration); let heartbeat_data = HeartbeatData { block_height: None, last_heartbeat, + last_heartbeat_sys, window: 0, durations, }; diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index d3cde9bb4cf..0ad99859428 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -18,6 +18,7 @@ version = { workspace = true } [dependencies] anyhow = { workspace = true } +bs58 = "0.5" derive_more = { version = "0.99" } fuel-vm-private = { workspace = true, default-features = false, features = ["alloc"] } secrecy = "0.8" diff --git a/crates/types/src/services/p2p.rs b/crates/types/src/services/p2p.rs index 758ca83df2d..6907ba8e0eb 100644 --- a/crates/types/src/services/p2p.rs +++ b/crates/types/src/services/p2p.rs @@ -4,7 +4,16 @@ use crate::{ fuel_tx::Transaction, fuel_types::BlockHeight, }; -use std::fmt::Debug; +use std::{ + collections::HashSet, + fmt::{ + Debug, + Display, + Formatter, + }, + str::FromStr, + time::SystemTime, +}; /// Contains types and logic for Peer Reputation pub mod peer_reputation; @@ -134,6 +143,21 @@ impl From for Vec { } } +impl Display for PeerId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(&bs58::encode(&self.0).into_string()) + } +} + +impl FromStr for PeerId { + type Err = String; + + fn from_str(s: &str) -> Result { + let bytes = bs58::decode(s).into_vec().map_err(|e| e.to_string())?; + Ok(Self(bytes)) + } +} + impl PeerId { /// Bind the PeerId and given data of type T together to generate a /// SourcePeer @@ -144,3 +168,25 @@ impl PeerId { } } } + +/// Contains metadata about a connected peer +pub struct PeerInfo { + /// The libp2p peer id + pub id: PeerId, + /// all known multi-addresses of the peer + pub peer_addresses: HashSet, + /// the version of fuel-core reported by the peer + pub client_version: Option, + /// recent heartbeat from the peer + pub heartbeat_data: HeartbeatData, + /// the current application reputation score of the peer + pub app_score: f64, +} + +/// Contains information from the most recent heartbeat received by the peer +pub struct HeartbeatData { + /// The currently reported block height of the peer + pub block_height: Option, + /// The instant representing when the latest heartbeat was received. + pub last_heartbeat: SystemTime, +} diff --git a/tests/tests/chain.rs b/tests/tests/chain.rs index a068b27338e..7fa52eee66f 100644 --- a/tests/tests/chain.rs +++ b/tests/tests/chain.rs @@ -24,3 +24,80 @@ async fn chain_info() { chain_info.consensus_parameters.gas_costs ); } + +#[cfg(feature = "p2p")] +#[tokio::test(flavor = "multi_thread")] +async fn test_peer_info() { + use fuel_core::p2p_test_helpers::{ + make_nodes, + BootstrapSetup, + Nodes, + ProducerSetup, + ValidatorSetup, + }; + use fuel_core_types::{ + fuel_tx::Input, + fuel_vm::SecretKey, + }; + use rand::{ + rngs::StdRng, + SeedableRng, + }; + use std::time::Duration; + + let mut rng = StdRng::seed_from_u64(line!() as u64); + + // Create a producer and a validator that share the same key pair. + let secret = SecretKey::random(&mut rng); + let pub_key = Input::owner(&secret.public_key()); + let Nodes { + mut producers, + mut validators, + bootstrap_nodes: _dont_drop, + } = make_nodes( + [Some(BootstrapSetup::new(pub_key))], + [Some( + ProducerSetup::new(secret).with_txs(1).with_name("Alice"), + )], + [Some(ValidatorSetup::new(pub_key).with_name("Bob"))], + None, + ) + .await; + + let producer = producers.pop().unwrap(); + let mut validator = validators.pop().unwrap(); + + // Insert the transactions into the tx pool and await them, + // to ensure we have a live p2p connection. + let expected = producer.insert_txs().await; + + // Wait up to 10 seconds for the validator to sync with the producer. + // This indicates we have a successful P2P connection. + validator.consistency_10s(&expected).await; + + let validator_peer_id = validator + .node + .shared + .config + .p2p + .unwrap() + .keypair + .public() + .to_peer_id(); + + // TODO: this needs to fetch peers from the GQL API, not the service directly. + // This is just a mock of what we should be able to do with GQL API. + let client = producer.node.bound_address; + let client = FuelClient::from(client); + let peers = client.chain_info().await.unwrap().peers; + assert_eq!(peers.len(), 2); + let info = peers + .iter() + .find(|info| info.id.to_string() == validator_peer_id.to_base58()) + .expect("Should be connected to validator"); + + let time_since_heartbeat = std::time::SystemTime::now() + .duration_since(info.heartbeat_data.last_heartbeat) + .unwrap(); + assert!(time_since_heartbeat < Duration::from_secs(10)); +}