From 0f27539db016320c3f3599f2d29c22f838de5194 Mon Sep 17 00:00:00 2001 From: "Miguel A. Cabrera Minagorri" Date: Wed, 31 Jan 2024 14:22:44 +0100 Subject: [PATCH 1/3] feat(core): Allow to pass custom user data through stages Signed-off-by: Miguel A. Cabrera Minagorri --- pipeless/src/data.rs | 20 ++++++- pipeless/src/stages/languages/python.rs | 73 ++++++++++++++++++++++++- 2 files changed, 89 insertions(+), 4 deletions(-) diff --git a/pipeless/src/data.rs b/pipeless/src/data.rs index 45daf63..cb15824 100644 --- a/pipeless/src/data.rs +++ b/pipeless/src/data.rs @@ -3,6 +3,17 @@ use ndarray; use uuid; use gstreamer as gst; +/// Custom data that the user can add to the frame in a stage +/// allowing to pass data to subsequent stages +pub enum UserData { + Empty, + Integer(i32), + Float(f64), + String(String), + Array(Vec), + Dictionary(Vec<(String, UserData)>), +} + pub struct RgbFrame { uuid: uuid::Uuid, original: ndarray::Array3, @@ -17,6 +28,7 @@ pub struct RgbFrame { inference_input: ndarray::ArrayBase, ndarray::Dim>, inference_output: ndarray::ArrayBase, ndarray::Dim>, pipeline_id: uuid::Uuid, + user_data: UserData, } impl RgbFrame { pub fn new( @@ -36,6 +48,7 @@ impl RgbFrame { inference_input: ndarray::ArrayBase::zeros(ndarray::IxDyn(&[0])), inference_output: ndarray::ArrayBase::zeros(ndarray::IxDyn(&[0])), pipeline_id, + user_data: UserData::Empty, } } @@ -49,6 +62,7 @@ impl RgbFrame { inference_input: ndarray::ArrayBase, ndarray::Dim>, inference_output: ndarray::ArrayBase, ndarray::Dim>, pipeline_id: &str, + user_data: UserData, ) -> Self { RgbFrame { uuid: uuid::Uuid::from_str(uuid).unwrap(), @@ -60,6 +74,7 @@ impl RgbFrame { fps, input_ts, inference_input, inference_output, pipeline_id: uuid::Uuid::from_str(pipeline_id).unwrap(), + user_data: user_data } } @@ -119,6 +134,9 @@ impl RgbFrame { pub fn set_pipeline_id(&mut self, pipeline_id: &str) { self.pipeline_id = uuid::Uuid::from_str(pipeline_id).unwrap(); } + pub fn get_user_data(&self) -> &UserData { + &self.user_data + } } pub enum Frame { @@ -170,4 +188,4 @@ impl Frame { Frame::RgbFrame(frame) => { frame.set_inference_output(output_data); }, } } -} \ No newline at end of file +} diff --git a/pipeless/src/stages/languages/python.rs b/pipeless/src/stages/languages/python.rs index 4fe5e04..ad365b7 100644 --- a/pipeless/src/stages/languages/python.rs +++ b/pipeless/src/stages/languages/python.rs @@ -1,8 +1,8 @@ use log::{error, warn}; -use pyo3::prelude::*; +use pyo3::{PyObject, prelude::*}; use numpy::{self, ToPyArray}; -use crate::{data::{RgbFrame, Frame}, stages::{hook::{HookTrait, HookType}, stage::ContextTrait}, stages::stage::Context, kvs::store}; +use crate::{data::{Frame, RgbFrame, UserData}, stages::{hook::{HookTrait, HookType}, stage::ContextTrait}, stages::stage::Context, kvs::store}; /// Allows a Frame to be converted from Rust to Python impl IntoPy> for Frame { @@ -41,6 +41,7 @@ impl IntoPy> for RgbFrame { dict.set_item("inference_input", self.get_inference_input().to_pyarray(py)).unwrap(); dict.set_item("inference_output", self.get_inference_output().to_pyarray(py)).unwrap(); dict.set_item("pipeline_id", self.get_pipeline_id().to_string()).unwrap(); + dict.set_item("user_data", self.get_user_data()).unwrap(); dict.into() } } @@ -82,18 +83,84 @@ impl<'source> FromPyObject<'source> for RgbFrame { let inference_input = inference_input_ndarray; let inference_output =inference_output_ndarray; let pipeline_id = ob.get_item("pipeline_id").unwrap().extract()?; + let user_data = ob.get_item("user_data").unwrap().extract()?; let frame = RgbFrame::from_values( uuid, original, modified, width, height, pts, dts, duration, fps, input_ts, inference_input, inference_output, - pipeline_id, + pipeline_id, user_data ); Ok(frame) } } +/// Allows to pass the user data to python and back +impl ToPyObject for UserData { + fn to_object(&self, py: Python<'_>) -> PyObject { + match self { + UserData::Empty => py.None(), + UserData::Integer(i) => i.into_py(py), + UserData::Float(f) => f.into_py(py), + UserData::String(s) => s.into_py(py), + UserData::Array(arr) => { + let list = pyo3::types::PyList::empty(py); + for item in arr { + list.append(item.to_object(py)).unwrap(); + } + list.into_py(py) + } + UserData::Dictionary(dict) => { + let py_dict = pyo3::types::PyDict::new(py); + for (key, value) in dict { + py_dict.set_item(key, value.to_object(py)).unwrap(); + } + py_dict.into_py(py) + } + } + } +} + +/// Allows to pass the user data to python and back +impl<'source> FromPyObject<'source> for UserData { + fn extract(obj: &'source PyAny) -> PyResult { + if let Ok(integer) = obj.extract::() { + Ok(UserData::Integer(integer)) + } else if let Ok(float) = obj.extract::() { + Ok(UserData::Float(float)) + } else if let Ok(string) = obj.extract::() { + Ok(UserData::String(string)) + } else if obj.is_instance_of::() { + let array = obj.downcast::()?; + let array_data = array.into_iter() + .map(|elem| UserData::extract(elem)) + .collect::>>()?; + Ok(UserData::Array(array_data)) + } else if obj.is_instance_of::() { + let dict = obj.downcast::()?; + let dict_keys = dict.keys(); + let mut dict_items = Vec::new(); + for key in dict_keys { + let key_str = key.extract::()?; + let value = dict.get_item(key)?; + match value { + Some(v) => { + let value_data = UserData::extract(v)?; + dict_items.push((key_str, value_data)); + }, + None => { dict_items.push((key_str, UserData::Empty)); }, + } + } + Ok(UserData::Dictionary(dict_items)) + } else if obj.is_none() { + Ok(UserData::Empty) + } else { + Err(pyo3::exceptions::PyTypeError::new_err("Unsupported data type assigned to 'user_data'. Please check in the Pipeless the supported types.")) + } + } +} + /// Python context to maintain within a stage pub struct PythonStageContext { context: Py, From 9c0bb716b8edd4d08ee90a7ee567238847c61456 Mon Sep 17 00:00:00 2001 From: "Miguel A. Cabrera Minagorri" Date: Wed, 31 Jan 2024 14:33:50 +0100 Subject: [PATCH 2/3] Add custom data example Signed-off-by: Miguel A. Cabrera Minagorri --- examples/custom-data/README.md | 3 +++ examples/custom-data/process.py | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+) create mode 100644 examples/custom-data/README.md create mode 100644 examples/custom-data/process.py diff --git a/examples/custom-data/README.md b/examples/custom-data/README.md new file mode 100644 index 0000000..35ccce8 --- /dev/null +++ b/examples/custom-data/README.md @@ -0,0 +1,3 @@ +# Pass custom data between stages + +Check [this guide](https://www.pipeless.ai/docs/v1/examples/custom-data) to run the example step by step. diff --git a/examples/custom-data/process.py b/examples/custom-data/process.py new file mode 100644 index 0000000..17dcd95 --- /dev/null +++ b/examples/custom-data/process.py @@ -0,0 +1,19 @@ +def hook(frame_data, _): + # Add data to the frame that you can later recover from hooks of subsequent stages. You can also recover it from subsequent hooks of the same stage. + # You can use any kind of data. + # Integers: frame_data['user_data'] = 100 + # Floats: frame_data['user_data'] = 100.5 + # Strings: frame_data['user_data'] = "Hello!" + # Heterogeneus arrays: frame_data['user_data'] = ["string", 13, 34.6] + # Heterogeneus Dictionaries (IMPORTANT: all keys must be strings): + frame_data['user_data'] = { + "key1": 0, + "key2": [1, "3"], + "key3": { "inner1": "hola" } + } + + # In a later hook you can obtain the data like: + # my_data = frame_data['user_data'] + + # To connect stages simply give the list to Pipeless when adding a stream: + # pipeless add stream --input-uri "file:///home/user/my/path.mp4" --output-uri "screen" --frame-path "stage1,stage2" From 54959a11fa3912faabb7b1803e398b76f3e291c1 Mon Sep 17 00:00:00 2001 From: "Miguel A. Cabrera Minagorri" Date: Wed, 31 Jan 2024 14:34:50 +0100 Subject: [PATCH 3/3] Bump version Signed-off-by: Miguel A. Cabrera Minagorri --- pipeless/Cargo.lock | 2 +- pipeless/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pipeless/Cargo.lock b/pipeless/Cargo.lock index cd92b73..cb8cf1d 100644 --- a/pipeless/Cargo.lock +++ b/pipeless/Cargo.lock @@ -1513,7 +1513,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pipeless-ai" -version = "1.7.0" +version = "1.8.0" dependencies = [ "clap", "ctrlc", diff --git a/pipeless/Cargo.toml b/pipeless/Cargo.toml index c92ee92..fcfccf5 100644 --- a/pipeless/Cargo.toml +++ b/pipeless/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pipeless-ai" -version = "1.7.0" +version = "1.8.0" edition = "2021" authors = ["Miguel A. Cabrera Minagorri"] description = "An open-source computer vision framework to build and deploy applications in minutes"