Skip to content

Commit

Permalink
Merge pull request #139 from backend-developers-ltd/manifest
Browse files Browse the repository at this point in the history
reward manifest change
  • Loading branch information
emnoor-reef authored Jun 28, 2024
2 parents c959b4e + dc5c9e9 commit 0ca825d
Show file tree
Hide file tree
Showing 13 changed files with 284 additions and 45 deletions.
4 changes: 4 additions & 0 deletions compute_horde/compute_horde/mv_protocol/miner_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class ExecutorClassManifest(pydantic.BaseModel):
class ExecutorManifest(pydantic.BaseModel):
executor_classes: list[ExecutorClassManifest]

@property
def total_count(self) -> int:
return sum([x.count for x in self.executor_classes])


class BaseMinerRequest(BaseRequest):
message_type: RequestType
Expand Down
6 changes: 3 additions & 3 deletions validator/app/src/compute_horde_validator/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
from celery.schedules import crontab
from compute_horde import base # noqa

# from celery.schedules import crontab


root = environ.Path(__file__) - 2

env = environ.Env(DEBUG=(bool, False))
Expand Down Expand Up @@ -364,6 +361,9 @@ def wrapped(*args, **kwargs):

DEBUG_OVERRIDE_WEIGHTS_VERSION = env.int("DEBUG_OVERRIDE_WEIGHTS_VERSION", default=None)

MANIFEST_SCORE_MULTIPLIER = 1.05
EXECUTOR_COUNT_INCREASE_THRESHOLD = 3


def BITTENSOR_WALLET() -> bittensor.wallet:
if not BITTENSOR_WALLET_NAME or not BITTENSOR_WALLET_HOTKEY_NAME:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ async def miner_driver(self, job_request: JobRequest):
miner_hotkey=job_request.miner_hotkey,
my_hotkey=self.my_hotkey(),
job_uuid=job_request.uuid,
batch_id=None,
keypair=self.keypair,
)
await execute_organic_job(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def handle(self, *args, **options):
miner_hotkey=miner_hotkey,
my_hotkey=key.ss58_address,
job_uuid=None,
batch_id=None,
keypair=key,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import datetime as dt
import logging
import time

import bittensor
from asgiref.sync import sync_to_async
Expand All @@ -11,6 +12,32 @@
logger = logging.getLogger(__name__)


class WeightVersionHolder:
def __init__(self):
self._time_set = 0
self.value = None

def get(self):
if settings.DEBUG_OVERRIDE_WEIGHTS_VERSION is not None:
return settings.DEBUG_OVERRIDE_WEIGHTS_VERSION

if time.time() - self._time_set > 300:
subtensor = bittensor.subtensor(network=settings.BITTENSOR_NETWORK)
hyperparameters = subtensor.get_subnet_hyperparameters(netuid=settings.BITTENSOR_NETUID)
if hyperparameters is None:
raise RuntimeError("Network hyperparameters are None")
self.value = hyperparameters.weights_version
self._time_set = time.time()
return self.value


weights_version_holder = WeightVersionHolder()


def get_weights_version():
return weights_version_holder.get()


class AsyncMetagraphClient:
def __init__(self, cache_time=dt.timedelta(minutes=5)):
self.cache_time = cache_time
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Generated by Django 4.2.13 on 2024-06-27 11:12

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("validator", "0018_alter_systemevent_subtype_alter_systemevent_type"),
]

operations = [
migrations.CreateModel(
name="MinerManifest",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("created_at", models.DateTimeField(auto_now_add=True)),
("executor_count", models.IntegerField(default=0)),
(
"batch",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="validator.syntheticjobbatch",
),
),
(
"miner",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="validator.miner",
),
),
],
),
migrations.AddConstraint(
model_name="minermanifest",
constraint=models.UniqueConstraint(
fields=("miner", "batch"), name="unique_miner_manifest"
),
),
]
18 changes: 12 additions & 6 deletions validator/app/src/compute_horde_validator/validator/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,25 @@ class Meta:
def __str__(self):
return f"hotkey: {self.miner.hotkey}"

def set_manifest(self, manifest):
self._manifest = manifest

def get_manifest(self):
return getattr(self, "_manifest", None)


class SyntheticJobBatch(models.Model):
started_at = models.DateTimeField(auto_now_add=True)
accepting_results_until = models.DateTimeField()
scored = models.BooleanField(default=False)


class MinerManifest(models.Model):
miner = models.ForeignKey(Miner, on_delete=models.CASCADE)
batch = models.ForeignKey(SyntheticJobBatch, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
executor_count = models.IntegerField(default=0)

class Meta:
constraints = [
UniqueConstraint(fields=["miner", "batch"], name="unique_miner_manifest"),
]


class JobBase(models.Model):
class Meta:
abstract = True
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import time

import bittensor
from asgiref.sync import sync_to_async
from compute_horde.mv_protocol.miner_requests import V0JobFinishedRequest
from django.conf import settings

from compute_horde_validator.validator.metagraph_client import get_weights_version
from compute_horde_validator.validator.synthetic_jobs.generator.base import (
AbstractSyntheticJobGenerator,
)
Expand All @@ -19,32 +16,10 @@
MAX_SCORE = 2


class WeightVersionHolder:
def __init__(self):
self._time_set = 0
self.value = None

def get(self):
if settings.DEBUG_OVERRIDE_WEIGHTS_VERSION is not None:
return settings.DEBUG_OVERRIDE_WEIGHTS_VERSION

if time.time() - self._time_set > 300:
subtensor = bittensor.subtensor(network=settings.BITTENSOR_NETWORK)
hyperparameters = subtensor.get_subnet_hyperparameters(netuid=settings.BITTENSOR_NETUID)
if hyperparameters is None:
raise RuntimeError("Network hyperparameters are None")
self.value = hyperparameters.weights_version
self._time_set = time.time()
return self.value


weights_version_holder = WeightVersionHolder()


class GPUHashcatSyntheticJobGenerator(AbstractSyntheticJobGenerator):
def __init__(self):
# set synthetic_jobs based on subnet weights_version
self.weights_version = weights_version_holder.get()
self.weights_version = get_weights_version()
self.hash_job = None
self.expected_answer = None

Expand All @@ -59,7 +34,7 @@ def _get_hash_job(self):
hash_job = V0SyntheticJob.generate(
algorithm, HASHJOB_PARAMS[self.weights_version][algorithm]
)
elif self.weights_version == 1:
elif self.weights_version in [1, 2]:
algorithms = Algorithm.get_all_algorithms()
params = [HASHJOB_PARAMS[self.weights_version][algorithm] for algorithm in algorithms]
hash_job = V1SyntheticJob.generate(algorithms, params)
Expand Down Expand Up @@ -94,7 +69,7 @@ def volume_contents(self) -> str:
def score(self, time_took: float) -> float:
if self.weights_version == 0:
return MAX_SCORE * (1 - (time_took / (2 * self.timeout_seconds())))
elif self.weights_version == 1:
elif self.weights_version in [1, 2]:
return 1 / time_took
else:
raise RuntimeError(f"No score function for weights_version: {self.weights_version}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@
from django.conf import settings
from django.utils.timezone import now

from compute_horde_validator.validator.metagraph_client import get_weights_version
from compute_horde_validator.validator.models import (
JobBase,
Miner,
MinerManifest,
SyntheticJob,
SyntheticJobBatch,
SystemEvent,
Expand Down Expand Up @@ -84,7 +86,8 @@ def __init__(
my_hotkey: str,
miner_hotkey: str,
miner_port: int,
job_uuid: str,
job_uuid: None | str,
batch_id: None | int,
keypair: bittensor.Keypair,
):
super().__init__(loop, f"{miner_hotkey}({miner_address}:{miner_port})")
Expand All @@ -95,6 +98,7 @@ def __init__(
self.job_states = {}
if job_uuid is not None:
self.add_job(job_uuid)
self.batch_id = batch_id
self.keypair = keypair
self._barrier = None
self.miner_manifest = asyncio.Future()
Expand Down Expand Up @@ -132,6 +136,13 @@ async def handle_message(self, msg: BaseRequest):
return
if isinstance(msg, V0ExecutorManifestRequest):
self.miner_manifest.set_result(msg.manifest)
miner = await Miner.objects.aget(hotkey=self.miner_hotkey)
if self.batch_id:
await MinerManifest.objects.acreate(
miner=miner,
batch_id=self.batch_id,
executor_count=msg.manifest.total_count,
)
return
job_state = self.get_job_state(msg.job_uuid)
if job_state is None:
Expand Down Expand Up @@ -202,7 +213,17 @@ def create_and_run_sythethic_job_batch(netuid, network):
)
}
else:
metagraph = bittensor.metagraph(netuid, network=network)
try:
metagraph = bittensor.metagraph(netuid, network=network)
except Exception as e:
msg = f"Failed to get metagraph - will not run synthetic jobs: {e}"
logger.warning(msg)
SystemEvent.objects.using(settings.DEFAULT_DB_ALIAS).create(
type_=SystemEvent.EventType.MINER_SYNTHETIC_JOB_FAILURE,
subtype=SystemEvent.EventSubType.SUBTENSOR_CONNECTIVITY_ERROR,
long_description=msg,
)
return
axons_by_key = {n.hotkey: n.axon_info for n in metagraph.neurons}
miners = get_miners(metagraph)
miners = [(miner.id, miner.hotkey) for miner in miners]
Expand Down Expand Up @@ -261,6 +282,7 @@ async def execute_miner_synthetic_jobs(batch_id, miner_id, miner_hotkey, axon_in
miner_hotkey=miner_hotkey,
my_hotkey=key.ss58_address,
job_uuid=None,
batch_id=batch_id,
keypair=key,
)
data = {"miner_hotkey": miner_client.miner_hotkey}
Expand Down Expand Up @@ -325,6 +347,55 @@ async def execute_miner_synthetic_jobs(batch_id, miner_id, miner_hotkey, axon_in
)


async def apply_manifest_incentive(miner_hotkey: str, batch_id: int, score: float) -> float:
weights_version = get_weights_version()
if weights_version in [0, 1]:
return score
elif weights_version == 2:
miner = await Miner.objects.aget(hotkey=miner_hotkey)

# get last 3 batches and manifests
batches = [
batch
async for batch in SyntheticJobBatch.objects.filter(id__lte=batch_id).order_by("-id")[
:3
]
]
manifests = [
manifest
async for manifest in MinerManifest.objects.filter(
miner=miner, batch__in=batches
).order_by("-batch__id")
]

if len(manifests) > 0 and manifests[0].executor_count <= 3:
logger.debug(
f"Applied manifest incentive for {miner_hotkey} - last manifest has 3 or less executors"
)
elif (
len(manifests) == 3
and manifests[0].executor_count - manifests[1].executor_count
> settings.EXECUTOR_COUNT_INCREASE_THRESHOLD
and manifests[0].executor_count - manifests[2].executor_count
> settings.EXECUTOR_COUNT_INCREASE_THRESHOLD
):
logger.debug(
f"Applied manifest incentive for {miner_hotkey} - miner has increased number of executors significantly"
)
elif len(manifests) < 3:
logger.debug(
f"Applied manifest incentive for {miner_hotkey} - validator has missed one of the previous 2 synthetic jobs windows"
)
else:
# do not apply manifest incentive - return original score
return score

# apply manifest incentive
return score * settings.MANIFEST_SCORE_MULTIPLIER
else:
raise RuntimeError(f"Scoring undefined for {weights_version=}")


async def _execute_synthetic_job(miner_client: MinerClient, job: SyntheticJob):
data = {"job_uuid": str(job.job_uuid), "miner_hotkey": job.miner.hotkey}
save_event = partial(save_job_execution_event, data=data)
Expand Down Expand Up @@ -474,13 +545,15 @@ async def _execute_synthetic_job(miner_client: MinerClient, job: SyntheticJob):
)

# if job passed, save synthetic job score
job.score = score
job.score = await apply_manifest_incentive(
miner_client.miner_hotkey, job.batch_id, score
)
await job.asave()

# Send receipt to miner
try:
receipt_message = miner_client.generate_receipt_message(
job, full_job_sent, time_took, score
job, full_job_sent, time_took, job.score
)
await miner_client.send_model(receipt_message)
logger.info("Receipt message sent")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ async def run_admin_job_request(job_request_id: int, callback=None):
miner_hotkey=miner.hotkey,
my_hotkey=keypair.ss58_address,
job_uuid=job.job_uuid,
batch_id=None,
keypair=keypair,
)

Expand Down
Loading

0 comments on commit 0ca825d

Please sign in to comment.