diff --git a/examples/docker/emloop_example/Dockerfile b/examples/docker/emloop_example/Dockerfile deleted file mode 100644 index c7a3b15..0000000 --- a/examples/docker/emloop_example/Dockerfile +++ /dev/null @@ -1,22 +0,0 @@ -FROM iterait/emloop - -RUN pacman -Syu --noconfirm --needed python-pyzmq openssh python-numpy - -ADD ssh /root/.ssh -RUN chmod 600 -R /root/.ssh -RUN ssh-keyscan -H github.com >> /root/.ssh/known_hosts - -RUN pip install git+ssh://git@github.com/iterait/shepherd.git - -ENV LD_LIBRARY_PATH /usr/local/lib - -EXPOSE 9999 - -WORKDIR /project - -ADD . . - -# Replace this with your model of choice -ADD emloop-test /project/config.yaml - -CMD shepherd-runner /project diff --git a/examples/docker/emloop_example/README.md b/examples/docker/emloop_example/README.md deleted file mode 100644 index 4127682..0000000 --- a/examples/docker/emloop_example/README.md +++ /dev/null @@ -1,12 +0,0 @@ -# Shepherd runner docker example -This directory contains simple config, dataset, model and Dockerfile with emloop docker runner. - -A container of this image may be utilized with **shepherd** for predicting new examples. - -To build and test it - -- create a `ssh` dir with SSH keys having access to **shepherd** GitHub -- start a local docker registry with the provided docker compose sandbox (`../docker-compose-sandbox.yml`) -- build, tag and push the image to the registry with `./build.sh` - -With that, you should be able to configure the runner to pull this image and create a task for it. diff --git a/examples/docker/emloop_example/build.sh b/examples/docker/emloop_example/build.sh deleted file mode 100755 index 369bfe8..0000000 --- a/examples/docker/emloop_example/build.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh -docker build --no-cache . -t 0.0.0.0:6000/emloop-test:latest -docker push 0.0.0.0:6000/emloop-test:latest diff --git a/examples/docker/emloop_example/dummy/__init__.py b/examples/docker/emloop_example/dummy/__init__.py deleted file mode 100644 index a7394ac..0000000 --- a/examples/docker/emloop_example/dummy/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .dummy_dataset import DummyDataset -from .dummy_model import DummyModel -from .post_process_dataset import PostProcessDataset - diff --git a/examples/docker/emloop_example/dummy/dummy_dataset.py b/examples/docker/emloop_example/dummy/dummy_dataset.py deleted file mode 100644 index c3dfa12..0000000 --- a/examples/docker/emloop_example/dummy/dummy_dataset.py +++ /dev/null @@ -1,14 +0,0 @@ -import emloop as el - - -class DummyDataset(el.BaseDataset): - - def _configure_dataset(self, post_process_factor: int=1, **kwargs): - self._post_process_factor = post_process_factor - - def predict_stream(self, payload): - yield payload - - def production_stream(self, payload): - for b in self.predict_stream(payload): - yield b diff --git a/examples/docker/emloop_example/dummy/dummy_model.py b/examples/docker/emloop_example/dummy/dummy_model.py deleted file mode 100644 index 2e4b69e..0000000 --- a/examples/docker/emloop_example/dummy/dummy_model.py +++ /dev/null @@ -1,26 +0,0 @@ -import emloop as el - - -class DummyModel(el.AbstractModel): - - def __init__(self, factor: int=2, **kwargs): - super().__init__(**kwargs) - self._factor = factor - pass - - def input_names(self): - pass - - def output_names(self): - pass - - def save(self, name_suffix: str): - pass - - def run(self, batch: el.Batch, train: bool, stream): - batch['output'] = [batch['key'][0]*self._factor] - return batch - - @property - def restore_fallback(self): - return None diff --git a/examples/docker/emloop_example/dummy/post_process_dataset.py b/examples/docker/emloop_example/dummy/post_process_dataset.py deleted file mode 100644 index 0972a33..0000000 --- a/examples/docker/emloop_example/dummy/post_process_dataset.py +++ /dev/null @@ -1,8 +0,0 @@ -from .dummy_dataset import DummyDataset - - -class PostProcessDataset(DummyDataset): - - def postprocess_batch(self, input_batch, output_batch): - output_batch['output'][0] *= self._post_process_factor - return output_batch diff --git a/examples/docker/emloop_example/emloop-test/latest/config.yaml b/examples/docker/emloop_example/emloop-test/latest/config.yaml deleted file mode 100644 index 933fc03..0000000 --- a/examples/docker/emloop_example/emloop-test/latest/config.yaml +++ /dev/null @@ -1,11 +0,0 @@ -dataset: - class: examples.docker.emloop_example.dummy.DummyDataset - - -model: - name: Dummy - class: examples.docker.emloop_example.dummy.DummyModel - -eval: - predict: - hooks: [LogProfile] diff --git a/examples/docker/emloop_example/emloop-test/latest/docker-config.yaml b/examples/docker/emloop_example/emloop-test/latest/docker-config.yaml deleted file mode 100644 index 1300d05..0000000 --- a/examples/docker/emloop_example/emloop-test/latest/docker-config.yaml +++ /dev/null @@ -1,7 +0,0 @@ -dataset: - class: dummy.DummyDataset - - -model: - name: Dummy - class: dummy.DummyModel diff --git a/examples/docker/emloop_example/emloop-test/production/config.yaml b/examples/docker/emloop_example/emloop-test/production/config.yaml deleted file mode 100644 index d218b39..0000000 --- a/examples/docker/emloop_example/emloop-test/production/config.yaml +++ /dev/null @@ -1,14 +0,0 @@ -dataset: - class: examples.docker.emloop_example.dummy.PostProcessDataset - - -model: - name: Dummy - class: examples.docker.emloop_example.dummy.DummyModel - -eval: - production: - model: - factor: 10 - dataset: - post_process_factor: 2 diff --git a/examples/docker/emloop_example/emloop-test/test/config.yaml b/examples/docker/emloop_example/emloop-test/test/config.yaml deleted file mode 100644 index fb0674f..0000000 --- a/examples/docker/emloop_example/emloop-test/test/config.yaml +++ /dev/null @@ -1,7 +0,0 @@ -dataset: - class: examples.docker.emloop_example.dummy.DummyDataset - - -model: - name: Dummy - class: examples.docker.emloop_example.dummy.DummyModel diff --git a/examples/docker/emloop_example/emloop-test/test/runner.yaml b/examples/docker/emloop_example/emloop-test/test/runner.yaml deleted file mode 100644 index 5713cd8..0000000 --- a/examples/docker/emloop_example/emloop-test/test/runner.yaml +++ /dev/null @@ -1,2 +0,0 @@ -runner: - class: missing.Runner # this should cause an exception of the shepherd-runner diff --git a/examples/docker/emloop_example/emloop-test/test2/config.yaml b/examples/docker/emloop_example/emloop-test/test2/config.yaml deleted file mode 100644 index 1300d05..0000000 --- a/examples/docker/emloop_example/emloop-test/test2/config.yaml +++ /dev/null @@ -1,7 +0,0 @@ -dataset: - class: dummy.DummyDataset - - -model: - name: Dummy - class: dummy.DummyModel diff --git a/setup.py b/setup.py index 2e5e4a8..1b92d36 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,6 @@ 'schematics==2.1.1', 'aiohttp==3.7.4', 'aiohttp-cors==0.7.0', - 'emloop>=0.2', 'apistrap==0.9.11', 'minio==5.0.6', 'urllib3==1.24.2' diff --git a/shepherd/config.py b/shepherd/config.py index 6713f0a..a799e42 100644 --- a/shepherd/config.py +++ b/shepherd/config.py @@ -1,11 +1,11 @@ import logging -import re import os -import ruamel.yaml +import re from typing import Optional, Dict, Any +import ruamel.yaml from schematics import Model -from schematics.types import ModelType, DictType, StringType, URLType, BaseType +from schematics.types import ModelType, DictType, StringType, BaseType def strip_url_scheme(url): diff --git a/shepherd/manage.py b/shepherd/manage.py index 232e162..bfe72af 100644 --- a/shepherd/manage.py +++ b/shepherd/manage.py @@ -1,7 +1,6 @@ import logging import click -import emloop as el from aiohttp import web import aiohttp_cors @@ -32,8 +31,7 @@ def run(host, port, config_file) -> None: # set-up logging logging.basicConfig(level=config.logging.log_level, - format=el.constants.EL_LOG_FORMAT, - datefmt=el.constants.EL_LOG_DATE_FORMAT) + format="%(asctime)s-%(levelname)s-%(name)s::%(module)s|%(lineno)s:: %(message)s") logging.getLogger("urllib3").setLevel(logging.WARNING) welcome() diff --git a/shepherd/runner/__init__.py b/shepherd/runner/__init__.py index 6b5f173..5ad2c4d 100644 --- a/shepherd/runner/__init__.py +++ b/shepherd/runner/__init__.py @@ -1,4 +1,3 @@ from .base_runner import BaseRunner, n_available_gpus -from .json_runner import JSONRunner, to_json_serializable, run -__all__ = ['BaseRunner', 'JSONRunner', 'to_json_serializable', 'run', 'n_available_gpus'] +__all__ = ['BaseRunner', 'n_available_gpus'] diff --git a/shepherd/runner/base_runner.py b/shepherd/runner/base_runner.py index 8095810..3d909ec 100644 --- a/shepherd/runner/base_runner.py +++ b/shepherd/runner/base_runner.py @@ -1,17 +1,13 @@ -import re -import os import logging -import traceback +import os import os.path as path +import re +import traceback from abc import abstractmethod -from typing import Optional, Any, Dict import zmq import zmq.asyncio -import emloop as el -from emloop.cli.util import validate_config, find_config -from emloop.utils import load_config from shepherd.comm import * from shepherd.constants import INPUT_DIR, OUTPUT_DIR @@ -45,7 +41,7 @@ class BaseRunner: def __init__(self, config_path: str, port: int, stream_name: str): """Create new :py:class:`Runner`.""" - logging.info('Creating emloop runner from `%s` listening on port %s', config_path, port) + logging.info('Creating runner from `%s` listening on port %s', config_path, port) # bind to the socket self._port = port @@ -53,51 +49,6 @@ def __init__(self, config_path: str, port: int, stream_name: str): self._config_path: str = config_path self._stream_name: str = stream_name - self._config: Dict[str, Any] = None - self._dataset: Optional[el.AbstractDataset] = None - self._model: Optional[el.AbstractModel] = None - - def _load_config(self) -> None: - """ - Maybe load the **emloop** configuration from previously specified file and apply updates - from ``eval.`` section. - """ - if self._config is None: - logging.debug('Loading config from `%s', self._config_path) - # load config - self._config = load_config(config_file=find_config(self._config_path)) - if 'eval' in self._config and self._stream_name in self._config['eval']: - logging.debug('Applying eval config updates for stream `%s`', self._stream_name) - update_section = self._config['eval'][self._stream_name] - for subsection in ['dataset', 'model', 'main_loop']: - if subsection in update_section: - self._config[subsection].update(update_section[subsection]) - if 'hooks' in update_section: - self._config['hooks'] = update_section['hooks'] - else: - logging.warning('Config does not contain `eval.%s.hooks` section. ' - 'No hook will be employed during the evaluation.', self._stream_name) - self._config['hooks'] = [] - self._config["model"]["n_gpus"] = n_available_gpus() - validate_config(self._config) - logging.debug('Loaded config: %s', self._config) - - def _load_dataset(self) -> None: - """Maybe load dataset.""" - if self._dataset is None: - self._load_config() - logging.info('Creating dataset') - self._dataset = el.create_dataset(self._config, None) - - def _load_model(self) -> None: - """Maybe load model.""" - if self._model is None: - self._load_config() - logging.info('Creating model') - restore_from = self._config_path - if not path.isdir(restore_from): - restore_from = path.dirname(restore_from) - self._model = el.create_model(self._config, None, self._dataset, restore_from) @abstractmethod def _process_job(self, input_path: str, output_path: str) -> None: diff --git a/shepherd/runner/json_runner.py b/shepherd/runner/json_runner.py deleted file mode 100644 index 98543a0..0000000 --- a/shepherd/runner/json_runner.py +++ /dev/null @@ -1,85 +0,0 @@ -import json -import logging -import os.path as path -from typing import Any -from collections import defaultdict - -import emloop as el -import numpy as np - -from ..constants import DEFAULT_PAYLOAD_FILE, DEFAULT_OUTPUT_FILE -from .base_runner import BaseRunner - - -def to_json_serializable(data): - """Make an object containing numpy arrays/scalars JSON serializable.""" - - if data is None: - return None - if isinstance(data, dict): - return {key: to_json_serializable(value) for key, value in data.items()} - elif isinstance(data, list) or isinstance(data, tuple): - return [to_json_serializable(v) for v in data] - elif isinstance(data, np.ndarray): - return data.tolist() - elif np.isscalar(data): - return data - else: - raise ValueError('Unsupported JSON type `{}` (key `{}`)'.format(type(data), data)) - - -def run(model: el.AbstractModel, dataset: el.AbstractDataset, stream_name: str, payload: Any) -> el.Batch: - """ - Get the specified data stream from the given dataset, apply the given model on its batches and return the results. - - The components have to be **emloop** compatible with: - - dataset having method named ``[stream_name]_stream`` taking the payload and returning the stream - - (optional) dataset having method named ``postprocess_batch`` taking both the input and output batches and - returning the post-processed batch - - :param model: emloop model to be run - :param dataset: emloop dataset to get the stream from - :param stream_name: stream name - :param payload: payload passed to the method creating the stream - :return: result batch (if the stream produces multiple batches its the concatenation of all the results) - """ - result = defaultdict(list) - for input_batch in getattr(dataset, stream_name + '_stream')(payload): - logging.info('Another batch (%s)', list(input_batch.keys())) - output_batch = model.run(input_batch, train=False, stream=None) - if hasattr(dataset, 'postprocess_batch'): - logging.info('\tPostprocessing') - result_batch = dataset.postprocess_batch(input_batch=input_batch, output_batch=output_batch) - logging.info('\tdone') - else: - logging.info('Skipping postprocessing') - result_batch = output_batch - - for source, value in result_batch.items(): - result[source] += list(value) - return result - - -class JSONRunner(BaseRunner): - """ - Fully functional emloop runner which loads a JSON from ``input_path``/``input.json``, passes the loaded object - to the desired dataset stream, runs the model and saves the output batch to ``output_path``/``output.json``. - """ - - def _process_job(self, input_path: str, output_path: str) -> None: - """ - Process a JSON job - - load ``input_path``/``input`` - - create dataset stream with the loaded JSON - - run the model - - save the output to ``output_path``/``output`` - - :param input_path: input data directory - :param output_path: output data directory - """ - self._load_dataset() - self._load_model() - payload = json.load(open(path.join(input_path, DEFAULT_PAYLOAD_FILE))) - result = run(self._model, self._dataset, self._stream_name, payload) - result_json = to_json_serializable(result) - json.dump(result_json, open(path.join(output_path, DEFAULT_OUTPUT_FILE), 'w')) diff --git a/shepherd/runner/runner_entry_point.py b/shepherd/runner/runner_entry_point.py index f0224f5..ef8ff77 100644 --- a/shepherd/runner/runner_entry_point.py +++ b/shepherd/runner/runner_entry_point.py @@ -1,14 +1,14 @@ import asyncio +import importlib +import logging import os import sys -import logging -import os.path as path from argparse import ArgumentParser -import emloop as el - - __all__ = ['main'] +from typing import Tuple, Optional + +from shepherd.runner import BaseRunner def create_argparser(): @@ -17,7 +17,7 @@ def create_argparser(): parser.add_argument('-p', '--port', dest="port", default=9999, type=int, help='Socket port to bind to') parser.add_argument('-s', '--stream', default='predict', help='Dataset stream name') parser.add_argument('-r', '--runner', default='shepherd.runner.JSONRunner', help='Fully qualified runner class') - parser.add_argument('config_path', help='emloop configuration file path') + parser.add_argument('config_path', help='configuration file path') return parser @@ -31,25 +31,15 @@ def main() -> None: # parse args sys.path.insert(0, os.getcwd()) logging.basicConfig(level=logging.DEBUG, - format=el.constants.EL_LOG_FORMAT, - datefmt=el.constants.EL_LOG_DATE_FORMAT) + format="%(asctime)s-%(levelname)s-%(name)s::%(module)s|%(lineno)s:: %(message)s") args = create_argparser().parse_args() - runner_fqn = args.runner - config_dir = args.config_path - if path.isfile(args.config_path): - config_dir = path.dirname(args.config_path) - - runner_config_file = path.join(config_dir, 'runner.yaml') - if path.exists(runner_config_file): - logging.info('Using custom runner configuration file') - runner_config = el.utils.load_config(path.join(runner_config_file)) - runner_fqn = runner_config['runner']['class'] + module_name = "model.isletnet_runner" + class_name = "IsletnetRunner" - # create runner - module, class_ = el.utils.parse_fully_qualified_name(runner_fqn) - runner = el.utils.create_object(module, class_, args=(args.config_path, args.port, args.stream)) + _module = importlib.import_module(module_name) + runner = getattr(_module, class_name)(args.config_path, args.port, args.stream) # listen for input messages asyncio.run(runner.process_all()) diff --git a/shepherd/sheep/bare_sheep.py b/shepherd/sheep/bare_sheep.py index 3eba7d4..dc15a49 100644 --- a/shepherd/sheep/bare_sheep.py +++ b/shepherd/sheep/bare_sheep.py @@ -1,10 +1,8 @@ import os import shlex import subprocess -import os.path as path from typing import Dict, Any, Optional -import emloop as el from schematics.types import StringType from .base_sheep import BaseSheep @@ -36,22 +34,6 @@ def __init__(self, config: Dict[str, Any], **kwargs): self._runner: Optional[subprocess.Popen] = None self._runner_config_path: Optional[str] = None - def _load_model(self, model_name: str, model_version: str) -> None: - """ - Set up runner config path to ``working_directory`` / ``model_name`` / ``model_version`` / ``config.yaml``. - - :param model_name: model name - :param model_version: model version - :raise SheepConfigurationError: if the runner config path does not exist - """ - emloop_config_path = path.join(self._config.working_directory, model_name, model_version, - el.constants.EL_CONFIG_FILE) - if not path.exists(emloop_config_path): - raise SheepConfigurationError('Cannot load model `{}:{}`, file `{}` does not exist.' - .format(model_name, model_version, emloop_config_path)) - super()._load_model(model_name, model_version) - self._runner_config_path = path.relpath(emloop_config_path, self._config.working_directory) - def start(self, model_name: str, model_version: str) -> None: """ Start a subprocess with the sheep runner. diff --git a/tests/runner/test_runner.py b/tests/runner/test_runner.py index f435bbf..29f620e 100644 --- a/tests/runner/test_runner.py +++ b/tests/runner/test_runner.py @@ -1,55 +1,14 @@ -import asyncio import json -import pytest import os -import re import os.path as path - +import re import subprocess -from threading import Thread +import pytest + +from shepherd.comm import * from shepherd.constants import OUTPUT_DIR, DEFAULT_OUTPUT_FILE from shepherd.runner import * -from shepherd.runner.runner_entry_point import main -from shepherd.comm import * - - -def test_to_json_serializable(json_data): - original, serializable = json_data - assert serializable == to_json_serializable(original) - with pytest.raises(ValueError): - to_json_serializable(asyncio) - - -async def test_json_runner(job, feeding_socket, runner_setup, loop): - socket, port = feeding_socket - job_id, job_dir = job - - version, stream, expected = runner_setup - config_path = path.join('examples', 'docker', 'emloop_example', 'emloop-test', version) - runner = JSONRunner(config_path, port, stream) - task = asyncio.create_task(runner.process_all()) - await Messenger.send(socket, InputMessage(dict(job_id=job_id, io_data_root=job_dir))) - message: DoneMessage = await Messenger.recv(socket, [DoneMessage]) - task.cancel() - output = json.load(open(path.join(job_dir, job_id, OUTPUT_DIR, DEFAULT_OUTPUT_FILE))) - - assert output == {'key': [42], 'output': [expected]} - assert message.job_id == job_id - - -async def test_json_runner_exception(job, feeding_socket): - socket, port = feeding_socket - job_id, job_dir = job - - config_path = path.join('examples', 'docker', 'emloop_example', 'emloop-test', 'latest') - runner = JSONRunner(config_path, port, 'does_not_exist') - task = asyncio.create_task(runner.process_all()) - await Messenger.send(socket, InputMessage(dict(job_id=job_id, io_data_root=job_dir))) - error = await Messenger.recv(socket, [ErrorMessage]) - task.cancel() - - assert error.message == 'AttributeError: \'DummyDataset\' object has no attribute \'does_not_exist_stream\'' def start_cli(command, mocker): @@ -77,13 +36,6 @@ async def test_runner(job, feeding_socket, runner_setup, mocker, start): # for assert output['output'] == [expected] -def test_runner_configuration(mocker): - config_path = path.join('examples', 'docker', 'emloop_example', 'emloop-test', 'test') - mocker.patch('sys.argv', ['shepherd-runner', '-p', '8888', config_path]) - with pytest.raises(ModuleNotFoundError): - main() # runner is configured to a non-existent module; thus, we expect a failure - - def test_n_gpus(mocker): n_system_gpus = len([s for s in os.listdir("/dev") if re.search(r'nvidia[0-9]+', s) is not None]) assert n_available_gpus() == n_system_gpus diff --git a/tests/sheep/test_sheep.py b/tests/sheep/test_sheep.py index 710cb8c..30a5636 100644 --- a/tests/sheep/test_sheep.py +++ b/tests/sheep/test_sheep.py @@ -20,21 +20,6 @@ def test_extract_gpu_number(): assert extract_gpu_number('/dev/nvidia3') == '3' -def test_bare_sheep_start_stop(bare_sheep: BareSheep): - bare_sheep.slaughter() - bare_sheep.start('emloop-test', 'latest') - assert bare_sheep.running - bare_sheep.slaughter() - assert not bare_sheep.running - bare_sheep.start('emloop-test', 'latest') - - -def test_bare_configuration_error(bare_sheep: BareSheep): - - with pytest.raises(SheepConfigurationError): # model version does not exist - bare_sheep.start('emloop-test', 'i-do-not-exist') - - @pytest.fixture() def image_valid2() -> Tuple[str, str]: yield 'library/alpine', 'edge'