Skip to content

Commit

Permalink
Merge pull request #361 from backend-developers-ltd/debug-llm-failures
Browse files Browse the repository at this point in the history
Add system events for LLM failures and system events limits
  • Loading branch information
emnoor-reef authored Jan 10, 2025
2 parents 3ee6b0c + 6edb53b commit b593d98
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 7 deletions.
5 changes: 5 additions & 0 deletions validator/app/src/compute_horde_validator/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ def wrapped(*args, **kwargs):
"Maximum retries for organic jobs",
int,
),
"DYNAMIC_SYSTEM_EVENT_LIMITS": (
"MINER_SYNTHETIC_JOB_FAILURE,LLM_PROMPT_ANSWERS_MISSING,10",
"Limits of system events produced for each type-subtype pairs in a synthetic job run. Format: TYPE1,SUBTYPE1,100;TYPE2,SUBTYPE2,200",
str,
),
}
DYNAMIC_CONFIG_CACHE_TIMEOUT = 300

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
import time
from collections.abc import Callable
from contextlib import suppress
from typing import Any
from typing import Any, TypeAlias

import constance.utils
from asgiref.sync import sync_to_async
from compute_horde.executor_class import ExecutorClass
from constance import config
from django.conf import settings

from compute_horde_validator.validator.models import SystemEvent


class DynamicConfigHolder:
CACHE_TIMEOUT = 300
Expand Down Expand Up @@ -82,3 +84,22 @@ def get_executor_class_weights() -> dict[ExecutorClass, float]:
return executor_class_value_map_parser(
config.DYNAMIC_EXECUTOR_CLASS_WEIGHTS, value_parser=float
)


LimitsDict: TypeAlias = dict[tuple[SystemEvent.EventType, SystemEvent.EventSubType], int]


def parse_system_event_limits(raw_limits: str) -> LimitsDict:
limits = {}
for limit_item in raw_limits.split(";"):
with suppress(ValueError):
type_str, subtype_str, limit_str = limit_item.split(",")
type_ = SystemEvent.EventType(type_str)
subtype = SystemEvent.EventSubType(subtype_str)
limits[(type_, subtype)] = int(limit_str)
return limits


async def get_system_event_limits() -> LimitsDict:
raw_limits: str = await aget_config("DYNAMIC_SYSTEM_EVENT_LIMITS")
return parse_system_event_limits(raw_limits)
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@
from django.db.models import BooleanField, Count, ExpressionWrapper, Q
from pydantic import BaseModel, JsonValue

from compute_horde_validator.validator.dynamic_config import get_miner_max_executors_per_class
from compute_horde_validator.validator.dynamic_config import (
LimitsDict,
get_miner_max_executors_per_class,
get_system_event_limits,
)
from compute_horde_validator.validator.models import (
Miner,
MinerManifest,
Expand Down Expand Up @@ -456,6 +460,11 @@ class BatchContext:
events: list[SystemEvent]
event_count: int

# limit system events by type-subtype pairs configured in dynamic config
event_limits: LimitsDict
# events count per type-subtype, this is needed for enforcing limits
event_limits_usage: LimitsDict

stage_start_time: dict[str, datetime]
average_job_send_time: timedelta | None = None

Expand All @@ -476,6 +485,14 @@ def system_event(
func: str | None = None,
append: bool = True,
) -> SystemEvent | None:
if (type, subtype) in self.event_limits:
if self.event_limits_usage[(type, subtype)] >= self.event_limits[(type, subtype)]:
logger.warning(
f"Discarding system event for exceeding limit {type=} {subtype=} {description=}"
)
return None
self.event_limits_usage[(type, subtype)] += 1

if data is None:
data = {}

Expand Down Expand Up @@ -743,7 +760,7 @@ class _MinerClientFactoryProtocol(Protocol):
def __call__(self, ctx: BatchContext, miner_hotkey: str) -> MinerClient: ...


def _init_context(
async def _init_context(
axons: dict[str, bittensor.AxonInfo],
serving_miners: list[Miner],
batch_id: int | None = None,
Expand All @@ -752,6 +769,7 @@ def _init_context(
own_wallet = settings.BITTENSOR_WALLET()
own_keypair = own_wallet.get_hotkey()
create_miner_client = create_miner_client or MinerClient
event_limits = await get_system_event_limits()

ctx = BatchContext(
batch_id=batch_id,
Expand All @@ -772,6 +790,8 @@ def _init_context(
jobs={},
events=[],
event_count=0,
event_limits=event_limits,
event_limits_usage=defaultdict(int),
stage_start_time={},
_loop=asyncio.get_running_loop(),
)
Expand Down Expand Up @@ -1533,6 +1553,25 @@ async def _score_job(ctx: BatchContext, job: Job) -> None:
),
)

# NOTE: We generally want data to be dict[str, str].
# Since this code block is here only for debugging purpose,
# we are passing non-conforming data here with 'type: ignore'.
if isinstance(job.job_generator, LlmPromptsSyntheticJobGenerator):
job.system_event(
type=SystemEvent.EventType.MINER_SYNTHETIC_JOB_FAILURE,
subtype=SystemEvent.EventSubType.LLM_PROMPT_ANSWERS_MISSING,
description="failed synthetic llm job details",
data={
"prompts_url": job.job_generator.s3_url,
"answers_url": job.job_generator.url_for_download(),
"seed": job.job_generator.seed, # type: ignore
"known_answers": { # type: ignore
p.content: p.answer for p in job.job_generator.expected_prompts
},
"job_response": job.job_response.model_dump(mode="json"), # type: ignore
},
)

logger.info(
"%s finished with %s in %.2f seconds with score %.6g: %s",
job.name,
Expand Down Expand Up @@ -1894,7 +1933,7 @@ async def execute_synthetic_batch_run(
# randomize the order of miners each batch to avoid systemic bias
random.shuffle(serving_miners)

ctx = _init_context(axons, serving_miners, batch_id, create_miner_client)
ctx = await _init_context(axons, serving_miners, batch_id, create_miner_client)
await ctx.checkpoint_system_event("BATCH_BEGIN", dt=start_time)

try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def _url_for_upload(self) -> str:
prefix=self.s3_output_prefix,
)

def _url_for_download(self) -> str:
def url_for_download(self) -> str:
return get_public_url(
key=self.s3_output_key,
bucket_name=self.s3_output_bucket,
Expand Down Expand Up @@ -92,7 +92,7 @@ async def output_upload(self) -> OutputUpload | None:
)

async def download_answers(self):
response = await download_file_content(self._url_for_download())
response = await download_file_content(self.url_for_download())
self.prompt_answers = pydantic.TypeAdapter(dict[str, str]).validate_json(response)

def verify(self, msg: V0JobFinishedRequest, time_took: float) -> tuple[bool, str, float]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from compute_horde_validator.validator.dynamic_config import parse_system_event_limits
from compute_horde_validator.validator.models import SystemEvent


def test_parse_system_event_limits_empty():
assert parse_system_event_limits("") == {}


def test_parse_system_event_limits_single():
type = SystemEvent.EventType.LLM_PROMPT_SAMPLING
subtype = SystemEvent.EventSubType.SUCCESS
count = 42
assert parse_system_event_limits(f"{type.value},{subtype.value},{count}") == {
(type, subtype): count,
}


def test_parse_system_event_limits_multiple():
type1 = SystemEvent.EventType.LLM_PROMPT_SAMPLING
subtype1 = SystemEvent.EventSubType.SUCCESS
count1 = 42
type2 = SystemEvent.EventType.LLM_PROMPT_SAMPLING
subtype2 = SystemEvent.EventSubType.SUCCESS
count2 = 72
assert parse_system_event_limits(
f"{type1.value},{subtype1.value},{count1};{type2.value},{subtype2.value},{count2}"
) == {
(type1, subtype1): count1,
(type2, subtype2): count2,
}


def test_parse_system_event_limits_malformed_skipped():
assert parse_system_event_limits("malformed") == {}


def test_parse_system_event_limits_partial_malformed_skipped():
type = SystemEvent.EventType.LLM_PROMPT_SAMPLING
subtype = SystemEvent.EventSubType.SUCCESS
count = 42
assert parse_system_event_limits(f"malformed;{type.value},{subtype.value},{count}") == {
(type, subtype): count,
}


def test_parse_system_event_limits_missing_type_skipped():
type = "missing"
subtype = SystemEvent.EventSubType.SUCCESS
count = 42
assert parse_system_event_limits(f"malformed;{type},{subtype.value},{count}") == {}


def test_parse_system_event_limits_missing_subtype_skipped():
type = SystemEvent.EventType.LLM_PROMPT_SAMPLING
subtype = "missing"
count = 42
assert parse_system_event_limits(f"malformed;{type.value},{subtype},{count}") == {}


def test_parse_system_event_limits_invalid_count_skipped():
type = SystemEvent.EventType.LLM_PROMPT_SAMPLING
subtype = SystemEvent.EventSubType.SUCCESS
count = "wat?"
assert parse_system_event_limits(f"malformed;{type.value},{subtype.value},{count}") == {}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pytest
from freezegun import freeze_time

from compute_horde_validator.validator.models import SystemEvent
from compute_horde_validator.validator.synthetic_jobs.batch_run import (
_init_context,
_not_enough_prompts_system_event,
Expand All @@ -13,7 +14,7 @@
@pytest.mark.django_db
async def test_not_enough_prompts_system_event_is_not_repeated():
initial_date = datetime(2020, 1, 1)
ctx = _init_context(axons={}, serving_miners=[])
ctx = await _init_context(axons={}, serving_miners=[])

with freeze_time(initial_date) as frozen_datetime:
await _not_enough_prompts_system_event(ctx)
Expand All @@ -30,3 +31,52 @@ async def test_not_enough_prompts_system_event_is_not_repeated():
# after 24h cached window, should add event
await _not_enough_prompts_system_event(ctx)
assert len(ctx.events) == 2


@pytest.mark.asyncio
@pytest.mark.django_db(databases=["default", "default_alias"], transaction=True)
# @pytest.mark.override_config(
# DYNAMIC_SYSTEM_EVENT_LIMITS="MINER_SYNTHETIC_JOB_FAILURE,LLM_PROMPT_ANSWERS_MISSING,2",
# )
async def test_system_event_limits():
# TODO: Fix constance patching. The override_config marker or the patch_constance decorator
# does not patch constance.utils.get_values. So values used from the cache holder
# does not get patched.
# This test is currently dependent on the default value of the config.

ctx = await _init_context(axons={}, serving_miners=[])

for _ in range(20):
# limited type
ctx.system_event(
type=SystemEvent.EventType.MINER_SYNTHETIC_JOB_FAILURE,
subtype=SystemEvent.EventSubType.LLM_PROMPT_ANSWERS_MISSING,
description="",
)
# not limited type
ctx.system_event(
type=SystemEvent.EventType.MINER_SYNTHETIC_JOB_SUCCESS,
subtype=SystemEvent.EventSubType.SUCCESS,
description="",
)

# limited type should be limited
assert (
sum(
1
for e in ctx.events
if e.type == SystemEvent.EventType.MINER_SYNTHETIC_JOB_FAILURE
and e.subtype == SystemEvent.EventSubType.LLM_PROMPT_ANSWERS_MISSING
)
== 10
)
# not limited type should not be limited
assert (
sum(
1
for e in ctx.events
if e.type == SystemEvent.EventType.MINER_SYNTHETIC_JOB_SUCCESS
and e.subtype == SystemEvent.EventSubType.SUCCESS
)
== 20
)

0 comments on commit b593d98

Please sign in to comment.