diff --git a/Cargo.toml b/Cargo.toml index 9908598..3810095 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ tracing = "0.1.40" uuid = { version = "1.10.0", features = ["v4"] } thiserror = "1.0" hyper-util = "0.1.7" +lazy_static = "1.5.0" [build-dependencies] tonic-build = "0.12.2" diff --git a/src/batchmap.rs b/src/batchmap.rs index 43f43e2..958cd75 100644 --- a/src/batchmap.rs +++ b/src/batchmap.rs @@ -15,7 +15,7 @@ use crate::error::Error; use crate::error::Error::BatchMapError; use crate::error::ErrorKind::{InternalError, UserDefinedError}; use crate::shared; -use crate::shared::shutdown_signal; +use crate::shared::{shutdown_signal, ContainerType}; const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/batchmap.sock"; @@ -472,6 +472,13 @@ impl crate::batchmap::Server { T: BatchMapper + Send + Sync + 'static, { let mut info = shared::ServerInfo::default(); + // set the minimum numaflow version for the map container + info.set_minimum_numaflow_version( + shared::MinimumNumaflowVersion + .get(&ContainerType::Map) + .copied() + .unwrap_or_default(), + ); // update the info json metadata field, and add the map mode info.set_metadata(shared::MAP_MODE_KEY, shared::BATCH_MAP); let listener = diff --git a/src/map.rs b/src/map.rs index 3499828..97f755d 100644 --- a/src/map.rs +++ b/src/map.rs @@ -1,7 +1,7 @@ use crate::error::Error::MapError; use crate::error::ErrorKind::{InternalError, UserDefinedError}; use crate::shared; -use crate::shared::shutdown_signal; +use crate::shared::{shutdown_signal, ContainerType}; use chrono::{DateTime, Utc}; use std::collections::HashMap; use std::fs; @@ -309,6 +309,13 @@ impl Server { T: Mapper + Send + Sync + 'static, { let mut info = shared::ServerInfo::default(); + // set the minimum numaflow version for the map container + info.set_minimum_numaflow_version( + shared::MinimumNumaflowVersion + .get(&ContainerType::Map) + .copied() + .unwrap_or_default(), + ); // update the info json metadata field, and add the map mode key value pair info.set_metadata(shared::MAP_MODE_KEY, shared::UNARY_MAP); diff --git a/src/reduce.rs b/src/reduce.rs index 938e25c..a9a9ad0 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -15,7 +15,7 @@ use crate::error::Error; use crate::error::Error::ReduceError; use crate::error::ErrorKind::{InternalError, UserDefinedError}; use crate::shared; -use crate::shared::prost_timestamp_from_utc; +use crate::shared::{prost_timestamp_from_utc, ContainerType}; const KEY_JOIN_DELIMITER: &str = ":"; const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; @@ -817,11 +817,16 @@ impl Server { where C: ReducerCreator + Send + Sync + 'static, { - let listener = shared::create_listener_stream( - &self.sock_addr, - &self.server_info_file, - shared::ServerInfo::default(), - )?; + let mut info = shared::ServerInfo::default(); + // set the minimum numaflow version for the reduce container + info.set_minimum_numaflow_version( + shared::MinimumNumaflowVersion + .get(&ContainerType::Reduce) + .copied() + .unwrap_or_default(), + ); + let listener = + shared::create_listener_stream(&self.sock_addr, &self.server_info_file, info)?; let creator = self.creator.take().unwrap(); let (internal_shutdown_tx, internal_shutdown_rx) = channel(1); let cln_token = CancellationToken::new(); diff --git a/src/shared.rs b/src/shared.rs index af4c08e..b9dbbad 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -3,6 +3,7 @@ use std::path::Path; use std::{collections::HashMap, io}; use chrono::{DateTime, TimeZone, Timelike, Utc}; +use lazy_static::lazy_static; use prost_types::Timestamp; use serde::{Deserialize, Serialize}; use tokio::net::UnixListener; @@ -15,6 +16,16 @@ pub(crate) const MAP_MODE_KEY: &str = "MAP_MODE"; pub(crate) const UNARY_MAP: &str = "unary-map"; pub(crate) const BATCH_MAP: &str = "batch-map"; +#[derive(Eq, PartialEq, Hash)] +pub(crate) enum ContainerType { + Map, + Reduce, + Sink, + Source, + SourceTransformer, + SideInput, +} + // Minimum version of Numaflow required by the current SDK version // // Updating this value: @@ -32,7 +43,18 @@ pub(crate) const BATCH_MAP: &str = "batch-map"; // Therefore, we translate ">= a.b.c" into ">= a.b.c-z". // The character 'z' is the largest in the ASCII table, ensuring that all RC versions are recognized as // smaller than any stable version suffixed with '-z'. -const MINIMUM_NUMAFLOW_VERSION: &str = "1.3.1-z"; +lazy_static! { + pub(crate) static ref MinimumNumaflowVersion: HashMap = { + let mut m = HashMap::new(); + m.insert(ContainerType::Source, "1.3.1-z"); + m.insert(ContainerType::Map, "1.3.1-z"); + m.insert(ContainerType::Reduce, "1.3.1-z"); + m.insert(ContainerType::Sink, "1.3.1-z"); + m.insert(ContainerType::SourceTransformer, "1.3.1-z"); + m.insert(ContainerType::SideInput, "1.3.1-z"); + m + }; +} const SDK_VERSION: &str = env!("CARGO_PKG_VERSION"); // ServerInfo structure to store server-related information @@ -61,7 +83,7 @@ impl ServerInfo { ServerInfo { protocol: "uds".to_string(), language: "rust".to_string(), - minimum_numaflow_version: MINIMUM_NUMAFLOW_VERSION.to_string(), + minimum_numaflow_version: "".to_string(), version: SDK_VERSION.to_string(), metadata: Option::from(metadata), } @@ -86,6 +108,11 @@ impl ServerInfo { self.metadata = Some(metadata); } } + + // Set minimum numaflow version + pub fn set_minimum_numaflow_version(&mut self, version: &str) { + self.minimum_numaflow_version = version.to_string(); + } } // #[tracing::instrument(skip(path), fields(path = ?path.as_ref()))] @@ -100,7 +127,7 @@ fn write_info_file(path: impl AsRef, mut server_info: ServerInfo) -> io::R server_info = ServerInfo::default(); } // Convert to a string of JSON and print it out - let serialized = serde_json::to_string(&server_info).unwrap(); + let serialized = serde_json::to_string(&server_info)?; let content = format!("{}U+005C__END__", serialized); info!(content, "Writing to file"); fs::write(path, content) diff --git a/src/sideinput.rs b/src/sideinput.rs index 3ea9bbf..cefd90b 100644 --- a/src/sideinput.rs +++ b/src/sideinput.rs @@ -1,7 +1,7 @@ use crate::error::Error::SideInputError; use crate::error::ErrorKind::{InternalError, UserDefinedError}; use crate::shared; -use crate::shared::shutdown_signal; +use crate::shared::{shutdown_signal, ContainerType}; use std::fs; use std::path::PathBuf; use std::sync::Arc; @@ -194,11 +194,16 @@ impl Server { where T: SideInputer + Send + Sync + 'static, { - let listener = shared::create_listener_stream( - &self.sock_addr, - &self.server_info_file, - shared::ServerInfo::default(), - )?; + let mut info = shared::ServerInfo::default(); + // set the minimum numaflow version for the side input container + info.set_minimum_numaflow_version( + shared::MinimumNumaflowVersion + .get(&ContainerType::SideInput) + .copied() + .unwrap_or_default(), + ); + let listener = + shared::create_listener_stream(&self.sock_addr, &self.server_info_file, info)?; let handler = self.svc.take().unwrap(); let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1); let cln_token = CancellationToken::new(); diff --git a/src/sink.rs b/src/sink.rs index 160a374..1282865 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -2,7 +2,9 @@ use crate::error::Error; use crate::error::Error::SinkError; use crate::error::ErrorKind::{InternalError, UserDefinedError}; use crate::shared; +use crate::shared::ContainerType; use crate::sink::sink_pb::SinkResponse; + use chrono::{DateTime, Utc}; use std::collections::HashMap; use std::path::PathBuf; @@ -466,11 +468,16 @@ impl Server { where T: Sinker + Send + Sync + 'static, { - let listener = shared::create_listener_stream( - &self.sock_addr, - &self.server_info_file, - shared::ServerInfo::default(), - )?; + let mut info = shared::ServerInfo::default(); + // set the minimum numaflow version for the sink container + info.set_minimum_numaflow_version( + shared::MinimumNumaflowVersion + .get(&ContainerType::Sink) + .copied() + .unwrap_or_default(), + ); + let listener = + shared::create_listener_stream(&self.sock_addr, &self.server_info_file, info)?; let handler = self.svc.take().unwrap(); let cln_token = CancellationToken::new(); let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1); diff --git a/src/source.rs b/src/source.rs index 59c0b80..cd3f722 100644 --- a/src/source.rs +++ b/src/source.rs @@ -6,8 +6,9 @@ use std::time::Duration; use crate::error::Error::SourceError; use crate::error::{Error, ErrorKind}; -use crate::shared::{self, prost_timestamp_from_utc}; +use crate::shared::{self, prost_timestamp_from_utc, ContainerType}; use crate::source::proto::{AckRequest, AckResponse, ReadRequest, ReadResponse}; + use chrono::{DateTime, Utc}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::oneshot; @@ -492,11 +493,16 @@ impl Server { where T: Sourcer + Send + Sync + 'static, { - let listener = shared::create_listener_stream( - &self.sock_addr, - &self.server_info_file, - shared::ServerInfo::default(), - )?; + let mut info = shared::ServerInfo::default(); + // set the minimum numaflow version for the source container + info.set_minimum_numaflow_version( + shared::MinimumNumaflowVersion + .get(&ContainerType::Source) + .copied() + .unwrap_or_default(), + ); + let listener = + shared::create_listener_stream(&self.sock_addr, &self.server_info_file, info)?; let handler = self.svc.take().unwrap(); let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1); let cln_token = CancellationToken::new(); diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index 9d9a6b4..aed7cc5 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -14,7 +14,7 @@ use tracing::{error, info, warn}; use crate::error::Error::{self, SourceTransformerError}; use crate::error::ErrorKind; -use crate::shared::{self, prost_timestamp_from_utc, utc_from_timestamp}; +use crate::shared::{self, prost_timestamp_from_utc, utc_from_timestamp, ContainerType}; const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/sourcetransform.sock"; @@ -528,11 +528,16 @@ impl Server { where T: SourceTransformer + Send + Sync + 'static, { - let listener = shared::create_listener_stream( - &self.sock_addr, - &self.server_info_file, - shared::ServerInfo::default(), - )?; + let mut info = shared::ServerInfo::default(); + // set the minimum numaflow version for the source transformer container + info.set_minimum_numaflow_version( + shared::MinimumNumaflowVersion + .get(&ContainerType::SourceTransformer) + .copied() + .unwrap_or_default(), + ); + let listener = + shared::create_listener_stream(&self.sock_addr, &self.server_info_file, info)?; let handler = self.svc.take().unwrap(); let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1); let cln_token = CancellationToken::new(); @@ -657,9 +662,9 @@ mod tests { Duration::from_secs(2), client.source_transform_fn(ReceiverStream::new(rx)), ) - .await - .map_err(|_| "timeout while getting stream for source_transform_fn")?? - .into_inner(); + .await + .map_err(|_| "timeout while getting stream for source_transform_fn")?? + .into_inner(); let handshake_resp = stream.message().await?.unwrap(); assert!( @@ -762,9 +767,9 @@ mod tests { Duration::from_secs(2), client.source_transform_fn(ReceiverStream::new(rx)), ) - .await - .map_err(|_| "timeout while getting stream for source_transform_fn")?? - .into_inner(); + .await + .map_err(|_| "timeout while getting stream for source_transform_fn")?? + .into_inner(); let handshake_resp = stream.message().await?.unwrap(); assert!( @@ -795,4 +800,4 @@ mod tests { assert!(task.is_finished(), "gRPC server is still running"); Ok(()) } -} +} \ No newline at end of file