Skip to content

Commit

Permalink
Merge pull request #130 from pipeless-ai/user_data
Browse files Browse the repository at this point in the history
feat(core): Allow to pass custom user data through stages
  • Loading branch information
miguelaeh authored Jan 31, 2024
2 parents 5f8254f + 54959a1 commit ef81b30
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 6 deletions.
3 changes: 3 additions & 0 deletions examples/custom-data/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Pass custom data between stages

Check [this guide](https://www.pipeless.ai/docs/v1/examples/custom-data) to run the example step by step.
19 changes: 19 additions & 0 deletions examples/custom-data/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
def hook(frame_data, _):
# Add data to the frame that you can later recover from hooks of subsequent stages. You can also recover it from subsequent hooks of the same stage.
# You can use any kind of data.
# Integers: frame_data['user_data'] = 100
# Floats: frame_data['user_data'] = 100.5
# Strings: frame_data['user_data'] = "Hello!"
# Heterogeneus arrays: frame_data['user_data'] = ["string", 13, 34.6]
# Heterogeneus Dictionaries (IMPORTANT: all keys must be strings):
frame_data['user_data'] = {
"key1": 0,
"key2": [1, "3"],
"key3": { "inner1": "hola" }
}

# In a later hook you can obtain the data like:
# my_data = frame_data['user_data']

# To connect stages simply give the list to Pipeless when adding a stream:
# pipeless add stream --input-uri "file:///home/user/my/path.mp4" --output-uri "screen" --frame-path "stage1,stage2"
2 changes: 1 addition & 1 deletion pipeless/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pipeless/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pipeless-ai"
version = "1.7.0"
version = "1.8.0"
edition = "2021"
authors = ["Miguel A. Cabrera Minagorri"]
description = "An open-source computer vision framework to build and deploy applications in minutes"
Expand Down
20 changes: 19 additions & 1 deletion pipeless/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@ use ndarray;
use uuid;
use gstreamer as gst;

/// Custom data that the user can add to the frame in a stage
/// allowing to pass data to subsequent stages
pub enum UserData {
Empty,
Integer(i32),
Float(f64),
String(String),
Array(Vec<UserData>),
Dictionary(Vec<(String, UserData)>),
}

pub struct RgbFrame {
uuid: uuid::Uuid,
original: ndarray::Array3<u8>,
Expand All @@ -17,6 +28,7 @@ pub struct RgbFrame {
inference_input: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
inference_output: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
pipeline_id: uuid::Uuid,
user_data: UserData,
}
impl RgbFrame {
pub fn new(
Expand All @@ -36,6 +48,7 @@ impl RgbFrame {
inference_input: ndarray::ArrayBase::zeros(ndarray::IxDyn(&[0])),
inference_output: ndarray::ArrayBase::zeros(ndarray::IxDyn(&[0])),
pipeline_id,
user_data: UserData::Empty,
}
}

Expand All @@ -49,6 +62,7 @@ impl RgbFrame {
inference_input: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
inference_output: ndarray::ArrayBase<ndarray::OwnedRepr<f32>, ndarray::Dim<ndarray::IxDynImpl>>,
pipeline_id: &str,
user_data: UserData,
) -> Self {
RgbFrame {
uuid: uuid::Uuid::from_str(uuid).unwrap(),
Expand All @@ -60,6 +74,7 @@ impl RgbFrame {
fps, input_ts,
inference_input, inference_output,
pipeline_id: uuid::Uuid::from_str(pipeline_id).unwrap(),
user_data: user_data
}
}

Expand Down Expand Up @@ -119,6 +134,9 @@ impl RgbFrame {
pub fn set_pipeline_id(&mut self, pipeline_id: &str) {
self.pipeline_id = uuid::Uuid::from_str(pipeline_id).unwrap();
}
pub fn get_user_data(&self) -> &UserData {
&self.user_data
}
}

pub enum Frame {
Expand Down Expand Up @@ -170,4 +188,4 @@ impl Frame {
Frame::RgbFrame(frame) => { frame.set_inference_output(output_data); },
}
}
}
}
73 changes: 70 additions & 3 deletions pipeless/src/stages/languages/python.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use log::{error, warn};
use pyo3::prelude::*;
use pyo3::{PyObject, prelude::*};
use numpy::{self, ToPyArray};

use crate::{data::{RgbFrame, Frame}, stages::{hook::{HookTrait, HookType}, stage::ContextTrait}, stages::stage::Context, kvs::store};
use crate::{data::{Frame, RgbFrame, UserData}, stages::{hook::{HookTrait, HookType}, stage::ContextTrait}, stages::stage::Context, kvs::store};

/// Allows a Frame to be converted from Rust to Python
impl IntoPy<Py<PyAny>> for Frame {
Expand Down Expand Up @@ -41,6 +41,7 @@ impl IntoPy<Py<PyAny>> for RgbFrame {
dict.set_item("inference_input", self.get_inference_input().to_pyarray(py)).unwrap();
dict.set_item("inference_output", self.get_inference_output().to_pyarray(py)).unwrap();
dict.set_item("pipeline_id", self.get_pipeline_id().to_string()).unwrap();
dict.set_item("user_data", self.get_user_data()).unwrap();
dict.into()
}
}
Expand Down Expand Up @@ -82,18 +83,84 @@ impl<'source> FromPyObject<'source> for RgbFrame {
let inference_input = inference_input_ndarray;
let inference_output =inference_output_ndarray;
let pipeline_id = ob.get_item("pipeline_id").unwrap().extract()?;
let user_data = ob.get_item("user_data").unwrap().extract()?;

let frame = RgbFrame::from_values(
uuid, original, modified, width, height,
pts, dts, duration, fps, input_ts,
inference_input, inference_output,
pipeline_id,
pipeline_id, user_data
);

Ok(frame)
}
}

/// Allows to pass the user data to python and back
impl ToPyObject for UserData {
fn to_object(&self, py: Python<'_>) -> PyObject {
match self {
UserData::Empty => py.None(),
UserData::Integer(i) => i.into_py(py),
UserData::Float(f) => f.into_py(py),
UserData::String(s) => s.into_py(py),
UserData::Array(arr) => {
let list = pyo3::types::PyList::empty(py);
for item in arr {
list.append(item.to_object(py)).unwrap();
}
list.into_py(py)
}
UserData::Dictionary(dict) => {
let py_dict = pyo3::types::PyDict::new(py);
for (key, value) in dict {
py_dict.set_item(key, value.to_object(py)).unwrap();
}
py_dict.into_py(py)
}
}
}
}

/// Allows to pass the user data to python and back
impl<'source> FromPyObject<'source> for UserData {
fn extract(obj: &'source PyAny) -> PyResult<Self> {
if let Ok(integer) = obj.extract::<i32>() {
Ok(UserData::Integer(integer))
} else if let Ok(float) = obj.extract::<f64>() {
Ok(UserData::Float(float))
} else if let Ok(string) = obj.extract::<String>() {
Ok(UserData::String(string))
} else if obj.is_instance_of::<pyo3::types::PyList>() {
let array = obj.downcast::<pyo3::types::PyList>()?;
let array_data = array.into_iter()
.map(|elem| UserData::extract(elem))
.collect::<PyResult<Vec<UserData>>>()?;
Ok(UserData::Array(array_data))
} else if obj.is_instance_of::<pyo3::types::PyDict>() {
let dict = obj.downcast::<pyo3::types::PyDict>()?;
let dict_keys = dict.keys();
let mut dict_items = Vec::new();
for key in dict_keys {
let key_str = key.extract::<String>()?;
let value = dict.get_item(key)?;
match value {
Some(v) => {
let value_data = UserData::extract(v)?;
dict_items.push((key_str, value_data));
},
None => { dict_items.push((key_str, UserData::Empty)); },
}
}
Ok(UserData::Dictionary(dict_items))
} else if obj.is_none() {
Ok(UserData::Empty)
} else {
Err(pyo3::exceptions::PyTypeError::new_err("Unsupported data type assigned to 'user_data'. Please check in the Pipeless the supported types."))
}
}
}

/// Python context to maintain within a stage
pub struct PythonStageContext {
context: Py<pyo3::types::PyDict>,
Expand Down

0 comments on commit ef81b30

Please sign in to comment.