Skip to content

Commit

Permalink
feat: container-type level version compatibility check
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <yangkr920208@gmail.com>
  • Loading branch information
KeranYang committed Oct 1, 2024
1 parent 30d8ce1 commit 767af03
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 41 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ tracing = "0.1.40"
uuid = { version = "1.10.0", features = ["v4"] }
thiserror = "1.0"
hyper-util = "0.1.7"
lazy_static = "1.5.0"

[build-dependencies]
tonic-build = "0.12.2"
Expand Down
9 changes: 8 additions & 1 deletion src/batchmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::error::Error;
use crate::error::Error::BatchMapError;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::shared;
use crate::shared::shutdown_signal;
use crate::shared::{shutdown_signal, ContainerType};

const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/batchmap.sock";
Expand Down Expand Up @@ -472,6 +472,13 @@ impl<T> crate::batchmap::Server<T> {
T: BatchMapper + Send + Sync + 'static,
{
let mut info = shared::ServerInfo::default();
// set the minimum numaflow version for the map container
info.set_minimum_numaflow_version(
shared::MinimumNumaflowVersion
.get(&ContainerType::Map)
.copied()
.unwrap_or_default(),
);
// update the info json metadata field, and add the map mode
info.set_metadata(shared::MAP_MODE_KEY, shared::BATCH_MAP);
let listener =
Expand Down
9 changes: 8 additions & 1 deletion src/map.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::error::Error::MapError;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::shared;
use crate::shared::shutdown_signal;
use crate::shared::{shutdown_signal, ContainerType};
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::fs;
Expand Down Expand Up @@ -309,6 +309,13 @@ impl<T> Server<T> {
T: Mapper + Send + Sync + 'static,
{
let mut info = shared::ServerInfo::default();
// set the minimum numaflow version for the map container
info.set_minimum_numaflow_version(
shared::MinimumNumaflowVersion
.get(&ContainerType::Map)
.copied()
.unwrap_or_default(),
);
// update the info json metadata field, and add the map mode key value pair
info.set_metadata(shared::MAP_MODE_KEY, shared::UNARY_MAP);

Expand Down
17 changes: 11 additions & 6 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::error::Error;
use crate::error::Error::ReduceError;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::shared;
use crate::shared::prost_timestamp_from_utc;
use crate::shared::{prost_timestamp_from_utc, ContainerType};

const KEY_JOIN_DELIMITER: &str = ":";
const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
Expand Down Expand Up @@ -817,11 +817,16 @@ impl<C> Server<C> {
where
C: ReducerCreator + Send + Sync + 'static,
{
let listener = shared::create_listener_stream(
&self.sock_addr,
&self.server_info_file,
shared::ServerInfo::default(),
)?;
let mut info = shared::ServerInfo::default();
// set the minimum numaflow version for the reduce container
info.set_minimum_numaflow_version(
shared::MinimumNumaflowVersion
.get(&ContainerType::Reduce)
.copied()
.unwrap_or_default(),
);
let listener =
shared::create_listener_stream(&self.sock_addr, &self.server_info_file, info)?;
let creator = self.creator.take().unwrap();
let (internal_shutdown_tx, internal_shutdown_rx) = channel(1);
let cln_token = CancellationToken::new();
Expand Down
33 changes: 30 additions & 3 deletions src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::Path;
use std::{collections::HashMap, io};

use chrono::{DateTime, TimeZone, Timelike, Utc};
use lazy_static::lazy_static;
use prost_types::Timestamp;
use serde::{Deserialize, Serialize};
use tokio::net::UnixListener;
Expand All @@ -15,6 +16,16 @@ pub(crate) const MAP_MODE_KEY: &str = "MAP_MODE";
pub(crate) const UNARY_MAP: &str = "unary-map";
pub(crate) const BATCH_MAP: &str = "batch-map";

#[derive(Eq, PartialEq, Hash)]
pub(crate) enum ContainerType {
Map,
Reduce,
Sink,
Source,
SourceTransformer,
SideInput,
}

// Minimum version of Numaflow required by the current SDK version
//
// Updating this value:
Expand All @@ -32,7 +43,18 @@ pub(crate) const BATCH_MAP: &str = "batch-map";
// Therefore, we translate ">= a.b.c" into ">= a.b.c-z".
// The character 'z' is the largest in the ASCII table, ensuring that all RC versions are recognized as
// smaller than any stable version suffixed with '-z'.
const MINIMUM_NUMAFLOW_VERSION: &str = "1.3.1-z";
lazy_static! {
pub(crate) static ref MinimumNumaflowVersion: HashMap<ContainerType, &'static str> = {
let mut m = HashMap::new();
m.insert(ContainerType::Source, "1.3.1-z");
m.insert(ContainerType::Map, "1.3.1-z");
m.insert(ContainerType::Reduce, "1.3.1-z");
m.insert(ContainerType::Sink, "1.3.1-z");
m.insert(ContainerType::SourceTransformer, "1.3.1-z");
m.insert(ContainerType::SideInput, "1.3.1-z");
m
};
}
const SDK_VERSION: &str = env!("CARGO_PKG_VERSION");

// ServerInfo structure to store server-related information
Expand Down Expand Up @@ -61,7 +83,7 @@ impl ServerInfo {
ServerInfo {
protocol: "uds".to_string(),
language: "rust".to_string(),
minimum_numaflow_version: MINIMUM_NUMAFLOW_VERSION.to_string(),
minimum_numaflow_version: "".to_string(),
version: SDK_VERSION.to_string(),
metadata: Option::from(metadata),
}
Expand All @@ -86,6 +108,11 @@ impl ServerInfo {
self.metadata = Some(metadata);
}
}

// Set minimum numaflow version
pub fn set_minimum_numaflow_version(&mut self, version: &str) {
self.minimum_numaflow_version = version.to_string();
}
}

// #[tracing::instrument(skip(path), fields(path = ?path.as_ref()))]
Expand All @@ -100,7 +127,7 @@ fn write_info_file(path: impl AsRef<Path>, mut server_info: ServerInfo) -> io::R
server_info = ServerInfo::default();
}
// Convert to a string of JSON and print it out
let serialized = serde_json::to_string(&server_info).unwrap();
let serialized = serde_json::to_string(&server_info)?;
let content = format!("{}U+005C__END__", serialized);
info!(content, "Writing to file");
fs::write(path, content)
Expand Down
17 changes: 11 additions & 6 deletions src/sideinput.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::error::Error::SideInputError;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::shared;
use crate::shared::shutdown_signal;
use crate::shared::{shutdown_signal, ContainerType};
use std::fs;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -194,11 +194,16 @@ impl<T> Server<T> {
where
T: SideInputer + Send + Sync + 'static,
{
let listener = shared::create_listener_stream(
&self.sock_addr,
&self.server_info_file,
shared::ServerInfo::default(),
)?;
let mut info = shared::ServerInfo::default();
// set the minimum numaflow version for the side input container
info.set_minimum_numaflow_version(
shared::MinimumNumaflowVersion
.get(&ContainerType::SideInput)
.copied()
.unwrap_or_default(),
);
let listener =
shared::create_listener_stream(&self.sock_addr, &self.server_info_file, info)?;
let handler = self.svc.take().unwrap();
let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1);
let cln_token = CancellationToken::new();
Expand Down
17 changes: 12 additions & 5 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::error::Error;
use crate::error::Error::SinkError;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::shared;
use crate::shared::ContainerType;
use crate::sink::sink_pb::SinkResponse;

use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::path::PathBuf;
Expand Down Expand Up @@ -466,11 +468,16 @@ impl<T> Server<T> {
where
T: Sinker + Send + Sync + 'static,
{
let listener = shared::create_listener_stream(
&self.sock_addr,
&self.server_info_file,
shared::ServerInfo::default(),
)?;
let mut info = shared::ServerInfo::default();
// set the minimum numaflow version for the sink container
info.set_minimum_numaflow_version(
shared::MinimumNumaflowVersion
.get(&ContainerType::Sink)
.copied()
.unwrap_or_default(),
);
let listener =
shared::create_listener_stream(&self.sock_addr, &self.server_info_file, info)?;
let handler = self.svc.take().unwrap();
let cln_token = CancellationToken::new();
let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1);
Expand Down
18 changes: 12 additions & 6 deletions src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use std::time::Duration;

use crate::error::Error::SourceError;
use crate::error::{Error, ErrorKind};
use crate::shared::{self, prost_timestamp_from_utc};
use crate::shared::{self, prost_timestamp_from_utc, ContainerType};
use crate::source::proto::{AckRequest, AckResponse, ReadRequest, ReadResponse};

use chrono::{DateTime, Utc};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::oneshot;
Expand Down Expand Up @@ -492,11 +493,16 @@ impl<T> Server<T> {
where
T: Sourcer + Send + Sync + 'static,
{
let listener = shared::create_listener_stream(
&self.sock_addr,
&self.server_info_file,
shared::ServerInfo::default(),
)?;
let mut info = shared::ServerInfo::default();
// set the minimum numaflow version for the source container
info.set_minimum_numaflow_version(
shared::MinimumNumaflowVersion
.get(&ContainerType::Source)
.copied()
.unwrap_or_default(),
);
let listener =
shared::create_listener_stream(&self.sock_addr, &self.server_info_file, info)?;
let handler = self.svc.take().unwrap();
let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1);
let cln_token = CancellationToken::new();
Expand Down
31 changes: 18 additions & 13 deletions src/sourcetransform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::{error, info, warn};

use crate::error::Error::{self, SourceTransformerError};
use crate::error::ErrorKind;
use crate::shared::{self, prost_timestamp_from_utc, utc_from_timestamp};
use crate::shared::{self, prost_timestamp_from_utc, utc_from_timestamp, ContainerType};

const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/sourcetransform.sock";
Expand Down Expand Up @@ -528,11 +528,16 @@ impl<T> Server<T> {
where
T: SourceTransformer + Send + Sync + 'static,
{
let listener = shared::create_listener_stream(
&self.sock_addr,
&self.server_info_file,
shared::ServerInfo::default(),
)?;
let mut info = shared::ServerInfo::default();
// set the minimum numaflow version for the source transformer container
info.set_minimum_numaflow_version(
shared::MinimumNumaflowVersion
.get(&ContainerType::SourceTransformer)
.copied()
.unwrap_or_default(),
);
let listener =
shared::create_listener_stream(&self.sock_addr, &self.server_info_file, info)?;
let handler = self.svc.take().unwrap();
let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1);
let cln_token = CancellationToken::new();
Expand Down Expand Up @@ -657,9 +662,9 @@ mod tests {
Duration::from_secs(2),
client.source_transform_fn(ReceiverStream::new(rx)),
)
.await
.map_err(|_| "timeout while getting stream for source_transform_fn")??
.into_inner();
.await
.map_err(|_| "timeout while getting stream for source_transform_fn")??
.into_inner();

let handshake_resp = stream.message().await?.unwrap();
assert!(
Expand Down Expand Up @@ -762,9 +767,9 @@ mod tests {
Duration::from_secs(2),
client.source_transform_fn(ReceiverStream::new(rx)),
)
.await
.map_err(|_| "timeout while getting stream for source_transform_fn")??
.into_inner();
.await
.map_err(|_| "timeout while getting stream for source_transform_fn")??
.into_inner();

let handshake_resp = stream.message().await?.unwrap();
assert!(
Expand Down Expand Up @@ -795,4 +800,4 @@ mod tests {
assert!(task.is_finished(), "gRPC server is still running");
Ok(())
}
}
}

0 comments on commit 767af03

Please sign in to comment.