Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Introducing Factory to Create Reducer #40

Merged
merged 3 commits into from
May 7, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 83 additions & 24 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,55 @@ pub mod proto {
tonic::include_proto!("reduce.v1");
}

struct ReduceService<T> {
handler: Arc<T>,
struct ReduceService<C>
where
C: ReducerCreator + Send + Sync + 'static,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try to remove the bound?

{
creator: C,
}

/// `ReducerCreator` is a trait for creating a new instance of a `Reducer`.
pub trait ReducerCreator {
/// Each type that implements `ReducerCreator` must also specify an associated type `R` that implements the `Reducer` trait.
/// The `create` method is used to create a new instance of this `Reducer` type.
///
/// # Example
///
/// Below is an example of how to implement the `ReducerCreator` trait for a specific type `MyReducerCreator`.
/// `MyReducerCreator` creates instances of `MyReducer`, which is a type that implements the `Reducer` trait.
///
/// ```rust
/// use numaflow::reduce::{Reducer, ReducerCreator, ReduceRequest, Metadata, Message};
/// use tokio::sync::mpsc::Receiver;
/// use tonic::async_trait;
///
/// pub struct MyReducer;
///
/// #[async_trait]
/// impl Reducer for MyReducer {
/// async fn reduce(
/// &self,
/// keys: Vec<String>,
/// mut input: Receiver<ReduceRequest>,
/// md: &Metadata,
/// ) -> Vec<Message> {
/// // Implementation of the reduce method goes here.
/// vec![]
/// }
/// }
///
/// pub struct MyReducerCreator;
///
/// impl ReducerCreator for MyReducerCreator {
/// type R = MyReducer;
///
/// fn create(&self) -> Self::R {
/// MyReducer
/// }
/// }
/// ```
type R: Reducer + Send + Sync + 'static;
fn create(&self) -> Self::R;
}

/// Reducer trait for implementing Reduce handler.
Expand All @@ -46,8 +93,8 @@ pub trait Reducer {
///
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let reduce_handler = counter::Counter::new();
/// reduce::Server::new(reduce_handler).start().await?;
/// let handler_creator = counter::CounterCreator{};
/// reduce::Server::new(handler_creator).start().await?;
/// Ok(())
/// }
/// mod counter {
Expand All @@ -57,6 +104,17 @@ pub trait Reducer {
/// use tonic::async_trait;
/// use numaflow::reduce::proto::reduce_server::Reduce;
/// pub(crate) struct Counter {}
///
/// pub(crate) struct CounterCreator {}
///
/// impl numaflow::reduce::ReducerCreator for CounterCreator {
/// type R = Counter;
///
/// fn create(&self) -> Counter {
/// Counter::new()
/// }
/// }
///
/// impl Counter {
/// pub(crate) fn new() -> Self {
/// Self {}
Expand All @@ -68,7 +126,7 @@ pub trait Reducer {
/// &self,
/// keys: Vec<String>,
/// mut input: Receiver<ReduceRequest>,
/// md: Metadata,
/// md: &Metadata,
/// ) -> Vec<Message> {
/// let mut counter = 0;
/// // the loop exits when input is closed which will happen only on close of book.
Expand All @@ -89,12 +147,11 @@ pub trait Reducer {
&self,
keys: Vec<String>,
input: mpsc::Receiver<ReduceRequest>,
md: Metadata,
md: &Metadata,
) -> Vec<Message>;
}

/// IntervalWindow is the start and end boundary of the window.
#[derive(Clone)]
pub struct IntervalWindow {
// start time of the window
pub start_time: DateTime<Utc>,
Expand All @@ -114,7 +171,6 @@ impl Metadata {
}
}

#[derive(Clone)]
/// Metadata are additional information passed into the [`Reducer::reduce`].
pub struct Metadata {
pub interval_window: IntervalWindow
Expand Down Expand Up @@ -182,9 +238,9 @@ fn get_window_details(request: &MetadataMap) -> (DateTime<Utc>, DateTime<Utc>) {
}

#[async_trait]
impl<T> proto::reduce_server::Reduce for ReduceService<T>
impl<C> proto::reduce_server::Reduce for ReduceService<C>
where
T: Reducer + Send + Sync + 'static,
C: ReducerCreator + Send + Sync + 'static,
{
type ReduceFnStream = ReceiverStream<Result<proto::ReduceResponse, Status>>;
async fn reduce_fn(
Expand All @@ -193,7 +249,7 @@ where
) -> Result<Response<Self::ReduceFnStream>, Status> {
// get gRPC window from metadata
let (start_win, end_win) = get_window_details(request.metadata());
let md = Metadata::new(IntervalWindow::new(start_win, end_win));
let md = Arc::new(Metadata::new(IntervalWindow::new(start_win, end_win)));

let mut key_to_tx: HashMap<String, Sender<ReduceRequest>> = HashMap::new();

Expand All @@ -214,12 +270,12 @@ where
// since we are calling this in a loop, we need make sure that there is reference counting
// and the lifetime of self is more than the async function.
// try Arc<Self> https://doc.rust-lang.org/reference/items/associated-items.html#methods ?
let v = Arc::clone(&self.handler);
let handler = self.creator.create();
let m = Arc::clone(&md);

// spawn task for each unique key
let keys = rr.keys.clone();
let reduce_md = md.clone();
set.spawn(async move { v.reduce(keys, rx, reduce_md).await });
set.spawn(async move { handler.reduce(keys, rx, m.as_ref()).await });

// write data into the channel
tx.send(rr.into()).await.unwrap();
Expand Down Expand Up @@ -267,21 +323,27 @@ where

/// gRPC server to start a reduce service
#[derive(Debug)]
pub struct Server<T> {
pub struct Server<C>
where
C: ReducerCreator + Send + Sync + 'static,
{
sock_addr: PathBuf,
max_message_size: usize,
server_info_file: PathBuf,
svc: Option<T>,
creator: Option<C>,
}

impl<T> Server<T> {
impl<C> Server<C>
where
C: ReducerCreator + Send + Sync + 'static,
{
/// Create a new Server with the given reduce service
pub fn new(reduce_svc: T) -> Self {
pub fn new(creator: C) -> Self {
Server {
sock_addr: DEFAULT_SOCK_ADDR.into(),
max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
server_info_file: DEFAULT_SERVER_INFO_FILE.into(),
svc: Some(reduce_svc),
creator: Some(creator),
}
}

Expand Down Expand Up @@ -325,12 +387,11 @@ impl<T> Server<T> {
shutdown: F,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
T: Reducer + Send + Sync + 'static,
F: Future<Output = ()>,
{
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = Arc::new(self.svc.take().unwrap());
let reduce_svc = ReduceService { handler };
let creator = self.creator.take().unwrap();
let reduce_svc = ReduceService { creator };
let reduce_svc = proto::reduce_server::ReduceServer::new(reduce_svc)
.max_encoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);
Expand All @@ -344,8 +405,6 @@ impl<T> Server<T> {

/// Starts the gRPC server. Automatically registers signal handlers for SIGINT and SIGTERM and initiates graceful shutdown of gRPC server when either one of the signal arrives.
pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
T: Reducer + Send + Sync + 'static,
{
self.start_with_shutdown(shared::shutdown_signal()).await
}
Expand Down
Loading