Skip to content

Commit

Permalink
optimistically try to receive without using blocking task
Browse files Browse the repository at this point in the history
  • Loading branch information
brayniac committed Jul 27, 2023
1 parent 71e15da commit 9698786
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/clients/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::workload::ClientRequest;
use crate::workload::ClientWorkItem as WorkItem;
use crate::*;
use ::momento::MomentoError;
use crossbeam::channel::Receiver;
use crossbeam::channel::{Receiver, TryRecvError};
use std::io::{Error, ErrorKind, Result};
use tokio::io::*;
use tokio::runtime::Runtime;
Expand Down
12 changes: 10 additions & 2 deletions src/clients/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,16 @@ async fn task(work_receiver: Receiver<WorkItem>, endpoint: String, config: Confi
}

let mut con = connection.take().unwrap();
let work_item = tokio::task::block_in_place(|| work_receiver.recv())
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?;
let work_item = match work_receiver.try_recv() {
Ok(item) => item,
Err(TryRecvError::Empty) => {
tokio::task::block_in_place(|| work_receiver.recv())
.map_err(|_| Error::new(ErrorKind::Other, "channel closed"))?
}
Err(_) => {
return Err(Error::new(ErrorKind::Other, "channel closed"));
}
};

REQUEST.increment();
let start = Instant::now();
Expand Down

0 comments on commit 9698786

Please sign in to comment.