Skip to content

Commit

Permalink
Merge pull request #241 from backend-developers-ltd/fix-executor-manager
Browse files Browse the repository at this point in the history
Fix executor manager timeout handling
  • Loading branch information
mzukowski-reef authored Sep 30, 2024
2 parents dec5045 + 3d76bcd commit ad0dad3
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 17 deletions.
5 changes: 3 additions & 2 deletions compute_horde/compute_horde/executor_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ class ExecutorClassSpec:
}


# this leaves around 1 min for synthetic job to complete
MAX_EXECUTOR_TIMEOUT = timedelta(minutes=4).total_seconds()
# we split 144min 2 tempos window to 24 validators - this is total time after reservation,
# validator may wait spin_up time of executor class to synchronize running synthetic batch
MAX_EXECUTOR_TIMEOUT = timedelta(minutes=6).total_seconds()

DEFAULT_EXECUTOR_CLASS = ExecutorClass.spin_up_4min__gpu_24gb
DEFAULT_EXECUTOR_TIMEOUT = EXECUTOR_CLASS[DEFAULT_EXECUTOR_CLASS].spin_up_time
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time

from compute_horde.executor_class import (
EXECUTOR_CLASS,
MAX_EXECUTOR_TIMEOUT,
ExecutorClass,
)
Expand All @@ -29,6 +30,9 @@ def is_expired(self):


class ExecutorClassPool:
RESERVATION_TIMEOUT = MAX_EXECUTOR_TIMEOUT
POOL_CLEANUP_PERIOD = 10

def __init__(self, manager, executor_class: ExecutorClass, executor_count: int):
self.manager = manager
self.executor_class = executor_class
Expand All @@ -42,7 +46,7 @@ async def reserve_executor(self, token, timeout):
async with self._reservation_lock:
while True:
if self.get_availability() == 0:
if time.time() - start < MAX_EXECUTOR_TIMEOUT:
if time.time() - start < self.RESERVATION_TIMEOUT:
await asyncio.sleep(1)
else:
logger.warning("Error unavailable after timeout")
Expand Down Expand Up @@ -71,26 +75,36 @@ async def _pool_cleanup_loop(self):
await self._pool_cleanup()
except Exception as exc:
logger.error("Error occurred", exc_info=exc)
await asyncio.sleep(10)
await asyncio.sleep(self.POOL_CLEANUP_PERIOD)

async def _pool_cleanup(self):
executors_to_drop = set()
for reserved_executor in self._executors:
async def check_executor(reserved_executor):
status = await self.manager.wait_for_executor(reserved_executor.executor, 1)
if status is not None:
executors_to_drop.add(reserved_executor)
else:
if reserved_executor.is_expired():
await self.manager.kill_executor(reserved_executor.executor)
executors_to_drop.add(reserved_executor)
still_running_executors = []
for reserved_executor in self._executors:
if reserved_executor not in executors_to_drop:
still_running_executors.append(reserved_executor)
self._executors = still_running_executors
return reserved_executor, True
elif reserved_executor.is_expired():
await self.manager.kill_executor(reserved_executor.executor)
return reserved_executor, True
return reserved_executor, False

results = await asyncio.gather(
*[check_executor(reserved_executor) for reserved_executor in self._executors]
)

executors_to_drop = set(
reserved_executor for reserved_executor, should_drop in results if should_drop
)

self._executors = [
reserved_executor
for reserved_executor in self._executors
if reserved_executor not in executors_to_drop
]


class BaseExecutorManager(metaclass=abc.ABCMeta):
EXECUTOR_TIMEOUT_LEEWAY = dt.timedelta(seconds=30).total_seconds()

def __init__(self):
self._executor_class_pools = {}

Expand Down Expand Up @@ -135,4 +149,11 @@ async def get_executor_class_pool(self, executor_class):

async def reserve_executor_class(self, token, executor_class, timeout):
pool = await self.get_executor_class_pool(executor_class)
await pool.reserve_executor(token, timeout)
await pool.reserve_executor(token, self.get_total_timeout(executor_class, timeout))

def get_total_timeout(self, executor_class, job_timeout):
spec = EXECUTOR_CLASS.get(executor_class)
spin_up_time = 0
if spec is not None:
spin_up_time = spec.spin_up_time
return spin_up_time + job_timeout + self.EXECUTOR_TIMEOUT_LEEWAY
208 changes: 208 additions & 0 deletions miner/app/src/compute_horde_miner/miner/tests/test_executor_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import asyncio
from contextlib import asynccontextmanager
from unittest.mock import patch

import pytest
import pytest_asyncio
from compute_horde.executor_class import ExecutorClass

from compute_horde_miner.miner.executor_manager._internal.base import (
BaseExecutorManager,
ExecutorUnavailable,
)


class DummyExecutor:
def __init__(self, execution_time):
self.execution_time = execution_time
self.task = None

async def run(self):
try:
await asyncio.sleep(self.execution_time)
return "Completed"
except asyncio.CancelledError:
return "Cancelled"


class DummyExecutorManager(BaseExecutorManager):
def __init__(self, manifest, runtime_offset=0):
super().__init__()
self.manifest = manifest
self.executors = []
self.runtime_offset = runtime_offset
self.EXECUTOR_TIMEOUT_LEEWAY = 0

@asynccontextmanager
async def set_runtime_offset(self, offset):
old_offset = self.runtime_offset
self.runtime_offset = offset
try:
yield
finally:
self.runtime_offset = old_offset

async def start_new_executor(self, token, executor_class, timeout):
executor = DummyExecutor(timeout + self.runtime_offset)
executor.task = asyncio.create_task(executor.run())
self.executors.append(executor)
return executor

async def kill_executor(self, executor):
if executor.task and not executor.task.done():
executor.task.cancel()
if executor in self.executors:
self.executors.remove(executor)

async def wait_for_executor(self, executor, timeout):
try:
return await asyncio.wait_for(asyncio.shield(executor.task), timeout)
except TimeoutError:
return None

async def get_manifest(self):
return self.manifest


@pytest_asyncio.fixture
async def dummy_manager():
manifest = {
ExecutorClass.always_on__gpu_24gb: 2,
}
manager = DummyExecutorManager(manifest, runtime_offset=-2)
yield manager
for pool in manager._executor_class_pools.values():
pool._pool_cleanup_task.cancel()
try:
await pool._pool_cleanup_task
except asyncio.CancelledError:
pass


@pytest.mark.asyncio
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT",
0,
)
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD",
0.1,
)
async def test_executor_class_pool(dummy_manager):
# Test reserving executors
pool = await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb)

executor1 = await pool.reserve_executor("token1", 2)
assert isinstance(executor1, DummyExecutor)
assert pool.get_availability() == 1

executor2 = await pool.reserve_executor("token2", 7)
assert isinstance(executor2, DummyExecutor)
assert pool.get_availability() == 0

# Test ExecutorUnavailable exception
with pytest.raises(ExecutorUnavailable):
await pool.reserve_executor("token3", 20)

# Test executor completion
status = await dummy_manager.wait_for_executor(executor1, 1)
assert status == "Completed"
status = await dummy_manager.wait_for_executor(executor2, 1)
assert status is None

# Allow time for pool cleanup
await asyncio.sleep(1)
assert pool.get_availability() == 1

# Test executor completion
status = await dummy_manager.wait_for_executor(executor2, 7)
assert status == "Completed"

# Allow time for pool cleanup
await asyncio.sleep(1)
assert pool.get_availability() == 2

# Test long-running executor
async with dummy_manager.set_runtime_offset(5):
long_running_executor = await pool.reserve_executor("token4", 5)

# Wait a bit, but not long enough for the executor to complete
status = await dummy_manager.wait_for_executor(long_running_executor, 2)
assert status is None

# Wait for the executor to be killed by the cleanup process
status = await dummy_manager.wait_for_executor(long_running_executor, 10)
assert status == "Cancelled"

# Allow time for pool cleanup
await asyncio.sleep(1)
assert pool.get_availability() == 2


@pytest.mark.asyncio
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT",
0,
)
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD",
0.1,
)
async def test_manager_reserve_executor_class(dummy_manager):
await dummy_manager.reserve_executor_class("token1", ExecutorClass.always_on__gpu_24gb, 10)
assert (
await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb)
).get_availability() == 1

await dummy_manager.reserve_executor_class("token2", ExecutorClass.always_on__gpu_24gb, 10)
assert (
await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb)
).get_availability() == 0

with pytest.raises(ExecutorUnavailable):
await dummy_manager.reserve_executor_class("token3", ExecutorClass.always_on__gpu_24gb, 10)


@pytest.mark.asyncio
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT",
0,
)
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD",
0.1,
)
async def test_manifest_update(dummy_manager):
pool = await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb)
assert pool._count == 2

# Update manifest
dummy_manager.manifest = {ExecutorClass.always_on__gpu_24gb: 3}

# Get pool again to trigger update
pool = await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb)
assert pool._count == 3


@pytest.mark.asyncio
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT",
0,
)
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD",
0.1,
)
async def test_concurrent_reservations(dummy_manager):
async def reserve(i):
try:
await dummy_manager.reserve_executor_class(
f"token{i}", ExecutorClass.always_on__gpu_24gb, 5
)
return True
except ExecutorUnavailable:
return False

results = await asyncio.gather(*[reserve(i) for i in range(5)])
assert results.count(True) == 2
assert results.count(False) == 3

0 comments on commit ad0dad3

Please sign in to comment.