Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use UDF specific names for server info file #35

Merged
merged 1 commit into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 19 additions & 26 deletions src/map.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use chrono::{DateTime, Utc};
use std::future::Future;
use std::path::PathBuf;
use tokio::sync::oneshot;

use chrono::{DateTime, Utc};
use tonic::{async_trait, Request, Response, Status};

use crate::shared;
Expand Down Expand Up @@ -121,21 +122,16 @@ pub struct Server<T> {
sock_addr: PathBuf,
max_message_size: usize,
server_info_file: PathBuf,
map_svc: Option<T>,
svc: Option<T>,
}

impl<T> Server<T> {
pub fn new(map_svc: T) -> Self {
let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() {
"/var/run/numaflow/server-info"
} else {
"/tmp/numaflow.server-info"
};
Server {
sock_addr: "/var/run/numaflow/map.sock".into(),
max_message_size: 64 * 1024 * 1024,
server_info_file: server_info_file.into(),
map_svc: Some(map_svc),
server_info_file: "/var/run/numaflow/mapper-server-info".into(),
svc: Some(map_svc),
}
}

Expand All @@ -157,42 +153,38 @@ impl<T> Server<T> {
self
}

/// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 4MB.
/// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 64MB.
pub fn max_message_size(&self) -> usize {
self.max_message_size
}

/// Change the file in which numflow server information is stored on start up to the new value. Default value is `/tmp/numaflow.server-info`
/// Change the file in which numflow server information is stored on start up to the new value. Default value is `/var/run/numaflow/mapper-server-info`
pub fn with_server_info_file(mut self, file: impl Into<PathBuf>) -> Self {
self.server_info_file = file.into();
self
}

/// Get the path to the file where numaflow server info is stored. Default value is `/tmp/numaflow.server-info`
/// Get the path to the file where numaflow server info is stored. Default value is `/var/run/numaflow/mapper-server-info`
pub fn server_info_file(&self) -> &std::path::Path {
self.server_info_file.as_path()
}

/// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated.
pub async fn start_with_shutdown(
pub async fn start_with_shutdown<F>(
&mut self,
shutdown: oneshot::Receiver<()>,
shutdown: F,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
T: Mapper + Send + Sync + 'static,
F: Future<Output = ()>,
{
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = self.map_svc.take().unwrap();
let handler = self.svc.take().unwrap();
let map_svc = MapService { handler };
let map_svc = proto::map_server::MapServer::new(map_svc)
.max_encoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);

let shutdown = async {
shutdown
.await
.expect("Receiving message from shutdown channel");
};
tonic::transport::Server::builder()
.add_service(map_svc)
.serve_with_incoming_shutdown(listener, shutdown)
Expand All @@ -205,9 +197,7 @@ impl<T> Server<T> {
where
T: Mapper + Send + Sync + 'static,
{
let (tx, rx) = oneshot::channel::<()>();
tokio::spawn(shared::wait_for_signal(tx));
self.start_with_shutdown(rx).await
self.start_with_shutdown(shared::shutdown_signal()).await
}
}

Expand Down Expand Up @@ -238,7 +228,7 @@ mod tests {

let tmp_dir = TempDir::new()?;
let sock_file = tmp_dir.path().join("map.sock");
let server_info_file = tmp_dir.path().join("server_info");
let server_info_file = tmp_dir.path().join("mapper-server-info");

let mut server = map::Server::new(Cat)
.with_server_info_file(&server_info_file)
Expand All @@ -250,7 +240,10 @@ mod tests {
assert_eq!(server.socket_file(), sock_file);

let (shutdown_tx, shutdown_rx) = oneshot::channel();
let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await });
let shutdown = async {
shutdown_rx.await.unwrap();
};
let task = tokio::spawn(async move { server.start_with_shutdown(shutdown).await });

tokio::time::sleep(Duration::from_millis(50)).await;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i forget why we need this sleep.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done just to ensure the server is started completely before sending requests to it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we write a small utility func to do a ping and use that? else there could be flaky tests in the future. we do not have to do this in this PR.

Expand Down
8 changes: 2 additions & 6 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,9 @@ pub async fn start_uds_server<T>(m: T) -> Result<(), Box<dyn std::error::Error +
where
T: Reducer + Send + Sync + 'static,
{
let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() {
"/var/run/numaflow/server-info"
} else {
"/tmp/numaflow.server-info"
};
let socket_file = "/var/run/numaflow/reduce.sock";
let listener = shared::create_listener_stream(socket_file, server_info_file)?;
let listener =
shared::create_listener_stream(socket_file, "/var/run/numaflow/reducer-server-info")?;
let reduce_svc = ReduceService {
handler: Arc::new(m),
};
Expand Down
30 changes: 16 additions & 14 deletions src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{collections::HashMap, io};

use chrono::{DateTime, TimeZone, Timelike, Utc};
use prost_types::Timestamp;
use tokio::sync::oneshot;
use tokio::signal;
use tokio_stream::wrappers::UnixListenerStream;
use tracing::info;

Expand Down Expand Up @@ -57,19 +57,21 @@ pub(crate) fn prost_timestamp_from_utc(t: DateTime<Utc>) -> Option<Timestamp> {
})
}

pub(crate) async fn wait_for_signal(tx: oneshot::Sender<()>) {
use tokio::signal::unix::{signal, SignalKind};
let mut interrupt =
signal(SignalKind::interrupt()).expect("Failed to register SIGINT interrupt handler");
let mut termination =
signal(SignalKind::terminate()).expect("Failed to register SIGTERM interrupt handler");
pub(crate) async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install SIGINT handler");
};

let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install SITERM handler")
.recv()
.await;
};
tokio::select! {
_ = interrupt.recv() => {
tracing::info!("Received SIGINT. Stopping gRPC server")
}
_ = termination.recv() => {
tracing::info!("Received SIGTERM. Stopping gRPC server")
}
_ = ctrl_c => {},
_ = terminate => {},
}
tx.send(()).expect("Sending shutdown signal to gRPC server");
}
10 changes: 4 additions & 6 deletions src/sideinput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,11 @@ pub async fn start_uds_server<T>(m: T) -> Result<(), Box<dyn std::error::Error +
where
T: SideInputer + Send + Sync + 'static,
{
let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() {
"/var/run/numaflow/server-info"
} else {
"/tmp/numaflow.server-info"
};
let socket_file = "/var/run/numaflow/sideinput.sock";
let listener = crate::shared::create_listener_stream(socket_file, server_info_file)?;
let listener = crate::shared::create_listener_stream(
socket_file,
"/var/run/numaflow/sideinput-server-info",
)?;
let si_svc = SideInputService { handler: m };

tonic::transport::Server::builder()
Expand Down
43 changes: 18 additions & 25 deletions src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::future::Future;
use std::path::PathBuf;

use chrono::{DateTime, Utc};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc;
use tonic::{Request, Status, Streaming};

use crate::shared;
Expand Down Expand Up @@ -178,15 +179,10 @@ pub struct Server<T> {

impl<T> Server<T> {
pub fn new(svc: T) -> Self {
let server_info_file = if std::env::var_os("NUMAFLOW_POD").is_some() {
"/var/run/numaflow/server-info"
} else {
"/tmp/numaflow.server-info"
};
Self {
sock_addr: "/var/run/numaflow/sink.sock".into(),
max_message_size: 64 * 1024 * 1024,
server_info_file: server_info_file.into(),
server_info_file: "/var/run/numaflow/sinker-server-info".into(),
svc: Some(svc),
}
}
Expand All @@ -198,40 +194,41 @@ impl<T> Server<T> {
self
}

/// Get the unix domain socket file path where gRPC server listens for incoming connections. Default value is `/var/run/numaflow/map.sock`
/// Get the unix domain socket file path where gRPC server listens for incoming connections. Default value is `/var/run/numaflow/sink.sock`
pub fn socket_file(&self) -> &std::path::Path {
self.sock_addr.as_path()
}

/// Set the maximum size of an encoded and decoded gRPC message. The value of `message_size` is in bytes. Default value is 4MB.
/// Set the maximum size of an encoded and decoded gRPC message. The value of `message_size` is in bytes. Default value is 64MB.
pub fn with_max_message_size(mut self, message_size: usize) -> Self {
self.max_message_size = message_size;
self
}

/// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 4MB.
/// Get the maximum size of an encoded and decoded gRPC message in bytes. Default value is 64MB.
pub fn max_message_size(&self) -> usize {
self.max_message_size
}

/// Change the file in which numflow server information is stored on start up to the new value. Default value is `/tmp/numaflow.server-info`
/// Change the file in which numflow server information is stored on start up to the new value. Default value is `/var/run/numaflow/sinker-server-info`
pub fn with_server_info_file(mut self, file: impl Into<PathBuf>) -> Self {
self.server_info_file = file.into();
self
}

/// Get the path to the file where numaflow server info is stored. Default value is `/tmp/numaflow.server-info`
/// Get the path to the file where numaflow server info is stored. Default value is `/var/run/numaflow/sinker-server-info`
pub fn server_info_file(&self) -> &std::path::Path {
self.server_info_file.as_path()
}

/// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated.
pub async fn start_with_shutdown(
pub async fn start_with_shutdown<F>(
&mut self,
shutdown: oneshot::Receiver<()>,
shutdown: F,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
T: Sinker + Send + Sync + 'static,
F: Future<Output = ()>,
{
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = self.svc.take().unwrap();
Expand All @@ -240,11 +237,6 @@ impl<T> Server<T> {
.max_encoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);

let shutdown = async {
shutdown
.await
.expect("Receiving message from shutdown channel");
};
tonic::transport::Server::builder()
.add_service(svc)
.serve_with_incoming_shutdown(listener, shutdown)
Expand All @@ -257,9 +249,7 @@ impl<T> Server<T> {
where
T: Sinker + Send + Sync + 'static,
{
let (tx, rx) = oneshot::channel::<()>();
tokio::spawn(shared::wait_for_signal(tx));
self.start_with_shutdown(rx).await
self.start_with_shutdown(shared::shutdown_signal()).await
}
}

Expand Down Expand Up @@ -315,8 +305,8 @@ mod tests {
}

let tmp_dir = TempDir::new()?;
let sock_file = tmp_dir.path().join("map.sock");
let server_info_file = tmp_dir.path().join("server_info");
let sock_file = tmp_dir.path().join("sink.sock");
let server_info_file = tmp_dir.path().join("sinker-server-info");

let mut server = sink::Server::new(Logger)
.with_server_info_file(&server_info_file)
Expand All @@ -328,7 +318,10 @@ mod tests {
assert_eq!(server.socket_file(), sock_file);

let (shutdown_tx, shutdown_rx) = oneshot::channel();
let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await });
let shutdown = async {
shutdown_rx.await.unwrap();
};
let task = tokio::spawn(async move { server.start_with_shutdown(shutdown).await });

tokio::time::sleep(Duration::from_millis(50)).await;

Expand Down
Loading
Loading