Skip to content

Commit

Permalink
[NC-43] Route packets back to WG peer (#3965)
Browse files Browse the repository at this point in the history
* Initial work on reverse nat

* wip

* Refine key gen

* Rename to wg_tunnel

* Forward packet to peer

* Remove source_addr

* Check if allowed to write to tunnel

* Extract out network_table

* Move map struc definitions to udp_listener

* Delegate ip network table calls

* Fix mac compilation

* Add TunTaskTx type
  • Loading branch information
octol authored Oct 10, 2023
1 parent 1e900c3 commit 7577ec9
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 86 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/wireguard/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ bytes = "1.5.0"
dashmap = "5.5.3"
etherparse = "0.13.0"
futures = "0.3.28"
ip_network = "0.4.1"
ip_network_table = "0.2.0"
log.workspace = true
nym-task = { path = "../task" }
tap.workspace = true
Expand Down
28 changes: 18 additions & 10 deletions common/wireguard/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,48 @@
#![cfg_attr(not(target_os = "linux"), allow(dead_code))]

use nym_task::TaskClient;

mod error;
mod event;
mod network_table;
mod platform;
mod setup;
mod tun;
mod udp_listener;
mod wg_tunnel;

// Currently the module related to setting up the virtual network device is platform specific.
#[cfg(target_os = "linux")]
use platform::linux::tun_device;

type ActivePeers =
dashmap::DashMap<std::net::SocketAddr, tokio::sync::mpsc::UnboundedSender<crate::event::Event>>;
#[derive(Clone)]
struct TunTaskTx(tokio::sync::mpsc::UnboundedSender<Vec<u8>>);

impl TunTaskTx {
fn send(&self, packet: Vec<u8>) -> Result<(), tokio::sync::mpsc::error::SendError<Vec<u8>>> {
self.0.send(packet)
}
}

#[cfg(target_os = "linux")]
pub async fn start_wireguard(
task_client: TaskClient,
task_client: nym_task::TaskClient,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
use std::sync::Arc;

// The set of active tunnels indexed by the peer's address
let active_peers = std::sync::Arc::new(ActivePeers::new());
let active_peers = Arc::new(udp_listener::ActivePeers::new());
let peers_by_ip = Arc::new(std::sync::Mutex::new(network_table::NetworkTable::new()));

// Start the tun device that is used to relay traffic outbound
let tun_task_tx = tun_device::start_tun_device(active_peers.clone());
let tun_task_tx = tun_device::start_tun_device(peers_by_ip.clone());

// Start the UDP listener that clients connect to
udp_listener::start_udp_listener(tun_task_tx, active_peers, task_client).await?;
udp_listener::start_udp_listener(tun_task_tx, active_peers, peers_by_ip, task_client).await?;

Ok(())
}

#[cfg(not(target_os = "linux"))]
pub async fn start_wireguard(
_task_client: TaskClient,
_task_client: nym_task::TaskClient,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
todo!("WireGuard is currently only supported on Linux")
}
25 changes: 25 additions & 0 deletions common/wireguard/src/network_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use std::net::IpAddr;

use ip_network::IpNetwork;
use ip_network_table::IpNetworkTable;

#[derive(Default)]
pub(crate) struct NetworkTable<T> {
ips: IpNetworkTable<T>,
}

impl<T> NetworkTable<T> {
pub(crate) fn new() -> Self {
Self {
ips: IpNetworkTable::new(),
}
}

pub fn insert<N: Into<IpNetwork>>(&mut self, network: N, data: T) -> Option<T> {
self.ips.insert(network, data)
}

pub fn longest_match<I: Into<IpAddr>>(&self, ip: I) -> Option<(IpNetwork, &T)> {
self.ips.longest_match(ip)
}
}
22 changes: 17 additions & 5 deletions common/wireguard/src/platform/linux/tun_device.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::{net::Ipv4Addr, sync::Arc};

use etherparse::{InternetSlice, SlicedPacket};
use tap::TapFallible;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::mpsc::{self, UnboundedSender},
sync::mpsc::{self},
};

use crate::{
event::Event,
setup::{TUN_BASE_NAME, TUN_DEVICE_ADDRESS, TUN_DEVICE_NETMASK},
ActivePeers,
udp_listener::PeersByIp,
TunTaskTx,
};

fn setup_tokio_tun_device(name: &str, address: Ipv4Addr, netmask: Ipv4Addr) -> tokio_tun::Tun {
Expand All @@ -25,7 +28,7 @@ fn setup_tokio_tun_device(name: &str, address: Ipv4Addr, netmask: Ipv4Addr) -> t
.expect("Failed to setup tun device, do you have permission?")
}

pub fn start_tun_device(_active_peers: Arc<ActivePeers>) -> UnboundedSender<Vec<u8>> {
pub(crate) fn start_tun_device(peers_by_ip: Arc<std::sync::Mutex<PeersByIp>>) -> TunTaskTx {
let tun = setup_tokio_tun_device(
format!("{}%d", TUN_BASE_NAME).as_str(),
TUN_DEVICE_ADDRESS.parse().unwrap(),
Expand All @@ -37,6 +40,7 @@ pub fn start_tun_device(_active_peers: Arc<ActivePeers>) -> UnboundedSender<Vec<

// Channels to communicate with the other tasks
let (tun_task_tx, mut tun_task_rx) = mpsc::unbounded_channel::<Vec<u8>>();
let tun_task_tx = TunTaskTx(tun_task_tx);

tokio::spawn(async move {
let mut buf = [0u8; 1024];
Expand All @@ -55,8 +59,16 @@ pub fn start_tun_device(_active_peers: Arc<ActivePeers>) -> UnboundedSender<Vec<
};
log::info!("iface: read Packet({src_addr} -> {dst_addr}, {len} bytes)");

// TODO: route packet to the correct peer.
log::info!("...forward packet to the correct peer (NOT YET IMPLEMENTED)");
// Route packet to the correct peer.
if let Some(peer_tx) = peers_by_ip.lock().unwrap().longest_match(dst_addr).map(|(_, tx)| tx) {
log::info!("Forward packet to wg tunnel");
peer_tx
.send(Event::IpPacket(packet.to_vec().into()))
.tap_err(|err| log::error!("{err}"))
.unwrap();
} else {
log::info!("No peer found, packet dropped");
}
},
Err(err) => {
log::info!("iface: read error: {err}");
Expand Down
52 changes: 33 additions & 19 deletions common/wireguard/src/setup.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::IpAddr;

use base64::{engine::general_purpose, Engine as _};
use boringtun::x25519;
use log::info;
Expand All @@ -15,35 +17,47 @@ pub const TUN_DEVICE_NETMASK: &str = "255.255.255.0";
// Corresponding public key: "WM8s8bYegwMa0TJ+xIwhk+dImk2IpDUKslDBCZPizlE="
const PRIVATE_KEY: &str = "AEqXrLFT4qjYq3wmX0456iv94uM6nDj5ugp6Jedcflg=";

// The public keys of the registered peers (clients)
const PEERS: &[&str; 1] = &[
// Corresponding private key: "ILeN6gEh6vJ3Ju8RJ3HVswz+sPgkcKtAYTqzQRhTtlo="
"NCIhkgiqxFx1ckKl3Zuh595DzIFl8mxju1Vg995EZhI=",
// Another key
// "mxV/mw7WZTe+0Msa0kvJHMHERDA/cSskiZWQce+TdEs=",
];
// The public keys of the registered peer (clients)
// Corresponding private key: "ILeN6gEh6vJ3Ju8RJ3HVswz+sPgkcKtAYTqzQRhTtlo="
const PEER: &str = "NCIhkgiqxFx1ckKl3Zuh595DzIFl8mxju1Vg995EZhI=";

pub fn init_static_dev_keys() -> (x25519::StaticSecret, x25519::PublicKey) {
// TODO: this is a temporary solution for development
let static_private_bytes: [u8; 32] = general_purpose::STANDARD
.decode(PRIVATE_KEY)
// The AllowedIPs for the connected peer, which is one a single IP and the same as the IP that the
// peer has configured on their side.
const ALLOWED_IPS: &str = "10.0.0.2";

fn decode_base64_key(base64_key: &str) -> [u8; 32] {
general_purpose::STANDARD
.decode(base64_key)
.unwrap()
.try_into()
.unwrap();
.unwrap()
}

pub fn server_static_private_key() -> x25519::StaticSecret {
// TODO: this is a temporary solution for development
let static_private_bytes: [u8; 32] = decode_base64_key(PRIVATE_KEY);
let static_private = x25519::StaticSecret::try_from(static_private_bytes).unwrap();
let static_public = x25519::PublicKey::from(&static_private);
info!(
"wg public key: {}",
general_purpose::STANDARD.encode(static_public)
);
static_private
}

// TODO: A single static public key is used for all peers during development
let peer_static_public_bytes: [u8; 32] = general_purpose::STANDARD
.decode(PEERS[0])
.unwrap()
.try_into()
.unwrap();
pub fn peer_static_public_key() -> x25519::PublicKey {
// A single static public key is used during development
let peer_static_public_bytes: [u8; 32] = decode_base64_key(PEER);
let peer_static_public = x25519::PublicKey::try_from(peer_static_public_bytes).unwrap();
info!(
"peer public key: {}",
general_purpose::STANDARD.encode(peer_static_public)
);
peer_static_public
}

(static_private, peer_static_public)
pub fn peer_allowed_ips() -> ip_network::IpNetwork {
let key: IpAddr = ALLOWED_IPS.parse().unwrap();
let cidr = 0u8;
ip_network::IpNetwork::new_truncate(key, cidr).unwrap()
}
35 changes: 28 additions & 7 deletions common/wireguard/src/udp_listener.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,41 @@
use std::{net::SocketAddr, sync::Arc};

use dashmap::DashMap;
use futures::StreamExt;
use log::error;
use nym_task::TaskClient;
use tap::TapFallible;
use tokio::{net::UdpSocket, sync::mpsc::UnboundedSender};
use tokio::{
net::UdpSocket,
sync::mpsc::{self},
};

use crate::{
event::Event,
setup::{WG_ADDRESS, WG_PORT},
ActivePeers,
network_table::NetworkTable,
setup::{self, WG_ADDRESS, WG_PORT},
TunTaskTx,
};

const MAX_PACKET: usize = 65535;

pub async fn start_udp_listener(
tun_task_tx: UnboundedSender<Vec<u8>>,
pub(crate) type ActivePeers = DashMap<SocketAddr, mpsc::UnboundedSender<Event>>;
pub(crate) type PeersByIp = NetworkTable<mpsc::UnboundedSender<Event>>;

pub(crate) async fn start_udp_listener(
tun_task_tx: TunTaskTx,
active_peers: Arc<ActivePeers>,
peers_by_ip: Arc<std::sync::Mutex<PeersByIp>>,
mut task_client: TaskClient,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let wg_address = SocketAddr::new(WG_ADDRESS.parse().unwrap(), WG_PORT);
log::info!("Starting wireguard UDP listener on {wg_address}");
let udp_socket = Arc::new(UdpSocket::bind(wg_address).await?);

// Setup some static keys for development
let (static_private, peer_static_public) = crate::setup::init_static_dev_keys();
let static_private = setup::server_static_private_key();
let peer_static_public = setup::peer_static_public_key();
let peer_allowed_ips = setup::peer_allowed_ips();

tokio::spawn(async move {
// Each tunnel is run in its own task, and the task handle is stored here so we can remove
Expand All @@ -44,6 +55,7 @@ pub async fn start_udp_listener(
Ok(addr) => {
log::info!("Removing peer: {addr:?}");
active_peers.remove(&addr);
// TODO: remove from peers_by_ip
}
Err(err) => {
error!("WireGuard UDP listener: error receiving shutdown from peer: {err}");
Expand All @@ -61,13 +73,22 @@ pub async fn start_udp_listener(
.unwrap();
} else {
log::info!("udp: received {len} bytes from {addr} from unknown peer, starting tunnel");
let (join_handle, peer_tx) = crate::tun::start_wg_tunnel(
// TODO: this is a temporary solution for development since this
// assumes we know the peer_static_public this corresponds to.
// TODO: rework this before production! This is likely not secure!
log::warn!("Assuming peer_static_public is known");
log::warn!("SECURITY: Rework me to do proper handshake before creating the tunnel!");
let (join_handle, peer_tx) = crate::wg_tunnel::start_wg_tunnel(
addr,
udp_socket.clone(),
static_private.clone(),
peer_static_public,
peer_allowed_ips,
tun_task_tx.clone(),
);

peers_by_ip.lock().unwrap().insert(peer_allowed_ips, peer_tx.clone());

peer_tx.send(Event::WgPacket(buf[..len].to_vec().into()))
.tap_err(|err| log::error!("{err}"))
.unwrap();
Expand Down
Loading

0 comments on commit 7577ec9

Please sign in to comment.