Skip to content

Commit

Permalink
enable streaming synthetic jobs flow
Browse files Browse the repository at this point in the history
  • Loading branch information
andreea-popescu-reef committed Jan 7, 2025
1 parent 49bec89 commit 5acb2a1
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 22 deletions.
10 changes: 10 additions & 0 deletions validator/app/src/compute_horde_validator/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,16 @@ def wrapped(*args, **kwargs):
"Maximum retries for organic jobs",
int,
),
"DYNAMIC_SYNTHETIC_STREAMING_JOB_EXECUTOR_CLASSES": (
"always_on.llm.a6000",
"Comma separated list of classes to run streaming jobs on during synthetic jobs batch runs",
str,
),
"DYNAMIC_SYNTHETIC_STREAMING_JOB_READY_TIMEOUT": (
300,
"Timeout for waiting for a streaming job to be ready to accept connections from the user",
int,
),
}
DYNAMIC_CONFIG_CACHE_TIMEOUT = 300

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ def executor_class_value_map_parser(
return result


def executor_class_array_parser(value_map_str: str) -> set[ExecutorClass]:
result = set()
for executor_class_str in value_map_str.split(","):
with suppress(ValueError):
result.add(ExecutorClass(executor_class_str))
return result


async def get_miner_max_executors_per_class() -> dict[ExecutorClass, int]:
miner_max_executors_per_class: str = await aget_config("DYNAMIC_MINER_MAX_EXECUTORS_PER_CLASS")
result = {
Expand All @@ -82,3 +90,8 @@ def get_executor_class_weights() -> dict[ExecutorClass, float]:
return executor_class_value_map_parser(
config.DYNAMIC_EXECUTOR_CLASS_WEIGHTS, value_parser=float
)


async def get_streaming_job_executor_classes() -> set[ExecutorClass]:
value = await aget_config("DYNAMIC_SYNTHETIC_STREAMING_JOB_EXECUTOR_CLASSES")
return executor_class_array_parser(value)
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any, Protocol

import bittensor
Expand All @@ -16,6 +17,7 @@
from compute_horde.base.output_upload import OutputUpload
from compute_horde.base.volume import Volume
from compute_horde.base_requests import BaseRequest
from compute_horde.certificate import generate_certificate_at
from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS, EXECUTOR_CLASS, ExecutorClass
from compute_horde.miner_client.base import (
AbstractMinerClient,
Expand All @@ -35,6 +37,8 @@
V0JobFailedRequest,
V0JobFinishedRequest,
V0MachineSpecsRequest,
V0StreamingJobNotReadyRequest,
V0StreamingJobReadyRequest,
)
from compute_horde.mv_protocol.validator_requests import (
AuthenticationPayload,
Expand All @@ -43,6 +47,7 @@
V0JobAcceptedReceiptRequest,
V0JobFinishedReceiptRequest,
V0JobRequest,
V1InitialJobRequest,
)
from compute_horde.receipts.models import JobAcceptedReceipt, JobFinishedReceipt, JobStartedReceipt
from compute_horde.receipts.schemas import (
Expand All @@ -58,7 +63,11 @@
from django.db.models import BooleanField, Count, ExpressionWrapper, Q
from pydantic import BaseModel, JsonValue

from compute_horde_validator.validator.dynamic_config import get_miner_max_executors_per_class
from compute_horde_validator.validator.dynamic_config import (
aget_config,
get_miner_max_executors_per_class,
get_streaming_job_executor_classes,
)
from compute_horde_validator.validator.models import (
Miner,
MinerManifest,
Expand All @@ -73,6 +82,7 @@
BaseSyntheticJobGenerator,
)
from compute_horde_validator.validator.synthetic_jobs.generator.llm_prompts import (
LlmPromptsJobGenerator,
LlmPromptsSyntheticJobGenerator,
)
from compute_horde_validator.validator.synthetic_jobs.scoring import get_manifest_multiplier
Expand Down Expand Up @@ -277,6 +287,13 @@ class Job:
executor_response_time: datetime | None = None
executor_response_event: asyncio.Event = field(default_factory=asyncio.Event)

# streaming job support
streaming_job_ready_response: (
V0StreamingJobReadyRequest | V0StreamingJobNotReadyRequest | None
) = None
streaming_job_ready_response_time: datetime | None = None
streaming_job_ready_response_event: asyncio.Event = field(default_factory=asyncio.Event)

job_barrier_time: datetime | None = None
job_before_sent_time: datetime | None = None
job_after_sent_time: datetime | None = None
Expand Down Expand Up @@ -331,6 +348,14 @@ def handle_message(self, msg: BaseRequest) -> None:
else:
duplicate = True

case V0StreamingJobReadyRequest() | V0StreamingJobNotReadyRequest():
if self.streaming_job_ready_response is None:
self.streaming_job_ready_response = msg
self.streaming_job_ready_response_time = datetime.now(tz=UTC)
self.streaming_job_ready_response_event.set()
else:
duplicate = True

case V0JobFinishedRequest() | V0JobFailedRequest():
if self.job_response is None:
self.job_response = msg
Expand Down Expand Up @@ -386,6 +411,7 @@ def emit_telemetry_event(self) -> SystemEvent | None:
accept_response=_model_dump(self.accept_response),
accept_response_time=_datetime_dump(self.accept_response_time),
executor_response=_model_dump(self.executor_response),
streaming_job_ready_response=_model_dump(self.streaming_job_ready_response),
executor_response_time=_datetime_dump(self.executor_response_time),
job_barrier_time=_datetime_dump(self.job_barrier_time),
job_before_sent_time=_datetime_dump(self.job_before_sent_time),
Expand Down Expand Up @@ -423,6 +449,11 @@ class BatchContext:
uuid: str
own_keypair: bittensor.Keypair

# validator creds for streaming jobs
own_public_key: str
own_certs: tuple[str, str]
certs_basepath: Path

# randomized, but order preserving list of miner.hotkeys
# used to go from indices returned by asyncio.gather() back to miner.hotkey
hotkeys: list[str]
Expand All @@ -431,7 +462,7 @@ class BatchContext:
axons: dict[str, bittensor.AxonInfo]
# full name for easier debugging: "{miner_hotkey}({ip}:{port})"
names: dict[str, str]
miners: dict[str, Miner]
miners: dict[str, Miner] # hotkey -> Miner
clients: dict[str, MinerClient]
executors: dict[str, defaultdict[ExecutorClass, int]]
job_generators: dict[str, dict[ExecutorClass, list[BaseSyntheticJobGenerator]]]
Expand Down Expand Up @@ -546,6 +577,7 @@ def emit_telemetry_event(self) -> SystemEvent | None:
for msg in (
job.accept_response,
job.executor_response,
job.streaming_job_ready_response,
job.job_response,
job.machine_specs,
):
Expand Down Expand Up @@ -753,10 +785,17 @@ def _init_context(
own_keypair = own_wallet.get_hotkey()
create_miner_client = create_miner_client or MinerClient

# TODO move somewhere else - gen a certificate per batch or not?
# Generate validator certificate
dir_path, public_key, certs = generate_certificate_at()

ctx = BatchContext(
batch_id=batch_id,
uuid=str(uuid.uuid4()),
own_keypair=own_keypair,
certs_basepath=dir_path,
own_public_key=public_key,
own_certs=certs,
hotkeys=[],
axons={},
names={},
Expand Down Expand Up @@ -1015,6 +1054,7 @@ async def get_llm_prompt_samples(ctx: BatchContext) -> list[PromptSample] | None


async def _generate_jobs(ctx: BatchContext) -> None:
streaming_classes = await get_streaming_job_executor_classes()
start_time = time.time()
generated_job_count = 0

Expand Down Expand Up @@ -1045,6 +1085,9 @@ async def _generate_jobs(ctx: BatchContext) -> None:
"s3_url": prompt_sample.series.s3_url,
"seed": prompt_sample.workload.seed,
}
# enable streaming for specific llm jobs executor classes
if executor_class in streaming_classes:
kwargs["streaming"] = True

job_generator = await current.synthetic_job_generator_factory.create(
executor_class, **kwargs
Expand Down Expand Up @@ -1079,6 +1122,7 @@ async def _send_initial_job_request(
) -> None:
job: Job | None = None
try:
streaming_classes = await get_streaming_job_executor_classes()
await start_barrier.wait()
barrier_time = datetime.now(tz=UTC)

Expand All @@ -1093,14 +1137,27 @@ async def _send_initial_job_request(
stagger_wait_interval = max_spin_up_time - job.get_spin_up_time()
assert stagger_wait_interval >= 0

request = V0InitialJobRequest(
job_uuid=job.uuid,
executor_class=job.executor_class,
base_docker_image_name=job.job_generator.base_docker_image_name(),
timeout_seconds=job.job_generator.timeout_seconds(),
volume=job.volume if job.job_generator.volume_in_initial_req() else None,
job_started_receipt_payload=job.job_started_receipt_payload,
job_started_receipt_signature=job.job_started_receipt_signature,
request = (
V1InitialJobRequest(
job_uuid=job.uuid,
executor_class=job.executor_class,
base_docker_image_name=job.job_generator.base_docker_image_name(),
timeout_seconds=job.job_generator.timeout_seconds(),
volume=job.volume if job.job_generator.volume_in_initial_req() else None,
job_started_receipt_payload=job.job_started_receipt_payload,
job_started_receipt_signature=job.job_started_receipt_signature,
public_key=ctx.own_public_key,
)
if job.executor_class in streaming_classes
else V0InitialJobRequest(
job_uuid=job.uuid,
executor_class=job.executor_class,
base_docker_image_name=job.job_generator.base_docker_image_name(),
timeout_seconds=job.job_generator.timeout_seconds(),
volume=job.volume if job.job_generator.volume_in_initial_req() else None,
job_started_receipt_payload=job.job_started_receipt_payload,
job_started_receipt_signature=job.job_started_receipt_signature,
)
)
request_json = request.model_dump_json()

Expand Down Expand Up @@ -1144,7 +1201,11 @@ async def _send_initial_job_request(


async def _send_job_request(
ctx: BatchContext, start_barrier: asyncio.Barrier, job_uuid: str
ctx: BatchContext,
start_barrier: asyncio.Barrier,
streaming_start_barrier: asyncio.Barrier,
job_uuid: str,
is_streaming: bool,
) -> None:
await start_barrier.wait()
barrier_time = datetime.now(tz=UTC)
Expand Down Expand Up @@ -1173,6 +1234,9 @@ async def _send_job_request(
await client.send_check(request_json)
job.job_after_sent_time = datetime.now(tz=UTC)

if is_streaming:
await _trigger_streaming_job(ctx, streaming_start_barrier, job_uuid)

await job.job_response_event.wait()


Expand Down Expand Up @@ -1211,6 +1275,13 @@ def _emit_decline_or_failure_events(ctx: BatchContext) -> None:
subtype=SystemEvent.EventSubType.JOB_NOT_STARTED,
description="refused",
)
if isinstance(job.streaming_job_ready_response, V0StreamingJobReadyRequest):
logger.warning("%s failed to start streaming", job.name)
job.system_event(
type=SystemEvent.EventType.MINER_SYNTHETIC_JOB_FAILURE,
subtype=SystemEvent.EventSubType.JOB_NOT_STARTED,
description="failed to start streaming",
)
if isinstance(job.job_response, V0JobFailedRequest):
returncode = job.job_response.docker_process_exit_status
text = f"failed: {returncode=}"
Expand Down Expand Up @@ -1382,32 +1453,82 @@ async def _multi_send_initial_job_request(ctx: BatchContext) -> None:
_handle_exceptions(ctx, exceptions)


async def _trigger_streaming_job(
ctx: BatchContext, streaming_start_barrier: asyncio.Barrier, job_uuid: str
) -> None:
job = ctx.jobs[job_uuid]
timeout = await aget_config("DYNAMIC_SYNTHETIC_STREAMING_JOB_READY_TIMEOUT")
async with asyncio.timeout(timeout):
await job.streaming_job_ready_response_event.wait()
logger.debug(f"Received streaming job ready response for {job_uuid}")

response = job.streaming_job_ready_response
if isinstance(response, V0StreamingJobReadyRequest):
# Save job certificate received from executor
executor_cert_path = ctx.certs_basepath / "ssl" / f"executor_certificate_{job_uuid}.pem"
executor_cert_path.write_text(response.public_key)

# provide the synthetic job prompt seed to the streaming job
if isinstance(job.job_generator, LlmPromptsJobGenerator):
seed = job.job_generator.seed
else:
logger.error(f"Bad streaming job generator type: {job.job_generator}")
return

# wait to trigger all streaming jobs at the same time
await streaming_start_barrier.wait()

url = f"https://{response.ip}:{response.port}/execute-job"
async with httpx.AsyncClient(
verify=str(executor_cert_path), cert=ctx.own_certs
) as client:
try:
r = await client.post(url, json={"seed": seed}, headers={"Host": response.ip})
if r.status_code == 200:
logger.debug(
f"Successfully triggered job execution for {job_uuid} on {url}"
)
else:
raise Exception(
f"Bad response triggering streaming job {job_uuid} execution {url}: {r.status_code}, {r.text}"
)
except Exception as e:
raise Exception(
f"Error triggering streaming job {job_uuid} execution {url}: {e}"
)


async def _multi_send_job_request(ctx: BatchContext) -> None:
executor_ready_job_uuids = [
job.uuid
streaming_classes = await get_streaming_job_executor_classes()

executor_ready_jobs = [
(job.uuid, job.executor_class in streaming_classes)
for job in ctx.jobs.values()
if isinstance(job.executor_response, V0ExecutorReadyRequest)
# occasionally we can get a job response (V0JobFailedRequest | V0JobFinishedRequest)
# before sending the actual job request (V0JobRequest), for example because
# the executor decide to abort the job before the details were sent
and job.job_response is None
]
logger.info("Sending job requests for %d ready jobs", len(executor_ready_job_uuids))
start_barrier = asyncio.Barrier(len(executor_ready_job_uuids))
logger.info("Sending job requests for %d ready jobs", len(executor_ready_jobs))
start_barrier = asyncio.Barrier(len(executor_ready_jobs))

streaming_start_barrier = asyncio.Barrier(len([x for x in executor_ready_jobs if x[1]]))

tasks = [
asyncio.create_task(
_send_job_request(ctx, start_barrier, job_uuid),
_send_job_request(ctx, start_barrier, streaming_start_barrier, job_uuid, is_streaming),
name=f"{job_uuid}._send_job_request",
)
for job_uuid in executor_ready_job_uuids
for job_uuid, is_streaming in executor_ready_jobs
]

results = await asyncio.gather(*tasks, return_exceptions=True)

exceptions: list[ExceptionInfo] = []
for i, result in enumerate(results):
if isinstance(result, BaseException):
job_uuid = executor_ready_job_uuids[i]
job_uuid = executor_ready_jobs[i][0]
job = ctx.jobs[job_uuid]
job.exception = result
job.exception_time = datetime.now(tz=UTC)
Expand Down
Loading

0 comments on commit 5acb2a1

Please sign in to comment.