Skip to content

Commit

Permalink
feat: simplified sourcer server API changes (#32)
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
  • Loading branch information
BulkBeing authored Feb 29, 2024
1 parent 4e2764a commit 19f975a
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 73 deletions.
1 change: 1 addition & 0 deletions examples/simple-source/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
2 changes: 1 addition & 1 deletion examples/simple-source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ name = "server"
path = "src/main.rs"

[dependencies]
tonic = "0.9"
tonic = "0.10.2"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "main" }
chrono = "0.4.30"
4 changes: 2 additions & 2 deletions examples/simple-source/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.70 as build
FROM rust:1.76-bookworm as build

RUN apt-get update
RUN apt-get install protobuf-compiler -y
Expand All @@ -16,7 +16,7 @@ COPY ./Cargo.lock ./Cargo.lock
RUN cargo build --release

# our final base
FROM rust
FROM debian:bookworm

# copy the build artifact from the build stage
COPY --from=build /examples/target/release/server .
Expand Down
27 changes: 15 additions & 12 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
///! An example for simple User Defined Source. It generates a continuous increasing sequence of offsets and some data for each call to [`numaflow::source::sourcer::read`].

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let source_handle = simple_source::SimpleSource::new();
numaflow::source::start_uds_server(source_handle).await?;

Ok(())
numaflow::source::Server::new(source_handle).start().await
}

pub(crate) mod simple_source {
use std::{
collections::HashMap,
collections::HashSet,
sync::atomic::{AtomicUsize, Ordering},
sync::RwLock,
};
Expand All @@ -24,21 +22,24 @@ pub(crate) mod simple_source {
/// does not provide a mutable reference as explained in [`numaflow::source::Sourcer`]
pub(crate) struct SimpleSource {
read_idx: AtomicUsize,
yet_to_ack: RwLock<HashMap<u32, bool>>,
yet_to_ack: RwLock<HashSet<u32>>,
}

impl SimpleSource {
pub fn new() -> Self {
Self {
read_idx: AtomicUsize::new(0),
yet_to_ack: RwLock::new(HashMap::new()),
yet_to_ack: RwLock::new(HashSet::new()),
}
}
}

#[async_trait]
impl Sourcer for SimpleSource {
async fn read(&self, source_request: SourceReadRequest, transmitter: Sender<Message>) {
if !self.yet_to_ack.read().unwrap().is_empty() {
return;
}
let start = Instant::now();

for i in 1..=source_request.count {
Expand All @@ -58,7 +59,7 @@ pub(crate) mod simple_source {
value: format!("{i} at {offset}").into_bytes(),
offset: Offset {
offset: offset.to_be_bytes().to_vec(),
partition_id: "0".to_string(),
partition_id: 0,
},
event_time: chrono::offset::Utc::now(),
keys: vec![],
Expand All @@ -67,10 +68,8 @@ pub(crate) mod simple_source {
.unwrap();

// add the entry to hashmap to mark the offset as pending to-be-acked
match self.yet_to_ack.write() {
Ok(mut guard) => guard.insert(offset as u32, true),
Err(_) => panic!("lock has been poisoned!"),
};
let mut yet_to_ack = self.yet_to_ack.write().expect("lock has been poisoned");
yet_to_ack.insert(offset as u32);
}
}

Expand All @@ -87,5 +86,9 @@ pub(crate) mod simple_source {
// pending for simple source is zero since we are not reading from any external source
0
}

async fn partitions(&self) -> Option<Vec<i32>> {
Some(vec![1])
}
}
}
2 changes: 1 addition & 1 deletion src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl<T> Server<T> {
self.sock_addr.as_path()
}

/// Set the maximum size of an encoded and decoded gRPC message. The value of `message_size` is in bytes. Default value is 4MB.
/// Set the maximum size of an encoded and decoded gRPC message. The value of `message_size` is in bytes. Default value is 64MB.
pub fn with_max_message_size(mut self, message_size: usize) -> Self {
self.max_message_size = message_size;
self
Expand Down
Loading

0 comments on commit 19f975a

Please sign in to comment.