Skip to content

Commit

Permalink
removed old menu lock
Browse files Browse the repository at this point in the history
Signed-off-by: wadeking98 <wkingnumber2@gmail.com>
  • Loading branch information
wadeking98 committed Oct 15, 2023
1 parent b6a844d commit 2b10d43
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 80 deletions.
39 changes: 7 additions & 32 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ fn get_prompt() -> (String, String) {
fn input_loop(
shells: Arc<Mutex<HashMap<String, connection::Handle>>>,
menu: menu_list::MenuList,
mut menu_channel_acquire: mpsc::Receiver<()>,
menu_channel_release: mpsc::Sender<()>,
init_message: Option<String>,
) {
tokio::spawn(async move {
Expand Down Expand Up @@ -99,12 +97,10 @@ fn input_loop(
}
};

entry(shells.clone(), menu_channel_release.clone());

if !key.eq(&String::new()) {
if key.eq("l") {
menu_channel_acquire.recv().await.unwrap();
}
let handle_opt = entry(shells.clone());
if handle_opt.is_some() {
let join_handle = handle_opt.unwrap();
join_handle.await.unwrap_or_default();
}
}
});
Expand Down Expand Up @@ -133,7 +129,6 @@ async fn soc_is_shell(soc: &mut TcpStream, soc_key: String) -> bool {
async fn handle_new_shell(
mut soc: TcpStream,
connected_shells: Arc<Mutex<HashMap<String, connection::Handle>>>,
menu_channel_release: mpsc::Sender<()>,
skip_validation: Option<bool>,
) {
let (handle_to_soc_send, handle_to_soc_recv) = mpsc::channel::<String>(1024);
Expand All @@ -159,12 +154,7 @@ async fn handle_new_shell(
handle_to_soc_recv,
soc_kill_sig_recv,
);
handle.handle_listen(
handle_to_soc_send,
soc_to_handle_recv,
menu_channel_release,
stdout,
);
handle.handle_listen(handle_to_soc_send, soc_to_handle_recv, stdout);
shells.insert(soc_key, handle);
} else {
return;
Expand Down Expand Up @@ -198,7 +188,6 @@ async fn main() {
return;
}
let connected_shells = Arc::new(Mutex::new(HashMap::<String, connection::Handle>::new()));
let (menu_channel_release, menu_channel_acquire) = mpsc::channel::<()>(1024);
let menu = menu_list::new();

// get user input
Expand All @@ -207,25 +196,13 @@ async fn main() {
red = color::Fg(color::LightRed),
reset = color::Fg(color::Reset)
);
input_loop(
connected_shells.clone(),
menu,
menu_channel_acquire,
menu_channel_release.clone(),
Some(init_message),
);
input_loop(connected_shells.clone(), menu, Some(init_message));
let socket_stream = listener::catch_sockets(bound_addr, bound_port);
pin_mut!(socket_stream);

loop {
let soc = socket_stream.next().await.unwrap().unwrap();
handle_new_shell(
soc,
connected_shells.clone(),
menu_channel_release.clone(),
None,
)
.await;
handle_new_shell(soc, connected_shells.clone(), None).await;
}
}

Expand Down Expand Up @@ -265,11 +242,9 @@ mod tests {
let (soc, _) = listener.accept().await.unwrap();
let connected_shells =
Arc::new(Mutex::new(HashMap::<String, connection::Handle>::new()));
let (menu_channel_release, _) = mpsc::channel::<()>(1024);
handle_new_shell(
soc,
connected_shells.clone(),
menu_channel_release,
Some(true),
)
.await;
Expand Down
73 changes: 38 additions & 35 deletions src/menu/menu_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ use termion::event::Key;
use termion::input::TermRead;
use termion::raw::{IntoRawMode, RawTerminal};
use termion::{clear, color, cursor, terminal_size};
use tokio::sync::{mpsc, Mutex, MutexGuard};
use tokio::sync::{Mutex, MutexGuard};
use tokio::task::JoinHandle;

use crate::socket::connection;

pub type MenuListValue = Box<
dyn Fn(Arc<Mutex<HashMap<String, connection::Handle>>>, mpsc::Sender<()>)
dyn Fn(Arc<Mutex<HashMap<String, connection::Handle>>>) -> Option<JoinHandle<()>>
+ Send
+ Sync
+ 'static,
Expand All @@ -33,17 +34,11 @@ pub fn clear() {
}

pub fn exit() {
std::process::exit(0)
std::process::exit(0);
}

fn start(key: String, connected_shells: &MutexGuard<HashMap<String, connection::Handle>>) {
let handle = match connected_shells.get(&key) {
Some(val) => {val},
None => {
println!("Invalid session key!");
return;
}
};
/// Sends the satrt signal to a handle, await until the handle is paused
async fn start(handle: connection::Handle) {

//start handler
let mut stdout = stdout();
Expand All @@ -66,6 +61,13 @@ fn start(key: String, connected_shells: &MutexGuard<HashMap<String, connection::
.unwrap();
}
handle.tx.send("start").unwrap();

loop{
let sig = handle.tx.subscribe().recv().await.unwrap();
if sig == "quit"{
return;
}
}
}

fn delete(key: String, connected_shells: &mut MutexGuard<HashMap<String, connection::Handle>>) {
Expand Down Expand Up @@ -150,18 +152,13 @@ fn alias(
}

macro_rules! unlock_menu {
($menu_channel_release:expr) => {
() => {
println!(
"\r\n{show}{blink}{clear}",
show = cursor::Show,
blink = cursor::BlinkingBlock,
clear = clear::AfterCursor
);
let menu_esc_release = $menu_channel_release.clone();
tokio::spawn(async move {
menu_esc_release.send(()).await.unwrap();
return;
});
};
}

Expand Down Expand Up @@ -240,9 +237,8 @@ fn refresh_list_display(
pub fn new() -> MenuList {
let mut menu: MenuList = HashMap::new();

let list = |connected_shells: Arc<Mutex<HashMap<String, connection::Handle>>>,
menu_channel_release: mpsc::Sender<()>| {
tokio::spawn(async move {
let list = |connected_shells: Arc<Mutex<HashMap<String, connection::Handle>>>| -> Option<JoinHandle<()>> {
Some(tokio::spawn(async move {
let stdin = stdin();
let mut stdout = stdout().into_raw_mode().unwrap();
let mut shell_list: Vec<(String, connection::Handle)>;
Expand All @@ -268,7 +264,7 @@ pub fn new() -> MenuList {
{
match key.unwrap() {
Key::Esc => {
unlock_menu!(menu_channel_release);
unlock_menu!();
return;
}
Key::Up => {
Expand All @@ -283,12 +279,19 @@ pub fn new() -> MenuList {
}
Key::Char('\n') | Key::Char('\r') => {
let key = keys[cur_idx].to_owned();
start(key, &shells);
println!(
"\r\n{show}{blink}",
show = cursor::Show,
blink = cursor::BlinkingBlock
);
let handle_opt = shells.get(&key);
if handle_opt.is_some(){
let handle = handle_opt.unwrap().clone();
println!(
"\r\n{show}{blink}{clear}",
show = cursor::Show,
blink = cursor::BlinkingBlock,
clear = clear::AfterCursor
);
// drop the mutex guard so we're not holding and waiting
drop(shells);
start(handle).await;
}
return;
}
Key::Char('r') => {
Expand All @@ -311,7 +314,7 @@ pub fn new() -> MenuList {
stdout.flush().unwrap();

if shells.is_empty() {
unlock_menu!(menu_channel_release);
unlock_menu!();
return;
}
}
Expand All @@ -320,8 +323,7 @@ pub fn new() -> MenuList {

alias(
key,
(((start_pos + cur_idx as u16) as i16)
- (shells.len() as i16))
(((start_pos + cur_idx as u16) as i16) - (shells.len() as i16))
as u16,
&mut shells,
);
Expand All @@ -337,20 +339,21 @@ pub fn new() -> MenuList {
refresh_list_display(&mut stdout, cur_idx, shell_list);
}
}
unlock_menu!(menu_channel_release);
});
unlock_menu!();
}))
};
menu.insert("l", Box::new(list));

let clear = |_, _| {
let clear = |_| {
clear();
None
};

menu.insert("clear", Box::new(clear));

menu.insert("h", Box::new(|_, _| help()));
menu.insert("h", Box::new(|_| {help(); None}));

menu.insert("exit", Box::new(|_, _| exit()));
menu.insert("exit", Box::new(|_| {exit(); None}));

return menu;
}
16 changes: 5 additions & 11 deletions src/socket/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,10 @@ impl Handle {
&self,
handle_to_soc_send: Sender<String>,
mut soc_to_handle_recv: Receiver<String>,
menu_channel_release: Sender<()>,
mut stdout: RawTerminal<W>,
) where
W: Write + Send + 'static,
{
let menu_channel_release_1 = menu_channel_release.clone();
let tx = self.tx.clone();
let rl = self.rl.clone();
let tx_copy = self.tx.clone();
Expand All @@ -113,7 +111,7 @@ impl Handle {

loop {
if !active {
if listener::wait_for_signal(tx_copy.subscribe(), "start", &mut raw_mode)
if listener::wait_for_signal(tx_copy.subscribe(), "start", Some(&mut raw_mode))
.await
.is_err()
{
Expand All @@ -139,7 +137,7 @@ impl Handle {
continue;
}
}
_ = listener::wait_for_signal(tx_copy.subscribe(), "quit", &mut raw_mode) =>{
_ = listener::wait_for_signal(tx_copy.subscribe(), "quit", Some(&mut raw_mode)) =>{
stdout.suspend_raw_mode().unwrap();
active = false;
}
Expand All @@ -159,7 +157,7 @@ impl Handle {
// start writer
tokio::spawn(async move {
// wait for start signal
if listener::wait_for_signal(tx.subscribe(), "start", &mut raw_mode)
if listener::wait_for_signal(tx.subscribe(), "start", Some(&mut raw_mode))
.await
.is_err()
{
Expand All @@ -178,10 +176,9 @@ impl Handle {
println!("{clear}", clear = clear::BeforeCursor);
//notify the reader that we're pausing
tx.send("quit").unwrap();
menu_channel_release_1.send(()).await.unwrap();
// send a new line so we get a prompt when we return
content = String::from("\n");
if listener::wait_for_signal(tx.subscribe(), "start", &mut raw_mode)
if listener::wait_for_signal(tx.subscribe(), "start", Some(&mut raw_mode))
.await
.is_err()
{
Expand All @@ -203,8 +200,7 @@ impl Handle {
println!("{clear}", clear = clear::BeforeCursor);
tx.send("quit").unwrap();
raw_mode_tx.send(false).await.unwrap();
menu_channel_release_1.send(()).await.unwrap();
if listener::wait_for_signal(tx.subscribe(), "start", &mut raw_mode)
if listener::wait_for_signal(tx.subscribe(), "start", Some(&mut raw_mode))
.await
.is_err()
{
Expand Down Expand Up @@ -304,7 +300,6 @@ mod tests {
let (handle, cancel_token) = Handle::new();
let (handle_to_soc_send, handle_to_soc_recv) = mpsc::channel::<String>(1024);
let (soc_to_handle_send, soc_to_handle_recv) = watch::channel::<String>(String::from(""));
let (menu_channel_release, _) = mpsc::channel::<()>(1024);
let out = std::io::Cursor::new(Vec::new()).into_raw_mode().unwrap();
listener::start_socket(
stream,
Expand All @@ -315,7 +310,6 @@ mod tests {
handle.handle_listen(
handle_to_soc_send.clone(),
soc_to_handle_recv.clone(),
menu_channel_release,
out
);
let mut rx = handle.tx.subscribe();
Expand Down
5 changes: 3 additions & 2 deletions src/socket/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ pub fn catch_sockets(addr: String, port: u16) -> impl Stream<Item = io::Result<T
pub async fn wait_for_signal(
mut receiver: BroadcastReceiver<&str>,
signal: &str,
raw_mode: &mut bool,
mut raw_mode_opt: Option<&mut bool>,
) -> Result<(), Error> {
loop {
match receiver.recv().await {
Ok(val) => {
if val.eq(signal) {
break;
} else if val.eq("raw") {
} else if raw_mode_opt.is_some() && val.eq("raw") {
let raw_mode = raw_mode_opt.take().unwrap();
*raw_mode = !*raw_mode;
} else if val.eq("delete") {
return Err(Error::new(ErrorKind::Interrupted, "Delete signal received"));
Expand Down

0 comments on commit 2b10d43

Please sign in to comment.