Skip to content

Commit

Permalink
ensure that the service returns states recognized by snpseq_packs (#13)
Browse files Browse the repository at this point in the history
* ensure that the service returns states recognized by snpseq_packs

* address review comments
  • Loading branch information
b97pla authored Oct 13, 2023
1 parent 717a474 commit 3cde27e
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 39 deletions.
15 changes: 15 additions & 0 deletions archive_verify/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

from arteria.web.state import State

# mapping between the internal states of the service (keys) and the states that should be
# returned in the response object (parsed by e.g. poll_status.py in snpseq_packs)
REDIS_STATES = {
"queued": State.PENDING,
"deferred": State.PENDING,
"scheduled": State.PENDING,
"started": State.STARTED,
"finished": State.DONE,
"failed": State.ERROR,
"stopped": State.CANCELLED,
"canceled": State.CANCELLED
}
83 changes: 49 additions & 34 deletions archive_verify/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from aiohttp import web
from rq import Queue

import archive_verify
from archive_verify.workers import verify_archive
import archive_verify.redis_client as redis_client

Expand Down Expand Up @@ -54,9 +55,16 @@ async def verify(request):
url = request.url
url_base = request.app["config"]["base_url"]

status_end_point = "{0}://{1}:{2}{3}/status/{4}".format(url.scheme, url.host, url.port, url_base, job.id)
status_end_point = "{0}://{1}:{2}{3}/status/{4}".format(
url.scheme,
url.host,
url.port,
url_base,
job.id)
response = {
"status": "pending",
"status": archive_verify.REDIS_STATES.get(
job.get_status(),
archive_verify.State.NONE),
"job_id": job.id,
"link": status_end_point,
"path": archive_path,
Expand All @@ -78,40 +86,47 @@ async def status(request):
q = request.app['redis_q']
job = q.fetch_job(job_id)

if job:
if job.is_started:
payload = {
"state": "started",
"msg": f"Job {job_id} is currently running."}
code = 200
elif job.is_finished or job.is_failed:
result = job.result

if result["state"] == "done":
payload = {
"state": result["state"],
"msg": f"Job {job_id} has returned with result: {result['msg']}"}
code = 200
else:
payload = {
"state": result["state"],
"msg": f"Job {job_id} has returned with result: {result['msg']}",
"debug": job.exc_info if job.exc_info else result}
code = 500

job.delete()
else:
payload = {
"state": job.get_status(),
"msg": f"Job {job_id} is {job.get_status()}"}
code = 200
if job is None:
return web.json_response(
{
"state": archive_verify.State.ERROR,
"msg": f"No such job {job_id} found!"
},
status=400
)

job_state = archive_verify.REDIS_STATES.get(
job.get_status(),
archive_verify.State.NONE)
payload = {
"state": job_state
}
code = 200

if job_state in [
archive_verify.State.DONE,
archive_verify.State.ERROR
]:
# this is the dict returned by the worker function
job_result = job.result
job_result_state = job_result["state"]
payload["state"] = job_result_state
payload["msg"] = f"Job {job_id} has returned with result: {job_result['msg']}"

if job_result_state == archive_verify.State.ERROR:
payload["debug"] = job.exc_info if job.exc_info else job_result
code = 500

job.delete()
elif job_state == archive_verify.State.STARTED:
payload["msg"] = f"Job {job_id} is currently running."
else:
payload = {
"state": "error",
"msg": f"No such job {job_id} found!"}
code = 400
payload["msg"] = f"Job {job_id} is {job_state}"

return web.json_response(payload, status=code)
return web.json_response(
payload,
status=code
)


async def redis_context(app):
Expand Down
7 changes: 4 additions & 3 deletions archive_verify/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import datetime

import archive_verify
from archive_verify.pdc_client import PdcClient, MockPdcClient

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -89,7 +90,7 @@ def verify_archive(
if not download_ok:
log.debug("Download of {} failed.".format(archive_name))
return {
"state": "error",
"state": archive_verify.State.ERROR,
"msg": "failed to properly download archive from pdc",
"path": dest
}
Expand All @@ -104,14 +105,14 @@ def verify_archive(
if not keep_downloaded_archive:
pdc_client.cleanup()
return {
"state": "done",
"state": archive_verify.State.DONE,
"path": output_file,
"msg": "Successfully verified archive md5sums."
}
else:
log.info("Verify of {} failed.".format(archive))
return {
"state": "error",
"state": archive_verify.State.ERROR,
"path": output_file,
"msg": "Failed to verify archive md5sums."
}
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ classifiers = [
]
dependencies = [
"aiohttp",
"arteria",
"pyyaml",
"redis",
"rq"
Expand Down
4 changes: 2 additions & 2 deletions tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ async def test_basic_verify(self):
request = await self.post_queued_request()
assert request.status == 200
resp = await request.json()
assert resp["status"] == "pending"
assert resp["status"] == "done"
assert resp["action"] == "verify"
assert resp["job_id"] != ""

async def test_basic_download(self):
request = await self.post_queued_request(endpoint="download")
assert request.status == 200
resp = await request.json()
assert resp["status"] == "pending"
assert resp["status"] == "done"
assert resp["action"] == "download"
assert resp["job_id"] != ""

Expand Down

0 comments on commit 3cde27e

Please sign in to comment.