Skip to content

Commit

Permalink
async
Browse files Browse the repository at this point in the history
  • Loading branch information
brayniac committed Oct 15, 2024
1 parent c31fe10 commit 6a1d050
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions src/workload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,15 @@ pub fn launch_workload(
let mut seed = [0; 64];
rng.fill_bytes(&mut seed);

workload_rt.spawn_blocking(move || {
// since this seed is unique, each workload thread should produce
// requests in a different sequence
let mut rng = Xoshiro512PlusPlus::from_seed(Seed512(seed));

workload_rt.spawn(async move {
while RUNNING.load(Ordering::Relaxed) {
generator.generate(
&client_sender,
&pubsub_sender,
&store_sender,
&oltp_sender,
&mut rng,
);
Xoshiro512PlusPlus::from_seed(Seed512(seed)),
).await;
}
});
}
Expand Down Expand Up @@ -154,13 +150,13 @@ impl Generator {
self.ratelimiter.clone()
}

pub fn generate(
pub async fn generate(
&self,
client_sender: &Sender<ClientWorkItemKind<ClientRequest>>,
pubsub_sender: &Sender<PublisherWorkItem>,
store_sender: &Sender<ClientWorkItemKind<StoreClientRequest>>,
oltp_sender: &Sender<ClientWorkItemKind<OltpRequest>>,
rng: &mut dyn RngCore,
mut rng: Xoshiro512PlusPlus,
) {
if let Some(ref ratelimiter) = self.ratelimiter {
loop {
Expand All @@ -174,34 +170,38 @@ impl Generator {
}
}

match &self.components[self.component_dist.sample(rng)] {
match &self.components[self.component_dist.sample(&mut rng)] {
Component::Keyspace(keyspace) => {
if client_sender
.send(self.generate_request(keyspace, rng))
.send(self.generate_request(keyspace, &mut rng))
.await
.is_err()
{
REQUEST_DROPPED.increment();
}
}
Component::Topics(topics) => {
if pubsub_sender
.send(self.generate_pubsub(topics, rng))
.send(self.generate_pubsub(topics, &mut rng))
.await
.is_err()
{
REQUEST_DROPPED.increment();
}
}
Component::Store(store) => {
if store_sender
.send(self.generate_store_request(store, rng))
.send(self.generate_store_request(store, &mut rng))
.await
.is_err()
{
REQUEST_DROPPED.increment();
}
}
Component::Oltp(oltp) => {
if oltp_sender
.send(self.generate_oltp_request(oltp, rng))
.send(self.generate_oltp_request(oltp, &mut rng))
.await
.is_err()
{
REQUEST_DROPPED.increment();
Expand Down

0 comments on commit 6a1d050

Please sign in to comment.