diff --git a/src/workload/mod.rs b/src/workload/mod.rs index 8131730..bb19597 100644 --- a/src/workload/mod.rs +++ b/src/workload/mod.rs @@ -63,19 +63,15 @@ pub fn launch_workload( let mut seed = [0; 64]; rng.fill_bytes(&mut seed); - workload_rt.spawn_blocking(move || { - // since this seed is unique, each workload thread should produce - // requests in a different sequence - let mut rng = Xoshiro512PlusPlus::from_seed(Seed512(seed)); - + workload_rt.spawn(async move { while RUNNING.load(Ordering::Relaxed) { generator.generate( &client_sender, &pubsub_sender, &store_sender, &oltp_sender, - &mut rng, - ); + Xoshiro512PlusPlus::from_seed(Seed512(seed)), + ).await; } }); } @@ -154,13 +150,13 @@ impl Generator { self.ratelimiter.clone() } - pub fn generate( + pub async fn generate( &self, client_sender: &Sender>, pubsub_sender: &Sender, store_sender: &Sender>, oltp_sender: &Sender>, - rng: &mut dyn RngCore, + mut rng: Xoshiro512PlusPlus, ) { if let Some(ref ratelimiter) = self.ratelimiter { loop { @@ -174,10 +170,11 @@ impl Generator { } } - match &self.components[self.component_dist.sample(rng)] { + match &self.components[self.component_dist.sample(&mut rng)] { Component::Keyspace(keyspace) => { if client_sender - .send(self.generate_request(keyspace, rng)) + .send(self.generate_request(keyspace, &mut rng)) + .await .is_err() { REQUEST_DROPPED.increment(); @@ -185,7 +182,8 @@ impl Generator { } Component::Topics(topics) => { if pubsub_sender - .send(self.generate_pubsub(topics, rng)) + .send(self.generate_pubsub(topics, &mut rng)) + .await .is_err() { REQUEST_DROPPED.increment(); @@ -193,7 +191,8 @@ impl Generator { } Component::Store(store) => { if store_sender - .send(self.generate_store_request(store, rng)) + .send(self.generate_store_request(store, &mut rng)) + .await .is_err() { REQUEST_DROPPED.increment(); @@ -201,7 +200,8 @@ impl Generator { } Component::Oltp(oltp) => { if oltp_sender - .send(self.generate_oltp_request(oltp, rng)) + .send(self.generate_oltp_request(oltp, &mut rng)) + .await .is_err() { REQUEST_DROPPED.increment();