Skip to content

Commit

Permalink
Merge pull request #16 from miguelaeh/container_image
Browse files Browse the repository at this point in the history
Add pipeless container image
  • Loading branch information
miguelaeh authored Aug 25, 2023
2 parents 6e74918 + b2301c8 commit 57626a7
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 46 deletions.
2 changes: 1 addition & 1 deletion core/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pipeless-ai"
version = "0.1.4"
version = "0.1.5"
description = "A framework to build and deploy multimodal perception apps in minutes without worrying about multimedia pipelines"
authors = ["Miguel Angel Cabrera Minagorri <devgorri@gmail.com>"]
license = "Apache-2.0"
Expand Down
98 changes: 71 additions & 27 deletions core/src/pipeless_ai/lib/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,25 @@ def recv_handler(*args, **kwargs):

return recv_handler

def wait_socket_dial(socket, addr):
'''
Waits until the socket connects to the provided addr
'''
connected = False
while not connected:
try:
socket.dial(addr, block=True)
connected = True
except ConnectionRefused:
logger.warning(f'[orange3]Connection to {addr} failed. Connection Refushed. Retrying...[/orange3]')
time.sleep(1)
except TryAgain:
logger.warning(f'[orange3]Connection to {addr} failed. Try Again. Retrying...[/orange3]')
time.sleep(1)
except Exception as e:
logger.error(f'[red]Failed to connect to {addr}. {e}')
sys.exit(1)

class InputOutputSocket(metaclass=Singleton):
"""
nng socket to send messages from the input to the output
Expand All @@ -74,7 +93,7 @@ def __init__(self, mode, send_timeout=1000, read_timeout=1000):
- mode: 'w' for the input (write). 'r' for the output (read)
"""
config = Config(None) # Get the already existing config instance
address = config.get_input().get_address()
address = config.get_output().get_address()
# Make this connection to run on the provided port+1.
# The provided port is for other type of connection
port = str(address.get_port() + 1)
Expand All @@ -83,15 +102,8 @@ def __init__(self, mode, send_timeout=1000, read_timeout=1000):
self._socket = Pair0()
self._socket.send_timeout = send_timeout
self._name = 'InputOutputSocket-Write'

connected = False
while not connected:
try:
self._socket.dial(self._addr, block=True)
connected = True
except ConnectionRefused:
logger.warning(f'[orange3]Connection to {self._addr} failed. Retrying...[/orange3]')
time.sleep(1)

wait_socket_dial(self._socket, self._addr)
elif mode == 'r':
self._socket = Pair0(listen=self._addr)
self._socket.recv_timeout = read_timeout
Expand Down Expand Up @@ -160,14 +172,7 @@ def __init__(self, timeout=500):
self._socket.send_buffer_size = 180 # 3 seconds of 60 pfs video
self._name = 'OutputPushSocket'

connected = False
while not connected:
try:
self._socket.dial(self._addr, block=True)
connected = True
except ConnectionRefused:
logger.warning(f'[orange3]Connection to {self._addr} failed. Retrying...[/orange3]')
time.sleep(1)
wait_socket_dial(self._socket, self._addr)

@send_error_handler
def send(self, msg):
Expand Down Expand Up @@ -195,14 +200,7 @@ def __init__(self, timeout=500):
self._socket.recv_buffer_size = 180 # 3 seconds of 60 pfs video
self._name = 'InputPullSocket'

connected = False
while not connected:
try:
self._socket.dial(self._addr, block=True)
connected = True
except ConnectionRefused:
logger.warning(f'[orange3]Connection to {self._addr} failed. Retrying...[/orange3]')
time.sleep(1)
wait_socket_dial(self._socket, self._addr)

@recv_error_handler
def recv(self):
Expand Down Expand Up @@ -240,4 +238,50 @@ def close(self):
self._socket.close()

def get_socket_name(self):
return self._name
return self._name

class WorkerReadySocket(metaclass=Singleton):
"""
Allows the input to wait for the first worker before starting to send data
When the worker needs to install user packages it takes longer to start
and the data send by the input is lost if we don't wait for at least the first worker
"""
def __init__(self, mode):
"""
Parameters:
- mode: 'input' for the input. 'worker' for the worker
"""
config = Config(None) # Get the already existing config instance
address = config.get_input().get_address()
# Make this connection to run on the provided port+2.
# The provided port is for other type of connection
port = str(address.get_port() + 2)
self._addr = f'tcp://{address.get_host()}:{port}'
if mode == 'worker':
self._socket = Pair0()
self._name = 'WorkerReadySocket-Worker'

wait_socket_dial(self._socket, self._addr)
elif mode == 'input':
self._socket = Pair0(listen=self._addr)
self._name = 'WorkerReadySocket-Input'
else:
raise 'Wrong mode for WorkerReadySocket'

@send_error_handler
def send(self, msg):
# Blocking send call. We always want to ensure the messages
# from input to output arrive because they change the pipelines
self._socket.send(msg)

@recv_error_handler
def recv(self):
# Only the input will receive, and once the first worker send, we won't receive again
# so we create a blocking call
return self._socket.recv()

def close(self):
self._socket.close()

def get_socket_name(self):
return self._name
18 changes: 11 additions & 7 deletions core/src/pipeless_ai/lib/input/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from gi.repository import Gst, GstApp, GLib

from pipeless_ai.lib.logger import logger, update_logger_component, update_logger_level
from pipeless_ai.lib.connection import InputOutputSocket, InputPushSocket
from pipeless_ai.lib.connection import InputOutputSocket, InputPushSocket, WorkerReadySocket
from pipeless_ai.lib.config import Config
from pipeless_ai.lib.messages import EndOfStreamMsg, RgbImageMsg, StreamCapsMsg, StreamTagsMsg

Expand Down Expand Up @@ -250,16 +250,20 @@ def input(config_dict):
bus.add_signal_watch()
bus.connect("message", on_bus_message, loop)

logger.info('Starting pipeline')
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
logger.error("[red]Unable to set the pipeline to the playing state.[/red]")
sys.exit(1)

try:
# Start socket to wait all components connections
s_push = InputPushSocket() # Listener
w_socket = WorkerReadySocket('input')
logger.info('Waiting first worker to be available')
w_socket.recv() # Wait for the first worker to appear
m_socket = InputOutputSocket('w') # Waits for output
logger.info('First worker ready')

logger.info('Starting pipeline')
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
logger.error("[red]Unable to set the pipeline to the playing state.[/red]")
sys.exit(1)

loop.run()
except KeyboardInterrupt:
Expand Down
6 changes: 5 additions & 1 deletion core/src/pipeless_ai/lib/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import numpy as np

from pipeless_ai.lib.config import Config
from pipeless_ai.lib.connection import InputPullSocket, OutputPushSocket
from pipeless_ai.lib.connection import InputPullSocket, OutputPushSocket, WorkerReadySocket
from pipeless_ai.lib.logger import logger, update_logger_component, update_logger_level
from pipeless_ai.lib.messages import EndOfStreamMsg, RgbImageMsg, deserialize

Expand Down Expand Up @@ -71,6 +71,10 @@ def worker(config_dict, user_module_path):
logger.error('Missing app .py file path')
sys.exit(1)

logger.info('Notifying worker ready to input')
w_socket = WorkerReadySocket('worker')
w_socket.send(b'ready') # Notify the input that a worker is available

try:
while True:
# Infinite worker loop
Expand Down
20 changes: 12 additions & 8 deletions package/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
FROM bitnami/python:3.10
FROM bitnami/python:3.10.12

# Install gstreamer
RUN install_packages libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev \
libgstreamer-plugins-bad1.0-dev gstreamer1.0-plugins-base \
gstreamer1.0-plugins-good gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly gstreamer1.0-libav gstreamer1.0-tools \
gstreamer1.0-x gstreamer1.0-alsa gstreamer1.0-gl gstreamer1.0-gtk3 \
gstreamer1.0-qt5 gstreamer1.0-pulseaudio

# Hack to allow mp4 files without errors. Note this library provides hardware acceleration, so we need an alternative fix.
#RUN apt-get remove gstreamer1.0-vaapi

# Install required dependencies
RUN install_packages libcairo2-dev libgirepository1.0-dev

# Allow python to install packages as nonroot
RUN mkdir /.local && chmod g+w /.local
COPY scripts /

RUN /python-nonroot.sh

USER 1001
RUN pip install pipeless-ai pipeless-ai-cli

# Allow to execute commands installed with pip
ENV PATH="${PATH}:/.local/bin"

RUN pip install pipeless-ai pipeless-ai-cli

WORKDIR /app

ENTRYPOINT ["pipeless"]
ENTRYPOINT ["/entrypoint.sh"]
61 changes: 59 additions & 2 deletions package/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,60 @@
# Working environment
# Pipeless container library

Please note the Dockerfile provided **IS NOT** ready for its usage
This directory contains the source files to build the Pipeless container images.

The container images provide a way to run Pipeless out-of-the box without having to deal with dependencies.

## Image Usage

Print help command:

```console
docker run --rm miguelaeh/pipeless --help
```

Create a new project locally:

```console
docker run --rm -v /your/app/dir:/app miguelaeh/pipeless create project my_app_name
```

Run all components:

```console
docker run --rm -v /your/app/dir:/app miguelaeh/pipeless run all
```

Run input only:

```console
docker run --rm miguelaeh/pipeless run input
```

### Install Custom Python Packages

Sometimes, your app may require python packages that are not installed by default into the pipeless container. You can use the `PIPELESS_USER_PYTHON_PACKAGES` variable to automatically install them on start. You can specify them as a list separated by commas (`,`), semicolons (`;`) or spaces (` `). For example:

```console
docker run --rm -e "PIPELESS_USER_PYTHON_PACKAGES=opencv-python;some_other_package" miguelaeh/pipeless run worker
```

### Important Notes

If you want to store the processed media to a file, it must be done in a path under `/app`. For example, setting `PIPELESS_OUTPUT_VIDEO_URI=file:///app/my_video.mp4`.
Futhermore, the directory mounted at `/app` (i.e. `/your/app/dir` on the above examples) must have group `root` with write permissions.

## Docker compose usage

The `docker-compose.yaml` file allows you to automatically deploy your application locally as if it would be deployed to the cloud.

Start docker compose:

```console
APP_DIR=/your/app/dir docker compose up
```

Stop the docker compose:

```console
APP_DIR=/your/app/dir docker compose down -v
```
42 changes: 42 additions & 0 deletions package/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
version: "2"

services:
input:
image: miguelaeh/pipeless:latest
command: ['run', 'input']
volumes:
- '${APP_DIR}:/app'
environment:
- PIPELESS_INPUT_ADDRESS_HOST=input
- PIPELESS_INPUT_ADDRESS_PORT=1234
- PIPELESS_OUTPUT_ADDRESS_HOST=output
- PIPELESS_OUTPUT_ADDRESS_PORT=1237
- PIPELESS_INPUT_VIDEO_URI=https://github.com/miguelaeh/pipeless/raw/main/examples/cats/cats.mp4
- PIPELESS_OUTPUT_VIDEO_URI=file:///app/output.mp4

output:
image: miguelaeh/pipeless:latest
command: ['run', 'output']
volumes:
- '${APP_DIR}:/app'
environment:
- PIPELESS_INPUT_ADDRESS_HOST=input
- PIPELESS_INPUT_ADDRESS_PORT=1234
- PIPELESS_OUTPUT_ADDRESS_HOST=output
- PIPELESS_OUTPUT_ADDRESS_PORT=1237
- PIPELESS_INPUT_VIDEO_URI=https://github.com/miguelaeh/pipeless/raw/main/examples/cats/cats.mp4
- PIPELESS_OUTPUT_VIDEO_URI=file:///app/output.mp4

worker:
image: miguelaeh/pipeless:latest
command: ['run', 'worker']
volumes:
- '${APP_DIR}:/app'
environment:
- PIPELESS_INPUT_ADDRESS_HOST=input
- PIPELESS_INPUT_ADDRESS_PORT=1234
- PIPELESS_OUTPUT_ADDRESS_HOST=output
- PIPELESS_OUTPUT_ADDRESS_PORT=1237
- PIPELESS_INPUT_VIDEO_URI=https://github.com/miguelaeh/pipeless/raw/main/examples/cats/cats.mp4
- PIPELESS_OUTPUT_VIDEO_URI=file:///app/output.mp4
- PIPELESS_USER_PYTHON_PACKAGES=opencv-python
18 changes: 18 additions & 0 deletions package/scripts/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

set -o errexit
set -o nounset
set -o pipefail
# set -o xtrace # Uncomment this line for debugging purposes

. /libpipeless.sh

if [[ "$1" = "run" && ( "$2" = "worker" || "$2" = "all" ) ]]; then
pipeless_install_user_python_deps
fi

if [[ "$1" = "pipeless" ]]; then
exec "$@"
else
exec pipeless "$@"
fi
Loading

0 comments on commit 57626a7

Please sign in to comment.