From a09ec64d149b400de16f144ca02f0fa958d2bb13 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Tue, 4 Jun 2024 13:05:34 +0300 Subject: [PATCH] Forward put_record requests to authorithy-discovery (#4683) Signed-off-by: Alexandru Gheorghe --- .../client/authority-discovery/src/error.rs | 6 + .../client/authority-discovery/src/worker.rs | 163 ++++++++++++++---- .../authority-discovery/src/worker/tests.rs | 128 ++++++++++++++ substrate/client/network/src/behaviour.rs | 37 +++- substrate/client/network/src/discovery.rs | 52 +++++- substrate/client/network/src/event.rs | 3 + .../client/network/src/litep2p/service.rs | 12 ++ substrate/client/network/src/service.rs | 57 ++++-- .../client/network/src/service/traits.rs | 28 ++- 9 files changed, 425 insertions(+), 61 deletions(-) diff --git a/substrate/client/authority-discovery/src/error.rs b/substrate/client/authority-discovery/src/error.rs index d2c567d77afc..3f395e47922e 100644 --- a/substrate/client/authority-discovery/src/error.rs +++ b/substrate/client/authority-discovery/src/error.rs @@ -75,4 +75,10 @@ pub enum Error { #[error("Unable to fetch best block.")] BestBlockFetchingError, + + #[error("Publisher not present.")] + MissingPublisher, + + #[error("Unknown authority.")] + UnknownAuthority, } diff --git a/substrate/client/authority-discovery/src/worker.rs b/substrate/client/authority-discovery/src/worker.rs index d89083100aa3..f20cf6aa2121 100644 --- a/substrate/client/authority-discovery/src/worker.rs +++ b/substrate/client/authority-discovery/src/worker.rs @@ -49,6 +49,7 @@ use sc_network_types::{ multihash::{Code, Multihash}, PeerId, }; +use schema::PeerSignature; use sp_api::{ApiError, ProvideRuntimeApi}; use sp_authority_discovery::{ AuthorityDiscoveryApi, AuthorityId, AuthorityPair, AuthoritySignature, @@ -111,7 +112,7 @@ pub enum Role { /// network peerset. /// /// 5. Allow querying of the collected addresses via the [`crate::Service`]. -pub struct Worker { +pub struct Worker { /// Channel receiver for messages send by a [`crate::Service`]. from_service: Fuse>, @@ -152,6 +153,12 @@ pub struct Worker { /// Queue of throttled lookups pending to be passed to the network. pending_lookups: Vec, + /// The list of all known authorities. + known_authorities: HashMap, + + /// The last time we requested the list of authorities. + authorities_queried_at: Option, + /// Set of in-flight lookups. in_flight_lookups: HashMap, @@ -268,6 +275,8 @@ where network, dht_event_rx, publish_interval, + known_authorities: Default::default(), + authorities_queried_at: None, publish_if_changed_interval, latest_published_keys: HashSet::new(), latest_published_kad_keys: HashSet::new(), @@ -482,6 +491,13 @@ where .filter(|id| !local_keys.contains(id.as_ref())) .collect::>(); + self.known_authorities = authorities + .clone() + .into_iter() + .map(|authority| (hash_authority_id(authority.as_ref()), authority)) + .collect::>(); + self.authorities_queried_at = Some(best_hash); + self.addr_cache.retain_ids(&authorities); authorities.shuffle(&mut thread_rng()); @@ -581,7 +597,112 @@ where debug!(target: LOG_TARGET, "Failed to put hash '{:?}' on Dht.", hash) }, + DhtEvent::PutRecordRequest(record_key, record_value, publisher, expires) => { + if let Err(e) = self + .handle_put_record_requested(record_key, record_value, publisher, expires) + .await + { + debug!(target: LOG_TARGET, "Failed to handle put record request: {}", e) + } + + if let Some(metrics) = &self.metrics { + metrics.dht_event_received.with_label_values(&["put_record_req"]).inc(); + } + }, + } + } + + async fn handle_put_record_requested( + &mut self, + record_key: KademliaKey, + record_value: Vec, + publisher: Option, + expires: Option, + ) -> Result<()> { + let publisher = publisher.ok_or(Error::MissingPublisher)?; + + // Make sure we don't ever work with an outdated set of authorities + // and that we do not update known_authorithies too often. + let best_hash = self.client.best_hash().await?; + if !self.known_authorities.contains_key(&record_key) && + self.authorities_queried_at + .map(|authorities_queried_at| authorities_queried_at != best_hash) + .unwrap_or(true) + { + let authorities = self + .client + .authorities(best_hash) + .await + .map_err(|e| Error::CallingRuntime(e.into()))? + .into_iter() + .collect::>(); + + self.known_authorities = authorities + .into_iter() + .map(|authority| (hash_authority_id(authority.as_ref()), authority)) + .collect::>(); + + self.authorities_queried_at = Some(best_hash); + } + + let authority_id = + self.known_authorities.get(&record_key).ok_or(Error::UnknownAuthority)?; + let signed_record = + Self::check_record_signed_with_authority_id(record_value.as_slice(), authority_id)?; + self.check_record_signed_with_network_key( + &signed_record.record, + signed_record.peer_signature, + publisher, + authority_id, + )?; + self.network.store_record(record_key, record_value, Some(publisher), expires); + Ok(()) + } + + fn check_record_signed_with_authority_id( + record: &[u8], + authority_id: &AuthorityId, + ) -> Result { + let signed_record: schema::SignedAuthorityRecord = + schema::SignedAuthorityRecord::decode(record).map_err(Error::DecodingProto)?; + + let auth_signature = AuthoritySignature::decode(&mut &signed_record.auth_signature[..]) + .map_err(Error::EncodingDecodingScale)?; + + if !AuthorityPair::verify(&auth_signature, &signed_record.record, &authority_id) { + return Err(Error::VerifyingDhtPayload) } + + Ok(signed_record) + } + + fn check_record_signed_with_network_key( + &self, + record: &Vec, + peer_signature: Option, + remote_peer_id: PeerId, + authority_id: &AuthorityId, + ) -> Result<()> { + if let Some(peer_signature) = peer_signature { + match self.network.verify( + remote_peer_id.into(), + &peer_signature.public_key, + &peer_signature.signature, + record, + ) { + Ok(true) => {}, + Ok(false) => return Err(Error::VerifyingDhtPayload), + Err(error) => return Err(Error::ParsingLibp2pIdentity(error)), + } + } else if self.strict_record_validation { + return Err(Error::MissingPeerIdSignature) + } else { + debug!( + target: LOG_TARGET, + "Received unsigned authority discovery record from {}", authority_id + ); + } + Ok(()) } fn handle_dht_value_found_event(&mut self, values: Vec<(KademliaKey, Vec)>) -> Result<()> { @@ -600,16 +721,8 @@ where let remote_addresses: Vec = values .into_iter() .map(|(_k, v)| { - let schema::SignedAuthorityRecord { record, auth_signature, peer_signature } = - schema::SignedAuthorityRecord::decode(v.as_slice()) - .map_err(Error::DecodingProto)?; - - let auth_signature = AuthoritySignature::decode(&mut &auth_signature[..]) - .map_err(Error::EncodingDecodingScale)?; - - if !AuthorityPair::verify(&auth_signature, &record, &authority_id) { - return Err(Error::VerifyingDhtPayload) - } + let schema::SignedAuthorityRecord { record, peer_signature, .. } = + Self::check_record_signed_with_authority_id(&v, &authority_id)?; let addresses: Vec = schema::AuthorityRecord::decode(record.as_slice()) .map(|a| a.addresses) @@ -638,26 +751,12 @@ where // At this point we know all the valid multiaddresses from the record, know that // each of them belong to the same PeerId, we just need to check if the record is // properly signed by the owner of the PeerId - - if let Some(peer_signature) = peer_signature { - match self.network.verify( - remote_peer_id.into(), - &peer_signature.public_key, - &peer_signature.signature, - &record, - ) { - Ok(true) => {}, - Ok(false) => return Err(Error::VerifyingDhtPayload), - Err(error) => return Err(Error::ParsingLibp2pIdentity(error)), - } - } else if self.strict_record_validation { - return Err(Error::MissingPeerIdSignature) - } else { - debug!( - target: LOG_TARGET, - "Received unsigned authority discovery record from {}", authority_id - ); - } + self.check_record_signed_with_network_key( + &record, + peer_signature, + remote_peer_id, + &authority_id, + )?; Ok(addresses) }) .collect::>>>()? @@ -870,7 +969,7 @@ impl Metrics { // Helper functions for unit testing. #[cfg(test)] -impl Worker { +impl Worker { pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec) { self.addr_cache.insert(authority, addresses); } diff --git a/substrate/client/authority-discovery/src/worker/tests.rs b/substrate/client/authority-discovery/src/worker/tests.rs index 70107c89a851..de7443d634fa 100644 --- a/substrate/client/authority-discovery/src/worker/tests.rs +++ b/substrate/client/authority-discovery/src/worker/tests.rs @@ -20,6 +20,7 @@ use std::{ collections::HashSet, sync::{Arc, Mutex}, task::Poll, + time::Instant, }; use futures::{ @@ -118,6 +119,7 @@ sp_api::mock_impl_runtime_apis! { pub enum TestNetworkEvent { GetCalled(KademliaKey), PutCalled(KademliaKey, Vec), + StoreRecordCalled(KademliaKey, Vec, Option, Option), } pub struct TestNetwork { @@ -128,6 +130,9 @@ pub struct TestNetwork { // vectors below. pub put_value_call: Arc)>>>, pub get_value_call: Arc>>, + pub store_value_call: + Arc, Option, Option)>>>, + event_sender: mpsc::UnboundedSender, event_receiver: Option>, } @@ -148,6 +153,7 @@ impl Default for TestNetwork { external_addresses: vec!["/ip6/2001:db8::/tcp/30333".parse().unwrap()], put_value_call: Default::default(), get_value_call: Default::default(), + store_value_call: Default::default(), event_sender: tx, event_receiver: Some(rx), } @@ -193,6 +199,25 @@ impl NetworkDHTProvider for TestNetwork { .unbounded_send(TestNetworkEvent::GetCalled(key.clone())) .unwrap(); } + + fn store_record( + &self, + key: KademliaKey, + value: Vec, + publisher: Option, + expires: Option, + ) { + self.store_value_call.lock().unwrap().push(( + key.clone(), + value.clone(), + publisher, + expires, + )); + self.event_sender + .clone() + .unbounded_send(TestNetworkEvent::StoreRecordCalled(key, value, publisher, expires)) + .unwrap(); + } } impl NetworkStateInfo for TestNetwork { @@ -871,3 +896,106 @@ fn lookup_throttling() { .boxed_local(), ); } + +#[test] +fn test_handle_put_record_request() { + let network = TestNetwork::default(); + let peer_id = network.peer_id; + + let remote_multiaddr = { + let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap(); + + address.with(multiaddr::Protocol::P2p(peer_id.into())) + }; + let remote_key_store = MemoryKeystore::new(); + let remote_public_keys: Vec = (0..20) + .map(|_| { + remote_key_store + .sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None) + .unwrap() + .into() + }) + .collect(); + + let remote_non_authorithy_keys: Vec = (0..20) + .map(|_| { + remote_key_store + .sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None) + .unwrap() + .into() + }) + .collect(); + + let (_dht_event_tx, dht_event_rx) = channel(1); + let (_to_worker, from_service) = mpsc::channel(0); + let network = Arc::new(network); + let mut worker = Worker::new( + from_service, + Arc::new(TestApi { authorities: remote_public_keys.clone() }), + network.clone(), + dht_event_rx.boxed(), + Role::Discover, + Some(default_registry().clone()), + Default::default(), + ); + + let mut pool = LocalPool::new(); + + let valid_authorithy_key = remote_public_keys.first().unwrap().clone(); + + let kv_pairs = build_dht_event( + vec![remote_multiaddr], + valid_authorithy_key.into(), + &remote_key_store, + Some(&TestSigner { keypair: &network.identity }), + ); + + pool.run_until( + async { + // Invalid format should return an error. + for authority in remote_public_keys.iter() { + let key = hash_authority_id(authority.as_ref()); + assert!(matches!( + worker.handle_put_record_requested(key, vec![0x0], Some(peer_id), None).await, + Err(Error::DecodingProto(_)) + )); + } + let prev_requested_authorithies = worker.authorities_queried_at; + + // Unknown authority should return an error. + for authority in remote_non_authorithy_keys.iter() { + let key = hash_authority_id(authority.as_ref()); + assert!(matches!( + worker.handle_put_record_requested(key, vec![0x0], Some(peer_id), None).await, + Err(Error::UnknownAuthority) + )); + assert!(prev_requested_authorithies == worker.authorities_queried_at); + } + assert_eq!(network.store_value_call.lock().unwrap().len(), 0); + + // Valid authority should return Ok. + for (key, value) in kv_pairs.clone() { + assert!(worker + .handle_put_record_requested(key, value, Some(peer_id), None) + .await + .is_ok()); + } + assert_eq!(network.store_value_call.lock().unwrap().len(), 1); + + let another_authorithy_id = remote_public_keys.get(3).unwrap().clone(); + let key = hash_authority_id(another_authorithy_id.as_ref()); + + // Valid record signed with a different key should return error. + for (_, value) in kv_pairs { + assert!(matches!( + worker + .handle_put_record_requested(key.clone(), value, Some(peer_id), None) + .await, + Err(Error::VerifyingDhtPayload) + )); + } + assert_eq!(network.store_value_call.lock().unwrap().len(), 1); + } + .boxed_local(), + ); +} diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index 833ff5d09e5e..68deac0f47bc 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -37,7 +37,11 @@ use libp2p::{ use parking_lot::Mutex; use sp_runtime::traits::Block as BlockT; -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{ + collections::HashSet, + sync::Arc, + time::{Duration, Instant}, +}; pub use crate::request_responses::{InboundFailure, OutboundFailure, ResponseFailure}; @@ -157,9 +161,10 @@ pub enum BehaviourOut { /// We have learned about the existence of a node on the default set. Discovered(PeerId), - /// Events generated by a DHT as a response to get_value or put_value requests as well as the - /// request duration. - Dht(DhtEvent, Duration), + /// Events generated by a DHT as a response to get_value or put_value requests with the + /// request duration. Or events generated by the DHT as a consequnce of receiving a record + /// to store from peers. + Dht(DhtEvent, Option), /// Ignored event generated by lower layers. None, @@ -279,6 +284,17 @@ impl Behaviour { pub fn put_value(&mut self, key: RecordKey, value: Vec) { self.discovery.put_value(key, value); } + + /// Stores value in DHT + pub fn store_record( + &mut self, + record_key: RecordKey, + record_value: Vec, + publisher: Option, + expires: Option, + ) { + self.discovery.store_record(record_key, record_value, publisher, expires); + } } impl From for BehaviourOut { @@ -344,13 +360,18 @@ impl From for BehaviourOut { }, DiscoveryOut::Discovered(peer_id) => BehaviourOut::Discovered(peer_id), DiscoveryOut::ValueFound(results, duration) => - BehaviourOut::Dht(DhtEvent::ValueFound(results), duration), + BehaviourOut::Dht(DhtEvent::ValueFound(results), Some(duration)), DiscoveryOut::ValueNotFound(key, duration) => - BehaviourOut::Dht(DhtEvent::ValueNotFound(key), duration), + BehaviourOut::Dht(DhtEvent::ValueNotFound(key), Some(duration)), DiscoveryOut::ValuePut(key, duration) => - BehaviourOut::Dht(DhtEvent::ValuePut(key), duration), + BehaviourOut::Dht(DhtEvent::ValuePut(key), Some(duration)), + DiscoveryOut::PutRecordRequest(record_key, record_value, publisher, expires) => + BehaviourOut::Dht( + DhtEvent::PutRecordRequest(record_key, record_value, publisher, expires), + None, + ), DiscoveryOut::ValuePutFailed(key, duration) => - BehaviourOut::Dht(DhtEvent::ValuePutFailed(key), duration), + BehaviourOut::Dht(DhtEvent::ValuePutFailed(key), Some(duration)), DiscoveryOut::RandomKademliaStarted => BehaviourOut::RandomKademliaStarted, } } diff --git a/substrate/client/network/src/discovery.rs b/substrate/client/network/src/discovery.rs index 7d4481b0d06f..2c788ec713f3 100644 --- a/substrate/client/network/src/discovery.rs +++ b/substrate/client/network/src/discovery.rs @@ -80,7 +80,7 @@ use std::{ collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, num::NonZeroUsize, task::{Context, Poll}, - time::Duration, + time::{Duration, Instant}, }; /// Maximum number of known external addresses that we will cache. @@ -222,6 +222,9 @@ impl DiscoveryConfig { // https://github.com/paritytech/polkadot-sdk/issues/504 let kademlia_protocols = [kademlia_protocol.clone(), kademlia_legacy_protocol]; config.set_protocol_names(kademlia_protocols.into_iter().map(Into::into).collect()); + + config.set_record_filtering(libp2p::kad::KademliaStoreInserts::FilterBoth); + // By default Kademlia attempts to insert all peers into its routing table once a // dialing attempt succeeds. In order to control which peer is added, disable the // auto-insertion and instead add peers manually. @@ -427,6 +430,30 @@ impl DiscoveryBehaviour { } } + /// Store a record in the Kademlia record store. + pub fn store_record( + &mut self, + record_key: RecordKey, + record_value: Vec, + publisher: Option, + expires: Option, + ) { + if let Some(k) = self.kademlia.as_mut() { + if let Err(err) = k.store_mut().put(Record { + key: record_key, + value: record_value, + publisher: publisher.map(|publisher| publisher.into()), + expires, + }) { + debug!( + target: "sub-libp2p", + "Failed to store record with key: {:?}", + err + ); + } + } + } + /// Returns the number of nodes in each Kademlia kbucket for each Kademlia instance. /// /// Identifies Kademlia instances by their [`ProtocolId`] and kbuckets by the base 2 logarithm @@ -495,6 +522,14 @@ pub enum DiscoveryOut { /// Returning the result grouped in (key, value) pairs as well as the request duration. ValueFound(Vec<(RecordKey, Vec)>, Duration), + /// The DHT received a put record request. + PutRecordRequest( + RecordKey, + Vec, + Option, + Option, + ), + /// The record requested was not found in the DHT. /// /// Returning the corresponding key as well as the request duration. @@ -764,10 +799,21 @@ impl NetworkBehaviour for DiscoveryBehaviour { let ev = DiscoveryOut::Discovered(peer); return Poll::Ready(ToSwarm::GenerateEvent(ev)) }, - KademliaEvent::PendingRoutablePeer { .. } | - KademliaEvent::InboundRequest { .. } => { + KademliaEvent::PendingRoutablePeer { .. } => { // We are not interested in this event at the moment. }, + KademliaEvent::InboundRequest { request } => match request { + libp2p::kad::InboundRequest::PutRecord { record: Some(record), .. } => + return Poll::Ready(ToSwarm::GenerateEvent( + DiscoveryOut::PutRecordRequest( + record.key, + record.value, + record.publisher.map(Into::into), + record.expires, + ), + )), + _ => {}, + }, KademliaEvent::OutboundQueryProgressed { result: QueryResult::GetClosestPeers(res), .. diff --git a/substrate/client/network/src/event.rs b/substrate/client/network/src/event.rs index dc4fd53a49aa..d0ccbd8622b8 100644 --- a/substrate/client/network/src/event.rs +++ b/substrate/client/network/src/event.rs @@ -41,6 +41,9 @@ pub enum DhtEvent { /// An error has occurred while putting a record into the DHT. ValuePutFailed(Key), + + /// The DHT received a put record request. + PutRecordRequest(Key, Vec, Option, Option), } /// Type for events generated by networking layer. diff --git a/substrate/client/network/src/litep2p/service.rs b/substrate/client/network/src/litep2p/service.rs index 09b869abdf5f..8f36b0828bd3 100644 --- a/substrate/client/network/src/litep2p/service.rs +++ b/substrate/client/network/src/litep2p/service.rs @@ -51,6 +51,7 @@ use sc_utils::mpsc::TracingUnboundedSender; use std::{ collections::{HashMap, HashSet}, sync::{atomic::Ordering, Arc}, + time::Instant, }; /// Logging target for the file. @@ -236,6 +237,17 @@ impl NetworkDHTProvider for Litep2pNetworkService { fn put_value(&self, key: KademliaKey, value: Vec) { let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::PutValue { key, value }); } + + fn store_record( + &self, + _key: KademliaKey, + _value: Vec, + _publisher: Option, + _expires: Option, + ) { + // Will be added once litep2p is released with: https://github.com/paritytech/litep2p/pull/135 + log::warn!(target: LOG_TARGET, "Store record is not implemented for litep2p"); + } } #[async_trait::async_trait] diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 27de12bc1ec9..2cf4564e312c 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -89,6 +89,10 @@ use sc_network_common::{ use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_runtime::traits::Block as BlockT; +pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure}; +pub use libp2p::identity::{DecodingError, Keypair, PublicKey}; +pub use metrics::NotificationMetrics; +pub use protocol::NotificationsSink; use std::{ cmp, collections::{HashMap, HashSet}, @@ -101,14 +105,9 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, }, - time::Duration, + time::{Duration, Instant}, }; -pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure}; -pub use libp2p::identity::{DecodingError, Keypair, PublicKey}; -pub use metrics::NotificationMetrics; -pub use protocol::NotificationsSink; - pub(crate) mod metrics; pub(crate) mod out_events; @@ -957,6 +956,21 @@ where fn put_value(&self, key: KademliaKey, value: Vec) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PutValue(key, value)); } + + fn store_record( + &self, + key: KademliaKey, + value: Vec, + publisher: Option, + expires: Option, + ) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StoreRecord( + key, + value, + publisher.map(Into::into), + expires, + )); + } } #[async_trait::async_trait] @@ -1311,6 +1325,7 @@ impl<'a> NotificationSenderReadyT for NotificationSenderReady<'a> { enum ServiceToWorkerMsg { GetValue(KademliaKey), PutValue(KademliaKey, Vec), + StoreRecord(KademliaKey, Vec, Option, Option), AddKnownAddress(PeerId, Multiaddr), EventStream(out_events::Sender), Request { @@ -1438,6 +1453,10 @@ where self.network_service.behaviour_mut().get_value(key), ServiceToWorkerMsg::PutValue(key, value) => self.network_service.behaviour_mut().put_value(key, value), + ServiceToWorkerMsg::StoreRecord(key, value, publisher, expires) => self + .network_service + .behaviour_mut() + .store_record(key, value, publisher, expires), ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => self.network_service.behaviour_mut().add_known_address(peer_id, addr), ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender), @@ -1639,17 +1658,21 @@ where .report_notification_received(remote, notification); }, SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => { - if let Some(metrics) = self.metrics.as_ref() { - let query_type = match event { - DhtEvent::ValueFound(_) => "value-found", - DhtEvent::ValueNotFound(_) => "value-not-found", - DhtEvent::ValuePut(_) => "value-put", - DhtEvent::ValuePutFailed(_) => "value-put-failed", - }; - metrics - .kademlia_query_duration - .with_label_values(&[query_type]) - .observe(duration.as_secs_f64()); + match (self.metrics.as_ref(), duration) { + (Some(metrics), Some(duration)) => { + let query_type = match event { + DhtEvent::ValueFound(_) => "value-found", + DhtEvent::ValueNotFound(_) => "value-not-found", + DhtEvent::ValuePut(_) => "value-put", + DhtEvent::ValuePutFailed(_) => "value-put-failed", + DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request", + }; + metrics + .kademlia_query_duration + .with_label_values(&[query_type]) + .observe(duration.as_secs_f64()); + }, + _ => {}, } self.event_streams.send(Event::Dht(event)); diff --git a/substrate/client/network/src/service/traits.rs b/substrate/client/network/src/service/traits.rs index d1ea9a2ed568..fe06141f7e3b 100644 --- a/substrate/client/network/src/service/traits.rs +++ b/substrate/client/network/src/service/traits.rs @@ -39,7 +39,14 @@ use sc_network_common::{role::ObservedRole, ExHashT}; use sc_network_types::{multiaddr::Multiaddr, PeerId}; use sp_runtime::traits::Block as BlockT; -use std::{collections::HashSet, fmt::Debug, future::Future, pin::Pin, sync::Arc, time::Duration}; +use std::{ + collections::HashSet, + fmt::Debug, + future::Future, + pin::Pin, + sync::Arc, + time::{Duration, Instant}, +}; pub use libp2p::{identity::SigningError, kad::record::Key as KademliaKey}; @@ -209,6 +216,15 @@ pub trait NetworkDHTProvider { /// Start putting a value in the DHT. fn put_value(&self, key: KademliaKey, value: Vec); + + /// Store a record in the DHT memory store. + fn store_record( + &self, + key: KademliaKey, + value: Vec, + publisher: Option, + expires: Option, + ); } impl NetworkDHTProvider for Arc @@ -223,6 +239,16 @@ where fn put_value(&self, key: KademliaKey, value: Vec) { T::put_value(self, key, value) } + + fn store_record( + &self, + key: KademliaKey, + value: Vec, + publisher: Option, + expires: Option, + ) { + T::store_record(self, key, value, publisher, expires) + } } /// Provides an ability to set a fork sync request for a particular block.