Skip to content

Commit

Permalink
Forward put_record requests to authorithy-discovery (paritytech#4683)
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
  • Loading branch information
alexggh authored Jun 4, 2024
1 parent 3e84164 commit a09ec64
Show file tree
Hide file tree
Showing 9 changed files with 425 additions and 61 deletions.
6 changes: 6 additions & 0 deletions substrate/client/authority-discovery/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,10 @@ pub enum Error {

#[error("Unable to fetch best block.")]
BestBlockFetchingError,

#[error("Publisher not present.")]
MissingPublisher,

#[error("Unknown authority.")]
UnknownAuthority,
}
163 changes: 131 additions & 32 deletions substrate/client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use sc_network_types::{
multihash::{Code, Multihash},
PeerId,
};
use schema::PeerSignature;
use sp_api::{ApiError, ProvideRuntimeApi};
use sp_authority_discovery::{
AuthorityDiscoveryApi, AuthorityId, AuthorityPair, AuthoritySignature,
Expand Down Expand Up @@ -111,7 +112,7 @@ pub enum Role {
/// network peerset.
///
/// 5. Allow querying of the collected addresses via the [`crate::Service`].
pub struct Worker<Client, Block, DhtEventStream> {
pub struct Worker<Client, Block: BlockT, DhtEventStream> {
/// Channel receiver for messages send by a [`crate::Service`].
from_service: Fuse<mpsc::Receiver<ServicetoWorkerMsg>>,

Expand Down Expand Up @@ -152,6 +153,12 @@ pub struct Worker<Client, Block, DhtEventStream> {
/// Queue of throttled lookups pending to be passed to the network.
pending_lookups: Vec<AuthorityId>,

/// The list of all known authorities.
known_authorities: HashMap<KademliaKey, AuthorityId>,

/// The last time we requested the list of authorities.
authorities_queried_at: Option<Block::Hash>,

/// Set of in-flight lookups.
in_flight_lookups: HashMap<KademliaKey, AuthorityId>,

Expand Down Expand Up @@ -268,6 +275,8 @@ where
network,
dht_event_rx,
publish_interval,
known_authorities: Default::default(),
authorities_queried_at: None,
publish_if_changed_interval,
latest_published_keys: HashSet::new(),
latest_published_kad_keys: HashSet::new(),
Expand Down Expand Up @@ -482,6 +491,13 @@ where
.filter(|id| !local_keys.contains(id.as_ref()))
.collect::<Vec<_>>();

self.known_authorities = authorities
.clone()
.into_iter()
.map(|authority| (hash_authority_id(authority.as_ref()), authority))
.collect::<HashMap<_, _>>();
self.authorities_queried_at = Some(best_hash);

self.addr_cache.retain_ids(&authorities);

authorities.shuffle(&mut thread_rng());
Expand Down Expand Up @@ -581,7 +597,112 @@ where

debug!(target: LOG_TARGET, "Failed to put hash '{:?}' on Dht.", hash)
},
DhtEvent::PutRecordRequest(record_key, record_value, publisher, expires) => {
if let Err(e) = self
.handle_put_record_requested(record_key, record_value, publisher, expires)
.await
{
debug!(target: LOG_TARGET, "Failed to handle put record request: {}", e)
}

if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["put_record_req"]).inc();
}
},
}
}

async fn handle_put_record_requested(
&mut self,
record_key: KademliaKey,
record_value: Vec<u8>,
publisher: Option<PeerId>,
expires: Option<std::time::Instant>,
) -> Result<()> {
let publisher = publisher.ok_or(Error::MissingPublisher)?;

// Make sure we don't ever work with an outdated set of authorities
// and that we do not update known_authorithies too often.
let best_hash = self.client.best_hash().await?;
if !self.known_authorities.contains_key(&record_key) &&
self.authorities_queried_at
.map(|authorities_queried_at| authorities_queried_at != best_hash)
.unwrap_or(true)
{
let authorities = self
.client
.authorities(best_hash)
.await
.map_err(|e| Error::CallingRuntime(e.into()))?
.into_iter()
.collect::<Vec<_>>();

self.known_authorities = authorities
.into_iter()
.map(|authority| (hash_authority_id(authority.as_ref()), authority))
.collect::<HashMap<_, _>>();

self.authorities_queried_at = Some(best_hash);
}

let authority_id =
self.known_authorities.get(&record_key).ok_or(Error::UnknownAuthority)?;
let signed_record =
Self::check_record_signed_with_authority_id(record_value.as_slice(), authority_id)?;
self.check_record_signed_with_network_key(
&signed_record.record,
signed_record.peer_signature,
publisher,
authority_id,
)?;
self.network.store_record(record_key, record_value, Some(publisher), expires);
Ok(())
}

fn check_record_signed_with_authority_id(
record: &[u8],
authority_id: &AuthorityId,
) -> Result<schema::SignedAuthorityRecord> {
let signed_record: schema::SignedAuthorityRecord =
schema::SignedAuthorityRecord::decode(record).map_err(Error::DecodingProto)?;

let auth_signature = AuthoritySignature::decode(&mut &signed_record.auth_signature[..])
.map_err(Error::EncodingDecodingScale)?;

if !AuthorityPair::verify(&auth_signature, &signed_record.record, &authority_id) {
return Err(Error::VerifyingDhtPayload)
}

Ok(signed_record)
}

fn check_record_signed_with_network_key(
&self,
record: &Vec<u8>,
peer_signature: Option<PeerSignature>,
remote_peer_id: PeerId,
authority_id: &AuthorityId,
) -> Result<()> {
if let Some(peer_signature) = peer_signature {
match self.network.verify(
remote_peer_id.into(),
&peer_signature.public_key,
&peer_signature.signature,
record,
) {
Ok(true) => {},
Ok(false) => return Err(Error::VerifyingDhtPayload),
Err(error) => return Err(Error::ParsingLibp2pIdentity(error)),
}
} else if self.strict_record_validation {
return Err(Error::MissingPeerIdSignature)
} else {
debug!(
target: LOG_TARGET,
"Received unsigned authority discovery record from {}", authority_id
);
}
Ok(())
}

fn handle_dht_value_found_event(&mut self, values: Vec<(KademliaKey, Vec<u8>)>) -> Result<()> {
Expand All @@ -600,16 +721,8 @@ where
let remote_addresses: Vec<Multiaddr> = values
.into_iter()
.map(|(_k, v)| {
let schema::SignedAuthorityRecord { record, auth_signature, peer_signature } =
schema::SignedAuthorityRecord::decode(v.as_slice())
.map_err(Error::DecodingProto)?;

let auth_signature = AuthoritySignature::decode(&mut &auth_signature[..])
.map_err(Error::EncodingDecodingScale)?;

if !AuthorityPair::verify(&auth_signature, &record, &authority_id) {
return Err(Error::VerifyingDhtPayload)
}
let schema::SignedAuthorityRecord { record, peer_signature, .. } =
Self::check_record_signed_with_authority_id(&v, &authority_id)?;

let addresses: Vec<Multiaddr> = schema::AuthorityRecord::decode(record.as_slice())
.map(|a| a.addresses)
Expand Down Expand Up @@ -638,26 +751,12 @@ where
// At this point we know all the valid multiaddresses from the record, know that
// each of them belong to the same PeerId, we just need to check if the record is
// properly signed by the owner of the PeerId

if let Some(peer_signature) = peer_signature {
match self.network.verify(
remote_peer_id.into(),
&peer_signature.public_key,
&peer_signature.signature,
&record,
) {
Ok(true) => {},
Ok(false) => return Err(Error::VerifyingDhtPayload),
Err(error) => return Err(Error::ParsingLibp2pIdentity(error)),
}
} else if self.strict_record_validation {
return Err(Error::MissingPeerIdSignature)
} else {
debug!(
target: LOG_TARGET,
"Received unsigned authority discovery record from {}", authority_id
);
}
self.check_record_signed_with_network_key(
&record,
peer_signature,
remote_peer_id,
&authority_id,
)?;
Ok(addresses)
})
.collect::<Result<Vec<Vec<Multiaddr>>>>()?
Expand Down Expand Up @@ -870,7 +969,7 @@ impl Metrics {

// Helper functions for unit testing.
#[cfg(test)]
impl<Block, Client, DhtEventStream> Worker<Client, Block, DhtEventStream> {
impl<Block: BlockT, Client, DhtEventStream> Worker<Client, Block, DhtEventStream> {
pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec<Multiaddr>) {
self.addr_cache.insert(authority, addresses);
}
Expand Down
Loading

0 comments on commit a09ec64

Please sign in to comment.