Skip to content

Commit

Permalink
feature: dont keep persistent GatewayClient inside NMv1 (#5211)
Browse files Browse the repository at this point in the history
* removed overly complex logic for requesting mutex permits for packet processing

* dont keep persistent gateway connections. instead make them on demand
  • Loading branch information
jstuczyn committed Dec 4, 2024
1 parent df4ba70 commit 3850387
Show file tree
Hide file tree
Showing 15 changed files with 342 additions and 685 deletions.
4 changes: 3 additions & 1 deletion common/client-libs/gateway-client/src/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ impl ClientBandwidth {

if remaining < 0 {
tracing::warn!("OUT OF BANDWIDTH. remaining: {remaining_bi2}");
} else {
} else if remaining < 1_000_000 {
tracing::info!("remaining bandwidth: {remaining_bi2}");
} else {
tracing::debug!("remaining bandwidth: {remaining_bi2}");
}

self.inner
Expand Down
24 changes: 5 additions & 19 deletions common/client-libs/gateway-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ impl<C, St> GatewayClient<C, St> {
self.gateway_identity
}

pub fn shared_key(&self) -> Option<Arc<SharedGatewayKey>> {
self.shared_key.clone()
}

pub fn ws_fd(&self) -> Option<RawFd> {
match &self.connection {
SocketState::Available(conn) => ws_fd(conn.as_ref()),
Expand Down Expand Up @@ -408,7 +412,7 @@ impl<C, St> GatewayClient<C, St> {
}

Some(_) => {
info!("the gateway is using exactly the same (or older) protocol version as we are. We're good to continue!");
debug!("the gateway is using exactly the same (or older) protocol version as we are. We're good to continue!");
Ok(())
}
}
Expand Down Expand Up @@ -992,24 +996,6 @@ impl<C, St> GatewayClient<C, St> {
}
Ok(())
}

#[deprecated(note = "this method does not deal with upgraded keys for legacy clients")]
pub async fn authenticate_and_start(
&mut self,
) -> Result<AuthenticationResponse, GatewayClientError>
where
C: DkgQueryClient + Send + Sync,
St: CredentialStorage,
<St as CredentialStorage>::StorageError: Send + Sync + 'static,
{
let shared_key = self.perform_initial_authentication().await?;
self.claim_initial_bandwidth().await?;

// this call is NON-blocking
self.start_listening_for_mixnet_messages()?;

Ok(shared_key)
}
}

// type alias for an ease of use
Expand Down
5 changes: 5 additions & 0 deletions common/client-libs/gateway-client/src/socket_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ impl PartiallyDelegatedRouter {
}
};

if self.stream_return.is_canceled() {
// nothing to do, receiver has been dropped
return;
}

let return_res = match ret {
Err(err) => self.stream_return.send(Err(err)),
Ok(_) => {
Expand Down
9 changes: 9 additions & 0 deletions common/crypto/src/asymmetric/identity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use ed25519_dalek::{Signer, SigningKey};
pub use ed25519_dalek::{Verifier, PUBLIC_KEY_LENGTH, SECRET_KEY_LENGTH, SIGNATURE_LENGTH};
use nym_pemstore::traits::{PemStorableKey, PemStorableKeyPair};
use std::fmt::{self, Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::str::FromStr;
use thiserror::Error;
use zeroize::{Zeroize, ZeroizeOnDrop};
Expand Down Expand Up @@ -122,6 +123,14 @@ impl PemStorableKeyPair for KeyPair {
#[derive(Copy, Clone, Eq, PartialEq)]
pub struct PublicKey(ed25519_dalek::VerifyingKey);

impl Hash for PublicKey {
fn hash<H: Hasher>(&self, state: &mut H) {
// each public key has unique bytes representation which can be used
// for the hash implementation
self.to_bytes().hash(state)
}
}

impl Display for PublicKey {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
Display::fmt(&self.to_base58_string(), f)
Expand Down
20 changes: 9 additions & 11 deletions nym-api/src/network_monitor/gateways_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
// SPDX-License-Identifier: GPL-3.0-only

use futures::Stream;
use nym_crypto::asymmetric::identity;
use nym_crypto::asymmetric::{ed25519, identity};
use nym_gateway_client::{AcknowledgementReceiver, MixnetMessageReceiver};
use nym_mixnet_contract_common::IdentityKey;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_stream::StreamMap;
Expand All @@ -15,8 +14,8 @@ pub(crate) enum GatewayMessages {
}

pub(crate) struct GatewaysReader {
ack_map: StreamMap<IdentityKey, AcknowledgementReceiver>,
stream_map: StreamMap<IdentityKey, MixnetMessageReceiver>,
ack_map: StreamMap<ed25519::PublicKey, AcknowledgementReceiver>,
stream_map: StreamMap<ed25519::PublicKey, MixnetMessageReceiver>,
}

impl GatewaysReader {
Expand All @@ -33,19 +32,18 @@ impl GatewaysReader {
message_receiver: MixnetMessageReceiver,
ack_receiver: AcknowledgementReceiver,
) {
let channel_id = id.to_string();
self.stream_map.insert(channel_id.clone(), message_receiver);
self.ack_map.insert(channel_id, ack_receiver);
self.stream_map.insert(id, message_receiver);
self.ack_map.insert(id, ack_receiver);
}

pub fn remove_receivers(&mut self, id: &str) {
self.stream_map.remove(id);
self.ack_map.remove(id);
pub fn remove_receivers(&mut self, id: ed25519::PublicKey) {
self.stream_map.remove(&id);
self.ack_map.remove(&id);
}
}

impl Stream for GatewaysReader {
type Item = (IdentityKey, GatewayMessages);
type Item = (ed25519::PublicKey, GatewayMessages);

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.ack_map).poll_next(cx) {
Expand Down
46 changes: 22 additions & 24 deletions nym-api/src/network_monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use crate::node_status_api::NodeStatusCache;
use crate::nym_contract_cache::cache::NymContractCache;
use crate::storage::NymApiStorage;
use crate::support::caching::cache::SharedCache;
use crate::support::{config, nyxd};
use crate::support::config::Config;
use crate::support::nyxd;
use futures::channel::mpsc;
use nym_bandwidth_controller::BandwidthController;
use nym_credential_storage::persistent_storage::PersistentStorage;
Expand All @@ -36,7 +37,7 @@ pub(crate) mod test_route;
pub(crate) const ROUTE_TESTING_TEST_NONCE: u64 = 0;

pub(crate) fn setup<'a>(
config: &'a config::NetworkMonitor,
config: &'a Config,
nym_contract_cache: &NymContractCache,
described_cache: SharedCache<DescribedNodes>,
node_status_cache: NodeStatusCache,
Expand All @@ -54,7 +55,7 @@ pub(crate) fn setup<'a>(
}

pub(crate) struct NetworkMonitorBuilder<'a> {
config: &'a config::NetworkMonitor,
config: &'a Config,
nyxd_client: nyxd::Client,
node_status_storage: NymApiStorage,
contract_cache: NymContractCache,
Expand All @@ -64,7 +65,7 @@ pub(crate) struct NetworkMonitorBuilder<'a> {

impl<'a> NetworkMonitorBuilder<'a> {
pub(crate) fn new(
config: &'a config::NetworkMonitor,
config: &'a Config,
nyxd_client: nyxd::Client,
node_status_storage: NymApiStorage,
contract_cache: NymContractCache,
Expand All @@ -81,7 +82,7 @@ impl<'a> NetworkMonitorBuilder<'a> {
}
}

pub(crate) async fn build<R: MessageReceiver + Send + 'static>(
pub(crate) async fn build<R: MessageReceiver + Send + Sync + 'static>(
self,
) -> NetworkMonitorRunnables<R> {
// TODO: those keys change constant throughout the whole execution of the monitor.
Expand All @@ -101,7 +102,7 @@ impl<'a> NetworkMonitorBuilder<'a> {
self.contract_cache,
self.described_cache,
self.node_status_cache,
self.config.debug.per_node_test_packets,
self.config.network_monitor.debug.per_node_test_packets,
Arc::clone(&ack_key),
*identity_keypair.public_key(),
*encryption_keypair.public_key(),
Expand All @@ -110,35 +111,38 @@ impl<'a> NetworkMonitorBuilder<'a> {
let bandwidth_controller = {
BandwidthController::new(
nym_credential_storage::initialise_persistent_storage(
&self.config.storage_paths.credentials_database_path,
&self
.config
.network_monitor
.storage_paths
.credentials_database_path,
)
.await,
self.nyxd_client.clone(),
)
};

let packet_sender = new_packet_sender(
self.config,
&self.config,
gateway_status_update_sender,
Arc::clone(&identity_keypair),
self.config.debug.gateway_sending_rate,
bandwidth_controller,
self.config.debug.disabled_credentials_mode,
);

let received_processor = new_received_processor(
received_processor_receiver_channel,
Arc::clone(&encryption_keypair),
ack_key,
);
let summary_producer = new_summary_producer(self.config.debug.per_node_test_packets);
let summary_producer =
new_summary_producer(self.config.network_monitor.debug.per_node_test_packets);
let packet_receiver = new_packet_receiver(
gateway_status_update_receiver,
received_processor_sender_channel,
);

let monitor = Monitor::new(
self.config,
&self.config.network_monitor,
packet_preparer,
packet_sender,
received_processor,
Expand All @@ -154,12 +158,12 @@ impl<'a> NetworkMonitorBuilder<'a> {
}
}

pub(crate) struct NetworkMonitorRunnables<R: MessageReceiver + Send + 'static> {
pub(crate) struct NetworkMonitorRunnables<R: MessageReceiver + Send + Sync + 'static> {
monitor: Monitor<R>,
packet_receiver: PacketReceiver,
}

impl<R: MessageReceiver + Send + 'static> NetworkMonitorRunnables<R> {
impl<R: MessageReceiver + Send + Sync + 'static> NetworkMonitorRunnables<R> {
// TODO: note, that is not exactly doing what we want, because when
// `ReceivedProcessor` is constructed, it already spawns a future
// this needs to be refactored!
Expand Down Expand Up @@ -194,22 +198,16 @@ fn new_packet_preparer(
}

fn new_packet_sender(
config: &config::NetworkMonitor,
config: &Config,
gateways_status_updater: GatewayClientUpdateSender,
local_identity: Arc<identity::KeyPair>,
max_sending_rate: usize,
bandwidth_controller: BandwidthController<nyxd::Client, PersistentStorage>,
disabled_credentials_mode: bool,
) -> PacketSender {
PacketSender::new(
config,
gateways_status_updater,
local_identity,
config.debug.gateway_response_timeout,
config.debug.gateway_connection_timeout,
config.debug.max_concurrent_gateway_clients,
max_sending_rate,
bandwidth_controller,
disabled_credentials_mode,
)
}

Expand All @@ -236,8 +234,8 @@ fn new_packet_receiver(

// TODO: 1) does it still have to have separate builder or could we get rid of it now?
// TODO: 2) how do we make it non-async as other 'start' methods?
pub(crate) async fn start<R: MessageReceiver + Send + 'static>(
config: &config::NetworkMonitor,
pub(crate) async fn start<R: MessageReceiver + Send + Sync + 'static>(
config: &Config,
nym_contract_cache: &NymContractCache,
described_cache: SharedCache<DescribedNodes>,
node_status_cache: NodeStatusCache,
Expand Down
54 changes: 54 additions & 0 deletions nym-api/src/network_monitor/monitor/gateway_client_handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only

use crate::network_monitor::monitor::receiver::{GatewayClientUpdate, GatewayClientUpdateSender};
use crate::support::nyxd;
use nym_credential_storage::persistent_storage::PersistentStorage;
use nym_gateway_client::GatewayClient;
use std::ops::{Deref, DerefMut};
use tracing::warn;

pub(crate) struct GatewayClientHandle {
client: GatewayClient<nyxd::Client, PersistentStorage>,
gateways_status_updater: GatewayClientUpdateSender,
}

impl GatewayClientHandle {
pub(crate) fn new(
client: GatewayClient<nyxd::Client, PersistentStorage>,
gateways_status_updater: GatewayClientUpdateSender,
) -> Self {
GatewayClientHandle {
client,
gateways_status_updater,
}
}
}

impl Drop for GatewayClientHandle {
fn drop(&mut self) {
if self
.gateways_status_updater
.unbounded_send(GatewayClientUpdate::Disconnect(
self.client.gateway_identity(),
))
.is_err()
{
warn!("fail to cleanly shutdown gateway connection")
}
}
}

impl Deref for GatewayClientHandle {
type Target = GatewayClient<nyxd::Client, PersistentStorage>;

fn deref(&self) -> &Self::Target {
&self.client
}
}

impl DerefMut for GatewayClientHandle {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.client
}
}
Loading

0 comments on commit 3850387

Please sign in to comment.