diff --git a/src/main.rs b/src/main.rs index 39777795..47aae04f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -101,8 +101,8 @@ struct Client { /// 'socks5://[::1]:1212' => listen locally with socks5 on port 1212 and forward dynamically requested tunnel /// 'socks5://[::1]:1212?login=admin&password=admin' => listen locally with socks5 on port 1212 and only accept connection with login=admin and password=admin /// - /// 'httpproxy://[::1]:1212' => start a http proxy on port 1212 and forward dynamically requested tunnel - /// 'httpproxy://[::1]:1212?login=admin&password=admin' => start a http proxy on port 1212 and only accept connection with login=admin and password=admin + /// 'http://[::1]:1212' => start a http proxy on port 1212 and forward dynamically requested tunnel + /// 'http://[::1]:1212?login=admin&password=admin' => start a http proxy on port 1212 and only accept connection with login=admin and password=admin /// /// 'tproxy+tcp://[::1]:1212' => listen locally on tcp on port 1212 as a *transparent proxy* and forward dynamically requested tunnel /// 'tproxy+udp://[::1]:1212?timeout_sec=10' listen locally on udp on port 1212 as a *transparent proxy* and forward dynamically requested tunnel @@ -119,7 +119,7 @@ struct Client { /// 'tcp://1212:google.com:443' => listen on server for incoming tcp cnx on port 1212 and forward to google.com on port 443 from local machine /// 'udp://1212:1.1.1.1:53' => listen on server for incoming udp on port 1212 and forward to cloudflare dns 1.1.1.1 on port 53 from local machine /// 'socks5://[::1]:1212' => listen on server for incoming socks5 request on port 1212 and forward dynamically request from local machine (login/password is supported) - /// 'httpproxy://[::1]:1212' => listen on server for incoming http proxy request on port 1212 and forward dynamically request from local machine (login/password is supported) + /// 'http://[::1]:1212' => listen on server for incoming http proxy request on port 1212 and forward dynamically request from local machine (login/password is supported) /// 'unix://wstunnel.sock:g.com:443' => listen on server for incoming data from unix socket of path wstunnel.sock and forward to g.com:443 from local machine #[arg(short='R', long, value_name = "{tcp,udp,socks5,unix}://[BIND:]PORT:HOST:PORT", value_parser = parse_tunnel_arg, verbatim_doc_comment)] remote_to_local: Vec, @@ -550,6 +550,29 @@ fn parse_tunnel_arg(arg: &str) -> Result { remote: (dest_host, dest_port), }) } + "http:/" => { + let (local_bind, remaining) = parse_local_bind(&arg["http://".len()..])?; + let x = format!("0.0.0.0:0?{}", remaining); + let (dest_host, dest_port, options) = parse_tunnel_dest(&x)?; + let proxy_protocol = options.contains_key("proxy_protocol"); + let timeout = options + .get("timeout_sec") + .and_then(|x| x.parse::().ok()) + .map(|d| if d == 0 { None } else { Some(Duration::from_secs(d)) }) + .unwrap_or(Some(Duration::from_secs(30))); + let credentials = options + .get("login") + .and_then(|login| options.get("password").map(|p| (login.to_string(), p.to_string()))); + Ok(LocalToRemote { + local_protocol: LocalProtocol::HttpProxy { + timeout, + credentials, + proxy_protocol, + }, + local: local_bind, + remote: (dest_host, dest_port), + }) + } _ => match &arg[..8] { "socks5:/" => { let (local_bind, remaining) = parse_local_bind(&arg["socks5://".len()..])?; @@ -569,29 +592,6 @@ fn parse_tunnel_arg(arg: &str) -> Result { remote: (dest_host, dest_port), }) } - "httpprox" => { - let (local_bind, remaining) = parse_local_bind(&arg["httpproxy://".len()..])?; - let x = format!("0.0.0.0:0?{}", remaining); - let (dest_host, dest_port, options) = parse_tunnel_dest(&x)?; - let proxy_protocol = options.contains_key("proxy_protocol"); - let timeout = options - .get("timeout_sec") - .and_then(|x| x.parse::().ok()) - .map(|d| if d == 0 { None } else { Some(Duration::from_secs(d)) }) - .unwrap_or(Some(Duration::from_secs(30))); - let credentials = options - .get("login") - .and_then(|login| options.get("password").map(|p| (login.to_string(), p.to_string()))); - Ok(LocalToRemote { - local_protocol: LocalProtocol::HttpProxy { - timeout, - credentials, - proxy_protocol, - }, - local: local_bind, - remote: (dest_host, dest_port), - }) - } "stdio://" => { let (dest_host, dest_port, _options) = parse_tunnel_dest(&arg["stdio://".len()..])?; Ok(LocalToRemote { diff --git a/src/protocols/http_proxy/server.rs b/src/protocols/http_proxy/server.rs index 81af0762..be99330f 100644 --- a/src/protocols/http_proxy/server.rs +++ b/src/protocols/http_proxy/server.rs @@ -5,6 +5,7 @@ use bytes::Bytes; use log::{debug, error}; use std::net::{Ipv4Addr, SocketAddr}; use std::pin::Pin; +use std::sync::Arc; use base64::Engine; use futures_util::{future, stream, Stream}; @@ -17,6 +18,8 @@ use hyper_util::rt::TokioTimer; use parking_lot::Mutex; use std::time::Duration; use tokio::net::{TcpListener, TcpStream}; +use tokio::select; +use tokio::task::JoinSet; use tracing::log::info; use url::Host; @@ -98,29 +101,61 @@ pub async fn run_server( }; let auth_header = credentials.map(|(user, pass)| base64::engine::general_purpose::STANDARD.encode(format!("{}:{}", user, pass))); + let tasks = JoinSet::>::new(); - let listener = stream::unfold((listener, http1, auth_header), |(listener, http1, auth_header)| async { + let proxy_cfg = Arc::new((auth_header, http1)); + let listener = stream::unfold((listener, tasks, proxy_cfg), |(listener, mut tasks, proxy_cfg)| async { loop { - let (mut stream, _) = match listener.accept().await { - Ok(v) => v, - Err(err) => { - error!("Error while accepting connection {:?}", err); - continue; + let (mut stream, forward_to) = select! { + biased; + + cnx = tasks.join_next(), if !tasks.is_empty() => { + match cnx { + Some(Ok(Some((stream, f)))) => (stream, Some(f)), + None | Some(Ok(None)) => continue, + Some(Err(err)) => { + error!("Error while joinning tasks {:?}", err); + continue + }, + } + }, + + stream = listener.accept() => { + match stream { + Ok((stream, _)) => (stream, None), + Err(err) => { + error!("Error while accepting connection {:?}", err); + continue; + } + } } }; - let forward_to = Mutex::new((Host::Ipv4(Ipv4Addr::new(0, 0, 0, 0)), 0)); - let conn_fut = http1.serve_connection( - hyper_util::rt::TokioIo::new(&mut stream), - service_fn(|req| handle_request(&auth_header, &forward_to, req)), - ); - match conn_fut.await { - Ok(_) => return Some((Ok((stream, forward_to.into_inner())), (listener, http1, auth_header))), - Err(err) => { - info!("Error while serving connection: {}", err); - continue; - } + if let Some(forward_to) = forward_to { + return Some((Ok((stream, forward_to)), (listener, tasks, proxy_cfg))); } + + let handle_new_cnx = { + let proxy_cfg = proxy_cfg.clone(); + async move { + let http1 = &proxy_cfg.1; + let auth_header = &proxy_cfg.0; + let forward_to = Mutex::new((Host::Ipv4(Ipv4Addr::new(0, 0, 0, 0)), 0)); + let conn_fut = http1.serve_connection( + hyper_util::rt::TokioIo::new(&mut stream), + service_fn(|req| handle_request(auth_header, &forward_to, req)), + ); + + match conn_fut.await { + Ok(_) => Some((stream, forward_to.into_inner())), + Err(err) => { + info!("Error while serving connection: {}", err); + None + } + } + } + }; + tasks.spawn(handle_new_cnx); } });