Skip to content

Commit

Permalink
debug_run_mock_streaming_job_to_miner send terminate
Browse files Browse the repository at this point in the history
  • Loading branch information
andreea-popescu-reef committed Jan 7, 2025
1 parent f2765ed commit f603555
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,6 @@ async def wait_for_job(self, job_request: V0JobRequest) -> JobResult:
)
stdout = result[0].decode()
stderr = result[1].decode()

# TODO: handle streaming jobs multiple requests
except TimeoutError:
# If the process did not finish in time, kill it
logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,40 @@


class StreamingJobHandler(BaseHTTPRequestHandler):
def _set_response(self, code: int, msg: bytes = b""):
self.send_response(code)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(msg)

def do_GET(self):
if self.path == "/execute-job":
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
# Mock job finish after endpoint was hit
self._set_response(200, b"OK")
elif self.path == "/health":
self._set_response(200, b"OK")
elif self.path == "/terminate":
self._set_response(200, b"OK")
time.sleep(2)
os._exit(0)
elif self.path == "/health":
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
else:
self.send_response(404)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"Not Found")
self._set_response(404, b"Not Found")


class AutoStartStreamingJobHandler(BaseHTTPRequestHandler):
def _set_response(self, code: int, msg: bytes = b""):
self.send_response(code)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(msg)

def do_GET(self):
if self.path == "/health":
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
self._set_response(200, b"OK")
# Mock job finishing right after docker ready
time.sleep(2)
os._exit(0)
else:
self.send_response(404)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"Not Found")
self._set_response(404, b"Not Found")


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,17 +150,30 @@ async def run_streaming_job(options, wait_timeout: int = 300):
executor_cert_path = dir_path / "ssl" / "executor_certificate.pem"
executor_cert_path.write_text(streaming_job_ready_response.public_key)

base_url = f"https://{streaming_job_ready_response.ip}:{streaming_job_ready_response.port}"

# Check you can connect to the job container and trigger job execution
url = f"https://{streaming_job_ready_response.ip}:{streaming_job_ready_response.port}/execute-job"
logger.info(f"Making request to job container: {url}")
url = f"{base_url}/execute-job"
logger.info(f"Triggering streaming job execution on: {url}")
response = requests.get(
url,
verify=str(executor_cert_path),
cert=cert,
headers={"Host": streaming_job_ready_response.ip},
)
logger.info(f"Response {response.status_code}: {response.text}")
assert response.text == "OK", "Failed to trigger job execution"

url = f"{base_url}/terminate"
logger.info(f"Terminating streaming job on: {url}")
response = requests.get(
url,
verify=str(executor_cert_path),
cert=cert,
headers={"Host": streaming_job_ready_response.ip},
)
logger.info(f"Response {response.status_code}: {response.text}")
assert response.text == "OK"
assert response.text == "OK", "Failed to terminate job"

try:
final_response = await asyncio.wait_for(
Expand Down

0 comments on commit f603555

Please sign in to comment.