Skip to content

Commit

Permalink
feat(reverse-tunnel): allow multiple waiters & auto-shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
erebe committed Aug 4, 2024
1 parent 811a1e6 commit abb3857
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 147 deletions.
80 changes: 65 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ base64 = "0.22.1"
scopeguard = "1.2.0"

bb8 = { version = "0.8", features = [] }
bytes = { version = "1.6.1", features = [] }
clap = { version = "4.5.11", features = ["derive", "env"] }
bytes = { version = "1.7.1", features = [] }
clap = { version = "4.5.13", features = ["derive", "env"] }
fast-socks5 = { version = "0.9.6", features = [] }
fastwebsockets = { version = "0.8.0", features = ["upgrade", "simd", "unstable-split"] }
futures-util = { version = "0.3.30" }
hickory-resolver = { version = "0.24.1", features = ["tokio", "dns-over-https-rustls", "dns-over-rustls", "native-certs"] }
ppp = { version = "2.2.0", features = [] }
async-channel = { version = "2.3.1", features = [] }

# For config file parsing
regex = { version = "1.10.5", default-features = false, features = ["std", "perf"] }
Expand Down Expand Up @@ -58,7 +59,7 @@ urlencoding = "2.1.3"
uuid = { version = "1.10.0", features = ["v7", "serde"] }

[target.'cfg(not(target_family = "unix"))'.dependencies]
crossterm = { version = "0.27.0" }
crossterm = { version = "0.28.1" }
tokio-util = { version = "0.7.11", features = ["io"] }

[target.'cfg(target_family = "unix")'.dependencies]
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::restrictions::types::RestrictionsRules;
use crate::tunnel::client::{TlsClientConfig, WsClient, WsClientConfig};
use crate::tunnel::connectors::{Socks5TunnelConnector, TcpTunnelConnector, UdpTunnelConnector};
use crate::tunnel::listeners::{
new_stdio_listener, new_udp_listener, HttpProxyTunnelListener, Socks5TunnelListener, TcpTunnelListener,
new_stdio_listener, HttpProxyTunnelListener, Socks5TunnelListener, TcpTunnelListener, UdpTunnelListener,
};
use crate::tunnel::server::{TlsServerConfig, WsServer, WsServerConfig};
use crate::tunnel::{to_host_port, LocalProtocol, RemoteAddr, TransportAddr, TransportScheme};
Expand Down Expand Up @@ -1004,7 +1004,7 @@ async fn main() -> anyhow::Result<()> {
panic!("Transparent proxy is not available for non Linux platform")
}
LocalProtocol::Udp { timeout } => {
let server = new_udp_listener(tunnel.local, tunnel.remote.clone(), *timeout).await?;
let server = UdpTunnelListener::new(tunnel.local, tunnel.remote.clone(), *timeout).await?;

tokio::spawn(async move {
if let Err(err) = client.run_tunnel(server).await {
Expand Down
4 changes: 1 addition & 3 deletions src/tunnel/listeners/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub use http_proxy::HttpProxyTunnelListener;
pub use socks5::Socks5TunnelListener;
pub use stdio::new_stdio_listener;
pub use tcp::TcpTunnelListener;
pub use udp::new_udp_listener;
pub use udp::UdpTunnelListener;

#[cfg(unix)]
pub use unix_sock::UnixTunnelListener;
Expand All @@ -30,7 +30,6 @@ use tokio_stream::Stream;
pub trait TunnelListener: Stream<Item = anyhow::Result<((Self::Reader, Self::Writer), RemoteAddr)>> {
type Reader: AsyncRead + Send + 'static;
type Writer: AsyncWrite + Send + 'static;
type OkReturn; // = ((Self::Reader, Self::Writer), RemoteAddr);
}

impl<T, R, W> TunnelListener for T
Expand All @@ -41,5 +40,4 @@ where
{
type Reader = R;
type Writer = W;
type OkReturn = ((R, W), RemoteAddr);
}
40 changes: 18 additions & 22 deletions src/tunnel/listeners/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,31 @@ use std::time::Duration;
use tokio_stream::Stream;
use url::Host;

pub struct UdpTunnelListener<S>
where
S: Stream<Item = io::Result<UdpStream>>,
{
listener: S,
pub struct UdpTunnelListener {
listener: Pin<Box<dyn Stream<Item = io::Result<UdpStream>> + Send>>,
dest: (Host, u16),
timeout: Option<Duration>,
}

pub async fn new_udp_listener(
bind_addr: SocketAddr,
dest: (Host, u16),
timeout: Option<Duration>,
) -> anyhow::Result<UdpTunnelListener<impl Stream<Item = io::Result<UdpStream>>>> {
let listener = udp::run_server(bind_addr, timeout, |_| Ok(()), |s| Ok(s.clone()))
.await
.with_context(|| anyhow!("Cannot start UDP server on {}", bind_addr))?;
impl UdpTunnelListener {
pub async fn new(
bind_addr: SocketAddr,
dest: (Host, u16),
timeout: Option<Duration>,
) -> anyhow::Result<UdpTunnelListener> {
let listener = udp::run_server(bind_addr, timeout, |_| Ok(()), |s| Ok(s.clone()))
.await
.with_context(|| anyhow!("Cannot start UDP server on {}", bind_addr))?;

Ok(UdpTunnelListener {
listener,
dest,
timeout,
})
Ok(UdpTunnelListener {
listener: Box::pin(listener),
dest,
timeout,
})
}
}

impl<S> Stream for UdpTunnelListener<S>
where
S: Stream<Item = io::Result<UdpStream>>,
{
impl Stream for UdpTunnelListener {
type Item = anyhow::Result<((UdpStream, UdpStreamWriter), RemoteAddr)>;

fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
1 change: 1 addition & 0 deletions src/tunnel/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![allow(clippy::module_inception)]
mod handler_http2;
mod handler_websocket;
mod reverse_tunnel;
mod server;
mod utils;

Expand Down
Loading

0 comments on commit abb3857

Please sign in to comment.