Skip to content

Commit

Permalink
Merge pull request #105 from backend-developers-ltd/gpu-specs
Browse files Browse the repository at this point in the history
Scrape hardware stats from the executor
  • Loading branch information
adal-chiriliuc-reef authored May 23, 2024
2 parents f98cd67 + cb3af4e commit a93e2b1
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import asyncio
import base64
import csv
import io
import json
import logging
import pathlib
import re
import shlex
import shutil
import subprocess
import tempfile
import time
import zipfile
Expand Down Expand Up @@ -184,6 +186,95 @@ def truncate(v: str) -> str:
return v


def run_cmd(cmd):
proc = subprocess.run(cmd, shell=True, capture_output=True, check=False, text=True)
if proc.returncode != 0:
raise RuntimeError(
f"run_cmd error {cmd=!r} {proc.returncode=} {proc.stdout=!r} {proc.stderr=!r}"
)
return proc.stdout


def get_machine_specs() -> MachineSpecs:
data = {}

data["gpu"] = {"count": 0, "details": []}
try:
nvidia_cmd = run_cmd(
"docker run --rm --runtime=nvidia --gpus all ubuntu "
"nvidia-smi --query-gpu=name,driver_version,name,memory.total,compute_cap,power.limit,clocks.gr,clocks.mem --format=csv"
)
csv_data = csv.reader(nvidia_cmd.splitlines())
header = [x.strip() for x in next(csv_data)]
for row in csv_data:
row = [x.strip() for x in row]
gpu_data = dict(zip(header, row))
data["gpu"]["details"].append(
{
"name": gpu_data["name"],
"driver": gpu_data["driver_version"],
"capacity": gpu_data["memory.total [MiB]"].split(" ")[0],
"cuda": gpu_data["compute_cap"],
"power_limit": gpu_data["power.limit [W]"].split(" ")[0],
"graphics_speed": gpu_data["clocks.current.graphics [MHz]"].split(" ")[0],
"memory_speed": gpu_data["clocks.current.memory [MHz]"].split(" ")[0],
}
)
data["gpu"]["count"] = len(data["gpu"]["details"])
except Exception as exc:
# print(f'Error processing scraped gpu specs: {exc}', flush=True)
data["gpu_scrape_error"] = repr(exc)

data["cpu"] = {"count": 0, "model": "", "clocks": []}
try:
lscpu_output = run_cmd("lscpu")
data["cpu"]["model"] = re.search(r"Model name:\s*(.*)$", lscpu_output, re.M).group(1)
data["cpu"]["count"] = int(re.search(r"CPU\(s\):\s*(.*)", lscpu_output).group(1))

cpu_data = run_cmd('lscpu --parse=MHZ | grep -Po "^[0-9,.]*$"').splitlines()
data["cpu"]["clocks"] = [float(x) for x in cpu_data]
except Exception as exc:
# print(f'Error getting cpu specs: {exc}', flush=True)
data["cpu_scrape_error"] = repr(exc)

data["ram"] = {}
try:
with open("/proc/meminfo") as f:
meminfo = f.read()

for name, key in [
("MemAvailable", "available"),
("MemFree", "free"),
("MemTotal", "total"),
]:
data["ram"][key] = int(re.search(rf"^{name}:\s*(\d+)\s+kB$", meminfo, re.M).group(1))
data["ram"]["used"] = data["ram"]["total"] - data["ram"]["free"]
except Exception as exc:
# print(f"Error reading /proc/meminfo; Exc: {exc}", file=sys.stderr)
data["ram_scrape_error"] = repr(exc)

data["hard_disk"] = {}
try:
disk_usage = shutil.disk_usage(".")
data["hard_disk"] = {
"total": disk_usage.total // 1024, # in kiB
"used": disk_usage.used // 1024,
"free": disk_usage.free // 1024,
}
except Exception as exc:
# print(f"Error getting disk_usage from shutil: {exc}", file=sys.stderr)
data["hard_disk_scrape_error"] = repr(exc)

data["os"] = ""
try:
data["os"] = run_cmd('lsb_release -d | grep -Po "Description:\\s*\\K.*"').strip()
except Exception as exc:
# print(f'Error getting os specs: {exc}', flush=True)
data["os_scrape_error"] = repr(exc)

return MachineSpecs(specs=data)


class JobError(Exception):
def __init__(self, description: str):
self.description = description
Expand Down Expand Up @@ -301,12 +392,6 @@ async def run_job(self, job_request: V0JobRequest):
exit_status = process.returncode
timeout = False

# fetch machine specs if synthetic job generated
specs = None
if (self.specs_volume_mount_dir / "specs.json").exists():
with open(self.specs_volume_mount_dir / "specs.json") as f:
specs = MachineSpecs(specs=json.load(f))

# Save the streams in output volume and truncate them in response.
with open(self.output_volume_mount_dir / "stdout.txt", "w") as f:
f.write(stdout)
Expand Down Expand Up @@ -349,7 +434,6 @@ async def run_job(self, job_request: V0JobRequest):
timeout=timeout,
stdout=stdout,
stderr=stderr,
specs=specs,
)

async def clean(self):
Expand Down Expand Up @@ -492,14 +576,16 @@ async def _executor_loop(self):
await self.miner_client.send_failed_to_prepare()
return

logger.debug(f"Prepared for job {initial_message.job_uuid}")
logger.debug(f"Scraping hardware specs for job {initial_message.job_uuid}")
specs = get_machine_specs()

await self.miner_client.send_ready()
logger.debug(f"Informed miner that I'm ready for job {initial_message.job_uuid}")

job_request = await self.miner_client.full_payload
logger.debug(f"Running job {initial_message.job_uuid}")
result = await job_runner.run_job(job_request)
result.specs = specs

if result.success:
await self.miner_client.send_finished(result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ def test_main_loop():
"message_type": "V0ReadyRequest",
"job_uuid": job_uuid,
},
{
"message_type": "V0MachineSpecsRequest",
"specs": mock.ANY,
"job_uuid": job_uuid,
},
{
"message_type": "V0FinishedRequest",
"docker_process_stdout": payload,
Expand Down Expand Up @@ -151,6 +156,11 @@ def test_zip_url_volume(httpx_mock: HTTPXMock):
"message_type": "V0ReadyRequest",
"job_uuid": job_uuid,
},
{
"message_type": "V0MachineSpecsRequest",
"specs": mock.ANY,
"job_uuid": job_uuid,
},
{
"message_type": "V0FinishedRequest",
"docker_process_stdout": payload,
Expand Down Expand Up @@ -259,6 +269,11 @@ def response_callback(request: httpx.Request) -> httpx.Response:
"message_type": "V0ReadyRequest",
"job_uuid": job_uuid,
},
{
"message_type": "V0MachineSpecsRequest",
"specs": mock.ANY,
"job_uuid": job_uuid,
},
{
"message_type": "V0FinishedRequest",
"docker_process_stdout": payload,
Expand Down Expand Up @@ -378,6 +393,11 @@ def test_zip_and_http_post_output_uploader(httpx_mock: HTTPXMock, tmp_path):
"message_type": "V0ReadyRequest",
"job_uuid": job_uuid,
},
{
"message_type": "V0MachineSpecsRequest",
"specs": mock.ANY,
"job_uuid": job_uuid,
},
{
"message_type": "V0FinishedRequest",
"docker_process_stdout": payload,
Expand Down Expand Up @@ -439,6 +459,11 @@ def test_zip_and_http_put_output_uploader(httpx_mock: HTTPXMock, tmp_path):
"message_type": "V0ReadyRequest",
"job_uuid": job_uuid,
},
{
"message_type": "V0MachineSpecsRequest",
"specs": mock.ANY,
"job_uuid": job_uuid,
},
{
"message_type": "V0FinishedRequest",
"docker_process_stdout": payload,
Expand Down Expand Up @@ -546,6 +571,11 @@ def test_raw_script_job():
"message_type": "V0ReadyRequest",
"job_uuid": job_uuid,
},
{
"message_type": "V0MachineSpecsRequest",
"specs": mock.ANY,
"job_uuid": job_uuid,
},
{
"message_type": "V0FinishedRequest",
"docker_process_stdout": f"{payload}\n",
Expand Down
8 changes: 8 additions & 0 deletions tests/integration_tests/test_miner_on_dev_executor_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ async def test_echo_image(self):
response = json.loads(
await asyncio.wait_for(ws.recv(), timeout=WEBSOCKET_TIMEOUT)
)
assert response == {
"message_type": "V0MachineSpecsRequest",
"job_uuid": job_uuid,
"specs": mock.ANY,
}
response = json.loads(
await asyncio.wait_for(ws.recv(), timeout=WEBSOCKET_TIMEOUT)
)
assert response == {
"message_type": "V0JobFinishedRequest",
"job_uuid": job_uuid,
Expand Down

0 comments on commit a93e2b1

Please sign in to comment.