From 6bb6bc57cb6551325dbda6dac0efa4845bf4e361 Mon Sep 17 00:00:00 2001 From: "Miguel A. Cabrera Minagorri" Date: Thu, 12 Oct 2023 14:03:23 +0200 Subject: [PATCH 1/2] Accept batch as input for ONNX Signed-off-by: Miguel A. Cabrera Minagorri --- core/src/pipeless_ai/lib/plugins.py | 2 + .../lib/worker/inference/runtime.py | 17 ++++-- .../pipeless_ai/lib/worker/inference/utils.py | 61 +++++++++---------- core/src/pipeless_ai/lib/worker/worker.py | 3 +- 4 files changed, 45 insertions(+), 38 deletions(-) diff --git a/core/src/pipeless_ai/lib/plugins.py b/core/src/pipeless_ai/lib/plugins.py index 29ebb61..15f63c2 100644 --- a/core/src/pipeless_ai/lib/plugins.py +++ b/core/src/pipeless_ai/lib/plugins.py @@ -8,6 +8,8 @@ def load_plugin_module(path): spec = importlib.util.spec_from_file_location('plugin', path) plugin_module = importlib.util.module_from_spec(spec) + # TODO: does the following work when using several plugins without overriding the previous ones? + sys.modules['plugin'] = plugin_module # Allows to pickle the Plugin class. spec.loader.exec_module(plugin_module) Plugin = getattr(plugin_module, 'Plugin') return Plugin() diff --git a/core/src/pipeless_ai/lib/worker/inference/runtime.py b/core/src/pipeless_ai/lib/worker/inference/runtime.py index 776fbad..db6b676 100644 --- a/core/src/pipeless_ai/lib/worker/inference/runtime.py +++ b/core/src/pipeless_ai/lib/worker/inference/runtime.py @@ -1,10 +1,11 @@ import sys +import traceback import numpy as np import onnx import onnxruntime from pipeless_ai.lib.logger import logger -from pipeless_ai.lib.worker.inference.utils import get_model_path, parse_input_shape, prepare_frame +from pipeless_ai.lib.worker.inference.utils import get_model_path, parse_input_shape, prepare_frames def load_model(file: str, alias: str, force_opset_version: int | None = None, force_ir_version: int | None = None): """ @@ -99,7 +100,7 @@ def __init__(self, inference_config): try: # Run a first testing inference that usually takes longer than the rest test_image = np.zeros((self.input_img_height, self.input_img_width, self.input_img_channels), dtype=np.uint8) - test_image = prepare_frame(test_image, self.input_dtype, self.input_image_shape_format, self.input_batch_size) + test_image = prepare_frames([test_image], self.input_dtype, self.input_image_shape_format, self.input_batch_size) self.session.run(None, {self.input_name: test_image}) except Exception as e: logger.error(f'There was an error running the testing inference: {e}') @@ -107,10 +108,10 @@ def __init__(self, inference_config): logger.info("ORT session ready!") - def run(self, inference_input_frame): + def run(self, inference_input_frames): try: - inference_input_frame = prepare_frame(inference_input_frame, self.input_dtype, self.input_image_shape_format, self.input_batch_size, target_height=self.input_img_height, target_width=self.input_img_width) - input_data = { self.input_name: inference_input_frame } + inference_input_frames = prepare_frames(inference_input_frames, self.input_dtype, self.input_image_shape_format, self.input_batch_size, target_height=self.input_img_height, target_width=self.input_img_width) + input_data = { self.input_name: inference_input_frames } # Using IO bindings we signifcantly remove overhead of copying input and outputs io_binding = self.session.io_binding() # Bind inputs @@ -129,4 +130,8 @@ def run(self, inference_input_frame): return outputs except Exception as e: logger.error(f'There was an error running inference: {e}') - return None \ No newline at end of file + traceback.print_exc() + return None + + def get_batch_size(self): + return self.input_batch_size \ No newline at end of file diff --git a/core/src/pipeless_ai/lib/worker/inference/utils.py b/core/src/pipeless_ai/lib/worker/inference/utils.py index 8ad03e1..aea6506 100644 --- a/core/src/pipeless_ai/lib/worker/inference/utils.py +++ b/core/src/pipeless_ai/lib/worker/inference/utils.py @@ -91,37 +91,36 @@ def parse_input_shape(input_shape, format, force_tuple): new_sub_shape = tuple(key_to_value[key] for key in new_order) return batch_size, *new_sub_shape -def prepare_frame(frame, input_dtype, input_shape_format, batch_size, target_height=None, target_width=None): - if target_height and target_width: - # Scale the image maintaining aspect ratio - width_ratio = target_width / frame.shape[1] - height_ratio = target_height / frame.shape[0] - # Choose the minimum scaling factor to maintain aspect ratio - scale_factor = min(width_ratio, height_ratio) - # Calculate new dimensions after resizing - new_width = int(frame.shape[1] * scale_factor) - new_height = int(frame.shape[0] * scale_factor) - # Calculate padding dimensions - pad_width = (target_width - new_width) // 2 - pad_height = (target_height - new_height) // 2 - # Create a canvas with the desired dimensions and padding - canvas = np.zeros((target_height, target_width, frame.shape[2]), dtype=np.uint8) - # Resize the image and place it on the canvas - resized_image = cv2.resize(frame, (new_width, new_height)) - canvas[pad_height:pad_height+new_height, pad_width:pad_width+new_width] = resized_image - frame = canvas - elif (target_width and not target_height) or (target_height and not target_width): - logger.error("Can't resize to a single dimmension. Please provide both tagert_width and target_height") - sys.exit(1) +def prepare_frames(frames, input_dtype, input_shape_format, batch_size, target_height=None, target_width=None): + out_frames = [] + for frame in frames: + if target_height and target_width: + # Scale the image maintaining aspect ratio + width_ratio = target_width / frame.shape[1] + height_ratio = target_height / frame.shape[0] + # Choose the minimum scaling factor to maintain aspect ratio + scale_factor = min(width_ratio, height_ratio) + # Calculate new dimensions after resizing + new_width = int(frame.shape[1] * scale_factor) + new_height = int(frame.shape[0] * scale_factor) + # Calculate padding dimensions + pad_width = (target_width - new_width) // 2 + pad_height = (target_height - new_height) // 2 + # Create a canvas with the desired dimensions and padding + canvas = np.zeros((target_height, target_width, frame.shape[2]), dtype=np.uint8) + # Resize the image and place it on the canvas + resized_image = cv2.resize(frame, (new_width, new_height)) + canvas[pad_height:pad_height+new_height, pad_width:pad_width+new_width] = resized_image + frame = canvas + elif (target_width and not target_height) or (target_height and not target_width): + logger.error("Can't resize to a single dimmension. Please provide both tagert_width and target_height") + sys.exit(1) - if input_dtype == 'tensor(float)': - frame = frame.astype(np.float32) + if input_dtype == 'tensor(float)': + frame = frame.astype(np.float32) - if input_shape_format: - frame = transpose_image(frame, input_shape_format) # [h,w,channels] to the specified shape + if input_shape_format: + frame = transpose_image(frame, input_shape_format) # [h,w,channels] to the specified shape - if batch_size: - # Since this is a single frame just expand the dims - frame = np.expand_dims(frame, axis=0) - - return frame \ No newline at end of file + out_frames.append(frame) + return np.array(out_frames) \ No newline at end of file diff --git a/core/src/pipeless_ai/lib/worker/worker.py b/core/src/pipeless_ai/lib/worker/worker.py index 7c5d7cc..ab898a1 100644 --- a/core/src/pipeless_ai/lib/worker/worker.py +++ b/core/src/pipeless_ai/lib/worker/worker.py @@ -49,7 +49,7 @@ def fetch_and_process(user_app, inference_session): if inference_session: # TODO: we could run inference in batches inference_input = exec_hook_with_plugins(user_app, 'pre_process', updated_ndframe) - inference_result = inference_session.run(inference_input) + inference_result = inference_session.run([inference_input]) user_app.inference.results = inference_result # Embed the inference results into the user application else: updated_ndframe = exec_hook_with_plugins(user_app, 'pre_process', updated_ndframe) @@ -83,6 +83,7 @@ def load_user_module(path): sys.path.append(path) # Add to the Python path to allow imports spec = importlib.util.spec_from_file_location('user_app', f'{path}/app.py') user_app_module = importlib.util.module_from_spec(spec) + sys.modules['user_app'] = user_app_module # Allows to pickle the App class try: spec.loader.exec_module(user_app_module) except Exception as e: From d31920a2fd6f3d5808f639422e102b5f255d5b34 Mon Sep 17 00:00:00 2001 From: "Miguel A. Cabrera Minagorri" Date: Thu, 12 Oct 2023 18:20:56 +0200 Subject: [PATCH 2/2] feat(worker): Add dynamic frame skip option Signed-off-by: Miguel A. Cabrera Minagorri --- core/src/pipeless_ai/lib/config.py | 3 + core/src/pipeless_ai/lib/input/input.py | 6 +- core/src/pipeless_ai/lib/messages.py | 15 +++- core/src/pipeless_ai/lib/worker/worker.py | 94 ++++++++++++++++++----- 4 files changed, 94 insertions(+), 24 deletions(-) diff --git a/core/src/pipeless_ai/lib/config.py b/core/src/pipeless_ai/lib/config.py index 8b247d0..87d71ad 100644 --- a/core/src/pipeless_ai/lib/config.py +++ b/core/src/pipeless_ai/lib/config.py @@ -168,6 +168,7 @@ def __init__(self, worker_dict): self._inference = Inference(worker_dict.get("inference") or {}) # Allow the user to enable the line profiler for his code self._enable_profiler = prioritized_config(worker_dict, 'enable_profiler', f'{ENV_PREFIX}_WORKER_ENABLE_PROFILER', convert_to=bool, default=False) + self._skip_frames = prioritized_config(worker_dict, 'skip_frames', f'{ENV_PREFIX}_WORKER_SKIP_FRAMES', convert_to=bool, default=False) def get_n_workers(self): return self._n_workers def get_recv_buffer_size(self): @@ -178,6 +179,8 @@ def get_inference(self): return self._inference def get_enable_profiler(self): return self._enable_profiler + def get_skip_frames(self): + return self._skip_frames class Plugins(): def __init__(self, plugins_dict): diff --git a/core/src/pipeless_ai/lib/input/input.py b/core/src/pipeless_ai/lib/input/input.py index 0f38ff2..ced9dac 100644 --- a/core/src/pipeless_ai/lib/input/input.py +++ b/core/src/pipeless_ai/lib/input/input.py @@ -1,5 +1,6 @@ import os import sys +import time import traceback import numpy as np @@ -33,18 +34,21 @@ def on_new_sample(sink: GstApp.AppSink) -> Gst.FlowReturn: logger.error('Getting multimedia data from the buffer did not success.') return Gst.FlowReturn.ERROR + frame_input_time = time.time() caps = sample.get_caps() width = caps.get_structure(0).get_value("width") height = caps.get_structure(0).get_value("height") dts = buffer.dts pts = buffer.pts duration = buffer.duration + framerate = caps.get_structure(0).get_fraction('framerate') + fps = float(framerate.value_numerator) / float(framerate.value_denominator) ndframe : np.ndarray = np.ndarray( shape=(height, width, 3), dtype=np.uint8, buffer=info.data ) - msg = RgbImageMsg(width, height, ndframe, dts, pts, duration) + msg = RgbImageMsg(width, height, ndframe, dts, pts, duration, fps, frame_input_time) s_msg = msg.serialize() # Pass msg to the workers diff --git a/core/src/pipeless_ai/lib/messages.py b/core/src/pipeless_ai/lib/messages.py index 3149347..636eaa4 100644 --- a/core/src/pipeless_ai/lib/messages.py +++ b/core/src/pipeless_ai/lib/messages.py @@ -66,14 +66,16 @@ class RgbImageMsg(Msg): """ Raw RGB image data and basic information """ - def __init__(self, width, height, raw_data, dts, pts, duration): + def __init__(self, width, height, raw_data, dts, pts, duration, fps, input_time): self._type = MsgType.RGB_IMAGE + self._input_time = input_time self._dts = dts self._pts = pts self._duration = duration self._width = width self._height = height self._data = raw_data + self._fps = fps def serialize(self): s_data = self._data @@ -86,7 +88,9 @@ def serialize(self): "duration": self._duration, "width": self._width, "height": self._height, - "data": s_data + "data": s_data, + "fps": self._fps, + "input_time": self._input_time }) def update_data(self, new_raw_data): @@ -105,6 +109,10 @@ def get_pts(self): return self._pts def get_duration(self): return self._duration + def get_fps(self): + return self._fps + def get_input_time(self): + return self._input_time def deserialize(_msg): """ @@ -116,7 +124,6 @@ def deserialize(_msg): if isinstance(r_data, bytes): # Ref: https://numpy.org/doc/stable/reference/generated/numpy.ndarray.dumps.html#numpy.ndarray.dumps r_data = pickle.loads(msg["data"]) - return RgbImageMsg( msg["width"], msg["height"], @@ -124,6 +131,8 @@ def deserialize(_msg): msg["dts"], msg["pts"], msg["duration"], + msg["fps"], + msg["input_time"] ) elif msg["type"] == MsgType.CAPABILITIES: return StreamCapsMsg(msg["caps"]) diff --git a/core/src/pipeless_ai/lib/worker/worker.py b/core/src/pipeless_ai/lib/worker/worker.py index ab898a1..9b2a9f1 100644 --- a/core/src/pipeless_ai/lib/worker/worker.py +++ b/core/src/pipeless_ai/lib/worker/worker.py @@ -1,4 +1,6 @@ +from collections import deque import importlib +import math import sys import time import traceback @@ -11,7 +13,36 @@ from pipeless_ai.lib.messages import EndOfStreamMsg, RgbImageMsg, deserialize from pipeless_ai.lib.worker.inference.utils import get_inference_session -def fetch_and_process(user_app, inference_session): +class ProcessingMetrics(): + ''' + This class is used to maintain some internal metrics to control when to process + a frame depending on the stream metrics + ''' + def __init__(self): + # FIFO queue of 4 values to only take into account the most recent processing times + self.fifo_list = deque([], maxlen=4) + self.n_frames_skipped = 0 + def add_proc_time(self, proc_time): + self.fifo_list.append(proc_time) + self.n_frames_skipped = 0 + def get_avg_time(self) -> float: + fifo_len = len(self.fifo_list) + return 0 if fifo_len == 0 else sum(self.fifo_list) / fifo_len + def count_skipped_frame(self): + self.n_frames_skipped += 1 + def should_skip_frame(self, fps) -> bool: + f_interval = 1 / fps + p_space = math.ceil(self.get_avg_time() / f_interval) + should_process_frame = self.n_frames_skipped >= p_space + return not should_process_frame + + # The following is not about metrics, but adding here for ease + def set_previous_inference_results(self, results): + self.previous_inference_results = results + def get_previous_inference_results(self): + return self.previous_inference_results + +def fetch_and_process(user_app, inference_session, processing_metrics: ProcessingMetrics): """ Processes messages comming from the input Returns whether the current worker iteration should continue @@ -21,12 +52,11 @@ def fetch_and_process(user_app, inference_session): r_socket = InputPullSocket() raw_msg = r_socket.recv() if raw_msg is not None: - if config.get_worker().get_show_exec_time(): - start_time = time.time() msg = deserialize(raw_msg) if config.get_output().get_video().is_enabled(): s_socket = OutputPushSocket() if isinstance(msg, RgbImageMsg): + start_processing_time = msg.get_input_time() # TODO: we can use pynng recv_msg to get information about which pipe the message comes from, thus distinguish stream sources and route destinations # Usefull to support several input medias to the same app height = msg.get_height() @@ -36,30 +66,56 @@ def fetch_and_process(user_app, inference_session): shape=(height, width, 3), dtype=np.uint8, buffer=data ) + fps = msg.get_fps() + # We work with numpy views of the array to avoid complete copying, which is very slow. # Set original frame as non writable to raise execption if modified ndframe.flags.writeable = False user_app.original_frame = ndframe.view() # View of the original frame - # Execute frame processing - # When an inference model is provided, the process hook is not invoked because logically it doesn't have sense. - # Also, the post-process hook will receive the original frame instead of the pre-process output since the + original_ndframe = ndframe.view() # This view will be passed to the user code + + should_skip_process_hook = config.get_worker().get_skip_frames() and processing_metrics.should_skip_frame(fps) + + # Inject into the user app so the user has control on what the pre-process and post-process run when the frame is skipped + # For example, if you are mesuring the speed of an object, you may need to + # count the frame but you can save other pre-processing parts because the frame won't be processed + # TODO: should we create some kind of frame metadata that persists among different stages for multistage applications? + # after all, stages usually depend on the results of the previous ones. + user_app.skip_frame = should_skip_process_hook + + # User pre-processing + preproc_out = exec_hook_with_plugins(user_app, 'pre_process', original_ndframe) + # By default, the post-process hook will receive the original frame instead of the pre-process output since the # pre-process output is usually not an image but the inference model input. - updated_ndframe = ndframe.view() - if inference_session: - # TODO: we could run inference in batches - inference_input = exec_hook_with_plugins(user_app, 'pre_process', updated_ndframe) - inference_result = inference_session.run([inference_input]) - user_app.inference.results = inference_result # Embed the inference results into the user application + if should_skip_process_hook: + processing_metrics.count_skipped_frame() + user_app.inference.results = processing_metrics.get_previous_inference_results() + proc_out = original_ndframe else: - updated_ndframe = exec_hook_with_plugins(user_app, 'pre_process', updated_ndframe) - updated_ndframe = exec_hook_with_plugins(user_app, 'process', updated_ndframe) - updated_ndframe = exec_hook_with_plugins(user_app, 'post_process', updated_ndframe) - msg.update_data(updated_ndframe) + if inference_session: + # TODO: we could run inference in batches + inference_result = inference_session.run([preproc_out]) + processing_metrics.set_previous_inference_results(inference_result) + user_app.inference.results = inference_result # Embed the inference results into the user application + proc_out = original_ndframe + else: + proc_out = exec_hook_with_plugins(user_app, 'process', preproc_out) + + postproc_out = exec_hook_with_plugins(user_app, 'post_process', proc_out) + msg.update_data(postproc_out) if config.get_output().get_video().is_enabled(): # Forward the message to the output s_socket.send(msg.serialize()) + + if not should_skip_process_hook: + # Update the metrics with the processed frame + processing_time = time.time() - start_processing_time + processing_metrics.add_proc_time(processing_time) + if config.get_worker().get_show_exec_time(): + logger.info(f'Application took {processing_time * 1000:.3f} ms to run for the frame') + elif isinstance(msg, EndOfStreamMsg): logger.info('Worker iteration finished. Notifying output. About to reset worker') if config.get_output().get_video().is_enabled(): @@ -70,9 +126,6 @@ def fetch_and_process(user_app, inference_session): logger.error(f'Unsupported message type: {msg.type}') sys.exit(1) - if config.get_worker().get_show_exec_time(): - logger.info(f'Application took {(time.time() - start_time) * 1000:.3f} ms to run for the frame') - return True # Continue the current worker execution def load_user_module(path): @@ -134,10 +187,11 @@ def worker(config_dict, user_module_path): exec_hook_with_plugins(user_app, 'before') + processing_metrics = ProcessingMetrics() # Stream loop continue_worker = True while continue_worker: - continue_worker = fetch_and_process(user_app, inference_session) + continue_worker = fetch_and_process(user_app, inference_session, processing_metrics) exec_hook_with_plugins(user_app, 'after')