diff --git a/core/pyproject.toml b/core/pyproject.toml index ab2e4e7..5418ad1 100644 --- a/core/pyproject.toml +++ b/core/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pipeless-ai" -version = "0.1.4" +version = "0.1.5" description = "A framework to build and deploy multimodal perception apps in minutes without worrying about multimedia pipelines" authors = ["Miguel Angel Cabrera Minagorri "] license = "Apache-2.0" diff --git a/core/src/pipeless_ai/lib/connection.py b/core/src/pipeless_ai/lib/connection.py index e57e570..5c3fb41 100644 --- a/core/src/pipeless_ai/lib/connection.py +++ b/core/src/pipeless_ai/lib/connection.py @@ -64,6 +64,25 @@ def recv_handler(*args, **kwargs): return recv_handler +def wait_socket_dial(socket, addr): + ''' + Waits until the socket connects to the provided addr + ''' + connected = False + while not connected: + try: + socket.dial(addr, block=True) + connected = True + except ConnectionRefused: + logger.warning(f'[orange3]Connection to {addr} failed. Connection Refushed. Retrying...[/orange3]') + time.sleep(1) + except TryAgain: + logger.warning(f'[orange3]Connection to {addr} failed. Try Again. Retrying...[/orange3]') + time.sleep(1) + except Exception as e: + logger.error(f'[red]Failed to connect to {addr}. {e}') + sys.exit(1) + class InputOutputSocket(metaclass=Singleton): """ nng socket to send messages from the input to the output @@ -74,7 +93,7 @@ def __init__(self, mode, send_timeout=1000, read_timeout=1000): - mode: 'w' for the input (write). 'r' for the output (read) """ config = Config(None) # Get the already existing config instance - address = config.get_input().get_address() + address = config.get_output().get_address() # Make this connection to run on the provided port+1. # The provided port is for other type of connection port = str(address.get_port() + 1) @@ -83,15 +102,8 @@ def __init__(self, mode, send_timeout=1000, read_timeout=1000): self._socket = Pair0() self._socket.send_timeout = send_timeout self._name = 'InputOutputSocket-Write' - - connected = False - while not connected: - try: - self._socket.dial(self._addr, block=True) - connected = True - except ConnectionRefused: - logger.warning(f'[orange3]Connection to {self._addr} failed. Retrying...[/orange3]') - time.sleep(1) + + wait_socket_dial(self._socket, self._addr) elif mode == 'r': self._socket = Pair0(listen=self._addr) self._socket.recv_timeout = read_timeout @@ -160,14 +172,7 @@ def __init__(self, timeout=500): self._socket.send_buffer_size = 180 # 3 seconds of 60 pfs video self._name = 'OutputPushSocket' - connected = False - while not connected: - try: - self._socket.dial(self._addr, block=True) - connected = True - except ConnectionRefused: - logger.warning(f'[orange3]Connection to {self._addr} failed. Retrying...[/orange3]') - time.sleep(1) + wait_socket_dial(self._socket, self._addr) @send_error_handler def send(self, msg): @@ -195,14 +200,7 @@ def __init__(self, timeout=500): self._socket.recv_buffer_size = 180 # 3 seconds of 60 pfs video self._name = 'InputPullSocket' - connected = False - while not connected: - try: - self._socket.dial(self._addr, block=True) - connected = True - except ConnectionRefused: - logger.warning(f'[orange3]Connection to {self._addr} failed. Retrying...[/orange3]') - time.sleep(1) + wait_socket_dial(self._socket, self._addr) @recv_error_handler def recv(self): @@ -240,4 +238,50 @@ def close(self): self._socket.close() def get_socket_name(self): - return self._name \ No newline at end of file + return self._name + +class WorkerReadySocket(metaclass=Singleton): + """ + Allows the input to wait for the first worker before starting to send data + When the worker needs to install user packages it takes longer to start + and the data send by the input is lost if we don't wait for at least the first worker + """ + def __init__(self, mode): + """ + Parameters: + - mode: 'input' for the input. 'worker' for the worker + """ + config = Config(None) # Get the already existing config instance + address = config.get_input().get_address() + # Make this connection to run on the provided port+2. + # The provided port is for other type of connection + port = str(address.get_port() + 2) + self._addr = f'tcp://{address.get_host()}:{port}' + if mode == 'worker': + self._socket = Pair0() + self._name = 'WorkerReadySocket-Worker' + + wait_socket_dial(self._socket, self._addr) + elif mode == 'input': + self._socket = Pair0(listen=self._addr) + self._name = 'WorkerReadySocket-Input' + else: + raise 'Wrong mode for WorkerReadySocket' + + @send_error_handler + def send(self, msg): + # Blocking send call. We always want to ensure the messages + # from input to output arrive because they change the pipelines + self._socket.send(msg) + + @recv_error_handler + def recv(self): + # Only the input will receive, and once the first worker send, we won't receive again + # so we create a blocking call + return self._socket.recv() + + def close(self): + self._socket.close() + + def get_socket_name(self): + return self._name diff --git a/core/src/pipeless_ai/lib/input/input.py b/core/src/pipeless_ai/lib/input/input.py index c2d6e13..1428fe1 100644 --- a/core/src/pipeless_ai/lib/input/input.py +++ b/core/src/pipeless_ai/lib/input/input.py @@ -10,7 +10,7 @@ from gi.repository import Gst, GstApp, GLib from pipeless_ai.lib.logger import logger, update_logger_component, update_logger_level -from pipeless_ai.lib.connection import InputOutputSocket, InputPushSocket +from pipeless_ai.lib.connection import InputOutputSocket, InputPushSocket, WorkerReadySocket from pipeless_ai.lib.config import Config from pipeless_ai.lib.messages import EndOfStreamMsg, RgbImageMsg, StreamCapsMsg, StreamTagsMsg @@ -250,16 +250,20 @@ def input(config_dict): bus.add_signal_watch() bus.connect("message", on_bus_message, loop) - logger.info('Starting pipeline') - ret = pipeline.set_state(Gst.State.PLAYING) - if ret == Gst.StateChangeReturn.FAILURE: - logger.error("[red]Unable to set the pipeline to the playing state.[/red]") - sys.exit(1) - try: # Start socket to wait all components connections s_push = InputPushSocket() # Listener + w_socket = WorkerReadySocket('input') + logger.info('Waiting first worker to be available') + w_socket.recv() # Wait for the first worker to appear m_socket = InputOutputSocket('w') # Waits for output + logger.info('First worker ready') + + logger.info('Starting pipeline') + ret = pipeline.set_state(Gst.State.PLAYING) + if ret == Gst.StateChangeReturn.FAILURE: + logger.error("[red]Unable to set the pipeline to the playing state.[/red]") + sys.exit(1) loop.run() except KeyboardInterrupt: diff --git a/core/src/pipeless_ai/lib/worker/worker.py b/core/src/pipeless_ai/lib/worker/worker.py index cdbac22..2c3f033 100644 --- a/core/src/pipeless_ai/lib/worker/worker.py +++ b/core/src/pipeless_ai/lib/worker/worker.py @@ -4,7 +4,7 @@ import numpy as np from pipeless_ai.lib.config import Config -from pipeless_ai.lib.connection import InputPullSocket, OutputPushSocket +from pipeless_ai.lib.connection import InputPullSocket, OutputPushSocket, WorkerReadySocket from pipeless_ai.lib.logger import logger, update_logger_component, update_logger_level from pipeless_ai.lib.messages import EndOfStreamMsg, RgbImageMsg, deserialize @@ -71,6 +71,10 @@ def worker(config_dict, user_module_path): logger.error('Missing app .py file path') sys.exit(1) + logger.info('Notifying worker ready to input') + w_socket = WorkerReadySocket('worker') + w_socket.send(b'ready') # Notify the input that a worker is available + try: while True: # Infinite worker loop diff --git a/package/Dockerfile b/package/Dockerfile index 668d21f..2569873 100644 --- a/package/Dockerfile +++ b/package/Dockerfile @@ -1,4 +1,6 @@ -FROM bitnami/python:3.10 +FROM bitnami/python:3.10.12 + +# Install gstreamer RUN install_packages libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev \ libgstreamer-plugins-bad1.0-dev gstreamer1.0-plugins-base \ gstreamer1.0-plugins-good gstreamer1.0-plugins-bad \ @@ -6,18 +8,20 @@ RUN install_packages libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev \ gstreamer1.0-x gstreamer1.0-alsa gstreamer1.0-gl gstreamer1.0-gtk3 \ gstreamer1.0-qt5 gstreamer1.0-pulseaudio -# Hack to allow mp4 files without errors. Note this library provides hardware acceleration, so we need an alternative fix. -#RUN apt-get remove gstreamer1.0-vaapi - +# Install required dependencies RUN install_packages libcairo2-dev libgirepository1.0-dev -# Allow python to install packages as nonroot -RUN mkdir /.local && chmod g+w /.local +COPY scripts / + +RUN /python-nonroot.sh + USER 1001 -RUN pip install pipeless-ai pipeless-ai-cli + # Allow to execute commands installed with pip ENV PATH="${PATH}:/.local/bin" +RUN pip install pipeless-ai pipeless-ai-cli + WORKDIR /app -ENTRYPOINT ["pipeless"] +ENTRYPOINT ["/entrypoint.sh"] diff --git a/package/README.md b/package/README.md index 4985715..d3ff841 100644 --- a/package/README.md +++ b/package/README.md @@ -1,3 +1,60 @@ -# Working environment +# Pipeless container library -Please note the Dockerfile provided **IS NOT** ready for its usage \ No newline at end of file +This directory contains the source files to build the Pipeless container images. + +The container images provide a way to run Pipeless out-of-the box without having to deal with dependencies. + +## Image Usage + +Print help command: + +```console +docker run --rm miguelaeh/pipeless --help +``` + +Create a new project locally: + +```console +docker run --rm -v /your/app/dir:/app miguelaeh/pipeless create project my_app_name +``` + +Run all components: + +```console +docker run --rm -v /your/app/dir:/app miguelaeh/pipeless run all +``` + +Run input only: + +```console +docker run --rm miguelaeh/pipeless run input +``` + +### Install Custom Python Packages + +Sometimes, your app may require python packages that are not installed by default into the pipeless container. You can use the `PIPELESS_USER_PYTHON_PACKAGES` variable to automatically install them on start. You can specify them as a list separated by commas (`,`), semicolons (`;`) or spaces (` `). For example: + +```console +docker run --rm -e "PIPELESS_USER_PYTHON_PACKAGES=opencv-python;some_other_package" miguelaeh/pipeless run worker +``` + +### Important Notes + +If you want to store the processed media to a file, it must be done in a path under `/app`. For example, setting `PIPELESS_OUTPUT_VIDEO_URI=file:///app/my_video.mp4`. +Futhermore, the directory mounted at `/app` (i.e. `/your/app/dir` on the above examples) must have group `root` with write permissions. + +## Docker compose usage + +The `docker-compose.yaml` file allows you to automatically deploy your application locally as if it would be deployed to the cloud. + +Start docker compose: + +```console +APP_DIR=/your/app/dir docker compose up +``` + +Stop the docker compose: + +```console +APP_DIR=/your/app/dir docker compose down -v +``` diff --git a/package/docker-compose.yaml b/package/docker-compose.yaml new file mode 100644 index 0000000..38aaaf8 --- /dev/null +++ b/package/docker-compose.yaml @@ -0,0 +1,42 @@ +version: "2" + +services: + input: + image: miguelaeh/pipeless:latest + command: ['run', 'input'] + volumes: + - '${APP_DIR}:/app' + environment: + - PIPELESS_INPUT_ADDRESS_HOST=input + - PIPELESS_INPUT_ADDRESS_PORT=1234 + - PIPELESS_OUTPUT_ADDRESS_HOST=output + - PIPELESS_OUTPUT_ADDRESS_PORT=1237 + - PIPELESS_INPUT_VIDEO_URI=https://github.com/miguelaeh/pipeless/raw/main/examples/cats/cats.mp4 + - PIPELESS_OUTPUT_VIDEO_URI=file:///app/output.mp4 + + output: + image: miguelaeh/pipeless:latest + command: ['run', 'output'] + volumes: + - '${APP_DIR}:/app' + environment: + - PIPELESS_INPUT_ADDRESS_HOST=input + - PIPELESS_INPUT_ADDRESS_PORT=1234 + - PIPELESS_OUTPUT_ADDRESS_HOST=output + - PIPELESS_OUTPUT_ADDRESS_PORT=1237 + - PIPELESS_INPUT_VIDEO_URI=https://github.com/miguelaeh/pipeless/raw/main/examples/cats/cats.mp4 + - PIPELESS_OUTPUT_VIDEO_URI=file:///app/output.mp4 + + worker: + image: miguelaeh/pipeless:latest + command: ['run', 'worker'] + volumes: + - '${APP_DIR}:/app' + environment: + - PIPELESS_INPUT_ADDRESS_HOST=input + - PIPELESS_INPUT_ADDRESS_PORT=1234 + - PIPELESS_OUTPUT_ADDRESS_HOST=output + - PIPELESS_OUTPUT_ADDRESS_PORT=1237 + - PIPELESS_INPUT_VIDEO_URI=https://github.com/miguelaeh/pipeless/raw/main/examples/cats/cats.mp4 + - PIPELESS_OUTPUT_VIDEO_URI=file:///app/output.mp4 + - PIPELESS_USER_PYTHON_PACKAGES=opencv-python diff --git a/package/scripts/entrypoint.sh b/package/scripts/entrypoint.sh new file mode 100755 index 0000000..766208d --- /dev/null +++ b/package/scripts/entrypoint.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +set -o errexit +set -o nounset +set -o pipefail +# set -o xtrace # Uncomment this line for debugging purposes + +. /libpipeless.sh + +if [[ "$1" = "run" && ( "$2" = "worker" || "$2" = "all" ) ]]; then + pipeless_install_user_python_deps +fi + +if [[ "$1" = "pipeless" ]]; then + exec "$@" +else + exec pipeless "$@" +fi diff --git a/package/scripts/libpipeless.sh b/package/scripts/libpipeless.sh new file mode 100755 index 0000000..a715673 --- /dev/null +++ b/package/scripts/libpipeless.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +set -o errexit +set -o nounset +set -o pipefail +# set -o xtrace # Uncomment this line for debugging purposes + +######################## +# Install Python dependencies required by the user code to run the worker +# Globals: +# PIPELESS_* +# Arguments: +# None +# Returns: +# None +######################### +pipeless_install_user_python_deps() { + [[ -z "$PIPELESS_USER_PYTHON_PACKAGES" ]] && return + + local -a package_list + read -r -a package_list <<< "$(tr ',;' ' ' <<< "${PIPELESS_USER_PYTHON_PACKAGES}")" + if [[ "${#package_list[@]}" -le 0 ]]; then + echo "No python packages specified by the user" + return + fi + + for package in "${package_list[@]}"; do + pip install "$package" + done +} + +######################## +# Install system packages required by the user code to run the worker +# These need to be installed at buildtime +# Globals: +# PIPELESS_* +# Arguments: +# None +# Returns: +# None +######################### +pipeless_install_user_system_deps() { + [[ -z "$PIPELESS_USER_SYSTEM_PACKAGES" ]] && return + + local -a package_list + read -r -a package_list <<< "$(tr ',;' ' ' <<< "${PIPELESS_USER_SYSTEM_PACKAGES}")" + if [[ "${#package_list[@]}" -le 0 ]]; then + echo "No system packages specified by the user" + return + fi + + for package in "${package_list[@]}"; do + apt-get install "$package" + done +} diff --git a/package/scripts/python-nonroot.sh b/package/scripts/python-nonroot.sh new file mode 100755 index 0000000..264f930 --- /dev/null +++ b/package/scripts/python-nonroot.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +set -o errexit +set -o nounset +set -o pipefail +# set -o xtrace # Uncomment this line for debugging purposes + +dirs=( + # Allow python to install packages as nonroot + "/.local" + "/.cache" +) +for dir in "${dirs[@]}"; do + mkdir -p "$dir" + chmod -R g+w "$dir" +done +