Skip to content

Commit

Permalink
feat: source and sink implementation in Rust (blocking implementation) (
Browse files Browse the repository at this point in the history
numaproj#2190)

Signed-off-by: Yashash H L <yashashhl25@gmail.com>
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Co-authored-by: Yashash H L <yashashhl25@gmail.com>
Co-authored-by: Sreekanth <prsreekanth920@gmail.com>
  • Loading branch information
3 people authored Oct 27, 2024
1 parent e98ff98 commit 5b77782
Show file tree
Hide file tree
Showing 44 changed files with 3,245 additions and 809 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ RUN chmod +x /bin/numaflow-rs
####################################################################################################
# Rust binary
####################################################################################################
FROM lukemathwalker/cargo-chef:latest-rust-1.80 AS chef
FROM lukemathwalker/cargo-chef:latest-rust-1.81 AS chef
ARG TARGETPLATFORM
WORKDIR /numaflow
RUN apt-get update && apt-get install -y protobuf-compiler
Expand Down
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,17 @@ build-rust-in-docker:
DOCKER_BUILDKIT=1 $(DOCKER) build --build-arg "BASE_IMAGE=$(DEV_BASE_IMAGE)" $(DOCKER_BUILD_ARGS) -t $(IMAGE_NAMESPACE)/$(BINARY_NAME)-rust-builder:$(VERSION) --target rust-builder -f $(DOCKERFILE) .
export CTR=$$($(DOCKER) create $(IMAGE_NAMESPACE)/$(BINARY_NAME)-rust-builder:$(VERSION)) && $(DOCKER) cp $$CTR:/root/numaflow dist/numaflow-rs-linux-$(HOST_ARCH) && $(DOCKER) rm $$CTR && $(DOCKER) image rm $(IMAGE_NAMESPACE)/$(BINARY_NAME)-rust-builder:$(VERSION)

.PHONY: build-rust-in-docker-multi
build-rust-in-docker-multi:
mkdir -p dist
docker run -v ./dist/cargo:/root/.cargo -v ./rust/:/app/ -w /app --rm ubuntu:24.04 bash build.sh
cp -pv rust/target/aarch64-unknown-linux-gnu/release/numaflow dist/numaflow-rs-linux-arm64
cp -pv rust/target/x86_64-unknown-linux-gnu/release/numaflow dist/numaflow-rs-linux-amd64

image-multi: ui-build set-qemu dist/$(BINARY_NAME)-linux-arm64.gz dist/$(BINARY_NAME)-linux-amd64.gz
ifndef GITHUB_ACTIONS
$(MAKE) build-rust-in-docker-multi
endif
$(DOCKER) buildx build --sbom=false --provenance=false --build-arg "BASE_IMAGE=$(RELEASE_BASE_IMAGE)" $(DOCKER_BUILD_ARGS) -t $(IMAGE_NAMESPACE)/$(BINARY_NAME):$(VERSION) --target $(BINARY_NAME) --platform linux/amd64,linux/arm64 --file $(DOCKERFILE) ${PUSH_OPTION} .

set-qemu:
Expand Down
6 changes: 5 additions & 1 deletion api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -22890,6 +22890,9 @@
},
"type": "array"
},
"executeRustBinary": {
"type": "boolean"
},
"image": {
"type": "string"
},
Expand All @@ -22915,7 +22918,8 @@
"imagePullPolicy",
"image",
"volumeMounts",
"resources"
"resources",
"executeRustBinary"
],
"type": "object"
}
Expand Down
6 changes: 5 additions & 1 deletion api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -22867,7 +22867,8 @@
"imagePullPolicy",
"image",
"volumeMounts",
"resources"
"resources",
"executeRustBinary"
],
"properties": {
"env": {
Expand All @@ -22876,6 +22877,9 @@
"$ref": "#/definitions/io.k8s.api.core.v1.EnvVar"
}
},
"executeRustBinary": {
"type": "boolean"
},
"image": {
"type": "string"
},
Expand Down
2 changes: 1 addition & 1 deletion examples/1-simple-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ spec:
- from: in
to: cat
- from: cat
to: out
to: out
20 changes: 11 additions & 9 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,17 @@ const (
EnvServingMinPipelineSpec = "NUMAFLOW_SERVING_MIN_PIPELINE_SPEC"
EnvServingHostIP = "NUMAFLOW_SERVING_HOST_IP"
EnvServingStoreTTL = "NUMAFLOW_SERVING_STORE_TTL"
PathVarRun = "/var/run/numaflow"
VertexMetricsPort = 2469
VertexMetricsPortName = "metrics"
VertexHTTPSPort = 8443
VertexHTTPSPortName = "https"
DaemonServicePort = 4327
MonoVertexMetricsPort = 2469
MonoVertexMetricsPortName = "metrics"
MonoVertexDaemonServicePort = 4327
EnvExecuteRustBinary = "NUMAFLOW_EXECUTE_RUST_BINARY"

PathVarRun = "/var/run/numaflow"
VertexMetricsPort = 2469
VertexMetricsPortName = "metrics"
VertexHTTPSPort = 8443
VertexHTTPSPortName = "https"
DaemonServicePort = 4327
MonoVertexMetricsPort = 2469
MonoVertexMetricsPortName = "metrics"
MonoVertexDaemonServicePort = 4327

DefaultRequeueAfter = 10 * time.Second

Expand Down
13 changes: 7 additions & 6 deletions pkg/apis/numaflow/v1alpha1/container_supplier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package v1alpha1
import corev1 "k8s.io/api/core/v1"

type getContainerReq struct {
env []corev1.EnvVar
isbSvcType ISBSvcType
imagePullPolicy corev1.PullPolicy
image string
volumeMounts []corev1.VolumeMount
resources corev1.ResourceRequirements
env []corev1.EnvVar
isbSvcType ISBSvcType
imagePullPolicy corev1.PullPolicy
image string
volumeMounts []corev1.VolumeMount
resources corev1.ResourceRequirements
executeRustBinary bool
}

type containerSupplier interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/mono_vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (mvspec MonoVertexSpec) DeepCopyWithoutReplicas() MonoVertexSpec {

func (mvspec MonoVertexSpec) buildContainers(req getContainerReq) []corev1.Container {
mainContainer := containerBuilder{}.
init(req).command(NumaflowRustBinary).args("--monovertex").build()
init(req).command(NumaflowRustBinary).args("--rust").build()

containers := []corev1.Container{mainContainer}
if mvspec.Source.UDSource != nil { // Only support UDSource for now.
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/numaflow/v1alpha1/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func (s Sink) getContainers(req getContainerReq) ([]corev1.Container, error) {
}

func (s Sink) getMainContainer(req getContainerReq) corev1.Container {
if req.executeRustBinary {
return containerBuilder{}.init(req).command(NumaflowRustBinary).args("processor", "--type="+string(VertexTypeSink), "--isbsvc-type="+string(req.isbSvcType), "--rust").build()
}
return containerBuilder{}.init(req).args("processor", "--type="+string(VertexTypeSink), "--isbsvc-type="+string(req.isbSvcType)).build()
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/numaflow/v1alpha1/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (s Source) getContainers(req getContainerReq) ([]corev1.Container, error) {
}

func (s Source) getMainContainer(req getContainerReq) corev1.Container {
if req.executeRustBinary {
return containerBuilder{}.init(req).command(NumaflowRustBinary).args("processor", "--type="+string(VertexTypeSink), "--isbsvc-type="+string(req.isbSvcType), "--rust").build()
}
return containerBuilder{}.init(req).args("processor", "--type="+string(VertexTypeSource), "--isbsvc-type="+string(req.isbSvcType)).build()
}

Expand Down
17 changes: 10 additions & 7 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/env"
"k8s.io/utils/ptr"
)

Expand Down Expand Up @@ -240,15 +241,17 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) {
},
}
volumeMounts := []corev1.VolumeMount{{Name: varVolumeName, MountPath: PathVarRun}}

executeRustBinary, _ := env.GetBool(EnvExecuteRustBinary, false)
containers, err := v.Spec.getType().getContainers(getContainerReq{
isbSvcType: req.ISBSvcType,
env: envVars,
image: req.Image,
imagePullPolicy: req.PullPolicy,
resources: req.DefaultResources,
volumeMounts: volumeMounts,
isbSvcType: req.ISBSvcType,
env: envVars,
image: req.Image,
imagePullPolicy: req.PullPolicy,
resources: req.DefaultResources,
volumeMounts: volumeMounts,
executeRustBinary: executeRustBinary,
})

if err != nil {
return nil, err
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/apis/numaflow/v1alpha1/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion pkg/daemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,13 @@ func (r *Rater) getPodReadCounts(vertexName, podName string) *PodReadCount {
if partitionName == "" {
r.log.Warnf("[vertex name %s, pod name %s]: Partition name is not found for metric %s", vertexName, podName, readTotalMetricName)
} else {
partitionReadCount[partitionName] = ele.Counter.GetValue()
// https://github.com/prometheus/client_rust/issues/194
counterVal := ele.Counter.GetValue()
untypedVal := ele.Untyped.GetValue()
if counterVal == 0 && untypedVal != 0 {
counterVal = untypedVal
}
partitionReadCount[partitionName] = counterVal
}
}
podReadCount := &PodReadCount{podName, partitionReadCount}
Expand Down
1 change: 1 addition & 0 deletions pkg/mvtxdaemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (r *Rater) getPodReadCounts(podName string) *PodReadCount {
// from the results safely.
// We use Untyped here as the counter metric family shows up as untyped from the rust client
// TODO(MonoVertex): Check further on this to understand why not type is counter
// https://github.com/prometheus/client_rust/issues/194
podReadCount := &PodReadCount{podName, metricsList[0].Untyped.GetValue()}
return podReadCount
} else {
Expand Down
38 changes: 38 additions & 0 deletions rust/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/bash

set -xeuo pipefail

# Builds static rust binaries for both aarch64 and amd64
# Intended for building Linux binries from Mac OS host in a docker container.
# Usage: (from root directory of numaflow repo)
# docker run -v ./rust/:/app/ -w /app --rm ubuntu:24.04 bash build.sh

# Detect the host machine architecture
ARCH=$(uname -m)

apt update && apt install -y curl protobuf-compiler build-essential

if [ ! -f "$HOME/.cargo/env" ]; then
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
fi
. "$HOME/.cargo/env"

# Function to build for aarch64
build_aarch64() {
apt install -y gcc-aarch64-linux-gnu
sed -i "/^targets = \[/d" rust-toolchain.toml
RUSTFLAGS='-C target-feature=+crt-static -C linker=aarch64-linux-gnu-gcc' cargo build --release --target aarch64-unknown-linux-gnu
sed -i "/targets = \['aarch64-unknown-linux-gnu'\]/d" rust-toolchain.toml
}

# Function to build for x86_64
build_x86_64() {
apt install -y gcc-x86-64-linux-gnu
sed -i "/^targets = \[/d" rust-toolchain.toml
echo "targets = ['x86_64-unknown-linux-gnu']" >> rust-toolchain.toml
RUSTFLAGS='-C target-feature=+crt-static -C linker=x86_64-linux-gnu-gcc' cargo build --release --target x86_64-unknown-linux-gnu
sed -i "/targets = \['x86_64-unknown-linux-gnu'\]/d" rust-toolchain.toml
}

build_aarch64
build_x86_64
14 changes: 9 additions & 5 deletions rust/numaflow-core/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::env;
use std::sync::OnceLock;

use monovertex::MonovertexConfig;

use crate::config::pipeline::PipelineConfig;
use crate::Error;
use crate::Result;
use monovertex::MonovertexConfig;

const ENV_MONO_VERTEX_OBJ: &str = "NUMAFLOW_MONO_VERTEX_OBJECT";
const ENV_VERTEX_OBJ: &str = "NUMAFLOW_VERTEX_OBJECT";
Expand All @@ -28,6 +29,7 @@ pub fn config() -> &'static Settings {
})
}

/// CustomResources supported by Numaflow.
#[derive(Debug, Clone)]
pub(crate) enum CustomResourceType {
MonoVertex(MonovertexConfig),
Expand All @@ -53,7 +55,7 @@ impl Settings {
}

if let Ok(obj) = env::var(ENV_VERTEX_OBJ) {
let cfg = PipelineConfig::load(obj)?;
let cfg = PipelineConfig::load(obj, env::vars())?;
return Ok(Settings {
custom_resource_type: CustomResourceType::Pipeline(cfg),
});
Expand All @@ -64,12 +66,14 @@ impl Settings {

#[cfg(test)]
mod tests {
use crate::config::components::sink::OnFailureStrategy;
use crate::config::{CustomResourceType, Settings, ENV_MONO_VERTEX_OBJ};
use std::env;

use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use serde_json::json;
use std::env;

use crate::config::components::sink::OnFailureStrategy;
use crate::config::{CustomResourceType, Settings, ENV_MONO_VERTEX_OBJ};

#[test]
fn test_settings_load_combined() {
Expand Down
Loading

0 comments on commit 5b77782

Please sign in to comment.