Skip to content

Commit

Permalink
chore: code review
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <vigith@gmail.com>
  • Loading branch information
vigith committed Mar 24, 2024
1 parent da489b0 commit e6ba185
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
4 changes: 2 additions & 2 deletions examples/source-transformer-now/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.70 as build
FROM rust:1.75-bookworm as build

RUN apt-get update
RUN apt-get install protobuf-compiler -y
Expand All @@ -16,7 +16,7 @@ COPY ./Cargo.lock ./Cargo.lock
RUN cargo build --release

# our final base
FROM rust
FROM debian:bookworm

# copy the build artifact from the build stage
COPY --from=build /examples/target/release/server .
Expand Down
10 changes: 5 additions & 5 deletions src/sourcetransform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ struct SourceTransformerService<T> {
#[async_trait]
pub trait SourceTransformer {
/// transform takes in an input element and can produce 0, 1, or more results. The input is a [`SourceTransformRequest`]
/// and the output is a ['Vec`] of [`Message`]. In a `transform` each element is processed independently
/// and the output is a [`Vec`] of [`Message`]. In a `transform` each element is processed independently
/// and there is no state associated with the elements. Source transformer can be used for transforming
/// and assigning event time to input messages. More about source transformer can be read
/// [here](https://numaflow.numaproj.io/user-guide/sources/transformer/overview/)
Expand Down Expand Up @@ -123,7 +123,7 @@ where
Ok(Response::new(proto::SourceTransformResponse {
results: messages
.into_iter()
.map(move |msg| msg.into())
.map(|msg| msg.into())
.collect::<Vec<_>>(),
}))
}
Expand All @@ -139,7 +139,7 @@ pub struct Server<T> {
sock_addr: PathBuf,
max_message_size: usize,
server_info_file: PathBuf,
sourcetrf_svc: Option<T>,
svc: Option<T>,
}

impl<T> Server<T> {
Expand All @@ -153,7 +153,7 @@ impl<T> Server<T> {
sock_addr: "/var/run/numaflow/sourcetransform.sock".into(),
max_message_size: 64 * 1024 * 1024,
server_info_file: server_info_file.into(),
sourcetrf_svc: Some(sourcetransformer_svc),
svc: Some(sourcetransformer_svc),
}
}

Expand Down Expand Up @@ -200,7 +200,7 @@ impl<T> Server<T> {
T: SourceTransformer + Send + Sync + 'static,
{
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = self.sourcetrf_svc.take().unwrap();
let handler = self.svc.take().unwrap();
let sourcetrf_svc = SourceTransformerService { handler };
let sourcetrf_svc =
proto::source_transform_server::SourceTransformServer::new(sourcetrf_svc)
Expand Down

0 comments on commit e6ba185

Please sign in to comment.