diff --git a/Cargo.lock b/Cargo.lock index 08cb8043..04e2fc66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1776,13 +1776,13 @@ dependencies = [ [[package]] name = "ratelimit" -version = "0.7.0" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ab80ec32e625af41cd6ffc39fe4dd943e6f71b437d2e29df65ad2426d1268ef" +checksum = "4e9c124ab68f28501873b8c5ecfa0de688e37cbe4f1b5440ed1971135e7c84e8" dependencies = [ "clocksource", - "parking_lot", - "thiserror", + "rand", + "rand_distr", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 3e1464ea..8e821b51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ protocol-ping = { git = "https://github.com/pelikan-io/pelikan" } rand = "0.8.5" rand_distr = "0.4.3" rand_xoshiro = "0.6.0" -ratelimit = "0.7.0" +ratelimit = "0.5.1" redis = { version = "0.22.3", features = ["tokio-comp"] } ringlog = "0.2.0" serde = "1.0.144" diff --git a/src/admin/mod.rs b/src/admin/mod.rs index b25c0394..a703d6f5 100644 --- a/src/admin/mod.rs +++ b/src/admin/mod.rs @@ -330,17 +330,7 @@ mod handlers { ratelimit: Option>, ) -> Result { if let Some(r) = ratelimit { - let amount = (rate as f64 / 1_000_000.0).ceil() as u64; - let interval = Duration::from_micros(1_000_000 / (rate / amount)); - let capacity = std::cmp::max(100, amount); - - r.set_max_tokens(capacity) - .expect("failed to set max tokens"); - r.set_refill_interval(interval) - .expect("failed to set refill interval"); - r.set_refill_amount(amount) - .expect("failed to set refill amount"); - + r.set_rate(rate); Ok(StatusCode::OK) } else { Ok(StatusCode::NOT_FOUND) diff --git a/src/workload/mod.rs b/src/workload/mod.rs index d671ad99..eab84ca0 100644 --- a/src/workload/mod.rs +++ b/src/workload/mod.rs @@ -76,18 +76,10 @@ pub struct Generator { impl Generator { pub fn new(config: &Config) -> Self { - let ratelimiter = config.workload().ratelimit().map(|rate| { - let amount = (rate.get() as f64 / 1_000_000.0).ceil() as u64; - let interval = Duration::from_micros(1_000_000 / (rate.get() / amount)); - let capacity = std::cmp::max(100, amount); - - Arc::new( - Ratelimiter::builder(amount, interval) - .max_tokens(capacity) - .build() - .expect("failed to initialize ratelimiter"), - ) - }); + let ratelimiter = config + .workload() + .ratelimit() + .map(|r| Arc::new(Ratelimiter::new(1000, 1, r.into()))); let mut components = Vec::new(); let mut component_weights = Vec::new(); @@ -642,18 +634,11 @@ pub async fn reconnect(work_sender: Sender, config: Config) -> R return Ok(()); } - let ratelimiter = config.client().unwrap().reconnect_rate().map(|rate| { - let amount = (rate.get() as f64 / 1_000_000.0).ceil() as u64; - let interval = Duration::from_micros(1_000_000 / (rate.get() / amount)); - let capacity = std::cmp::max(100, amount); - - Arc::new( - Ratelimiter::builder(amount, interval) - .max_tokens(capacity) - .build() - .expect("failed to initialize ratelimiter"), - ) - }); + let ratelimiter = config + .client() + .unwrap() + .reconnect_rate() + .map(|r| Arc::new(Ratelimiter::new(1000, 1, r.into()))); if ratelimiter.is_none() { return Ok(()); @@ -662,14 +647,8 @@ pub async fn reconnect(work_sender: Sender, config: Config) -> R let ratelimiter = ratelimiter.unwrap(); while RUNNING.load(Ordering::Relaxed) { - match ratelimiter.try_wait() { - Ok(_) => { - let _ = work_sender.send(ClientWorkItem::Reconnect).await; - } - Err(d) => { - std::thread::sleep(d); - } - } + ratelimiter.wait(); + let _ = work_sender.send(ClientWorkItem::Reconnect).await; } Ok(())