Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple listeners #12

Merged
merged 7 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
876 changes: 554 additions & 322 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ console = "0.15"
http = "0.2"
http-body = "0.4.2"
hyper = "0.14"
prost = "0.12"
tokio = {version = "1.0", features = ["rt-multi-thread", "time", "macros"]}
tokio-stream = "0.1"
tonic = {version="0.11.0", features = ["tls"]}
tonic-types = {version="0.11.0"}
tonic-reflection = {version="0.11.0"}
tokio-vsock = "0.5"
tonic = {version="0.12.2", features = ["tls"]}
tonic-types = {version="0.12.2"}
tonic-reflection = {version="0.12.2"}
tower = {version = "0.4"}
tracing = "0.1"
tracing-subscriber = {version = "0.3", features = ["env-filter", "tracing-log", "time", "local-time"]}
Expand All @@ -37,6 +37,8 @@ serde = { version = "1.0.202", features = ["derive"]}
serde_json = "1.0.120"
x509-parser = { version = "0.16" }

tokio-listener = { features = ["multi-listener", "tonic012", "vsock"], git = "https://github.com/avnik/tokio-listener", branch = "avnik/vsock-ghaf" }

# GIVC subparts
givc-common = { path="common" }
givc-client = { path="client" }
8 changes: 5 additions & 3 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ async-channel = "2.3.1"
async-stream = "0.3"
http = "0.2"
http-body = "0.4.2"
prost = "0.12"
hyper-util = { version = "0.1.4"}
tokio = {version = "1.0", features = ["rt-multi-thread", "time", "macros"]}
tokio-stream = "0.1"
tonic = {version="0.11.0", features = ["tls"]}
tonic-types = {version="0.11.0"}
tokio-vsock = "*"
tonic = {version="0.12.2", features = ["tls"]}
tonic-types = {version="0.12.2"}
tower = {version = "0.4"}
tracing = "0.1"
serde = { version = "1.0.202", features = ["derive"]}

Expand Down
20 changes: 14 additions & 6 deletions client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use crate::endpoint::{EndpointConfig, TlsConfig};
use anyhow::bail;
use async_channel::Receiver;
use givc_common::pb;
pub use givc_common::query::{Event, QueryResult};
use givc_common::types::*;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
use tracing::debug;

use givc_common::address::EndpointAddress;
use givc_common::pb;
pub use givc_common::query::{Event, QueryResult};
use givc_common::types::*;

use crate::endpoint::{EndpointConfig, TlsConfig};

type Client = pb::admin_service_client::AdminServiceClient<Channel>;

#[derive(Debug)]
Expand Down Expand Up @@ -40,6 +43,13 @@ impl AdminClient {
// New style api, not yet implemented, stub atm to make current code happy
// FIXME: Still doubt if constructor should be sync or async
pub fn new(addr: String, port: u16, tls_info: Option<(String, TlsConfig)>) -> Self {
Self::from_endpoint_address(EndpointAddress::Tcp { addr, port }, tls_info)
}

pub fn from_endpoint_address(
addr: EndpointAddress,
tls_info: Option<(String, TlsConfig)>,
) -> Self {
let (name, tls) = match tls_info {
Some((name, tls)) => (name, Some(tls)),
None => (String::from("bogus(no tls)"), None),
Expand All @@ -48,8 +58,6 @@ impl AdminClient {
endpoint: EndpointConfig {
transport: TransportConfig {
address: addr,
port: port,
protocol: String::from("bogus"),
tls_name: name,
},
tls: tls,
Expand Down
63 changes: 55 additions & 8 deletions client/src/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use anyhow::anyhow;
use givc_common::types::TransportConfig;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::time::Duration;
use tonic::transport::Endpoint;

use anyhow::anyhow;
use hyper_util::rt::TokioIo;
use tokio::net::UnixStream;
use tokio_vsock::{VsockAddr, VsockStream};
use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity, ServerTlsConfig};
use tonic::transport::{Endpoint, Uri};
use tower::service_fn;
use tracing::info;

use givc_common::address::EndpointAddress;
use givc_common::types::TransportConfig;

#[derive(Debug, Clone)]
pub struct TlsConfig {
pub ca_cert_file_path: PathBuf,
Expand Down Expand Up @@ -50,25 +57,65 @@ impl TlsConfig {
}
}

fn transport_config_to_url(tc: &TransportConfig, with_tls: bool) -> String {
fn transport_config_to_url(ea: &EndpointAddress, with_tls: bool) -> String {
let scheme = match with_tls {
true => "https",
false => "http",
};
format!("{}://{}:{}", scheme, tc.address, tc.port)
match ea {
EndpointAddress::Tcp { addr, port } => format!("{}://{}:{}", scheme, addr, port),
_ => format!("{}://[::]:443", scheme), // Bogus url, to make tonic connector happy
}
}

async fn connect_unix_socket(endpoint: Endpoint, path: &String) -> anyhow::Result<Channel> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&str instead of &String

let mut path = Some(path.to_owned());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use the Arc based solution here that we discussed earlier.

let ch = endpoint
.connect_with_connector(service_fn(move |_: Uri| {
let path = path.take();
async move {
if let Some(path) = path {
// Connect to a Uds socket
Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(path).await?))
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Path already taken",
))
}
}
}))
.await?;
Ok(ch)
}

async fn connect_vsock_socket(endpoint: Endpoint, vs: &VsockAddr) -> anyhow::Result<Channel> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VsockAddr implements Copy, could just drop the reference and the let ....to_owned() (and use *vs in the caller)

let vs = vs.to_owned();
let ch = endpoint
.connect_with_connector(service_fn(move |_: Uri| async move {
let stream = VsockStream::connect(vs).await?;
Ok::<_, std::io::Error>(TokioIo::new(stream))
}))
.await?;
Ok(ch)
}

impl EndpointConfig {
pub async fn connect(&self) -> anyhow::Result<Channel> {
let url = transport_config_to_url(&self.transport, self.tls.is_some());
let url = transport_config_to_url(&self.transport.address, self.tls.is_some());
info!("Connecting to {url}, TLS name {:?}", &self.tls);
let mut endpoint = Endpoint::try_from(url)?
.timeout(Duration::from_secs(5))
.concurrency_limit(30);
if let Some(tls) = &self.tls {
endpoint = endpoint.tls_config(tls.client_config()?)?;
};
let channel = endpoint.connect().await?;
let channel = match &self.transport.address {
EndpointAddress::Tcp { .. } => endpoint.connect().await?,
EndpointAddress::Unix(unix) => connect_unix_socket(endpoint, unix).await?,
EndpointAddress::Abstract(abs) => connect_unix_socket(endpoint, abs).await?,
EndpointAddress::Vsock(vs) => connect_vsock_socket(endpoint, vs).await?,
};
Ok(channel)
}
}
7 changes: 4 additions & 3 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ anyhow = "1.0.86"
async-stream = "0.3"
http = "0.2"
http-body = "0.4.2"
prost = "0.12"
prost = "0.13"
tokio = {version = "1.0", features = ["rt-multi-thread", "time", "macros"]}
tokio-stream = "0.1"
tonic = {version="0.11.0", features = ["tls"]}
tonic-types = {version="0.11.0"}
tokio-vsock = "*"
tonic = {version="0.12.2", features = ["tls"]}
tonic-types = {version="0.12.2"}
tracing = "0.1"
tracing-subscriber = {version = "0.3"}
serde = { version = "1.0.202", features = ["derive"]}
Expand Down
19 changes: 19 additions & 0 deletions common/src/address.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::convert::{Into, TryFrom};
//use std::net::SocketAddr;
use std::path::PathBuf;

use tokio_vsock::VsockAddr;

use crate::pb;

#[derive(Clone, Debug, PartialEq)]
pub enum EndpointAddress {
Tcp {
// IP + port (FIXME: should be SocketAddres)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using String has the benefit of being able to use names instead of IP-addresses.

addr: String,
port: u16,
},
Unix(String), // "/path/to/sock" (same host only)
Abstract(String), // "@abstract-socket-name" (same host only)
Vsock(VsockAddr), // cid+port. FIXME: cid have two magic numbers for host and local
}
1 change: 1 addition & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod address;
pub mod query;
pub mod types;

Expand Down
55 changes: 43 additions & 12 deletions common/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
// This module contain literal translations of types from internal/pkgs/types/types.go
// Some of them would be rewritten, replaced, or even removed
use super::address::EndpointAddress;
use crate::pb;
use anyhow::{anyhow, bail};
use std::convert::{Into, TryFrom};

use anyhow::{anyhow, bail};
use tokio_vsock::VsockAddr;

#[derive(Debug, Copy, Clone, PartialEq)]
pub struct UnitType {
pub vm: VmType,
Expand Down Expand Up @@ -186,9 +189,7 @@ impl Into<pb::UnitStatus> for UnitStatus {

#[derive(Debug, Clone, PartialEq)]
pub struct EndpointEntry {
pub protocol: String, // Bogus, should we drop it?
pub address: String,
pub port: u16,
pub address: EndpointAddress,
pub tls_name: String,
}

Expand All @@ -197,22 +198,52 @@ pub type TransportConfig = EndpointEntry;
impl TryFrom<pb::TransportConfig> for EndpointEntry {
type Error = anyhow::Error;
fn try_from(tc: pb::TransportConfig) -> Result<Self, Self::Error> {
let endpoint = match tc.protocol.as_str() {
"tcp" => EndpointAddress::Tcp {
addr: tc.address,
port: tc.port.parse()?,
},
"unix" => EndpointAddress::Unix(tc.address),
"abstract" => EndpointAddress::Abstract(tc.address),
"vsock" => {
EndpointAddress::Vsock(VsockAddr::new(tc.address.parse()?, tc.port.parse()?))
}
unknown => bail!("Unknown protocol: {unknown}"),
};
Ok(Self {
protocol: tc.protocol,
address: tc.address,
port: tc.port.parse()?,
address: endpoint,
tls_name: tc.name,
})
}
}

impl Into<pb::TransportConfig> for EndpointEntry {
fn into(self) -> pb::TransportConfig {
pb::TransportConfig {
protocol: self.protocol,
address: self.address,
port: self.port.to_string(),
name: self.tls_name,
match self.address {
EndpointAddress::Tcp { addr, port } => pb::TransportConfig {
protocol: "tcp".into(),
address: addr,
port: port.to_string(),
name: self.tls_name,
},
EndpointAddress::Unix(unix) => pb::TransportConfig {
protocol: "unix".into(),
address: unix,
port: "".into(),
name: self.tls_name,
},
EndpointAddress::Abstract(abstr) => pb::TransportConfig {
protocol: "abstract".into(),
address: abstr,
port: "".into(),
name: self.tls_name,
},
EndpointAddress::Vsock(vs) => pb::TransportConfig {
protocol: "vsock".into(),
address: vs.cid().to_string(),
port: vs.port().to_string(),
name: self.tls_name,
},
}
}
}
8 changes: 5 additions & 3 deletions src/admin/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl RegistryEntry {
#[cfg(test)]
impl RegistryEntry {
pub fn dummy(n: String) -> Self {
use givc_common::address::EndpointAddress;
Self {
name: n,
r#type: UnitType {
Expand All @@ -56,9 +57,10 @@ impl RegistryEntry {
path: "bogus".to_string(),
},
placement: Placement::Endpoint(EndpointEntry {
protocol: "bogus".to_string(),
address: "127.0.0.1".to_string(),
port: 42,
address: EndpointAddress::Tcp {
addr: "127.0.0.1".to_string(),
port: 42,
},
tls_name: "bogus".to_string(),
}),
watch: true,
Expand Down
27 changes: 26 additions & 1 deletion src/bin/givc-admin.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use clap::Parser;
use givc::admin;
use givc::endpoint::TlsConfig;
use givc::utils::vsock::parse_vsock_addr;
use givc_common::pb::reflection::ADMIN_DESCRIPTOR;
use std::net::SocketAddr;
use std::path::PathBuf;
Expand All @@ -16,6 +17,12 @@ struct Cli {
#[arg(long, env = "PORT", default_missing_value = "9000", value_parser = clap::value_parser!(u16).range(1..))]
port: u16,

#[arg(long, help = "Additionally listen UNIX socket (path)")]
unix: Option<String>,

#[arg(long, help = "Additionally listen Vsock socket (cid:port format)")]
vsock: Option<String>,

#[arg(long, env = "TLS", default_missing_value = "false")]
use_tls: bool,

Expand Down Expand Up @@ -70,10 +77,28 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
let admin_service_svc =
admin::server::AdminServiceServer::new(admin::server::AdminService::new(tls));

let sys_opts = tokio_listener::SystemOptions::default();
let user_opts = tokio_listener::UserOptions::default();
let tcp_addr = tokio_listener::ListenerAddress::Tcp(addr);

let mut addrs = vec![tcp_addr];

if let Some(unix_sock) = cli.unix {
let unix_sock_addr = tokio_listener::ListenerAddress::Path(unix_sock.into());
addrs.push(unix_sock_addr)
}

if let Some(vsock) = cli.vsock {
let vsock_addr = parse_vsock_addr(&vsock)?.into();
addrs.push(vsock_addr)
}

let listener = tokio_listener::Listener::bind_multiple(&addrs, &sys_opts, &user_opts).await?;

builder
.add_service(reflect)
.add_service(admin_service_svc)
.serve(addr)
.serve_with_incoming(listener)
.await?;

Ok(())
Expand Down
Loading