From eb80aa5ceed8c422f6f07172b18393c98a81c496 Mon Sep 17 00:00:00 2001 From: erhant Date: Thu, 29 Aug 2024 12:12:42 +0300 Subject: [PATCH] limit send queue size --- Cargo.lock | 52 ++++++++++++++++++++++---------------------- Cargo.toml | 2 +- src/p2p/behaviour.rs | 20 +++++++++++++---- 3 files changed, 43 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2d04418..2c1baab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2366,7 +2366,7 @@ checksum = "a5f43f184355eefb8d17fc948dbecf6c13be3c141f20d834ae842193a448c72a" [[package]] name = "libp2p" version = "0.54.1" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "bytes 1.7.1", "either", @@ -2402,7 +2402,7 @@ dependencies = [ [[package]] name = "libp2p-allow-block-list" version = "0.4.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "libp2p-core", "libp2p-identity", @@ -2413,7 +2413,7 @@ dependencies = [ [[package]] name = "libp2p-autonat" version = "0.13.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "async-trait", "asynchronous-codec", @@ -2439,7 +2439,7 @@ dependencies = [ [[package]] name = "libp2p-connection-limits" version = "0.4.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "libp2p-core", "libp2p-identity", @@ -2450,7 +2450,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.42.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "either", "fnv", @@ -2477,7 +2477,7 @@ dependencies = [ [[package]] name = "libp2p-dcutr" version = "0.12.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "asynchronous-codec", "either", @@ -2499,7 +2499,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.42.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "async-trait", "futures", @@ -2514,7 +2514,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.47.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "asynchronous-codec", "base64 0.22.1", @@ -2545,7 +2545,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.45.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "asynchronous-codec", "either", @@ -2587,7 +2587,7 @@ dependencies = [ [[package]] name = "libp2p-kad" version = "0.46.1" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "arrayvec", "asynchronous-codec", @@ -2615,7 +2615,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" version = "0.46.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "data-encoding", "futures", @@ -2635,7 +2635,7 @@ dependencies = [ [[package]] name = "libp2p-metrics" version = "0.14.2" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "futures", "libp2p-core", @@ -2655,7 +2655,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.45.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "asynchronous-codec", "bytes 1.7.1", @@ -2680,7 +2680,7 @@ dependencies = [ [[package]] name = "libp2p-ping" version = "0.45.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "either", "futures", @@ -2697,7 +2697,7 @@ dependencies = [ [[package]] name = "libp2p-quic" version = "0.11.1" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "bytes 1.7.1", "futures", @@ -2720,7 +2720,7 @@ dependencies = [ [[package]] name = "libp2p-relay" version = "0.18.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "asynchronous-codec", "bytes 1.7.1", @@ -2744,7 +2744,7 @@ dependencies = [ [[package]] name = "libp2p-request-response" version = "0.27.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "async-trait", "futures", @@ -2763,7 +2763,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.45.1" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "either", "fnv", @@ -2786,7 +2786,7 @@ dependencies = [ [[package]] name = "libp2p-swarm-derive" version = "0.35.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -2797,7 +2797,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.42.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "futures", "futures-timer", @@ -2813,7 +2813,7 @@ dependencies = [ [[package]] name = "libp2p-tls" version = "0.5.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "futures", "futures-rustls", @@ -2831,7 +2831,7 @@ dependencies = [ [[package]] name = "libp2p-upnp" version = "0.3.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "futures", "futures-timer", @@ -2846,7 +2846,7 @@ dependencies = [ [[package]] name = "libp2p-yamux" version = "0.46.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "either", "futures", @@ -3223,7 +3223,7 @@ dependencies = [ [[package]] name = "multistream-select" version = "0.13.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "bytes 1.7.1", "futures", @@ -3944,7 +3944,7 @@ dependencies = [ [[package]] name = "quick-protobuf-codec" version = "0.3.1" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "asynchronous-codec", "bytes 1.7.1", @@ -4520,7 +4520,7 @@ checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" [[package]] name = "rw-stream-sink" version = "0.4.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=62f910e#62f910e30096718d0849a3b35b062c4a9b89ab4f" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=3c55e95#3c55e954856d0bb95039d04f4001d923d109e220" dependencies = [ "futures", "pin-project", diff --git a/Cargo.toml b/Cargo.toml index 9392924..dc64d5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,7 +48,7 @@ fastbloom-rs = "0.5.9" ollama-workflows = { git = "https://github.com/andthattoo/ollama-workflows", rev = "d6b2e1e" } # peer-to-peer -libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "62f910e", features = [ +libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "3c55e95", features = [ # libp2p = { version = "0.54.1", features = [ "dcutr", "ping", diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index 5a510e5..e5f04c8 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -98,6 +98,12 @@ fn create_gossipsub_behavior(author: PeerId) -> gossipsub::Behaviour { /// and check their fields based on whether they exist or not. const VALIDATION_MODE: ValidationMode = ValidationMode::Permissive; + /// Heartbeat interval in seconds + const HEARTBEAT_INTERVAL_SECS: u64 = 10; + + /// Duplicate cache time in seconds + const DUPLICATE_CACHE_TIME_SECS: u64 = 120; + /// Gossip cache TTL in seconds const GOSSIP_TTL_SECS: u64 = 100; @@ -111,6 +117,10 @@ fn create_gossipsub_behavior(author: PeerId) -> gossipsub::Behaviour { /// because we don't need historic messages at all const MAX_IHAVE_LENGTH: usize = 100; + /// Max size of the send queue + /// This helps to avoid memory exhaustion during high load + const MAX_SEND_QUEUE_SIZE: usize = 400; + // message id's are simply hashes of the message data let message_id_fn = |message: &Message| { let mut hasher = hash_map::DefaultHasher::new(); @@ -123,15 +133,17 @@ fn create_gossipsub_behavior(author: PeerId) -> gossipsub::Behaviour { Behaviour::new( MessageAuthenticity::Author(author), ConfigBuilder::default() - .heartbeat_interval(Duration::from_secs(10)) - .max_transmit_size(MAX_TRANSMIT_SIZE) // 256 KB + .heartbeat_interval(Duration::from_secs(HEARTBEAT_INTERVAL_SECS)) + .max_transmit_size(MAX_TRANSMIT_SIZE) .message_id_fn(message_id_fn) + .message_capacity(MESSAGE_CAPACITY) .message_ttl(Duration::from_secs(MESSAGE_TTL_SECS)) .gossip_ttl(Duration::from_secs(GOSSIP_TTL_SECS)) - .message_capacity(MESSAGE_CAPACITY) + .duplicate_cache_time(Duration::from_secs(DUPLICATE_CACHE_TIME_SECS)) + .max_ihave_length(MAX_IHAVE_LENGTH) + .send_queue_size(MAX_SEND_QUEUE_SIZE) .validation_mode(VALIDATION_MODE) .validate_messages() - .max_ihave_length(MAX_IHAVE_LENGTH) .build() .expect("Valid config"), // TODO: better error handling )