Skip to content

Commit

Permalink
Merge pull request #328 from backend-developers-ltd/streamflow
Browse files Browse the repository at this point in the history
job streaming support
  • Loading branch information
andreea-popescu-reef authored Jan 10, 2025
2 parents cfcc724 + 7d26524 commit 3f554ee
Show file tree
Hide file tree
Showing 28 changed files with 1,104 additions and 116 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/executor_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,8 @@ jobs:
- name: Build backenddevelopersltd/compute-horde-job-echo:v0-latest image
run: docker build . -t backenddevelopersltd/compute-horde-job-echo:v0-latest
working-directory: ./executor/app/src/compute_horde_executor/executor/tests/integration/docker_image_for_tests
- name: Build backenddevelopersltd/compute-horde-streaming-job-test:v0-latest image
run: docker build . -t backenddevelopersltd/compute-horde-streaming-job-test:v0-latest
working-directory: ./executor/app/src/compute_horde_executor/executor/tests/integration/docker_image_for_streaming_job_tests
- name: Run unit tests
run: nox -vs test
123 changes: 118 additions & 5 deletions compute_horde/compute_horde/certificate.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,107 @@
import asyncio
import ipaddress
import logging
import tempfile
from datetime import UTC, datetime, timedelta
from pathlib import Path

import aiohttp
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.x509 import Certificate
from cryptography.x509.oid import NameOID

logger = logging.getLogger(__name__)


async def get_docker_container_ip(container_name: str, bridge_network: bool = False) -> str:
query = (
"'{{.NetworkSettings.Networks.bridge.IPAddress}}'"
if bridge_network
else "'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}'"
)

process = await asyncio.create_subprocess_exec(
"docker",
"inspect",
"-f",
query,
container_name,
stdout=asyncio.subprocess.PIPE,
)
stdout, _ = await process.communicate()
await process.wait()
if stdout:
return stdout.decode().strip()[1:-1]
else:
raise Exception(f"Failed to get IP of {container_name}")


async def check_endpoint(url, timeout) -> bool:
"""
Pings endpoint every second until it returns 200 or timeout is reached.
"""
async with aiohttp.ClientSession() as session:
for _ in range(timeout):
try:
response = await session.get(url)
if response.status == 200:
return True
except aiohttp.ClientError as e:
logger.debug(f"Failed to ping {url}: {e}")
await asyncio.sleep(1)
return False


async def start_nginx(
nginx_conf: str,
port: int,
dir_path: Path,
job_network: str,
container_name: str = "job-nginx",
timeout: int = 10,
):
nginx_conf_file = dir_path / "nginx.conf"
nginx_conf_file.write_text(nginx_conf)

cmd = [
"docker",
"run",
"--detach",
"--rm",
"--name",
container_name,
"--network",
"bridge", # primary external network
"-p",
f"{port}:443", # expose nginx port
"-v",
f"{dir_path}:/etc/nginx/",
"nginx:1.26-alpine",
]
process = await asyncio.create_subprocess_exec(*cmd)
_stdout, _stderr = await process.communicate()
await process.wait()

# make sure to get ip while there is still a single network
ip = await get_docker_container_ip(container_name)

# connect to internal network for job communication
process = await asyncio.create_subprocess_exec(
"docker", "network", "connect", job_network, container_name
)
await process.wait()

# wait for nginx to start
url = f"http://{ip}/ok"
nginx_started = await check_endpoint(url, timeout)
if not nginx_started:
stdout = _stdout.decode() if _stdout else ""
stderr = _stderr.decode() if _stderr else ""
raise Exception(f"Failed to ping nginx on {url} - server init {stdout=}, {stderr=}")


def generate_certificate(alternative_name: str) -> tuple[Certificate, RSAPrivateKey]:
"""Generate a certificate and private key"""
Expand Down Expand Up @@ -64,10 +157,30 @@ def write_private_key(private_key: RSAPrivateKey, path: Path) -> None:
path.write_bytes(serialize_private_key(private_key))


def generate_certificate_at(dir_path: Path, alternative_name: str) -> None:
"""Generates and saves a certificate and private key at `dir_path`"""
assert dir_path.is_dir()
def save_public_key(public_key: str, dir_path: Path) -> None:
certs_dir = dir_path / "ssl"
client_cert_file = certs_dir / "client.crt"
client_cert_file.write_text(public_key)


def generate_certificate_at(
dir_path: Path | None = None, alternative_name: str = "127.0.0.1"
) -> tuple[Path, str, tuple[str, str]]:
"""
Generate a certificate and private key and save them to a directory.
Returns the directory path, the public key and the paths to the public and private key files.
"""

if dir_path is None:
dir_path = Path(tempfile.mkdtemp())
certs_dir = dir_path / "ssl"
certs_dir.mkdir()

certificate, private_key = generate_certificate(alternative_name)
write_certificate(certificate, dir_path / "certificate.pem")
write_private_key(private_key, dir_path / "private_key.pem")
public_key_path = certs_dir / "certificate.pem"
private_key_path = certs_dir / "private_key.pem"
write_certificate(certificate, public_key_path)
write_private_key(private_key, private_key_path)

public_key = serialize_certificate(certificate).decode("utf-8")
return dir_path, public_key, (str(public_key_path), str(private_key_path))
13 changes: 13 additions & 0 deletions compute_horde/compute_horde/em_protocol/executor_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
class RequestType(enum.Enum):
V0ReadyRequest = "V0ReadyRequest"
V0FailedToPrepare = "V0FailedToPrepare"
V0StreamingJobReadyRequest = "V0StreamingJobReadyRequest"
V0StreamingJobFailedToPrepareRequest = "V0StreamingJobFailedToPrepareRequest"
V0FinishedRequest = "V0FinishedRequest"
V0FailedRequest = "V0FailedRequest"
V0MachineSpecsRequest = "V0MachineSpecsRequest"
Expand All @@ -25,6 +27,17 @@ class V0FailedToPrepare(BaseExecutorRequest, JobMixin):
message_type: RequestType = RequestType.V0FailedToPrepare


# signaling the streaming job is ready to accept connections from user
class V0StreamingJobReadyRequest(BaseExecutorRequest, JobMixin):
message_type: RequestType = RequestType.V0StreamingJobReadyRequest
public_key: str
port: int


class V0StreamingJobFailedToPrepareRequest(BaseExecutorRequest, JobMixin):
message_type: RequestType = RequestType.V0StreamingJobFailedToPrepareRequest


class V0FailedRequest(BaseExecutorRequest, JobMixin):
message_type: RequestType = RequestType.V0FailedRequest
docker_process_exit_status: int | None = None
Expand Down
7 changes: 7 additions & 0 deletions compute_horde/compute_horde/em_protocol/miner_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

class RequestType(enum.Enum):
V0PrepareJobRequest = "V0PrepareJobRequest"
V1PrepareJobRequest = "V1PrepareJobRequest"
V0RunJobRequest = "V0RunJobRequest"
GenericError = "GenericError"

Expand All @@ -33,6 +34,12 @@ def validate_volume_or_volume_type(self) -> Self:
return self


class V1InitialJobRequest(V0InitialJobRequest):
message_type: RequestType = RequestType.V1PrepareJobRequest
public_key: str
executor_ip: str


class V0JobRequest(BaseMinerRequest, JobMixin):
message_type: RequestType = RequestType.V0RunJobRequest
docker_image_name: str | None = None
Expand Down
15 changes: 15 additions & 0 deletions compute_horde/compute_horde/miner_client/organic.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
V0JobFailedRequest,
V0JobFinishedRequest,
V0MachineSpecsRequest,
V0StreamingJobReadyRequest,
)
from compute_horde.mv_protocol.validator_requests import (
AuthenticationPayload,
Expand Down Expand Up @@ -106,6 +107,11 @@ def __init__(
] = loop.create_future()
self.executor_ready_or_failed_timestamp: int = 0

self.streaming_job_ready_or_not_future: asyncio.Future[
V0StreamingJobReadyRequest | miner_requests.V0StreamingJobNotReadyRequest
] = loop.create_future()
self.streaming_job_ready_or_not_timestamp: int = 0

self.miner_finished_or_failed_future: asyncio.Future[
V0JobFailedRequest | V0JobFinishedRequest
] = loop.create_future()
Expand Down Expand Up @@ -198,6 +204,14 @@ async def handle_message(self, msg: BaseRequest) -> None:
self.executor_ready_or_failed_timestamp = int(time.time())
except asyncio.InvalidStateError:
logger.warning(f"Received {msg} from {self.miner_name} but future was already set")
elif isinstance(
msg, V0StreamingJobReadyRequest | miner_requests.V0StreamingJobNotReadyRequest
):
try:
self.streaming_job_ready_or_not_future.set_result(msg)
self.streaming_job_ready_or_not_timestamp = int(time.time())
except asyncio.InvalidStateError:
logger.warning(f"Received {msg} from {self.miner_name} but future was already set")
elif isinstance(msg, V0JobFailedRequest | V0JobFinishedRequest):
try:
self.miner_finished_or_failed_future.set_result(msg)
Expand Down Expand Up @@ -332,6 +346,7 @@ class FailureReason(enum.Enum):
FINAL_RESPONSE_TIMED_OUT = enum.auto()
JOB_DECLINED = enum.auto()
EXECUTOR_FAILED = enum.auto()
STREAMING_JOB_READY_TIMED_OUT = enum.auto()
JOB_FAILED = enum.auto()


Expand Down
14 changes: 14 additions & 0 deletions compute_horde/compute_horde/mv_protocol/miner_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class RequestType(enum.Enum):
V0ExecutorManifestRequest = "V0ExecutorManifestRequest"
V0ExecutorReadyRequest = "V0ExecutorReadyRequest"
V0ExecutorFailedRequest = "V0ExecutorFailedRequest"
V0StreamingJobReadyRequest = "V0StreamingJobReadyRequest"
V0StreamingJobNotReadyRequest = "V0StreamingJobNotReadyRequest"
V0JobFailedRequest = "V0JobFailedRequest"
V0JobFinishedRequest = "V0JobFinishedRequest"
V0MachineSpecsRequest = "V0MachineSpecsRequest"
Expand Down Expand Up @@ -54,6 +56,18 @@ class V0ExecutorFailedRequest(BaseMinerRequest, JobMixin):
message_type: RequestType = RequestType.V0ExecutorFailedRequest


# TODO: sign this with the miners wallet when sent to vali
class V0StreamingJobReadyRequest(BaseMinerRequest, JobMixin):
message_type: RequestType = RequestType.V0StreamingJobReadyRequest
public_key: str
ip: str
port: int


class V0StreamingJobNotReadyRequest(BaseMinerRequest, JobMixin):
message_type: RequestType = RequestType.V0StreamingJobNotReadyRequest


class V0JobFailedRequest(BaseMinerRequest, JobMixin):
message_type: RequestType = RequestType.V0JobFailedRequest
docker_process_exit_status: int | None = None
Expand Down
6 changes: 6 additions & 0 deletions compute_horde/compute_horde/mv_protocol/validator_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
class RequestType(enum.Enum):
V0AuthenticateRequest = "V0AuthenticateRequest"
V0InitialJobRequest = "V0InitialJobRequest"
V1InitialJobRequest = "V1InitialJobRequest"
V0MachineSpecsRequest = "V0MachineSpecsRequest"
V0JobRequest = "V0JobRequest"
V0JobAcceptedReceiptRequest = "V0JobAcceptedReceiptRequest"
Expand Down Expand Up @@ -71,6 +72,11 @@ def validate_volume_or_volume_type(self) -> Self:
return self


class V1InitialJobRequest(V0InitialJobRequest):
message_type: RequestType = RequestType.V1InitialJobRequest
public_key: str


class V0JobRequest(BaseValidatorRequest, JobMixin):
message_type: RequestType = RequestType.V0JobRequest
executor_class: ExecutorClass | None = None
Expand Down
12 changes: 1 addition & 11 deletions compute_horde/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datetime
import subprocess
import uuid

import bittensor
Expand Down Expand Up @@ -103,14 +102,5 @@ def mocked_responses():


@pytest.fixture(scope="session")
def docker_name_prefix():
def container_name():
return str(uuid.uuid4())


@pytest.fixture(scope="session")
def cleanup_docker(docker_name_prefix):
output = subprocess.check_output(["docker", "ps", "--format", "{{.Names}}"])
containers = output.decode().split()
for container in containers:
if container.startswith(docker_name_prefix):
subprocess.run(["docker", "kill", container])
Loading

0 comments on commit 3f554ee

Please sign in to comment.