Skip to content

Commit

Permalink
WIP: replace async-channel
Browse files Browse the repository at this point in the history
WIP to replace async-channel for performance reasons
  • Loading branch information
brayniac committed Jul 27, 2023
1 parent 65a5d02 commit 8f14f66
Show file tree
Hide file tree
Showing 13 changed files with 51 additions and 14 deletions.
36 changes: 35 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ bytes = "1.2.1"
clap = "4.1.6"
clocksource = "0.6.0"
foreign-types-shared = "0.3.1"
flume = "0.10.14"
futures = "0.3.28"
http-body-util = "0.1.0-rc.2"
hyper = { version = "1.0.0-rc.3", features = ["http1", "http2", "client"]}
Expand Down
2 changes: 1 addition & 1 deletion src/clients/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi
let mut s = session.take().unwrap();

let work_item = work_receiver
.recv()
.recv_async()
.await
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?;

Expand Down
2 changes: 1 addition & 1 deletion src/clients/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ async fn task(
let mut s = sender.take().unwrap();

let work_item = work_receiver
.recv()
.recv_async()
.await
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?;

Expand Down
2 changes: 1 addition & 1 deletion src/clients/memcache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi
let mut s = stream.take().unwrap();

let work_item = work_receiver
.recv()
.recv_async()
.await
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?;

Expand Down
2 changes: 1 addition & 1 deletion src/clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::workload::ClientRequest;
use crate::workload::ClientWorkItem as WorkItem;
use crate::*;
use ::momento::MomentoError;
use async_channel::Receiver;
use flume::Receiver;
use std::io::{Error, ErrorKind, Result};
use tokio::io::*;
use tokio::runtime::Runtime;
Expand Down
2 changes: 1 addition & 1 deletion src/clients/momento/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn task(

while RUNNING.load(Ordering::Relaxed) {
let work_item = work_receiver
.recv()
.recv_async()
.await
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?;

Expand Down
2 changes: 1 addition & 1 deletion src/clients/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi
let mut s = stream.take().unwrap();

let work_item = work_receiver
.recv()
.recv_async()
.await
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?;

Expand Down
2 changes: 1 addition & 1 deletion src/clients/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi

let mut con = connection.take().unwrap();
let work_item = work_receiver
.recv()
.recv_async()
.await
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?;

Expand Down
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use flume::bounded;
use crate::clients::launch_clients;
use crate::pubsub::launch_pubsub;
use crate::workload::{launch_workload, Generator};
use async_channel::{bounded, Sender};
// use async_channel::{bounded, Sender};
use backtrace::Backtrace;
use clap::{Arg, Command};
use core::sync::atomic::{AtomicBool, Ordering};
Expand Down
2 changes: 1 addition & 1 deletion src/pubsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::clients::*;
use crate::workload::Component;
use crate::workload::PublisherWorkItem as WorkItem;
use crate::*;
use async_channel::Receiver;
use flume::Receiver;
use std::io::{Error, ErrorKind, Result};
use tokio::runtime::Runtime;

Expand Down
2 changes: 1 addition & 1 deletion src/pubsub/momento.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ async fn publisher_task(

while RUNNING.load(Ordering::Relaxed) {
let work_item = work_receiver
.recv()
.recv_async()
.await
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?;

Expand Down
7 changes: 4 additions & 3 deletions src/workload/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use flume::Sender;
use super::*;
use config::{Command, ValueKind, Verb};
use rand::distributions::{Alphanumeric, Uniform};
Expand Down Expand Up @@ -141,10 +142,10 @@ impl Generator {

match &self.components[self.component_dist.sample(rng)] {
Component::Keyspace(keyspace) => {
let _ = client_sender.send_blocking(self.generate_request(keyspace, rng));
let _ = client_sender.send(self.generate_request(keyspace, rng));
}
Component::Topics(topics) => {
let _ = pubsub_sender.send_blocking(self.generate_pubsub(topics, rng));
let _ = pubsub_sender.send(self.generate_pubsub(topics, rng));
}
}
}
Expand Down Expand Up @@ -666,7 +667,7 @@ pub async fn reconnect(work_sender: Sender<ClientWorkItem>, config: Config) -> R
while RUNNING.load(Ordering::Relaxed) {
match ratelimiter.try_wait() {
Ok(_) => {
let _ = work_sender.send(ClientWorkItem::Reconnect).await;
let _ = work_sender.send_async(ClientWorkItem::Reconnect).await;
}
Err(d) => {
std::thread::sleep(d);
Expand Down

0 comments on commit 8f14f66

Please sign in to comment.