Skip to content

Commit

Permalink
Merge branch 'master' into llama-syn
Browse files Browse the repository at this point in the history
  • Loading branch information
mzukowski-reef authored Sep 30, 2024
2 parents f12f260 + ad0dad3 commit fd8e5d3
Show file tree
Hide file tree
Showing 17 changed files with 282 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*.sqlite3
*~
*.egg-info/
/.idea/
.idea/
.env
venv
.hypothesis
Expand Down
5 changes: 3 additions & 2 deletions compute_horde/compute_horde/executor_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ class ExecutorClassSpec:
}


# this leaves around 1 min for synthetic job to complete
MAX_EXECUTOR_TIMEOUT = timedelta(minutes=4).total_seconds()
# we split 144min 2 tempos window to 24 validators - this is total time after reservation,
# validator may wait spin_up time of executor class to synchronize running synthetic batch
MAX_EXECUTOR_TIMEOUT = timedelta(minutes=6).total_seconds()

DEFAULT_EXECUTOR_CLASS = ExecutorClass.spin_up_4min__gpu_24gb
DEFAULT_EXECUTOR_TIMEOUT = EXECUTOR_CLASS[DEFAULT_EXECUTOR_CLASS].spin_up_time
2 changes: 1 addition & 1 deletion compute_horde/compute_horde/test_base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def start_process(cls, args, additional_env: dict[str, str]):

@classmethod
def wait_for_process_start(cls, process_name, probe_function, process: subprocess.Popen):
for i in range(300):
for _ in range(300):
if probe_function():
return
if process.poll() is not None:
Expand Down
5 changes: 3 additions & 2 deletions compute_horde/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,18 @@ line-length = 100

[tool.ruff.lint]
# TODO add D
select = ["E", "F", "I", "UP"]
select = ["E", "F", "I", "UP", "B"]
# TODO: remove E501 once docstrings are formatted
ignore = [
"D100", "D105", "D107", "D200", "D202", "D203", "D205", "D212", "D400", "D401", "D415",
"D101", "D102","D103", "D104", # TODO remove once we have docstring for all public methods
"E501", # TODO: remove E501 once docstrings are formatted
"B027", "B904", "B905",
]

[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"]
"test/**" = ["D", "F403", "F405"]
"**/tests/**" = ["D", "F403", "F405", "B018"]

[tool.codespell]
skip = 'pdm.lock'
Expand Down
2 changes: 1 addition & 1 deletion executor/bin/rotate-local-backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def rotate_backups(path, file_count):
files = files[:-file_count]
if files:
print(f"Removing {len(files)} old files")
for mtime, f in files:
for _mtime, f in files:
f.unlink()
else:
print("No old files to remove")
Expand Down
5 changes: 3 additions & 2 deletions executor/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,18 @@ line-length = 100

[tool.ruff.lint]
# TODO add D
select = ["E", "F", "I", "UP"]
select = ["E", "F", "I", "UP", "B"]
# TODO: remove E501 once docstrings are formatted
ignore = [
"D100", "D105", "D107", "D200", "D202", "D203", "D205", "D212", "D400", "D401", "D415",
"D101", "D102","D103", "D104", # TODO remove once we have docstring for all public methods
"E501", # TODO: remove E501 once docstrings are formatted
"B027", "B904", "B905",
]

[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"]
"test/**" = ["D", "F403", "F405"]
"**/tests/**" = ["D", "F403", "F405", "B018"]

[tool.codespell]
skip = '*.min.js,*.lock,*/monitoring_certs/*'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time

from compute_horde.executor_class import (
EXECUTOR_CLASS,
MAX_EXECUTOR_TIMEOUT,
ExecutorClass,
)
Expand All @@ -29,6 +30,9 @@ def is_expired(self):


class ExecutorClassPool:
RESERVATION_TIMEOUT = MAX_EXECUTOR_TIMEOUT
POOL_CLEANUP_PERIOD = 10

def __init__(self, manager, executor_class: ExecutorClass, executor_count: int):
self.manager = manager
self.executor_class = executor_class
Expand All @@ -42,7 +46,7 @@ async def reserve_executor(self, token, timeout):
async with self._reservation_lock:
while True:
if self.get_availability() == 0:
if time.time() - start < MAX_EXECUTOR_TIMEOUT:
if time.time() - start < self.RESERVATION_TIMEOUT:
await asyncio.sleep(1)
else:
logger.warning("Error unavailable after timeout")
Expand Down Expand Up @@ -71,26 +75,36 @@ async def _pool_cleanup_loop(self):
await self._pool_cleanup()
except Exception as exc:
logger.error("Error occurred", exc_info=exc)
await asyncio.sleep(10)
await asyncio.sleep(self.POOL_CLEANUP_PERIOD)

async def _pool_cleanup(self):
executors_to_drop = set()
for reserved_executor in self._executors:
async def check_executor(reserved_executor):
status = await self.manager.wait_for_executor(reserved_executor.executor, 1)
if status is not None:
executors_to_drop.add(reserved_executor)
else:
if reserved_executor.is_expired():
await self.manager.kill_executor(reserved_executor.executor)
executors_to_drop.add(reserved_executor)
still_running_executors = []
for reserved_executor in self._executors:
if reserved_executor not in executors_to_drop:
still_running_executors.append(reserved_executor)
self._executors = still_running_executors
return reserved_executor, True
elif reserved_executor.is_expired():
await self.manager.kill_executor(reserved_executor.executor)
return reserved_executor, True
return reserved_executor, False

results = await asyncio.gather(
*[check_executor(reserved_executor) for reserved_executor in self._executors]
)

executors_to_drop = set(
reserved_executor for reserved_executor, should_drop in results if should_drop
)

self._executors = [
reserved_executor
for reserved_executor in self._executors
if reserved_executor not in executors_to_drop
]


class BaseExecutorManager(metaclass=abc.ABCMeta):
EXECUTOR_TIMEOUT_LEEWAY = dt.timedelta(seconds=30).total_seconds()

def __init__(self):
self._executor_class_pools = {}

Expand Down Expand Up @@ -119,7 +133,7 @@ async def get_manifest(self) -> dict[ExecutorClass, int]:
Keys are executor class ids and values are number of supported executors for given executor class.
"""

async def get_executor_class_pool(self, executor_class):
async def _sync_pools_with_manifest(self):
manifest = await self.get_manifest()
for executor_class, executor_count in manifest.items():
pool = self._executor_class_pools.get(executor_class)
Expand All @@ -128,8 +142,18 @@ async def get_executor_class_pool(self, executor_class):
self._executor_class_pools[executor_class] = pool
else:
pool.set_count(executor_count)

async def get_executor_class_pool(self, executor_class):
await self._sync_pools_with_manifest()
return self._executor_class_pools[executor_class]

async def reserve_executor_class(self, token, executor_class, timeout):
pool = await self.get_executor_class_pool(executor_class)
await pool.reserve_executor(token, timeout)
await pool.reserve_executor(token, self.get_total_timeout(executor_class, timeout))

def get_total_timeout(self, executor_class, job_timeout):
spec = EXECUTOR_CLASS.get(executor_class)
spin_up_time = 0
if spec is not None:
spin_up_time = spec.spin_up_time
return spin_up_time + job_timeout + self.EXECUTOR_TIMEOUT_LEEWAY
208 changes: 208 additions & 0 deletions miner/app/src/compute_horde_miner/miner/tests/test_executor_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import asyncio
from contextlib import asynccontextmanager
from unittest.mock import patch

import pytest
import pytest_asyncio
from compute_horde.executor_class import ExecutorClass

from compute_horde_miner.miner.executor_manager._internal.base import (
BaseExecutorManager,
ExecutorUnavailable,
)


class DummyExecutor:
def __init__(self, execution_time):
self.execution_time = execution_time
self.task = None

async def run(self):
try:
await asyncio.sleep(self.execution_time)
return "Completed"
except asyncio.CancelledError:
return "Cancelled"


class DummyExecutorManager(BaseExecutorManager):
def __init__(self, manifest, runtime_offset=0):
super().__init__()
self.manifest = manifest
self.executors = []
self.runtime_offset = runtime_offset
self.EXECUTOR_TIMEOUT_LEEWAY = 0

@asynccontextmanager
async def set_runtime_offset(self, offset):
old_offset = self.runtime_offset
self.runtime_offset = offset
try:
yield
finally:
self.runtime_offset = old_offset

async def start_new_executor(self, token, executor_class, timeout):
executor = DummyExecutor(timeout + self.runtime_offset)
executor.task = asyncio.create_task(executor.run())
self.executors.append(executor)
return executor

async def kill_executor(self, executor):
if executor.task and not executor.task.done():
executor.task.cancel()
if executor in self.executors:
self.executors.remove(executor)

async def wait_for_executor(self, executor, timeout):
try:
return await asyncio.wait_for(asyncio.shield(executor.task), timeout)
except TimeoutError:
return None

async def get_manifest(self):
return self.manifest


@pytest_asyncio.fixture
async def dummy_manager():
manifest = {
ExecutorClass.always_on__gpu_24gb: 2,
}
manager = DummyExecutorManager(manifest, runtime_offset=-2)
yield manager
for pool in manager._executor_class_pools.values():
pool._pool_cleanup_task.cancel()
try:
await pool._pool_cleanup_task
except asyncio.CancelledError:
pass


@pytest.mark.asyncio
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT",
0,
)
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD",
0.1,
)
async def test_executor_class_pool(dummy_manager):
# Test reserving executors
pool = await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb)

executor1 = await pool.reserve_executor("token1", 2)
assert isinstance(executor1, DummyExecutor)
assert pool.get_availability() == 1

executor2 = await pool.reserve_executor("token2", 7)
assert isinstance(executor2, DummyExecutor)
assert pool.get_availability() == 0

# Test ExecutorUnavailable exception
with pytest.raises(ExecutorUnavailable):
await pool.reserve_executor("token3", 20)

# Test executor completion
status = await dummy_manager.wait_for_executor(executor1, 1)
assert status == "Completed"
status = await dummy_manager.wait_for_executor(executor2, 1)
assert status is None

# Allow time for pool cleanup
await asyncio.sleep(1)
assert pool.get_availability() == 1

# Test executor completion
status = await dummy_manager.wait_for_executor(executor2, 7)
assert status == "Completed"

# Allow time for pool cleanup
await asyncio.sleep(1)
assert pool.get_availability() == 2

# Test long-running executor
async with dummy_manager.set_runtime_offset(5):
long_running_executor = await pool.reserve_executor("token4", 5)

# Wait a bit, but not long enough for the executor to complete
status = await dummy_manager.wait_for_executor(long_running_executor, 2)
assert status is None

# Wait for the executor to be killed by the cleanup process
status = await dummy_manager.wait_for_executor(long_running_executor, 10)
assert status == "Cancelled"

# Allow time for pool cleanup
await asyncio.sleep(1)
assert pool.get_availability() == 2


@pytest.mark.asyncio
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT",
0,
)
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD",
0.1,
)
async def test_manager_reserve_executor_class(dummy_manager):
await dummy_manager.reserve_executor_class("token1", ExecutorClass.always_on__gpu_24gb, 10)
assert (
await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb)
).get_availability() == 1

await dummy_manager.reserve_executor_class("token2", ExecutorClass.always_on__gpu_24gb, 10)
assert (
await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb)
).get_availability() == 0

with pytest.raises(ExecutorUnavailable):
await dummy_manager.reserve_executor_class("token3", ExecutorClass.always_on__gpu_24gb, 10)


@pytest.mark.asyncio
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT",
0,
)
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD",
0.1,
)
async def test_manifest_update(dummy_manager):
pool = await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb)
assert pool._count == 2

# Update manifest
dummy_manager.manifest = {ExecutorClass.always_on__gpu_24gb: 3}

# Get pool again to trigger update
pool = await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb)
assert pool._count == 3


@pytest.mark.asyncio
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.RESERVATION_TIMEOUT",
0,
)
@patch(
"compute_horde_miner.miner.executor_manager._internal.base.ExecutorClassPool.POOL_CLEANUP_PERIOD",
0.1,
)
async def test_concurrent_reservations(dummy_manager):
async def reserve(i):
try:
await dummy_manager.reserve_executor_class(
f"token{i}", ExecutorClass.always_on__gpu_24gb, 5
)
return True
except ExecutorUnavailable:
return False

results = await asyncio.gather(*[reserve(i) for i in range(5)])
assert results.count(True) == 2
assert results.count(False) == 3
Loading

0 comments on commit fd8e5d3

Please sign in to comment.