Skip to content

Commit

Permalink
Merge pull request #97 from backend-developers-ltd/mulit-executor-man…
Browse files Browse the repository at this point in the history
…ager

Support for multiple executors and executor classes in BaseExecutorManager
  • Loading branch information
mzukowski-reef authored May 24, 2024
2 parents 74ec229 + 1361e61 commit e8dd2f7
Show file tree
Hide file tree
Showing 29 changed files with 831 additions and 391 deletions.
17 changes: 17 additions & 0 deletions compute_horde/compute_horde/mv_protocol/miner_requests.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import enum

import pydantic

from ..base_requests import BaseRequest, JobMixin
from ..utils import MachineSpecs


class RequestType(enum.Enum):
V0AcceptJobRequest = "V0AcceptJobRequest"
V0DeclineJobRequest = "V0DeclineJobRequest"
V0ExecutorManifestRequest = "V0ExecutorManifestRequest"
V0ExecutorReadyRequest = "V0ExecutorReadyRequest"
V0ExecutorFailedRequest = "V0ExecutorFailedRequest"
V0JobFailedRequest = "V0JobFailedRequest"
Expand All @@ -16,6 +19,15 @@ class RequestType(enum.Enum):
UnauthorizedError = "UnauthorizedError"


class ExecutorClassManifest(pydantic.BaseModel):
executor_class: int
count: int


class ExecutorManifest(pydantic.BaseModel):
executor_classes: list[ExecutorClassManifest]


class BaseMinerRequest(BaseRequest):
message_type: RequestType

Expand Down Expand Up @@ -54,6 +66,11 @@ class V0MachineSpecsRequest(BaseMinerRequest, JobMixin):
specs: MachineSpecs


class V0ExecutorManifestRequest(BaseMinerRequest):
message_type: RequestType = RequestType.V0ExecutorManifestRequest
manifest: ExecutorManifest


class GenericError(BaseMinerRequest):
message_type: RequestType = RequestType.GenericError
details: str | None = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import pytest

logger = logging.getLogger(__name__)

from ..mv_protocol.validator_requests import Volume, VolumeType

logger = logging.getLogger(__name__)


@pytest.mark.parametrize(
"url, expected",
Expand Down
39 changes: 25 additions & 14 deletions miner/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,43 @@ Miners are encouraged to optimize their setup by implementing their own executor

3. To use your custom executor, set the `EXECUTOR_MANAGER_CLASS_PATH` variable in the `.env` file of the miner runner.

This feature is currently in early access stage and is only available on `v0-preprod` images. See the section about preprod images to set up your environment.

To create a custom executor manager, follow these steps:

1. Create a directory for your code, e.g., `/home/ubuntu/custom_executor`.

2. Inside the directory, create a file named `my_executor.py` and add the following code:

```python
from compute_horde_miner.miner.executor_manager.base import BaseExecutorManager
from compute_horde_miner.miner.executor_manager.base import BaseExecutorManager

class MyExecutor(BaseExecutorManager):
async def start_new_executor(self, token, executor_class, timeout):
"""Start spinning up an executor with `token` for given executor_class or raise ExecutorUnavailable if at capacity
`timeout` is provided so manager does not need to relay on pool cleanup to stop expired executor"""
pass

async def kill_executor(self, executor):
"""Kill running executor. It might be platform specific, so leave it to Manager implementation"""
pass

async def wait_for_executor(self, executor, timeout):
"""Wait for executor to finish the job for till timeout.
class MyExecutor(BaseExecutorManager):
async def _reserve_executor(self, token):
"""Start spinning up an executor with `token` or raise ExecutorUnavailable if at capacity"""
pass
Have to return not None status if executor finished. If returned status is None it means that
executor is still running.
"""
pass

async def _kill_executor(self, executor):
"""Kill running executor."""
pass
async def get_manifest(self) -> dict[int, int]:
"""Return executors manifest
async def _wait_for_executor(self, executor, timeout):
"""Wait for executor to finish the job until timeout"""
pass
Keys are executor class ids and values are number of supported executors for given executor class.
"""
pass
```

You need to implement all three methods (`_reserve_executor`, `_kill_executor`, and `_wait_for_executor`) to make the executor work. For reference, you can check the implementation in `compute_horde_miner.miner.executor_manager.docker`.
You need to implement all 4 methods (`start_new_executor`, `kill_executor`, `wait_for_executor` and `get_manifest`) to make the executor work. For reference, you can check the implementation in `compute_horde_miner.miner.executor_manager.docker`.

3. Update your `.env` file with the following variables:

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import abc
import asyncio
import datetime as dt
import logging

logger = logging.getLogger(__name__)


# this leaves around 1 min for synthetic job to complete
MAX_EXECUTOR_TIMEOUT = dt.timedelta(minutes=4).total_seconds()
DEFAULT_EXECUTOR_CLASS = 0
DEFAULT_EXECUTOR_TIMEOUT = dt.timedelta(minutes=4).total_seconds()


class ExecutorUnavailable(Exception):
pass


class ReservedExecutor:
def __init__(self, executor, timeout):
self.executor = executor
self.timeout = timeout
self.start_time = dt.datetime.now()

def is_expired(self):
return (dt.datetime.now() - self.start_time).total_seconds() > min(
MAX_EXECUTOR_TIMEOUT, self.timeout
)


class ExecutorClassPool:
def __init__(self, manager, executor_class: int, executor_count: int):
self.manager = manager
self.executor_class = executor_class
self._count = executor_count
self._executors = []
self._reservation_lock = asyncio.Lock()
self._heartbeat_task = asyncio.create_task(self._pool_cleanup_task())

async def reserve_executor(self, token, timeout):
async with self._reservation_lock:
while True:
if self.get_availability() == 0:
await asyncio.sleep(1)
else:
executor = await self.manager.start_new_executor(
token, self.executor_class, timeout
)
self._executors.append(ReservedExecutor(executor, timeout))
return executor

def set_count(self, executor_count):
self._count = executor_count

def get_availability(self):
return max(0, self._count - len(self._executors))

async def _pool_cleanup_task(self):
# TODO: this is a basic working logic - pool cleanup should be more robust
while True:
try:
await self._pool_cleanup()
except Exception as exc:
logger.error("Error occurred", exc_info=exc)
await asyncio.sleep(10)

async def _pool_cleanup(self):
executors_to_drop = set()
for reserved_executor in self._executors:
status = await self.manager.wait_for_executor(reserved_executor.executor, 1)
if status is not None:
executors_to_drop.add(reserved_executor)
else:
if reserved_executor.is_expired():
await self.manager.kill_executor(reserved_executor.executor)
executors_to_drop.add(reserved_executor)
still_running_executors = []
for reserved_executor in self._executors:
if reserved_executor not in executors_to_drop:
still_running_executors.append(reserved_executor)
self._executors = still_running_executors


class BaseExecutorManager(metaclass=abc.ABCMeta):
def __init__(self):
self._executor_class_pools = {}

@abc.abstractmethod
async def start_new_executor(self, token, executor_class, timeout):
"""Start spinning up an executor with `token` for given executor_class or raise ExecutorUnavailable if at capacity
`timeout` is provided so manager does not need to relay on pool cleanup to stop expired executor"""

@abc.abstractmethod
async def kill_executor(self, executor):
"""Kill running executor. It might be platform specific, so leave it to Manager implementation"""

@abc.abstractmethod
async def wait_for_executor(self, executor, timeout):
"""Wait for executor to finish the job for till timeout.
Have to return not None status if executor finished. If returned status is None it means that
executor is still running.
"""

@abc.abstractmethod
async def get_manifest(self) -> dict[int, int]:
"""Return executors manifest
Keys are executor class ids and values are number of supported executors for given executor class.
"""

async def get_executor_class_pool(self, executor_class):
manifest = await self.get_manifest()
for executor_class, executor_count in manifest.items():
pool = self._executor_class_pools.get(executor_class)
if pool is None:
pool = ExecutorClassPool(self, executor_class, executor_count)
self._executor_class_pools[executor_class] = pool
else:
pool.set_count(executor_count)
return self._executor_class_pools[executor_class]

async def reserve_executor(self, token):
# TODO: deprecated - new code should use `reserve_executor_class`
await self.reserve_executor_class(token, DEFAULT_EXECUTOR_CLASS, DEFAULT_EXECUTOR_TIMEOUT)

async def reserve_executor_class(self, token, executor_class, timeout):
pool = await self.get_executor_class_pool(executor_class)
await pool.reserve_executor(token, timeout)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
import pathlib
import subprocess
import sys

from django.conf import settings

from compute_horde_miner.miner.executor_manager._internal.base import (
DEFAULT_EXECUTOR_CLASS,
BaseExecutorManager,
)

this_dir = pathlib.Path(__file__).parent
executor_dir = this_dir / ".." / ".." / ".." / ".." / ".." / ".." / ".." / "executor"


class DevExecutorManager(BaseExecutorManager):
async def start_new_executor(self, token, executor_class, timeout):
return subprocess.Popen(
[sys.executable, "app/src/manage.py", "run_executor"],
env={
"MINER_ADDRESS": f"ws://{settings.ADDRESS_FOR_EXECUTORS}:{settings.PORT_FOR_EXECUTORS}",
"EXECUTOR_TOKEN": token,
"PATH": os.environ["PATH"],
},
cwd=executor_dir,
)

async def kill_executor(self, executor):
try:
executor.kill()
except OSError:
pass

async def wait_for_executor(self, executor, timeout):
try:
return executor.wait(timeout)
except subprocess.TimeoutExpired:
pass

async def get_manifest(self):
return {DEFAULT_EXECUTOR_CLASS: 1}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import asyncio
import logging
import subprocess

from django.conf import settings

from compute_horde_miner.miner.executor_manager._internal.base import (
DEFAULT_EXECUTOR_CLASS,
BaseExecutorManager,
ExecutorUnavailable,
)

PULLING_TIMEOUT = 300
DOCKER_STOP_TIMEOUT = 5

logger = logging.getLogger(__name__)


class DockerExecutor:
def __init__(self, process_executor, token):
self.process_executor = process_executor
self.token = token


class DockerExecutorManager(BaseExecutorManager):
async def start_new_executor(self, token, executor_class, timeout):
if settings.ADDRESS_FOR_EXECUTORS:
address = settings.ADDRESS_FOR_EXECUTORS
else:
address = (
subprocess.check_output(
[
"docker",
"inspect",
"-f",
"{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}",
"root_app_1",
]
)
.decode()
.strip()
)
if not settings.DEBUG_SKIP_PULLING_EXECUTOR_IMAGE:
process = await asyncio.create_subprocess_exec(
"docker", "pull", settings.EXECUTOR_IMAGE
)
try:
await asyncio.wait_for(process.communicate(), timeout=PULLING_TIMEOUT)
if process.returncode:
logger.error(
f"Pulling executor container failed with returncode={process.returncode}"
)
raise ExecutorUnavailable("Failed to pull executor image")
except TimeoutError:
process.kill()
logger.error(
"Pulling executor container timed out, pulling it from shell might provide more details"
)
raise ExecutorUnavailable("Failed to pull executor image")
process_executor = await asyncio.create_subprocess_exec( # noqa: S607
"docker",
"run",
"--rm",
"-e",
f"MINER_ADDRESS=ws://{address}:{settings.PORT_FOR_EXECUTORS}",
"-e",
f"EXECUTOR_TOKEN={token}",
"--name",
token,
# the executor must be able to spawn images on host
"-v",
"/var/run/docker.sock:/var/run/docker.sock",
"-v",
"/tmp:/tmp",
settings.EXECUTOR_IMAGE,
"python",
"manage.py",
"run_executor",
)
return DockerExecutor(process_executor, token)

async def kill_executor(self, executor):
# kill executor container first so it would not be able to report anything - job simply timeouts
process = await asyncio.create_subprocess_exec("docker", "stop", executor.token)
try:
await asyncio.wait_for(process.wait(), timeout=DOCKER_STOP_TIMEOUT)
except TimeoutError:
pass

process = await asyncio.create_subprocess_exec("docker", "stop", f"{executor.token}-job")
try:
await asyncio.wait_for(process.wait(), timeout=DOCKER_STOP_TIMEOUT)
except TimeoutError:
pass

try:
executor.process_executor.kill()
except OSError:
pass

async def wait_for_executor(self, executor, timeout):
try:
return await asyncio.wait_for(executor.process_executor.wait(), timeout=timeout)
except TimeoutError:
pass

async def get_manifest(self):
return {DEFAULT_EXECUTOR_CLASS: 1}
Loading

0 comments on commit e8dd2f7

Please sign in to comment.