Skip to content

Commit

Permalink
use crossbeam queue
Browse files Browse the repository at this point in the history
  • Loading branch information
brayniac committed Jul 27, 2023
1 parent 8f14f66 commit 71e15da
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 62 deletions.
104 changes: 69 additions & 35 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ boring-sys = "2.1.0"
bytes = "1.2.1"
clap = "4.1.6"
clocksource = "0.6.0"
crossbeam = "0.8.2"
foreign-types-shared = "0.3.1"
flume = "0.10.14"
futures = "0.3.28"
http-body-util = "0.1.0-rc.2"
hyper = { version = "1.0.0-rc.3", features = ["http1", "http2", "client"]}
Expand Down
4 changes: 1 addition & 3 deletions src/clients/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi

let mut s = session.take().unwrap();

let work_item = work_receiver
.recv_async()
.await
let work_item = tokio::task::block_in_place(|| work_receiver.recv())
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?;

REQUEST.increment();
Expand Down
4 changes: 1 addition & 3 deletions src/clients/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ async fn task(

let mut s = sender.take().unwrap();

let work_item = work_receiver
.recv_async()
.await
let work_item = tokio::task::block_in_place(|| work_receiver.recv())
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?;

REQUEST.increment();
Expand Down
4 changes: 1 addition & 3 deletions src/clients/memcache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi

let mut s = stream.take().unwrap();

let work_item = work_receiver
.recv_async()
.await
let work_item = tokio::task::block_in_place(|| work_receiver.recv())
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?;

REQUEST.increment();
Expand Down
2 changes: 1 addition & 1 deletion src/clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::workload::ClientRequest;
use crate::workload::ClientWorkItem as WorkItem;
use crate::*;
use ::momento::MomentoError;
use flume::Receiver;
use crossbeam::channel::Receiver;
use std::io::{Error, ErrorKind, Result};
use tokio::io::*;
use tokio::runtime::Runtime;
Expand Down
4 changes: 1 addition & 3 deletions src/clients/momento/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ async fn task(
});

while RUNNING.load(Ordering::Relaxed) {
let work_item = work_receiver
.recv_async()
.await
let work_item = tokio::task::block_in_place(|| work_receiver.recv())
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?;

REQUEST.increment();
Expand Down
4 changes: 1 addition & 3 deletions src/clients/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi

let mut s = stream.take().unwrap();

let work_item = work_receiver
.recv_async()
.await
let work_item = tokio::task::block_in_place(|| work_receiver.recv())
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?;

REQUEST.increment();
Expand Down
4 changes: 1 addition & 3 deletions src/clients/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi
}

let mut con = connection.take().unwrap();
let work_item = work_receiver
.recv_async()
.await
let work_item = tokio::task::block_in_place(|| work_receiver.recv())
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?;

REQUEST.increment();
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use flume::bounded;
use crate::clients::launch_clients;
use crate::pubsub::launch_pubsub;
use crate::workload::{launch_workload, Generator};
use crossbeam::channel::bounded;
// use async_channel::{bounded, Sender};
use backtrace::Backtrace;
use clap::{Arg, Command};
Expand Down
2 changes: 1 addition & 1 deletion src/pubsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::clients::*;
use crate::workload::Component;
use crate::workload::PublisherWorkItem as WorkItem;
use crate::*;
use flume::Receiver;
use crossbeam::channel::Receiver;
use std::io::{Error, ErrorKind, Result};
use tokio::runtime::Runtime;

Expand Down
7 changes: 4 additions & 3 deletions src/pubsub/momento.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,11 @@ async fn publisher_task(
);

while RUNNING.load(Ordering::Relaxed) {
let work_item = work_receiver
.recv_async()
.await
let work_item = tokio::task::block_in_place(|| work_receiver.recv())
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?;
// .recv_async()
// .await
// ;

REQUEST.increment();
let start = Instant::now();
Expand Down
4 changes: 2 additions & 2 deletions src/workload/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use flume::Sender;
use super::*;
use config::{Command, ValueKind, Verb};
use crossbeam::channel::Sender;
use rand::distributions::{Alphanumeric, Uniform};
use rand::{Rng, RngCore, SeedableRng};
use rand_distr::Distribution as RandomDistribution;
Expand Down Expand Up @@ -667,7 +667,7 @@ pub async fn reconnect(work_sender: Sender<ClientWorkItem>, config: Config) -> R
while RUNNING.load(Ordering::Relaxed) {
match ratelimiter.try_wait() {
Ok(_) => {
let _ = work_sender.send_async(ClientWorkItem::Reconnect).await;
let _ = tokio::task::block_in_place(|| work_sender.send(ClientWorkItem::Reconnect));
}
Err(d) => {
std::thread::sleep(d);
Expand Down

0 comments on commit 71e15da

Please sign in to comment.