Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
kkowalski-reef committed Jan 22, 2025
1 parent e4afae9 commit 6794022
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@
logger = logging.getLogger(__name__)


class ExecutorUnavailable(Exception):
class NoExecutorUnavailable(Exception):
"""
Thrown when no executor is available to handle a request.
"""
pass


class ExecutorFailedToStart(Exception):
class ExecutorFailed(Exception):
pass


Expand Down Expand Up @@ -48,15 +51,15 @@ async def reserve_executor(self, token, timeout):
async with self._reservation_lock:
if self.get_availability() == 0:
logger.warning("No executor available")
raise ExecutorUnavailable()
raise NoExecutorUnavailable()

try:
executor = await self.manager.start_new_executor(
token, self.executor_class, timeout
)
except Exception as exc:
logger.error("Error occurred", exc_info=exc)
raise ExecutorFailedToStart()
raise ExecutorFailed()

self._executors.append(ReservedExecutor(executor, timeout))
return executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

from django.conf import settings

from compute_horde_miner.miner.executor_manager._internal.base import BaseExecutorManager, ExecutorUnavailable
from compute_horde_miner.miner.executor_manager._internal.base import (
BaseExecutorManager,
)
from compute_horde_miner.miner.executor_manager.executor_port_dispenser import (
executor_port_dispenser,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

from compute_horde_miner.miner.executor_manager._internal.base import (
BaseExecutorManager,
ExecutorUnavailable,
ExecutorFailedToStart,
ExecutorFailed,
)
from compute_horde_miner.miner.executor_manager._internal.selector import (
HistoricalRandomMinerSelector,
Expand Down Expand Up @@ -61,13 +60,13 @@ async def start_new_executor(self, token, executor_class, timeout):
logger.error(
f"Pulling executor container failed with returncode={process.returncode}"
)
raise ExecutorFailedToStart("Failed to pull executor image")
raise ExecutorFailed("Failed to pull executor image")
except TimeoutError:
process.kill()
logger.error(
"Pulling executor container timed out, pulling it from shell might provide more details"
)
raise ExecutorFailedToStart("Failed to pull executor image")
raise ExecutorFailed("Failed to pull executor image")
hf_args = (
[]
if settings.HF_ACCESS_TOKEN is None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
DockerExecutor = v1.DockerExecutor
PULLING_TIMEOUT = v1.PULLING_TIMEOUT
DOCKER_STOP_TIMEOUT = v1.DOCKER_STOP_TIMEOUT
ExecutorUnavailable = v1.ExecutorUnavailable
ExecutorFailedToStart = v1.ExecutorFailedToStart
ExecutorUnavailable = v1.NoExecutorUnavailable
ExecutorFailedToStart = v1.ExecutorFailed


class BaseExecutorManager(v1.BaseExecutorManager):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from compute_horde_miner.miner.executor_manager._internal.base import (
BaseExecutorManager,
ExecutorClassPool,
ExecutorFailedToStart,
ExecutorUnavailable,
ExecutorFailed,
NoExecutorUnavailable,
ReservedExecutor,
)
from compute_horde_miner.miner.executor_manager._internal.dev import DevExecutorManager
Expand All @@ -25,8 +25,8 @@
"MAX_EXECUTOR_TIMEOUT",
"BaseExecutorManager",
"ExecutorClassPool",
"ExecutorUnavailable",
"ExecutorFailedToStart",
"NoExecutorUnavailable",
"ExecutorFailed",
"ReservedExecutor",
"DOCKER_STOP_TIMEOUT",
"PULLING_TIMEOUT",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,16 +371,16 @@ async def handle_initial_job_request(
output_field=DateTimeField(),
),
)
# .filter(
# is_organic=True,
# executor_class=msg.executor_class,
# timestamp__lte=now,
# valid_until__gte=now,
# miner_signature__isnull=False, # miner signature is needed to build a valid Receipt
# )
# .exclude(
# job_uuid=msg.job_uuid, # UUIDField doesn't support "__ne=..."
# )
.filter(
is_organic=True,
executor_class=msg.executor_class,
timestamp__lte=now,
valid_until__gte=now,
miner_signature__isnull=False, # miner signature is needed to build a valid Receipt
)
.exclude(
job_uuid=msg.job_uuid, # UUIDField doesn't support "__ne=..."
)
)
logger.info(f"Declining job {msg.job_uuid}: executor unavailable")
await self.send(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from compute_horde_miner.miner.executor_manager._internal.base import (
BaseExecutorManager,
ExecutorUnavailable,
NoExecutorUnavailable,
)
from compute_horde_miner.miner.executor_manager.base import (
BaseExecutorManager as BaseBaseExecutorManager,
Expand Down Expand Up @@ -112,7 +112,7 @@ async def test_executor_class_pool(dummy_manager):
assert pool.get_availability() == 0

# Test ExecutorUnavailable exception
with pytest.raises(ExecutorUnavailable):
with pytest.raises(NoExecutorUnavailable):
await pool.reserve_executor("token3", 20)

# Test executor completion
Expand Down Expand Up @@ -166,7 +166,7 @@ async def test_manager_reserve_executor_class(dummy_manager):
await dummy_manager.get_executor_class_pool(ExecutorClass.always_on__gpu_24gb)
).get_availability() == 0

with pytest.raises(ExecutorUnavailable):
with pytest.raises(NoExecutorUnavailable):
await dummy_manager.reserve_executor_class("token3", ExecutorClass.always_on__gpu_24gb, 10)


Expand Down Expand Up @@ -199,7 +199,7 @@ async def reserve(i):
f"token{i}", ExecutorClass.always_on__gpu_24gb, 5
)
return True
except ExecutorUnavailable:
except NoExecutorUnavailable:
return False

results = await asyncio.gather(*[reserve(i) for i in range(5)])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ class Job:
time_took: timedelta | None = None
correct: bool | None = None # returned correct answer (even if outside time limit)
success: bool = False # returned correct answer within time limit
excused: bool = False # declined but provided valid excuse
excused: bool = False # declined (reason=busy) but provided valid excuse
comment: str = "failed"
score: float = 0
# dancing bonus
Expand Down Expand Up @@ -480,8 +480,7 @@ def get_valid_decline_excuse_receipts(self) -> list[Receipt]:
network=settings.BITTENSOR_NETWORK,
)
allowed_validators: set[str] = {
# Vali should probably trust itself in any case.
self.ctx.own_keypair.ss58_address,
self.ctx.own_keypair.ss58_address, # Vali should probably trust itself in any case.
*(n.hotkey for n in validator_neurons),
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async def test_execute_miner_synthetic_jobs_job_declined(

await check_synthetic_job(job_uuid, miner.pk, SyntheticJob.Status.FAILED, NOT_SCORED)
await sync_to_async(check_system_events)(
SystemEvent.EventType.MINER_SYNTHETIC_JOB_FAILURE, SystemEvent.EventSubType.JOB_NOT_STARTED
SystemEvent.EventType.MINER_SYNTHETIC_JOB_FAILURE, SystemEvent.EventSubType.JOB_REJECTED
)


Expand Down

0 comments on commit 6794022

Please sign in to comment.