Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A0-4318: network rate-limiter for the substrate network #1780

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
ef86447
revamped TokenBucket for rate-limiter:
fixxxedpoint Jul 10, 2024
3dd1522
rate_limiter:
fixxxedpoint Jul 10, 2024
46879b3
RateLimitedAsyncRead and Write moved to rate_limiter crate
fixxxedpoint Jul 10, 2024
fb77024
custom implementation of Transport with rate-limiting (only read) for…
fixxxedpoint Jul 10, 2024
7d52827
intergrated rate-limiting for substrate [`sc_client::network::Network…
fixxxedpoint Jul 10, 2024
ddb93c0
increased limits for the rate-limiter, both for alephbft and substrate
fixxxedpoint Jul 10, 2024
c90cb7d
- s/substrate_bit_rate/substrate_network_bit_rate
fixxxedpoint Jul 10, 2024
1e51b73
Disable devnet cron (#1779)
bartoszjedrzejewski Jul 10, 2024
054d731
renamed local variable in `build/transport.rs` - it no longer referen…
fixxxedpoint Jul 10, 2024
414d077
Merge remote-tracking branch 'origin/main' into A0-4318_rate_limited_…
fixxxedpoint Jul 10, 2024
ce4ae15
removed unecessary pub-use to [`TokenBucket`] in rate_limiter/lib.rs
fixxxedpoint Jul 10, 2024
ab3a4f9
removed unecessary `pub` in `pub mod transport;` in [`finality-aleph/…
fixxxedpoint Jul 10, 2024
f360758
removed newline in [`finality-aleph/src/network/build/mod.rs`]
fixxxedpoint Jul 10, 2024
5f7a518
added `impl Default for DefaultTimeProvider` in [`token_bucket.rs`]
fixxxedpoint Jul 10, 2024
f977951
removed unecessary `.into()` conversion of [`SleepingRateLimiter`] in…
fixxxedpoint Jul 10, 2024
3e7ff62
- `build_transport` moved to [`finality-aleph/src/network/build/trans…
fixxxedpoint Jul 10, 2024
93721ea
Default macro for DefaultTimeProvider - review changes
fixxxedpoint Jul 12, 2024
b6d72cb
using `parking_lot::Mutex` in rate_limiter instead of `std::sync::Mutex`
fixxxedpoint Jul 12, 2024
61bc2e7
Merge remote-tracking branch 'origin/main' into A0-4318_rate_limited_…
fixxxedpoint Jul 12, 2024
4fdc89f
simplified TokenBucket algorithm: using only `requested` field instea…
fixxxedpoint Jul 17, 2024
f24955a
TokenBucket per connection instead of a shared one
fixxxedpoint Aug 5, 2024
c04224c
using new API of `substrate::NetworkWorker::new_with_custom_transport`
fixxxedpoint Aug 5, 2024
fbb08ff
TokenBucket return None when rate_limit is 0 instead of some arbitrar…
fixxxedpoint Aug 5, 2024
6ac5497
Merge remote-tracking branch 'origin/main' into A0-4318_rate_limited_…
fixxxedpoint Aug 5, 2024
24169a6
default rate-limit for the `sync` network is now 1 MiB per connection…
fixxxedpoint Aug 6, 2024
0a7c9d4
fmt for rate_limiter.rs
fixxxedpoint Aug 6, 2024
c53f42a
fixed unit-tests for token-bucket after recent changes of its api
fixxxedpoint Aug 6, 2024
97c400f
rust-fmt after changes in rate-limiter
fixxxedpoint Aug 7, 2024
d351fe4
added type alias for `Box<dyn sc_network::config::NotificationService…
fixxxedpoint Aug 13, 2024
65adb85
Merge remote-tracking branch 'origin/main' into A0-4318_rate_limited_…
fixxxedpoint Aug 13, 2024
ac25b7d
s/substrate_network_bit_rate_per_connection/bit_rate_per_connection i…
fixxxedpoint Aug 16, 2024
18aa9e1
better readability for TokenBucket.rs
fixxxedpoint Aug 16, 2024
21b4b73
better readability in TokenBucket.rs ++
fixxxedpoint Aug 16, 2024
8c106dd
rust-fmt in `bin/node/src/service.rs`
fixxxedpoint Aug 16, 2024
961e6b3
using `enum Deadline {Never, Instant}` instead of `Option<Option<Inst…
fixxxedpoint Aug 16, 2024
0265a27
rust-fmt in tests for TokenBucket
fixxxedpoint Aug 16, 2024
3586bd7
Merge remote-tracking branch 'origin/main' into A0-4318_rate_limited_…
fixxxedpoint Oct 30, 2024
dd5a600
- simpler version of the TokenBucket (rate-limiter)
fixxxedpoint Oct 30, 2024
b18d9a7
added bunch of unit-tests for TokenBucket and SharedTokenBucket
fixxxedpoint Oct 30, 2024
ade5dde
- integration of the new SharedTokenBucket with both Substrate-based …
fixxxedpoint Oct 30, 2024
e39e000
added ALEPHBFT_NETWORK_BIT_RATE SUBSTRATE_NETWORK_BIT_RATE params to …
fixxxedpoint Oct 30, 2024
f61dffa
slightly refactored token_bucket.rs
fixxxedpoint Nov 4, 2024
629d8d4
fixed and updated unit tests in token_bucket.rs
fixxxedpoint Nov 4, 2024
a9678a0
rust-fmt for token_bucket and related
fixxxedpoint Nov 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 13 additions & 5 deletions bin/node/src/aleph_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ pub struct AlephCli {
#[clap(long, default_value_t = false)]
enable_pruning: bool,

/// Maximum bit-rate per node in bytes per second of the alephbft validator network.
#[clap(long, default_value_t = 64 * 1024)]
alephbft_bit_rate_per_connection: u64,
/// Maximum bit-rate in bits per second of the alephbft validator network.
#[clap(long, default_value_t = 768 * 1024)]
alephbft_network_bit_rate: u64,

/// Maximum bit-rate in bits per second of the substrate network.
#[clap(long, default_value_t = 5*1024*1024)]
substrate_network_bit_rate: u64,

/// Don't spend some extra time to collect more debugging data (e.g. validator network details).
/// By default collecting is enabled, as the impact on performance is negligible, if any.
Expand Down Expand Up @@ -93,8 +97,12 @@ impl AlephCli {
self.enable_pruning
}

pub fn alephbft_bit_rate_per_connection(&self) -> u64 {
self.alephbft_bit_rate_per_connection
pub fn alephbft_network_bit_rate(&self) -> u64 {
self.alephbft_network_bit_rate
}

pub fn substrate_network_bit_rate(&self) -> u64 {
ggawryal marked this conversation as resolved.
Show resolved Hide resolved
self.substrate_network_bit_rate
}

pub fn no_collection_of_extra_debugging_data(&self) -> bool {
Expand Down
15 changes: 8 additions & 7 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,8 @@ fn get_proposer_factory(

fn get_rate_limit_config(aleph_config: &AlephCli) -> RateLimiterConfig {
RateLimiterConfig {
alephbft_bit_rate_per_connection: aleph_config
.alephbft_bit_rate_per_connection()
.try_into()
.unwrap_or(usize::MAX),
alephbft_network_bit_rate: aleph_config.alephbft_network_bit_rate(),
substrate_network_bit_rate: aleph_config.substrate_network_bit_rate(),
}
}

Expand Down Expand Up @@ -296,6 +294,11 @@ pub fn new_authority(
)?;

let import_queue_handle = BlockImporter::new(service_components.import_queue.service());
let rate_limiter_config = get_rate_limit_config(&aleph_config);
let network_config = finality_aleph::SubstrateNetworkConfig {
substrate_network_bit_rate: rate_limiter_config.substrate_network_bit_rate,
network_config: config.network.clone(),
};

let BuildNetworkOutput {
network,
Expand All @@ -305,7 +308,7 @@ pub fn new_authority(
tx_handler_controller,
system_rpc_tx,
} = build_network(
&config.network,
network_config,
config.protocol_id(),
service_components.client.clone(),
major_sync,
Expand Down Expand Up @@ -370,8 +373,6 @@ pub fn new_authority(
.spawn_essential_handle()
.spawn_blocking("aura", None, aura);

let rate_limiter_config = get_rate_limit_config(&aleph_config);

let AlephRuntimeVars {
millisecs_per_block,
session_period,
Expand Down
45 changes: 12 additions & 33 deletions clique/src/rate_limiting.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,22 @@
use rate_limiter::{RateLimiter, SleepingRateLimiter};
use tokio::io::AsyncRead;
use rate_limiter::{RateLimitedAsyncRead, RateLimiterImpl, SharingRateLimiter};

use crate::{ConnectionInfo, Data, Dialer, Listener, PeerAddressInfo, Splittable, Splitted};

pub struct RateLimitedAsyncRead<Read> {
rate_limiter: RateLimiter,
read: Read,
}

impl<Read> RateLimitedAsyncRead<Read> {
pub fn new(read: Read, rate_limiter: RateLimiter) -> Self {
Self { rate_limiter, read }
}
}

impl<Read: AsyncRead + Unpin> AsyncRead for RateLimitedAsyncRead<Read> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
let this = self.get_mut();
let read = std::pin::Pin::new(&mut this.read);
this.rate_limiter.rate_limit(read, cx, buf)
}
}

impl<Read: ConnectionInfo> ConnectionInfo for RateLimitedAsyncRead<Read> {
fn peer_address_info(&self) -> PeerAddressInfo {
self.read.peer_address_info()
self.inner().peer_address_info()
}
}

/// Implementation of the [Dialer] trait governing all returned [Dialer::Connection] instances by a rate-limiting wrapper.
#[derive(Clone)]
pub struct RateLimitingDialer<D> {
dialer: D,
rate_limiter: SleepingRateLimiter,
rate_limiter: SharingRateLimiter,
}

impl<D> RateLimitingDialer<D> {
pub fn new(dialer: D, rate_limiter: SleepingRateLimiter) -> Self {
pub fn new(dialer: D, rate_limiter: SharingRateLimiter) -> Self {
Self {
dialer,
rate_limiter,
Expand All @@ -66,7 +42,7 @@ where
let connection = self.dialer.connect(address).await?;
let (sender, receiver) = connection.split();
Ok(Splitted(
RateLimitedAsyncRead::new(receiver, RateLimiter::new(self.rate_limiter.clone())),
RateLimitedAsyncRead::new(receiver, RateLimiterImpl::new(self.rate_limiter.clone())),
sender,
))
}
Expand All @@ -75,11 +51,11 @@ where
/// Implementation of the [Listener] trait governing all returned [Listener::Connection] instances by a rate-limiting wrapper.
pub struct RateLimitingListener<L> {
listener: L,
rate_limiter: SleepingRateLimiter,
rate_limiter: SharingRateLimiter,
}

impl<L> RateLimitingListener<L> {
pub fn new(listener: L, rate_limiter: SleepingRateLimiter) -> Self {
pub fn new(listener: L, rate_limiter: SharingRateLimiter) -> Self {
Self {
listener,
rate_limiter,
Expand All @@ -88,7 +64,10 @@ impl<L> RateLimitingListener<L> {
}

#[async_trait::async_trait]
impl<L: Listener + Send> Listener for RateLimitingListener<L> {
impl<L> Listener for RateLimitingListener<L>
where
L: Listener + Send,
{
type Connection = Splitted<
RateLimitedAsyncRead<<L::Connection as Splittable>::Receiver>,
<L::Connection as Splittable>::Sender,
Expand All @@ -99,7 +78,7 @@ impl<L: Listener + Send> Listener for RateLimitingListener<L> {
let connection = self.listener.accept().await?;
let (sender, receiver) = connection.split();
Ok(Splitted(
RateLimitedAsyncRead::new(receiver, RateLimiter::new(self.rate_limiter.clone())),
RateLimitedAsyncRead::new(receiver, RateLimiterImpl::new(self.rate_limiter.clone())),
sender,
))
}
Expand Down
10 changes: 10 additions & 0 deletions docker/docker_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ MAX_RUNTIME_INSTANCES=${MAX_RUNTIME_INSTANCES:-8}
BACKUP_PATH=${BACKUP_PATH:-${BASE_PATH}/backup-stash}
DATABASE_ENGINE=${DATABASE_ENGINE:-}
PRUNING_ENABLED=${PRUNING_ENABLED:-false}
ALEPHBFT_NETWORK_BIT_RATE=${ALEPHBFT_NETWORK_BIT_RATE:-}
SUBSTRATE_NETWORK_BIT_RATE=${SUBSTRATE_NETWORK_BIT_RATE:-}

if [[ "true" == "$PURGE_BEFORE_START" ]]; then
echo "Purging chain (${CHAIN}) at path ${BASE_PATH}"
Expand Down Expand Up @@ -141,4 +143,12 @@ if [[ -n "${MAX_SUBSCRIPTIONS_PER_CONNECTION:-}" ]]; then
ARGS+=(--rpc-max-subscriptions-per-connection ${MAX_SUBSCRIPTIONS_PER_CONNECTION})
fi

if [[ -n "${ALEPHBFT_NETWORK_BIT_RATE}" ]]; then
ARGS+=(--alephbft-network-bit-rate ${ALEPHBFT_NETWORK_BIT_RATE})
fi

if [[ -n "${SUBSTRATE_NETWORK_BIT_RATE}" ]]; then
ARGS+=(--substrate-network-bit-rate ${SUBSTRATE_NETWORK_BIT_RATE})
fi

echo "${CUSTOM_ARGS}" | xargs aleph-node "${ARGS[@]}"
1 change: 1 addition & 0 deletions finality-aleph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ serde = { workspace = true }
static_assertions = { workspace = true }
tiny-bip39 = { workspace = true }
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
libp2p = { workspace = true }

substrate-prometheus-endpoint = { workspace = true }

Expand Down
9 changes: 6 additions & 3 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ pub use crate::{
justification::AlephJustification,
network::{
address_cache::{ValidatorAddressCache, ValidatorAddressingInfo},
build_network, BuildNetworkOutput, ProtocolNetwork, SubstratePeerId,
build_network, BuildNetworkOutput, ProtocolNetwork, SubstrateNetworkConfig,
SubstratePeerId,
},
nodes::run_validator_node,
session::SessionPeriod,
Expand Down Expand Up @@ -255,8 +256,10 @@ type Hasher = abft::HashWrapper<BlakeTwo256>;

#[derive(Clone)]
pub struct RateLimiterConfig {
/// Maximum bit-rate per node in bytes per second of the alephbft validator network.
pub alephbft_bit_rate_per_connection: usize,
/// Maximum bit-rate in bits per second of the alephbft validator network.
pub alephbft_network_bit_rate: u64,
/// Maximum bit-rate in bits per second of the substrate network (shared by sync, gossip, etc.).
pub substrate_network_bit_rate: u64,
}

pub struct AlephConfig<C, T> {
Expand Down
15 changes: 13 additions & 2 deletions finality-aleph/src/network/build/base.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use libp2p::{core::StreamMuxer, PeerId, Transport};
use sc_client_api::Backend;
use sc_network::{
config::{
Expand All @@ -8,6 +9,7 @@ use sc_network::{
},
error::Error as NetworkError,
peer_store::PeerStore,
transport::NetworkConfig,
NetworkService, NetworkWorker,
};
use sc_network_light::light_client_requests::handler::LightClientRequestHandler;
Expand Down Expand Up @@ -69,8 +71,9 @@ type BaseNetworkOutput<B> = (
);

/// Create a base network with all the protocols already included. Also spawn (almost) all the necessary services.
pub fn network<B, BE, C>(
pub fn network<B, BE, C, T, SM>(
network_config: &NetworkConfiguration,
transport_builder: impl FnOnce(NetworkConfig) -> T,
protocol_id: ProtocolId,
client: Arc<C>,
spawn_handle: &SpawnTaskHandle,
Expand All @@ -82,6 +85,13 @@ where
B::Header: Header<Number = BlockNumber>,
BE: Backend<B>,
C: ClientForAleph<B, BE>,
T: Transport<Output = (PeerId, SM)> + Send + Unpin + 'static,
T::Dial: Send,
T::ListenerUpgrade: Send,
T::Error: Send + Sync,
SM: StreamMuxer + Unpin + Send + 'static,
SM::Substream: Unpin + Send,
SM::Error: Send + Sync,
{
let mut full_network_config = FullNetworkConfiguration::new(network_config);
let genesis_hash = client
Expand Down Expand Up @@ -135,7 +145,8 @@ where
block_announce_config: base_protocol_config,
};

let network_service = NetworkWorker::new(network_params)?;
let network_service =
NetworkWorker::new_with_custom_transport(network_params, transport_builder)?;
let network = network_service.service().clone();
spawn_handle.spawn_blocking("network-worker", SPAWN_CATEGORY, network_service.run());
Ok((network, networks, transactions_prototype))
Expand Down
21 changes: 18 additions & 3 deletions finality-aleph/src/network/build/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::{atomic::AtomicBool, Arc};

use log::error;
use rate_limiter::SharingRateLimiter;
use sc_client_api::Backend;
use sc_network::{
config::{NetworkConfiguration, ProtocolId},
Expand Down Expand Up @@ -28,6 +29,7 @@ mod base;
mod own_protocols;
mod rpc;
mod transactions;
mod transport;

use base::network as base_network;
use own_protocols::Networks;
Expand All @@ -47,10 +49,17 @@ pub struct NetworkOutput<TP: TransactionPool + 'static> {
pub system_rpc_tx: TracingUnboundedSender<RpcRequest<TP::Block>>,
}

pub struct SubstrateNetworkConfig {
/// Maximum bit-rate in bits per second of the substrate network (shared by sync, gossip, etc.).
pub substrate_network_bit_rate: u64,
/// Configuration of the network service.
pub network_config: NetworkConfiguration,
}

/// Start everything necessary to run the inter-node network and return the interfaces for it.
/// This includes everything in the base network, the base protocol service, and services for handling transactions and RPCs.
pub fn network<TP, BE, C>(
network_config: &NetworkConfiguration,
network_config: SubstrateNetworkConfig,
protocol_id: ProtocolId,
client: Arc<C>,
major_sync: Arc<AtomicBool>,
Expand All @@ -72,6 +81,11 @@ where
.expect("Genesis block exists.");
let (base_protocol_config, events_from_network) =
setup_base_protocol::<TP::Block>(genesis_hash);

let network_rate_limit = network_config.substrate_network_bit_rate;
let rate_limiter = SharingRateLimiter::new(network_rate_limit.into());
let transport_builder = |config| transport::build_transport(rate_limiter, config);

let (
network,
Networks {
Expand All @@ -80,7 +94,8 @@ where
},
transaction_prototype,
) = base_network(
network_config,
&network_config.network_config,
transport_builder,
protocol_id,
client.clone(),
spawn_handle,
Expand All @@ -91,7 +106,7 @@ where
let (base_service, syncing_service) = BaseProtocolService::new(
major_sync,
genesis_hash,
network_config,
&network_config.network_config,
protocol_names,
network.clone(),
events_from_network,
Expand Down
Loading