Skip to content

Commit

Permalink
Merge pull request #233 from backend-developers-ltd/llama-syn
Browse files Browse the repository at this point in the history
LLM prompts synthetic jobs
  • Loading branch information
mzukowski-reef authored Sep 30, 2024
2 parents ad0dad3 + fd8e5d3 commit e68968a
Show file tree
Hide file tree
Showing 36 changed files with 1,438 additions and 285 deletions.
6 changes: 5 additions & 1 deletion compute_horde/compute_horde/base/output_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ class MultiUpload(pydantic.BaseModel):


OutputUpload = Annotated[
ZipAndHttpPostUpload | ZipAndHttpPutUpload | MultiUpload,
SingleFilePostUpload
| SingleFilePutUpload
| ZipAndHttpPostUpload
| ZipAndHttpPutUpload
| MultiUpload,
Field(discriminator="output_upload_type"),
]
7 changes: 7 additions & 0 deletions compute_horde/compute_horde/executor_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
class ExecutorClass(StrEnum):
spin_up_4min__gpu_24gb = "spin_up-4min.gpu-24gb"
always_on__gpu_24gb = "always_on.gpu-24gb"
always_on__llm__a6000 = "always_on.llm.a6000"
# always_on__cpu_16c__ram_64gb = "always_on.cpu-16c.ram-64gb"
# always_on__gpu_80gb = "always_on.gpu-80gb"
# always_on__gpu_24gb__docker_cached_facilitator = "always_on.gpu-24gb.docker_cached-facilitator"
Expand Down Expand Up @@ -39,6 +40,12 @@ class ExecutorClassSpec:
gpu_vram_gb=24,
spin_up_time=0,
),
ExecutorClass.always_on__llm__a6000: ExecutorClassSpec(
description="always on, NVIDIA RTX A6000 GPU machine for LLM prompts solving",
has_gpu=True,
gpu_vram_gb=48,
spin_up_time=int(timedelta(minutes=1).total_seconds()),
),
# ExecutorClass.always_on__cpu_16c__ram_64gb: ExecutorClassSpec(
# cpu_cores=16,
# ram_gb=64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,26 +482,26 @@ async def run_job(self, job_request: V0JobRequest):

success = exit_status == 0

# upload the output if requested
if job_request.output_upload:
try:
output_uploader = OutputUploader.for_upload_output(job_request.output_upload)
await output_uploader.upload(self.output_volume_mount_dir)
except OutputUploadFailed as ex:
logger.warning(
f"Uploading output failed for job {self.initial_job_request.job_uuid} with error: {ex!r}"
)
success = False
stdout = ex.description
stderr = ""

time_took = time.time() - t1

if success:
# upload the output if requested and job succeeded
if job_request.output_upload:
try:
output_uploader = OutputUploader.for_upload_output(job_request.output_upload)
await output_uploader.upload(self.output_volume_mount_dir)
except OutputUploadFailed as ex:
logger.warning(
f"Uploading output failed for job {self.initial_job_request.job_uuid} with error: {ex!r}"
)
success = False
stdout = ex.description
stderr = ""

time_took = time.time() - t1
logger.info(
f'Job "{self.initial_job_request.job_uuid}" finished successfully in {time_took:0.2f} seconds'
)
else:
time_took = time.time() - t1
logger.error(
f'"{" ".join(cmd)}" (job_uuid={self.initial_job_request.job_uuid})'
f' failed after {time_took:0.2f} seconds with status={process.returncode}'
Expand Down
4 changes: 4 additions & 0 deletions validator/app/src/compute_horde_validator/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def route_task(name, args, kwargs, options, task=None, **kw):
"compute_horde_validator.validator.tasks.fetch_receipts_from_miner",
"compute_horde_validator.validator.tasks.send_events_to_facilitator",
"compute_horde_validator.validator.tasks.fetch_dynamic_config",
# TODO: llm tasks should have dedicated workers, but just move them from default queue for now
"compute_horde_validator.validator.tasks.llm_prompt_generation",
"compute_horde_validator.validator.tasks.llm_prompt_sampling",
"compute_horde_validator.validator.tasks.llm_prompt_answering",
}
if name in worker_queue_names:
return {"queue": "worker"}
Expand Down
90 changes: 63 additions & 27 deletions validator/app/src/compute_horde_validator/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def wrapped(*args, **kwargs):

ENV = env("ENV", default="prod")

PROMPT_GENERATION_MODEL = env("PROMPT_GENERATION_MODEL", default="phi3")

DEFAULT_ADMIN_PASSWORD = env("DEFAULT_ADMIN_PASSWORD", default=None)
DEFAULT_ADMIN_USERNAME = env("DEFAULT_ADMIN_USERNAME", default="admin")
DEFAULT_ADMIN_EMAIL = env("DEFAULT_ADMIN_EMAIL", default="admin@admin.com")
Expand Down Expand Up @@ -216,31 +218,61 @@ def wrapped(*args, **kwargs):
"in seconds",
int,
),
"DYNAMIC_NUMBER_OF_PROMPTS_TO_VALIDATE_FROM_SERIES": (
10,
"how many prompts to sample and validate from a series",
# llama params
"DYNAMIC_MAX_PROMPT_SERIES": (
3500,
"Maximum number of prompt series upon which the prompt generator will not be triggered",
int,
),
"DYNAMIC_NUMBER_OF_WORKLOADS_TO_TRIGGER_LOCAL_INFERENCE": (
100,
"how many workloads are needed before running local inference",
"DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY": (
1536, # 256 * 2 * 3 - we allow 2 executors per miner and want queue for 3 synthetic job batches
"how many prompt samples to generate (should be larger than how many prompts series we use per synthetic run)",
int,
),
"DYNAMIC_MAX_PROMPT_BATCHES": (
10000,
"Maximum number of prompt batches upon which the prompt generator will not be triggered",
"DYNAMIC_NUMBER_OF_PROMPTS_PER_WORKLOAD": (
240,
"how many prompts to answer in a single workload",
int,
),
"DYNAMIC_PROMPTS_BATCHES_IN_A_SINGLE_GO": (
5,
# prompt generation params
"DYNAMIC_PROMPTS_SERIES_IN_A_SINGLE_GENERATION": (
25,
"Number of batches that prompt generator will process in a single go",
int,
),
"DYNAMIC_NUMBER_OF_PROMPTS_IN_BATCH": (
"DYNAMIC_NUMBER_OF_PROMPTS_IN_SERIES": (
240,
"Number of prompts to generate in a single batch",
"Number of prompts to generate in a single series",
int,
),
# prompts answering params
"DYNAMIC_NUMBER_OF_PROMPTS_TO_SAMPLE_FROM_SERIES": (
1,
"how many prompts to sample and answer from a series",
int,
),
"DYNAMIC_MINER_MAX_EXECUTORS_PER_CLASS": (
"always_on.llm.a6000=2",
(
"The maximum number of executor for an executor class that miners are allowed to have. "
"Executor classes not mentioned here have no limits. "
"The format should be: 'key1=value1,key2=value2', "
"where the keys are executor class enum values, and the values are integers. "
"Setting 0 will disable an executor class."
),
str,
),
"DYNAMIC_EXECUTOR_CLASS_WEIGHTS": (
"spin_up-4min.gpu-24gb=99,always_on.llm.a6000=1",
(
"Weights of executor classes that are used to normalize miners scores. "
"Executor classes not mentioned here are not taken into account when scoring. "
"The format should be: 'key1=value1,key2=value2', "
"where the keys are executor class enum values, and the values are floats, "
"but int values that sum up to 100 are encouraged"
),
str,
),
}
DYNAMIC_CONFIG_CACHE_TIMEOUT = 300

Expand Down Expand Up @@ -407,6 +439,21 @@ def wrapped(*args, **kwargs):
"schedule": timedelta(minutes=5),
"options": {},
},
"llm_prompt_generation": {
"task": "compute_horde_validator.validator.tasks.llm_prompt_generation",
"schedule": timedelta(minutes=5),
"options": {},
},
"llm_prompt_sampling": {
"task": "compute_horde_validator.validator.tasks.llm_prompt_sampling",
"schedule": timedelta(minutes=30),
"options": {},
},
"llm_prompt_answering": {
"task": "compute_horde_validator.validator.tasks.llm_prompt_answering",
"schedule": timedelta(minutes=5),
"options": {},
},
}
if env.bool("DEBUG_RUN_BEAT_VERY_OFTEN", default=False):
CELERY_BEAT_SCHEDULE["run_synthetic_jobs"]["schedule"] = crontab(minute="*")
Expand Down Expand Up @@ -508,17 +555,6 @@ def wrapped(*args, **kwargs):

DYNAMIC_CONFIG_ENV = env.str("DYNAMIC_CONFIG_ENV", default="prod")

# prompt gen sampling
DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_PROMPTS_IN_SERIES = env.int(
"DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_PROMPTS_IN_SERIES", default=None
)
DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_PROMPTS_TO_VALIDATE_FROM_SERIES = env.int(
"DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_PROMPTS_TO_VALIDATE_IN_BATCH", default=None
)
DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_WORKLOADS_TO_TRIGGER_LOCAL_INFERENCE = env.int(
"DEBUG_OVERRIDE_DYNAMIC_NUMBER_OF_WORKLOADS_TO_TRIGGER_LOCAL_INFERENCE", default=None
)

# synthetic jobs are evenly distributed through the cycle, however
# we start them from some offset because scheduling takes some time
SYNTHETIC_JOBS_RUN_OFFSET = env.int("SYNTHETIC_JOBS_RUN_OFFSET", default=24)
Expand All @@ -542,9 +578,9 @@ def BITTENSOR_WALLET() -> bittensor.wallet:


# Local miner generating prompts
GENERATION_MINER_KEY = env.str("GENERATION_MINER_KEY", default="")
GENERATION_MINER_ADDRESS = env.str("GENERATION_MINER_ADDRESS", default="")
GENERATION_MINER_PORT = env.int("GENERATION_MINER_PORT", default=0)
TRUSTED_MINER_KEY = env.str("TRUSTED_MINER_KEY", default="")
TRUSTED_MINER_ADDRESS = env.str("TRUSTED_MINER_ADDRESS", default="")
TRUSTED_MINER_PORT = env.int("TRUSTED_MINER_PORT", default=0)


CHANNEL_LAYERS = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import uuid

from compute_horde.base.volume import Volume
from compute_horde.executor_class import ExecutorClass
from compute_horde.miner_client.organic import OrganicJobDetails


Expand All @@ -28,8 +29,11 @@ def generator_version(self) -> int: ...
@abc.abstractmethod
def docker_image_name(self) -> str: ...

@abc.abstractmethod
def executor_class(self) -> ExecutorClass: ...

def docker_run_options_preset(self) -> str:
return "none"
return "nvidia_all"

def docker_run_cmd(self) -> list[str]:
return []
Expand All @@ -49,6 +53,7 @@ def output(self) -> str | None:
def get_job_details(self) -> OrganicJobDetails:
return OrganicJobDetails(
job_uuid=str(self._uuid),
executor_class=self.executor_class(),
docker_image=self.docker_image_name(),
raw_script=self.raw_script(),
docker_run_options_preset=self.docker_run_options_preset(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from compute_horde.base.output_upload import MultiUpload, SingleFilePutUpload
from compute_horde.executor_class import ExecutorClass
from django.conf import settings

from ..base import BasePromptJobGenerator

Expand All @@ -8,15 +10,22 @@ def generator_version(self) -> int:
return 0

def timeout_seconds(self) -> int:
return 3600
return 5 * 60

def docker_image_name(self) -> str:
return "backenddevelopersltd/compute-horde-prompt-gen:v0-latest"
return f"backenddevelopersltd/compute-horde-prompt-gen-{settings.PROMPT_GENERATION_MODEL}:v0-latest"

def executor_class(self) -> ExecutorClass:
return ExecutorClass.always_on__llm__a6000

def docker_run_cmd(self) -> list[str]:
return [
"--quantize",
"--model_name",
"phi3",
settings.PROMPT_GENERATION_MODEL,
"--batch_size=250", # on A6000 we want 240 prompts generated in single file, but not all results are valid
"--num_return_sequences=1",
"--max_new_tokens=40", # 40 new tokens is enough for reasonable length prompt - 30 caused too much cut off prompts
"--number_of_prompts_per_batch",
str(self.num_prompts_per_batch),
"--uuids",
Expand Down
Loading

0 comments on commit e68968a

Please sign in to comment.