diff --git a/pipeless/src/data.rs b/pipeless/src/data.rs index 45daf63..cb15824 100644 --- a/pipeless/src/data.rs +++ b/pipeless/src/data.rs @@ -3,6 +3,17 @@ use ndarray; use uuid; use gstreamer as gst; +/// Custom data that the user can add to the frame in a stage +/// allowing to pass data to subsequent stages +pub enum UserData { + Empty, + Integer(i32), + Float(f64), + String(String), + Array(Vec), + Dictionary(Vec<(String, UserData)>), +} + pub struct RgbFrame { uuid: uuid::Uuid, original: ndarray::Array3, @@ -17,6 +28,7 @@ pub struct RgbFrame { inference_input: ndarray::ArrayBase, ndarray::Dim>, inference_output: ndarray::ArrayBase, ndarray::Dim>, pipeline_id: uuid::Uuid, + user_data: UserData, } impl RgbFrame { pub fn new( @@ -36,6 +48,7 @@ impl RgbFrame { inference_input: ndarray::ArrayBase::zeros(ndarray::IxDyn(&[0])), inference_output: ndarray::ArrayBase::zeros(ndarray::IxDyn(&[0])), pipeline_id, + user_data: UserData::Empty, } } @@ -49,6 +62,7 @@ impl RgbFrame { inference_input: ndarray::ArrayBase, ndarray::Dim>, inference_output: ndarray::ArrayBase, ndarray::Dim>, pipeline_id: &str, + user_data: UserData, ) -> Self { RgbFrame { uuid: uuid::Uuid::from_str(uuid).unwrap(), @@ -60,6 +74,7 @@ impl RgbFrame { fps, input_ts, inference_input, inference_output, pipeline_id: uuid::Uuid::from_str(pipeline_id).unwrap(), + user_data: user_data } } @@ -119,6 +134,9 @@ impl RgbFrame { pub fn set_pipeline_id(&mut self, pipeline_id: &str) { self.pipeline_id = uuid::Uuid::from_str(pipeline_id).unwrap(); } + pub fn get_user_data(&self) -> &UserData { + &self.user_data + } } pub enum Frame { @@ -170,4 +188,4 @@ impl Frame { Frame::RgbFrame(frame) => { frame.set_inference_output(output_data); }, } } -} \ No newline at end of file +} diff --git a/pipeless/src/stages/languages/python.rs b/pipeless/src/stages/languages/python.rs index 4fe5e04..ad365b7 100644 --- a/pipeless/src/stages/languages/python.rs +++ b/pipeless/src/stages/languages/python.rs @@ -1,8 +1,8 @@ use log::{error, warn}; -use pyo3::prelude::*; +use pyo3::{PyObject, prelude::*}; use numpy::{self, ToPyArray}; -use crate::{data::{RgbFrame, Frame}, stages::{hook::{HookTrait, HookType}, stage::ContextTrait}, stages::stage::Context, kvs::store}; +use crate::{data::{Frame, RgbFrame, UserData}, stages::{hook::{HookTrait, HookType}, stage::ContextTrait}, stages::stage::Context, kvs::store}; /// Allows a Frame to be converted from Rust to Python impl IntoPy> for Frame { @@ -41,6 +41,7 @@ impl IntoPy> for RgbFrame { dict.set_item("inference_input", self.get_inference_input().to_pyarray(py)).unwrap(); dict.set_item("inference_output", self.get_inference_output().to_pyarray(py)).unwrap(); dict.set_item("pipeline_id", self.get_pipeline_id().to_string()).unwrap(); + dict.set_item("user_data", self.get_user_data()).unwrap(); dict.into() } } @@ -82,18 +83,84 @@ impl<'source> FromPyObject<'source> for RgbFrame { let inference_input = inference_input_ndarray; let inference_output =inference_output_ndarray; let pipeline_id = ob.get_item("pipeline_id").unwrap().extract()?; + let user_data = ob.get_item("user_data").unwrap().extract()?; let frame = RgbFrame::from_values( uuid, original, modified, width, height, pts, dts, duration, fps, input_ts, inference_input, inference_output, - pipeline_id, + pipeline_id, user_data ); Ok(frame) } } +/// Allows to pass the user data to python and back +impl ToPyObject for UserData { + fn to_object(&self, py: Python<'_>) -> PyObject { + match self { + UserData::Empty => py.None(), + UserData::Integer(i) => i.into_py(py), + UserData::Float(f) => f.into_py(py), + UserData::String(s) => s.into_py(py), + UserData::Array(arr) => { + let list = pyo3::types::PyList::empty(py); + for item in arr { + list.append(item.to_object(py)).unwrap(); + } + list.into_py(py) + } + UserData::Dictionary(dict) => { + let py_dict = pyo3::types::PyDict::new(py); + for (key, value) in dict { + py_dict.set_item(key, value.to_object(py)).unwrap(); + } + py_dict.into_py(py) + } + } + } +} + +/// Allows to pass the user data to python and back +impl<'source> FromPyObject<'source> for UserData { + fn extract(obj: &'source PyAny) -> PyResult { + if let Ok(integer) = obj.extract::() { + Ok(UserData::Integer(integer)) + } else if let Ok(float) = obj.extract::() { + Ok(UserData::Float(float)) + } else if let Ok(string) = obj.extract::() { + Ok(UserData::String(string)) + } else if obj.is_instance_of::() { + let array = obj.downcast::()?; + let array_data = array.into_iter() + .map(|elem| UserData::extract(elem)) + .collect::>>()?; + Ok(UserData::Array(array_data)) + } else if obj.is_instance_of::() { + let dict = obj.downcast::()?; + let dict_keys = dict.keys(); + let mut dict_items = Vec::new(); + for key in dict_keys { + let key_str = key.extract::()?; + let value = dict.get_item(key)?; + match value { + Some(v) => { + let value_data = UserData::extract(v)?; + dict_items.push((key_str, value_data)); + }, + None => { dict_items.push((key_str, UserData::Empty)); }, + } + } + Ok(UserData::Dictionary(dict_items)) + } else if obj.is_none() { + Ok(UserData::Empty) + } else { + Err(pyo3::exceptions::PyTypeError::new_err("Unsupported data type assigned to 'user_data'. Please check in the Pipeless the supported types.")) + } + } +} + /// Python context to maintain within a stage pub struct PythonStageContext { context: Py,