Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Release pipeline #97

Merged
merged 14 commits into from
Oct 10, 2024
Merged
89 changes: 52 additions & 37 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,54 +1,69 @@
name: Rust
name: Tests

on:
push:
branches: [ main ]
branches:
- main
- release-*
pull_request:
branches: [ main ]
branches:
- main

defaults:
run:
shell: bash

jobs:
build:
name: Unit Tests
name: Clippy and Unit Tests
runs-on: ubuntu-24.04
timeout-minutes: 10
steps:
- name: Check out code
uses: actions/checkout@v2
uses: actions/checkout@v4

- name: Set up Rust
uses: actions-rs/toolchain@v1
- name: Install Rust
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: stable
override: true
cache-workspaces: |
. -> target
./examples/batchmap-cat -> target
./examples/batchmap-flatmap -> target
./examples/map-cat -> target
./examples/map-tickgen-serde -> target
./examples/mapt-event-time-filter -> target
./examples/reduce-counter -> target
./examples/sideinput -> target
./examples/sideinput/udf -> target
./examples/simple-source -> target
./examples/sink-log -> target
./examples/source-transformer-now -> target

- name: Cache Cargo dependencies
uses: actions/cache@v2
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}

- name: Install protoc (for Protocol Buffers)
- name: Configure sccache
run: |
set -eux -o pipefail
PROTOC_VERSION=3.19.4
PROTOC_ZIP=protoc-$PROTOC_VERSION-linux-x86_64.zip
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v$PROTOC_VERSION/$PROTOC_ZIP
sudo unzip -o $PROTOC_ZIP -d /usr/local bin/protoc
sudo unzip -o $PROTOC_ZIP -d /usr/local 'include/*'
sudo chmod +x /usr/local/bin/protoc
sudo find /usr/local/include -type f | xargs sudo chmod a+r
sudo find /usr/local/include -type d | xargs sudo chmod a+rx
rm -f $PROTOC_ZIP

- name: Build
run: cargo build --verbose
echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV
echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV

- name: Run sccache-cache
uses: mozilla-actions/sccache-action@v0.0.5

- name: Install dependencies
run: sudo apt-get install -y protobuf-compiler

- name: Code Generation
run: make codegen

- name: Ensure generated code is checked in
run: git diff --exit-code

- name: Lint
run: make lint

- name: Run tests
run: cargo test --verbose
run: make test

- name: Documentation generation
run: RUSTFLAGS="-D warnings" cargo doc

- name: Check formatting
run: cargo fmt --all -- --check
- name: Dry run Cargo Publish
run: cargo publish --dry-run
46 changes: 46 additions & 0 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: Release

on:
release:
types: [ created ]

defaults:
run:
shell: bash

jobs:
build:
name: Publish to crates.io
runs-on: ubuntu-24.04
timeout-minutes: 12
# run workflow only on numaproj/numaflow-rs repository
if: ${{ github.repository }} == "numaproj/numaflow-rs"
steps:
- name: Check out code
uses: actions/checkout@v4

- name: Install Rust
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
cache: false

- name: Install dependencies
run: sudo apt-get install -y protobuf-compiler

- name: Code Generation
run: make codegen

- name: Ensure generated code is checked in
run: git diff --exit-code

- name: Lint
run: make lint

- name: Run tests
run: make test

- name: Documentation generation
run: RUSTFLAGS="-D warnings" cargo doc

- name: Dry run Cargo Publish
run: CARGO_REGISTRY_TOKEN=${{ secrets.CARGO_PUBLISH }} cargo publish
15 changes: 11 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,23 @@ homepage = "https://numaproj.github.io/numaflow/"
repository = "https://github.com/numaproj/numaflow-rs"
keywords = ["numaflow", "streaming", "messaging", "event-driven"]
categories = ["network-programming", "api-bindings"]
exclude = [
".github/*",
".gitignore",
".dockerignore",
"hack/*",
"Makefile",
]


[lib]
name = "numaflow"
path = "src/lib.rs"

[dependencies]
tonic = "0.12.2"
prost = "0.13.2"
prost-types = "0.13.2"
tonic = "0.12.3"
prost = "0.13.3"
prost-types = "0.13.3"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "signal"] }
tokio-util = "0.7.12"
tokio-stream = { version = "0.1.16", features = ["net"] }
Expand All @@ -34,7 +41,7 @@ thiserror = "1.0"
hyper-util = "0.1.7"

[build-dependencies]
tonic-build = "0.12.2"
tonic-build = "0.12.3"

[dev-dependencies]
tempfile = "3.9.0"
Expand Down
28 changes: 26 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,39 @@
# perform a cargo fmt on all directories containing a Cargo.toml file
.PHONY: lint
# find all directories containing Cargo.toml files
DIRS := $(shell find . -type f -name Cargo.toml -exec dirname {} \; | sort -u)
DIRS := $(shell find . -type f -name Cargo.toml -not -path "./target/*" -exec dirname {} \; | sort -u)
$(info Included directories: $(DIRS))
lint:
fmt:
@for dir in $(DIRS); do \
echo "Formatting code in $$dir"; \
cargo fmt --all --manifest-path "$$dir/Cargo.toml"; \
done

# Check if all files are formatted and run clippy on all directories containing a Cargo.toml file
.PHONY: lint
lint: test-fmt clippy

.PHONY: test-fmt
test-fmt:
@for dir in $(DIRS); do \
echo "Checking if code is formatted in directory: $$dir"; \
cargo fmt --all --check --manifest-path "$$dir/Cargo.toml" || { echo "Code is not formatted in $$dir"; exit 1; }; \
done

.PHONY: clippy
clippy:
@for dir in $(DIRS); do \
echo "Running clippy in directory: $$dir"; \
cargo clippy --workspace --manifest-path "$$dir/Cargo.toml" -- -D warnings || { echo "Clippy warnings/errors found in $$dir"; exit 1; }; \
done

# run cargo test on the repository root
.PHONY: test
test:
cargo test --workspace

.PHONY: codegen
codegen:
# Change timestamps so that tonic_build code generation will always be triggered.
touch proto/*
PROTO_CODE_GEN=1 cargo build
8 changes: 7 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use std::env;

fn main() {
if env::var("PROTO_CODE_GEN").unwrap_or("0".to_string()) != "1" {
return;
}
tonic_build::configure()
.build_server(true)
.compile(
.out_dir("src/servers")
.compile_protos(
&[
"proto/source.proto",
"proto/sourcetransform.proto",
Expand Down
2 changes: 1 addition & 1 deletion examples/reduce-counter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod counter {
&self,
keys: Vec<String>,
mut input: Receiver<ReduceRequest>,
md: &Metadata,
_md: &Metadata,
) -> Vec<Message> {
let mut counter = 0;
// the loop exits when input is closed which will happen only on close of book.
Expand Down
5 changes: 4 additions & 1 deletion examples/source-transformer-now/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ impl sourcetransform::SourceTransformer for NowCat {
&self,
input: sourcetransform::SourceTransformRequest,
) -> Vec<sourcetransform::Message> {
vec![sourcetransform::Message::new(input.value, chrono::offset::Utc::now()).keys(input.keys.clone())]
vec![
sourcetransform::Message::new(input.value, chrono::offset::Utc::now())
.keys(input.keys.clone()),
]
}
}
7 changes: 2 additions & 5 deletions src/batchmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@ use crate::batchmap::proto::batch_map_server::BatchMap;
use crate::error::Error;
use crate::error::Error::BatchMapError;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::servers::batchmap as proto;
use crate::shared::{self, shutdown_signal, ContainerType};

const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/batchmap.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info";
const DROP: &str = "U+005C__DROP__";
/// Numaflow Batch Map Proto definitions.
pub mod proto {
tonic::include_proto!("batchmap.v1");
}

struct BatchMapService<T: BatchMapper> {
handler: Arc<T>,
Expand Down Expand Up @@ -106,7 +103,7 @@ impl From<proto::BatchMapRequest> for Datum {
}
}
}
/// Message is the response struct from the [`Mapper::map`] .
/// Message is the response struct from the [`Mapper::map`][`crate::map::Mapper::map`] .
#[derive(Debug, PartialEq)]
pub struct Message {
/// Keys are a collection of strings which will be passed on to the next vertex as is. It can
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub mod sideinput;
/// batchmap is for writing the [batch map mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/batchmap/) handlers.
pub mod batchmap;

mod servers;

// Error handling on Numaflow SDKs!
//
// Any non-recoverable error will cause the process to shutdown with a non-zero exit status. All errors are non-recoverable.
Expand Down
7 changes: 1 addition & 6 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tonic::{async_trait, Request, Response, Status, Streaming};
use tracing::{error, info};

use crate::error::{Error, ErrorKind};
use crate::map::proto::MapResponse;
use crate::servers::map::{self as proto, MapResponse};
use crate::shared::{self, shutdown_signal, ContainerType};

const DEFAULT_CHANNEL_SIZE: usize = 1000;
Expand All @@ -21,11 +21,6 @@ const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/map.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info";
const DROP: &str = "U+005C__DROP__";

/// Numaflow Map Proto definitions.
pub mod proto {
tonic::include_proto!("map.v1");
}

struct MapService<T> {
handler: Arc<T>,
shutdown_tx: mpsc::Sender<()>,
Expand Down
10 changes: 3 additions & 7 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tonic::{async_trait, Request, Response, Status};
use crate::error::Error;
use crate::error::Error::ReduceError;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
pub use crate::servers::reduce as proto;
use crate::shared::{self, prost_timestamp_from_utc, ContainerType};

const KEY_JOIN_DELIMITER: &str = ":";
Expand All @@ -22,11 +23,6 @@ const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/reduce.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/reducer-server-info";
const DROP: &str = "U+005C__DROP__";

/// Numaflow Reduce Proto definitions.
pub mod proto {
tonic::include_proto!("reduce.v1");
}

struct ReduceService<C> {
creator: Arc<C>,
shutdown_tx: Sender<()>,
Expand Down Expand Up @@ -80,7 +76,7 @@ pub trait ReducerCreator {
/// Reducer trait for implementing Reduce handler.
#[async_trait]
pub trait Reducer {
/// reduce_handle is provided with a set of keys, a channel of [`Datum`], and [`Metadata`]. It
/// reduce_handle is provided with a set of keys, a channel of [`ReduceRequest`], and [`Metadata`]. It
/// returns 0, 1, or more results as a [`Vec`] of [`Message`]. Reduce is a stateful operation and
/// the channel is for the collection of keys and for that time [Window].
/// You can read more about reduce [here](https://numaflow.numaproj.io/user-guide/user-defined-functions/reduce/reduce/).
Expand All @@ -103,7 +99,7 @@ pub trait Reducer {
/// use numaflow::reduce::{Reducer, Metadata};
/// use tokio::sync::mpsc::Receiver;
/// use tonic::async_trait;
/// use numaflow::reduce::proto::reduce_server::Reduce;
/// use numaflow::reduce::proto::reduce_server::Reduce;
/// pub(crate) struct Counter {}
///
/// pub(crate) struct CounterCreator {}
Expand Down
27 changes: 27 additions & 0 deletions src/servers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#[path = "servers/batchmap.v1.rs"]
#[rustfmt::skip]
pub mod batchmap;

#[path = "servers/map.v1.rs"]
#[rustfmt::skip]
pub mod map;

#[path = "servers/reduce.v1.rs"]
#[rustfmt::skip]
pub mod reduce;

#[path = "servers/sideinput.v1.rs"]
#[rustfmt::skip]
pub mod sideinput;

#[path = "servers/sink.v1.rs"]
#[rustfmt::skip]
pub mod sink;

#[path = "servers/source.v1.rs"]
#[rustfmt::skip]
pub mod source;

#[path = "servers/sourcetransformer.v1.rs"]
#[rustfmt::skip]
pub mod sourcetransformer;
Loading
Loading