Skip to content

Commit

Permalink
Merge pull request #57 from pipeless-ai/frame_wkip
Browse files Browse the repository at this point in the history
feat(worker): Add dynamic frame skip
  • Loading branch information
miguelaeh authored Oct 12, 2023
2 parents bd75554 + d31920a commit 33eead8
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 61 deletions.
3 changes: 3 additions & 0 deletions core/src/pipeless_ai/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def __init__(self, worker_dict):
self._inference = Inference(worker_dict.get("inference") or {})
# Allow the user to enable the line profiler for his code
self._enable_profiler = prioritized_config(worker_dict, 'enable_profiler', f'{ENV_PREFIX}_WORKER_ENABLE_PROFILER', convert_to=bool, default=False)
self._skip_frames = prioritized_config(worker_dict, 'skip_frames', f'{ENV_PREFIX}_WORKER_SKIP_FRAMES', convert_to=bool, default=False)
def get_n_workers(self):
return self._n_workers
def get_recv_buffer_size(self):
Expand All @@ -178,6 +179,8 @@ def get_inference(self):
return self._inference
def get_enable_profiler(self):
return self._enable_profiler
def get_skip_frames(self):
return self._skip_frames

class Plugins():
def __init__(self, plugins_dict):
Expand Down
6 changes: 5 additions & 1 deletion core/src/pipeless_ai/lib/input/input.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import time
import traceback
import numpy as np

Expand Down Expand Up @@ -33,18 +34,21 @@ def on_new_sample(sink: GstApp.AppSink) -> Gst.FlowReturn:
logger.error('Getting multimedia data from the buffer did not success.')
return Gst.FlowReturn.ERROR

frame_input_time = time.time()
caps = sample.get_caps()
width = caps.get_structure(0).get_value("width")
height = caps.get_structure(0).get_value("height")
dts = buffer.dts
pts = buffer.pts
duration = buffer.duration
framerate = caps.get_structure(0).get_fraction('framerate')
fps = float(framerate.value_numerator) / float(framerate.value_denominator)
ndframe : np.ndarray = np.ndarray(
shape=(height, width, 3),
dtype=np.uint8, buffer=info.data
)

msg = RgbImageMsg(width, height, ndframe, dts, pts, duration)
msg = RgbImageMsg(width, height, ndframe, dts, pts, duration, fps, frame_input_time)
s_msg = msg.serialize()

# Pass msg to the workers
Expand Down
15 changes: 12 additions & 3 deletions core/src/pipeless_ai/lib/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,16 @@ class RgbImageMsg(Msg):
"""
Raw RGB image data and basic information
"""
def __init__(self, width, height, raw_data, dts, pts, duration):
def __init__(self, width, height, raw_data, dts, pts, duration, fps, input_time):
self._type = MsgType.RGB_IMAGE
self._input_time = input_time
self._dts = dts
self._pts = pts
self._duration = duration
self._width = width
self._height = height
self._data = raw_data
self._fps = fps

def serialize(self):
s_data = self._data
Expand All @@ -86,7 +88,9 @@ def serialize(self):
"duration": self._duration,
"width": self._width,
"height": self._height,
"data": s_data
"data": s_data,
"fps": self._fps,
"input_time": self._input_time
})

def update_data(self, new_raw_data):
Expand All @@ -105,6 +109,10 @@ def get_pts(self):
return self._pts
def get_duration(self):
return self._duration
def get_fps(self):
return self._fps
def get_input_time(self):
return self._input_time

def deserialize(_msg):
"""
Expand All @@ -116,14 +124,15 @@ def deserialize(_msg):
if isinstance(r_data, bytes):
# Ref: https://numpy.org/doc/stable/reference/generated/numpy.ndarray.dumps.html#numpy.ndarray.dumps
r_data = pickle.loads(msg["data"])

return RgbImageMsg(
msg["width"],
msg["height"],
r_data,
msg["dts"],
msg["pts"],
msg["duration"],
msg["fps"],
msg["input_time"]
)
elif msg["type"] == MsgType.CAPABILITIES:
return StreamCapsMsg(msg["caps"])
Expand Down
2 changes: 2 additions & 0 deletions core/src/pipeless_ai/lib/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
def load_plugin_module(path):
spec = importlib.util.spec_from_file_location('plugin', path)
plugin_module = importlib.util.module_from_spec(spec)
# TODO: does the following work when using several plugins without overriding the previous ones?
sys.modules['plugin'] = plugin_module # Allows to pickle the Plugin class.
spec.loader.exec_module(plugin_module)
Plugin = getattr(plugin_module, 'Plugin')
return Plugin()
Expand Down
17 changes: 11 additions & 6 deletions core/src/pipeless_ai/lib/worker/inference/runtime.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import sys
import traceback
import numpy as np
import onnx
import onnxruntime

from pipeless_ai.lib.logger import logger
from pipeless_ai.lib.worker.inference.utils import get_model_path, parse_input_shape, prepare_frame
from pipeless_ai.lib.worker.inference.utils import get_model_path, parse_input_shape, prepare_frames

def load_model(file: str, alias: str, force_opset_version: int | None = None, force_ir_version: int | None = None):
"""
Expand Down Expand Up @@ -99,18 +100,18 @@ def __init__(self, inference_config):
try:
# Run a first testing inference that usually takes longer than the rest
test_image = np.zeros((self.input_img_height, self.input_img_width, self.input_img_channels), dtype=np.uint8)
test_image = prepare_frame(test_image, self.input_dtype, self.input_image_shape_format, self.input_batch_size)
test_image = prepare_frames([test_image], self.input_dtype, self.input_image_shape_format, self.input_batch_size)
self.session.run(None, {self.input_name: test_image})
except Exception as e:
logger.error(f'There was an error running the testing inference: {e}')
sys.exit(1)

logger.info("ORT session ready!")

def run(self, inference_input_frame):
def run(self, inference_input_frames):
try:
inference_input_frame = prepare_frame(inference_input_frame, self.input_dtype, self.input_image_shape_format, self.input_batch_size, target_height=self.input_img_height, target_width=self.input_img_width)
input_data = { self.input_name: inference_input_frame }
inference_input_frames = prepare_frames(inference_input_frames, self.input_dtype, self.input_image_shape_format, self.input_batch_size, target_height=self.input_img_height, target_width=self.input_img_width)
input_data = { self.input_name: inference_input_frames }
# Using IO bindings we signifcantly remove overhead of copying input and outputs
io_binding = self.session.io_binding()
# Bind inputs
Expand All @@ -129,4 +130,8 @@ def run(self, inference_input_frame):
return outputs
except Exception as e:
logger.error(f'There was an error running inference: {e}')
return None
traceback.print_exc()
return None

def get_batch_size(self):
return self.input_batch_size
61 changes: 30 additions & 31 deletions core/src/pipeless_ai/lib/worker/inference/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,37 +91,36 @@ def parse_input_shape(input_shape, format, force_tuple):
new_sub_shape = tuple(key_to_value[key] for key in new_order)
return batch_size, *new_sub_shape

def prepare_frame(frame, input_dtype, input_shape_format, batch_size, target_height=None, target_width=None):
if target_height and target_width:
# Scale the image maintaining aspect ratio
width_ratio = target_width / frame.shape[1]
height_ratio = target_height / frame.shape[0]
# Choose the minimum scaling factor to maintain aspect ratio
scale_factor = min(width_ratio, height_ratio)
# Calculate new dimensions after resizing
new_width = int(frame.shape[1] * scale_factor)
new_height = int(frame.shape[0] * scale_factor)
# Calculate padding dimensions
pad_width = (target_width - new_width) // 2
pad_height = (target_height - new_height) // 2
# Create a canvas with the desired dimensions and padding
canvas = np.zeros((target_height, target_width, frame.shape[2]), dtype=np.uint8)
# Resize the image and place it on the canvas
resized_image = cv2.resize(frame, (new_width, new_height))
canvas[pad_height:pad_height+new_height, pad_width:pad_width+new_width] = resized_image
frame = canvas
elif (target_width and not target_height) or (target_height and not target_width):
logger.error("Can't resize to a single dimmension. Please provide both tagert_width and target_height")
sys.exit(1)
def prepare_frames(frames, input_dtype, input_shape_format, batch_size, target_height=None, target_width=None):
out_frames = []
for frame in frames:
if target_height and target_width:
# Scale the image maintaining aspect ratio
width_ratio = target_width / frame.shape[1]
height_ratio = target_height / frame.shape[0]
# Choose the minimum scaling factor to maintain aspect ratio
scale_factor = min(width_ratio, height_ratio)
# Calculate new dimensions after resizing
new_width = int(frame.shape[1] * scale_factor)
new_height = int(frame.shape[0] * scale_factor)
# Calculate padding dimensions
pad_width = (target_width - new_width) // 2
pad_height = (target_height - new_height) // 2
# Create a canvas with the desired dimensions and padding
canvas = np.zeros((target_height, target_width, frame.shape[2]), dtype=np.uint8)
# Resize the image and place it on the canvas
resized_image = cv2.resize(frame, (new_width, new_height))
canvas[pad_height:pad_height+new_height, pad_width:pad_width+new_width] = resized_image
frame = canvas
elif (target_width and not target_height) or (target_height and not target_width):
logger.error("Can't resize to a single dimmension. Please provide both tagert_width and target_height")
sys.exit(1)

if input_dtype == 'tensor(float)':
frame = frame.astype(np.float32)
if input_dtype == 'tensor(float)':
frame = frame.astype(np.float32)

if input_shape_format:
frame = transpose_image(frame, input_shape_format) # [h,w,channels] to the specified shape
if input_shape_format:
frame = transpose_image(frame, input_shape_format) # [h,w,channels] to the specified shape

if batch_size:
# Since this is a single frame just expand the dims
frame = np.expand_dims(frame, axis=0)

return frame
out_frames.append(frame)
return np.array(out_frames)
95 changes: 75 additions & 20 deletions core/src/pipeless_ai/lib/worker/worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from collections import deque
import importlib
import math
import sys
import time
import traceback
Expand All @@ -11,7 +13,36 @@
from pipeless_ai.lib.messages import EndOfStreamMsg, RgbImageMsg, deserialize
from pipeless_ai.lib.worker.inference.utils import get_inference_session

def fetch_and_process(user_app, inference_session):
class ProcessingMetrics():
'''
This class is used to maintain some internal metrics to control when to process
a frame depending on the stream metrics
'''
def __init__(self):
# FIFO queue of 4 values to only take into account the most recent processing times
self.fifo_list = deque([], maxlen=4)
self.n_frames_skipped = 0
def add_proc_time(self, proc_time):
self.fifo_list.append(proc_time)
self.n_frames_skipped = 0
def get_avg_time(self) -> float:
fifo_len = len(self.fifo_list)
return 0 if fifo_len == 0 else sum(self.fifo_list) / fifo_len
def count_skipped_frame(self):
self.n_frames_skipped += 1
def should_skip_frame(self, fps) -> bool:
f_interval = 1 / fps
p_space = math.ceil(self.get_avg_time() / f_interval)
should_process_frame = self.n_frames_skipped >= p_space
return not should_process_frame

# The following is not about metrics, but adding here for ease
def set_previous_inference_results(self, results):
self.previous_inference_results = results
def get_previous_inference_results(self):
return self.previous_inference_results

def fetch_and_process(user_app, inference_session, processing_metrics: ProcessingMetrics):
"""
Processes messages comming from the input
Returns whether the current worker iteration should continue
Expand All @@ -21,12 +52,11 @@ def fetch_and_process(user_app, inference_session):
r_socket = InputPullSocket()
raw_msg = r_socket.recv()
if raw_msg is not None:
if config.get_worker().get_show_exec_time():
start_time = time.time()
msg = deserialize(raw_msg)
if config.get_output().get_video().is_enabled():
s_socket = OutputPushSocket()
if isinstance(msg, RgbImageMsg):
start_processing_time = msg.get_input_time()
# TODO: we can use pynng recv_msg to get information about which pipe the message comes from, thus distinguish stream sources and route destinations
# Usefull to support several input medias to the same app
height = msg.get_height()
Expand All @@ -36,30 +66,56 @@ def fetch_and_process(user_app, inference_session):
shape=(height, width, 3),
dtype=np.uint8, buffer=data
)
fps = msg.get_fps()

# We work with numpy views of the array to avoid complete copying, which is very slow.
# Set original frame as non writable to raise execption if modified
ndframe.flags.writeable = False
user_app.original_frame = ndframe.view() # View of the original frame

# Execute frame processing
# When an inference model is provided, the process hook is not invoked because logically it doesn't have sense.
# Also, the post-process hook will receive the original frame instead of the pre-process output since the
original_ndframe = ndframe.view() # This view will be passed to the user code

should_skip_process_hook = config.get_worker().get_skip_frames() and processing_metrics.should_skip_frame(fps)

# Inject into the user app so the user has control on what the pre-process and post-process run when the frame is skipped
# For example, if you are mesuring the speed of an object, you may need to
# count the frame but you can save other pre-processing parts because the frame won't be processed
# TODO: should we create some kind of frame metadata that persists among different stages for multistage applications?
# after all, stages usually depend on the results of the previous ones.
user_app.skip_frame = should_skip_process_hook

# User pre-processing
preproc_out = exec_hook_with_plugins(user_app, 'pre_process', original_ndframe)
# By default, the post-process hook will receive the original frame instead of the pre-process output since the
# pre-process output is usually not an image but the inference model input.
updated_ndframe = ndframe.view()
if inference_session:
# TODO: we could run inference in batches
inference_input = exec_hook_with_plugins(user_app, 'pre_process', updated_ndframe)
inference_result = inference_session.run(inference_input)
user_app.inference.results = inference_result # Embed the inference results into the user application
if should_skip_process_hook:
processing_metrics.count_skipped_frame()
user_app.inference.results = processing_metrics.get_previous_inference_results()
proc_out = original_ndframe
else:
updated_ndframe = exec_hook_with_plugins(user_app, 'pre_process', updated_ndframe)
updated_ndframe = exec_hook_with_plugins(user_app, 'process', updated_ndframe)
updated_ndframe = exec_hook_with_plugins(user_app, 'post_process', updated_ndframe)
msg.update_data(updated_ndframe)
if inference_session:
# TODO: we could run inference in batches
inference_result = inference_session.run([preproc_out])
processing_metrics.set_previous_inference_results(inference_result)
user_app.inference.results = inference_result # Embed the inference results into the user application
proc_out = original_ndframe
else:
proc_out = exec_hook_with_plugins(user_app, 'process', preproc_out)

postproc_out = exec_hook_with_plugins(user_app, 'post_process', proc_out)
msg.update_data(postproc_out)

if config.get_output().get_video().is_enabled():
# Forward the message to the output
s_socket.send(msg.serialize())

if not should_skip_process_hook:
# Update the metrics with the processed frame
processing_time = time.time() - start_processing_time
processing_metrics.add_proc_time(processing_time)
if config.get_worker().get_show_exec_time():
logger.info(f'Application took {processing_time * 1000:.3f} ms to run for the frame')

elif isinstance(msg, EndOfStreamMsg):
logger.info('Worker iteration finished. Notifying output. About to reset worker')
if config.get_output().get_video().is_enabled():
Expand All @@ -70,9 +126,6 @@ def fetch_and_process(user_app, inference_session):
logger.error(f'Unsupported message type: {msg.type}')
sys.exit(1)

if config.get_worker().get_show_exec_time():
logger.info(f'Application took {(time.time() - start_time) * 1000:.3f} ms to run for the frame')

return True # Continue the current worker execution

def load_user_module(path):
Expand All @@ -83,6 +136,7 @@ def load_user_module(path):
sys.path.append(path) # Add to the Python path to allow imports
spec = importlib.util.spec_from_file_location('user_app', f'{path}/app.py')
user_app_module = importlib.util.module_from_spec(spec)
sys.modules['user_app'] = user_app_module # Allows to pickle the App class
try:
spec.loader.exec_module(user_app_module)
except Exception as e:
Expand Down Expand Up @@ -133,10 +187,11 @@ def worker(config_dict, user_module_path):

exec_hook_with_plugins(user_app, 'before')

processing_metrics = ProcessingMetrics()
# Stream loop
continue_worker = True
while continue_worker:
continue_worker = fetch_and_process(user_app, inference_session)
continue_worker = fetch_and_process(user_app, inference_session, processing_metrics)

exec_hook_with_plugins(user_app, 'after')

Expand Down

0 comments on commit 33eead8

Please sign in to comment.