Skip to content

Commit

Permalink
feat(core): Allow to pass custom user data through stages
Browse files Browse the repository at this point in the history
Signed-off-by: Miguel A. Cabrera Minagorri <devgorri@gmail.com>
  • Loading branch information
miguelaeh committed Jan 31, 2024
1 parent 5f8254f commit 0f27539
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 4 deletions.
20 changes: 19 additions & 1 deletion pipeless/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@ use ndarray;
use uuid;
use gstreamer as gst;

/// Custom data that the user can add to the frame in a stage
/// allowing to pass data to subsequent stages
pub enum UserData {
Empty,
Integer(i32),
Float(f64),
String(String),
Array(Vec<UserData>),
Dictionary(Vec<(String, UserData)>),
}

pub struct RgbFrame {
uuid: uuid::Uuid,
original: ndarray::Array3<u8>,
Expand All @@ -17,6 +28,7 @@ pub struct RgbFrame {
inference_input: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
inference_output: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
pipeline_id: uuid::Uuid,
user_data: UserData,
}
impl RgbFrame {
pub fn new(
Expand All @@ -36,6 +48,7 @@ impl RgbFrame {
inference_input: ndarray::ArrayBase::zeros(ndarray::IxDyn(&[0])),
inference_output: ndarray::ArrayBase::zeros(ndarray::IxDyn(&[0])),
pipeline_id,
user_data: UserData::Empty,
}
}

Expand All @@ -49,6 +62,7 @@ impl RgbFrame {
inference_input: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
inference_output: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
pipeline_id: &str,
user_data: UserData,
) -> Self {
RgbFrame {
uuid: uuid::Uuid::from_str(uuid).unwrap(),
Expand All @@ -60,6 +74,7 @@ impl RgbFrame {
fps, input_ts,
inference_input, inference_output,
pipeline_id: uuid::Uuid::from_str(pipeline_id).unwrap(),
user_data: user_data
}
}

Expand Down Expand Up @@ -119,6 +134,9 @@ impl RgbFrame {
pub fn set_pipeline_id(&mut self, pipeline_id: &str) {
self.pipeline_id = uuid::Uuid::from_str(pipeline_id).unwrap();
}
pub fn get_user_data(&self) -> &UserData {
&self.user_data
}
}

pub enum Frame {
Expand Down Expand Up @@ -170,4 +188,4 @@ impl Frame {
Frame::RgbFrame(frame) => { frame.set_inference_output(output_data); },
}
}
}
}
73 changes: 70 additions & 3 deletions pipeless/src/stages/languages/python.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use log::{error, warn};
use pyo3::prelude::*;
use pyo3::{PyObject, prelude::*};
use numpy::{self, ToPyArray};

use crate::{data::{RgbFrame, Frame}, stages::{hook::{HookTrait, HookType}, stage::ContextTrait}, stages::stage::Context, kvs::store};
use crate::{data::{Frame, RgbFrame, UserData}, stages::{hook::{HookTrait, HookType}, stage::ContextTrait}, stages::stage::Context, kvs::store};

/// Allows a Frame to be converted from Rust to Python
impl IntoPy<Py<PyAny>> for Frame {
Expand Down Expand Up @@ -41,6 +41,7 @@ impl IntoPy<Py<PyAny>> for RgbFrame {
dict.set_item("inference_input", self.get_inference_input().to_pyarray(py)).unwrap();
dict.set_item("inference_output", self.get_inference_output().to_pyarray(py)).unwrap();
dict.set_item("pipeline_id", self.get_pipeline_id().to_string()).unwrap();
dict.set_item("user_data", self.get_user_data()).unwrap();
dict.into()
}
}
Expand Down Expand Up @@ -82,18 +83,84 @@ impl<'source> FromPyObject<'source> for RgbFrame {
let inference_input = inference_input_ndarray;
let inference_output =inference_output_ndarray;
let pipeline_id = ob.get_item("pipeline_id").unwrap().extract()?;
let user_data = ob.get_item("user_data").unwrap().extract()?;

let frame = RgbFrame::from_values(
uuid, original, modified, width, height,
pts, dts, duration, fps, input_ts,
inference_input, inference_output,
pipeline_id,
pipeline_id, user_data
);

Ok(frame)
}
}

/// Allows to pass the user data to python and back
impl ToPyObject for UserData {
fn to_object(&self, py: Python<'_>) -> PyObject {
match self {
UserData::Empty => py.None(),
UserData::Integer(i) => i.into_py(py),
UserData::Float(f) => f.into_py(py),
UserData::String(s) => s.into_py(py),
UserData::Array(arr) => {
let list = pyo3::types::PyList::empty(py);
for item in arr {
list.append(item.to_object(py)).unwrap();
}
list.into_py(py)
}
UserData::Dictionary(dict) => {
let py_dict = pyo3::types::PyDict::new(py);
for (key, value) in dict {
py_dict.set_item(key, value.to_object(py)).unwrap();
}
py_dict.into_py(py)
}
}
}
}

/// Allows to pass the user data to python and back
impl<'source> FromPyObject<'source> for UserData {
fn extract(obj: &'source PyAny) -> PyResult<Self> {
if let Ok(integer) = obj.extract::<i32>() {
Ok(UserData::Integer(integer))
} else if let Ok(float) = obj.extract::<f64>() {
Ok(UserData::Float(float))
} else if let Ok(string) = obj.extract::<String>() {
Ok(UserData::String(string))
} else if obj.is_instance_of::<pyo3::types::PyList>() {
let array = obj.downcast::<pyo3::types::PyList>()?;
let array_data = array.into_iter()
.map(|elem| UserData::extract(elem))
.collect::<PyResult<Vec<UserData>>>()?;
Ok(UserData::Array(array_data))
} else if obj.is_instance_of::<pyo3::types::PyDict>() {
let dict = obj.downcast::<pyo3::types::PyDict>()?;
let dict_keys = dict.keys();
let mut dict_items = Vec::new();
for key in dict_keys {
let key_str = key.extract::<String>()?;
let value = dict.get_item(key)?;
match value {
Some(v) => {
let value_data = UserData::extract(v)?;
dict_items.push((key_str, value_data));
},
None => { dict_items.push((key_str, UserData::Empty)); },
}
}
Ok(UserData::Dictionary(dict_items))
} else if obj.is_none() {
Ok(UserData::Empty)
} else {
Err(pyo3::exceptions::PyTypeError::new_err("Unsupported data type assigned to 'user_data'. Please check in the Pipeless the supported types."))
}
}
}

/// Python context to maintain within a stage
pub struct PythonStageContext {
context: Py<pyo3::types::PyDict>,
Expand Down

0 comments on commit 0f27539

Please sign in to comment.