Skip to content

Commit

Permalink
Merge pull request #12 from avnik/avnik/listeners
Browse files Browse the repository at this point in the history
Support multiple listeners
  • Loading branch information
mbssrc authored Sep 29, 2024
2 parents c50f663 + d760e69 commit 3344005
Show file tree
Hide file tree
Showing 18 changed files with 786 additions and 388 deletions.
876 changes: 554 additions & 322 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ console = "0.15"
http = "0.2"
http-body = "0.4.2"
hyper = "0.14"
prost = "0.12"
tokio = {version = "1.0", features = ["rt-multi-thread", "time", "macros"]}
tokio-stream = "0.1"
tonic = {version="0.11.0", features = ["tls"]}
tonic-types = {version="0.11.0"}
tonic-reflection = {version="0.11.0"}
tokio-vsock = "0.5"
tonic = {version="0.12.2", features = ["tls"]}
tonic-types = {version="0.12.2"}
tonic-reflection = {version="0.12.2"}
tower = {version = "0.4"}
tracing = "0.1"
tracing-subscriber = {version = "0.3", features = ["env-filter", "tracing-log", "time", "local-time"]}
Expand All @@ -37,6 +37,8 @@ serde = { version = "1.0.202", features = ["derive"]}
serde_json = "1.0.120"
x509-parser = { version = "0.16" }

tokio-listener = { features = ["multi-listener", "tonic012", "vsock"], git = "https://github.com/avnik/tokio-listener", branch = "avnik/vsock-ghaf" }

# GIVC subparts
givc-common = { path="common" }
givc-client = { path="client" }
8 changes: 5 additions & 3 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ async-channel = "2.3.1"
async-stream = "0.3"
http = "0.2"
http-body = "0.4.2"
prost = "0.12"
hyper-util = { version = "0.1.4"}
tokio = {version = "1.0", features = ["rt-multi-thread", "time", "macros"]}
tokio-stream = "0.1"
tonic = {version="0.11.0", features = ["tls"]}
tonic-types = {version="0.11.0"}
tokio-vsock = "*"
tonic = {version="0.12.2", features = ["tls"]}
tonic-types = {version="0.12.2"}
tower = {version = "0.4"}
tracing = "0.1"
serde = { version = "1.0.202", features = ["derive"]}

Expand Down
20 changes: 14 additions & 6 deletions client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use crate::endpoint::{EndpointConfig, TlsConfig};
use anyhow::bail;
use async_channel::Receiver;
use givc_common::pb;
pub use givc_common::query::{Event, QueryResult};
use givc_common::types::*;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
use tracing::debug;

use givc_common::address::EndpointAddress;
use givc_common::pb;
pub use givc_common::query::{Event, QueryResult};
use givc_common::types::*;

use crate::endpoint::{EndpointConfig, TlsConfig};

type Client = pb::admin_service_client::AdminServiceClient<Channel>;

#[derive(Debug)]
Expand Down Expand Up @@ -40,6 +43,13 @@ impl AdminClient {
// New style api, not yet implemented, stub atm to make current code happy
// FIXME: Still doubt if constructor should be sync or async
pub fn new(addr: String, port: u16, tls_info: Option<(String, TlsConfig)>) -> Self {
Self::from_endpoint_address(EndpointAddress::Tcp { addr, port }, tls_info)
}

pub fn from_endpoint_address(
addr: EndpointAddress,
tls_info: Option<(String, TlsConfig)>,
) -> Self {
let (name, tls) = match tls_info {
Some((name, tls)) => (name, Some(tls)),
None => (String::from("bogus(no tls)"), None),
Expand All @@ -48,8 +58,6 @@ impl AdminClient {
endpoint: EndpointConfig {
transport: TransportConfig {
address: addr,
port: port,
protocol: String::from("bogus"),
tls_name: name,
},
tls: tls,
Expand Down
63 changes: 55 additions & 8 deletions client/src/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use anyhow::anyhow;
use givc_common::types::TransportConfig;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::time::Duration;
use tonic::transport::Endpoint;

use anyhow::anyhow;
use hyper_util::rt::TokioIo;
use tokio::net::UnixStream;
use tokio_vsock::{VsockAddr, VsockStream};
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity, ServerTlsConfig};
use tonic::transport::{Endpoint, Uri};
use tower::service_fn;
use tracing::info;

use givc_common::address::EndpointAddress;
use givc_common::types::TransportConfig;

#[derive(Debug, Clone)]
pub struct TlsConfig {
pub ca_cert_file_path: PathBuf,
Expand Down Expand Up @@ -50,25 +57,65 @@ impl TlsConfig {
}
}

fn transport_config_to_url(tc: &TransportConfig, with_tls: bool) -> String {
fn transport_config_to_url(ea: &EndpointAddress, with_tls: bool) -> String {
let scheme = match with_tls {
true => "https",
false => "http",
};
format!("{}://{}:{}", scheme, tc.address, tc.port)
match ea {
EndpointAddress::Tcp { addr, port } => format!("{}://{}:{}", scheme, addr, port),
_ => format!("{}://[::]:443", scheme), // Bogus url, to make tonic connector happy
}
}

async fn connect_unix_socket(endpoint: Endpoint, path: &String) -> anyhow::Result<Channel> {
let mut path = Some(path.to_owned());
let ch = endpoint
.connect_with_connector(service_fn(move |_: Uri| {
let path = path.take();
async move {
if let Some(path) = path {
// Connect to a Uds socket
Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path).await?))
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Path already taken",
))
}
}
}))
.await?;
Ok(ch)
}

async fn connect_vsock_socket(endpoint: Endpoint, vs: &VsockAddr) -> anyhow::Result<Channel> {
let vs = vs.to_owned();
let ch = endpoint
.connect_with_connector(service_fn(move |_: Uri| async move {
let stream = VsockStream::connect(vs).await?;
Ok::<_, std::io::Error>(TokioIo::new(stream))
}))
.await?;
Ok(ch)
}

impl EndpointConfig {
pub async fn connect(&self) -> anyhow::Result<Channel> {
let url = transport_config_to_url(&self.transport, self.tls.is_some());
let url = transport_config_to_url(&self.transport.address, self.tls.is_some());
info!("Connecting to {url}, TLS name {:?}", &self.tls);
let mut endpoint = Endpoint::try_from(url)?
.timeout(Duration::from_secs(5))
.concurrency_limit(30);
if let Some(tls) = &self.tls {
endpoint = endpoint.tls_config(tls.client_config()?)?;
};
let channel = endpoint.connect().await?;
let channel = match &self.transport.address {
EndpointAddress::Tcp { .. } => endpoint.connect().await?,
EndpointAddress::Unix(unix) => connect_unix_socket(endpoint, unix).await?,
EndpointAddress::Abstract(abs) => connect_unix_socket(endpoint, abs).await?,
EndpointAddress::Vsock(vs) => connect_vsock_socket(endpoint, vs).await?,
};
Ok(channel)
}
}
7 changes: 4 additions & 3 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ anyhow = "1.0.86"
async-stream = "0.3"
http = "0.2"
http-body = "0.4.2"
prost = "0.12"
prost = "0.13"
tokio = {version = "1.0", features = ["rt-multi-thread", "time", "macros"]}
tokio-stream = "0.1"
tonic = {version="0.11.0", features = ["tls"]}
tonic-types = {version="0.11.0"}
tokio-vsock = "*"
tonic = {version="0.12.2", features = ["tls"]}
tonic-types = {version="0.12.2"}
tracing = "0.1"
tracing-subscriber = {version = "0.3"}
serde = { version = "1.0.202", features = ["derive"]}
Expand Down
19 changes: 19 additions & 0 deletions common/src/address.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::convert::{Into, TryFrom};
//use std::net::SocketAddr;
use std::path::PathBuf;

use tokio_vsock::VsockAddr;

use crate::pb;

#[derive(Clone, Debug, PartialEq)]
pub enum EndpointAddress {
Tcp {
// IP + port (FIXME: should be SocketAddres)
addr: String,
port: u16,
},
Unix(String), // "/path/to/sock" (same host only)
Abstract(String), // "@abstract-socket-name" (same host only)
Vsock(VsockAddr), // cid+port. FIXME: cid have two magic numbers for host and local
}
1 change: 1 addition & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod address;
pub mod query;
pub mod types;

Expand Down
55 changes: 43 additions & 12 deletions common/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
// This module contain literal translations of types from internal/pkgs/types/types.go
// Some of them would be rewritten, replaced, or even removed
use super::address::EndpointAddress;
use crate::pb;
use anyhow::{anyhow, bail};
use std::convert::{Into, TryFrom};

use anyhow::{anyhow, bail};
use tokio_vsock::VsockAddr;

#[derive(Debug, Copy, Clone, PartialEq)]
pub struct UnitType {
pub vm: VmType,
Expand Down Expand Up @@ -186,9 +189,7 @@ impl Into<pb::UnitStatus> for UnitStatus {

#[derive(Debug, Clone, PartialEq)]
pub struct EndpointEntry {
pub protocol: String, // Bogus, should we drop it?
pub address: String,
pub port: u16,
pub address: EndpointAddress,
pub tls_name: String,
}

Expand All @@ -197,22 +198,52 @@ pub type TransportConfig = EndpointEntry;
impl TryFrom<pb::TransportConfig> for EndpointEntry {
type Error = anyhow::Error;
fn try_from(tc: pb::TransportConfig) -> Result<Self, Self::Error> {
let endpoint = match tc.protocol.as_str() {
"tcp" => EndpointAddress::Tcp {
addr: tc.address,
port: tc.port.parse()?,
},
"unix" => EndpointAddress::Unix(tc.address),
"abstract" => EndpointAddress::Abstract(tc.address),
"vsock" => {
EndpointAddress::Vsock(VsockAddr::new(tc.address.parse()?, tc.port.parse()?))
}
unknown => bail!("Unknown protocol: {unknown}"),
};
Ok(Self {
protocol: tc.protocol,
address: tc.address,
port: tc.port.parse()?,
address: endpoint,
tls_name: tc.name,
})
}
}

impl Into<pb::TransportConfig> for EndpointEntry {
fn into(self) -> pb::TransportConfig {
pb::TransportConfig {
protocol: self.protocol,
address: self.address,
port: self.port.to_string(),
name: self.tls_name,
match self.address {
EndpointAddress::Tcp { addr, port } => pb::TransportConfig {
protocol: "tcp".into(),
address: addr,
port: port.to_string(),
name: self.tls_name,
},
EndpointAddress::Unix(unix) => pb::TransportConfig {
protocol: "unix".into(),
address: unix,
port: "".into(),
name: self.tls_name,
},
EndpointAddress::Abstract(abstr) => pb::TransportConfig {
protocol: "abstract".into(),
address: abstr,
port: "".into(),
name: self.tls_name,
},
EndpointAddress::Vsock(vs) => pb::TransportConfig {
protocol: "vsock".into(),
address: vs.cid().to_string(),
port: vs.port().to_string(),
name: self.tls_name,
},
}
}
}
8 changes: 5 additions & 3 deletions src/admin/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl RegistryEntry {
#[cfg(test)]
impl RegistryEntry {
pub fn dummy(n: String) -> Self {
use givc_common::address::EndpointAddress;
Self {
name: n,
r#type: UnitType {
Expand All @@ -56,9 +57,10 @@ impl RegistryEntry {
path: "bogus".to_string(),
},
placement: Placement::Endpoint(EndpointEntry {
protocol: "bogus".to_string(),
address: "127.0.0.1".to_string(),
port: 42,
address: EndpointAddress::Tcp {
addr: "127.0.0.1".to_string(),
port: 42,
},
tls_name: "bogus".to_string(),
}),
watch: true,
Expand Down
27 changes: 26 additions & 1 deletion src/bin/givc-admin.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use clap::Parser;
use givc::admin;
use givc::endpoint::TlsConfig;
use givc::utils::vsock::parse_vsock_addr;
use givc_common::pb::reflection::ADMIN_DESCRIPTOR;
use std::net::SocketAddr;
use std::path::PathBuf;
Expand All @@ -16,6 +17,12 @@ struct Cli {
#[arg(long, env = "PORT", default_missing_value = "9000", value_parser = clap::value_parser!(u16).range(1..))]
port: u16,

#[arg(long, help = "Additionally listen UNIX socket (path)")]
unix: Option<String>,

#[arg(long, help = "Additionally listen Vsock socket (cid:port format)")]
vsock: Option<String>,

#[arg(long, env = "TLS", default_missing_value = "false")]
use_tls: bool,

Expand Down Expand Up @@ -70,10 +77,28 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let admin_service_svc =
admin::server::AdminServiceServer::new(admin::server::AdminService::new(tls));

let sys_opts = tokio_listener::SystemOptions::default();
let user_opts = tokio_listener::UserOptions::default();
let tcp_addr = tokio_listener::ListenerAddress::Tcp(addr);

let mut addrs = vec![tcp_addr];

if let Some(unix_sock) = cli.unix {
let unix_sock_addr = tokio_listener::ListenerAddress::Path(unix_sock.into());
addrs.push(unix_sock_addr)
}

if let Some(vsock) = cli.vsock {
let vsock_addr = parse_vsock_addr(&vsock)?.into();
addrs.push(vsock_addr)
}

let listener = tokio_listener::Listener::bind_multiple(&addrs, &sys_opts, &user_opts).await?;

builder
.add_service(reflect)
.add_service(admin_service_svc)
.serve(addr)
.serve_with_incoming(listener)
.await?;

Ok(())
Expand Down
Loading

0 comments on commit 3344005

Please sign in to comment.