Skip to content

Commit

Permalink
Merge pull request #9 from b97pla/download_option
Browse files Browse the repository at this point in the history
Download option
  • Loading branch information
b97pla authored Mar 1, 2023
2 parents 9b6fec9 + b88030f commit f51a976
Show file tree
Hide file tree
Showing 10 changed files with 248 additions and 86 deletions.
3 changes: 2 additions & 1 deletion archive_verify/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


def setup_routes(app):
app.router.add_post(app["config"]["base_url"] + "/verify", handlers.verify)
app.router.add_post(app["config"]["base_url"] + r"/{endpoint:(verify|download)}", handlers.verify)
app.router.add_get(app["config"]["base_url"] + "/status/{job_id}", handlers.status)


Expand Down Expand Up @@ -56,6 +56,7 @@ def start():
log.info("Starting archive-verify-ws on {}...".format(conf["port"]))
app = web.Application()
app['config'] = conf
app.cleanup_ctx.append(handlers.redis_context)
setup_routes(app)
web.run_app(app, port=conf["port"])

82 changes: 54 additions & 28 deletions archive_verify/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
import os

from aiohttp import web
import archive_verify.redis_client as redis_client
from rq import Queue

from archive_verify.workers import verify_archive
import archive_verify.redis_client as redis_client

log = logging.getLogger(__name__)


async def verify(request):
"""
Handler accepts a POST call with JSON parameters in the body. Upon a request it will
Expand All @@ -21,6 +22,8 @@ async def verify(request):
:return JSON containing job id and link which we can poll for current job status
"""
body = await request.json()
endpoint = request.match_info["endpoint"]
keep_download = (endpoint == "download")
host = body["host"]
archive = body["archive"]
description = body["description"]
Expand All @@ -30,28 +33,38 @@ async def verify(request):
# use a supplied path if available, otherwise construct it from the src_root and archive
archive_path = path or os.path.join(src_root, archive)

redis_conn = redis_client.get_redis_instance()
q = Queue(connection=redis_conn)
q = request.app['redis_q']

# Enqueue the verify_archive function with the user supplied input parameters.
# Note that the TTL and timeout parameters are important for e.g. how long
# the jobs and their results will be kept in the Redis queue. By default our
# config e.g. setups the queue to keep the job results indefinately,
# therefore they we will have to remove them ourselves afterwards.
job = q.enqueue_call(verify_archive,
args=(archive, archive_path, description, request.app["config"]),
timeout=request.app["config"]["job_timeout"],
result_ttl=request.app["config"]["job_result_ttl"],
ttl=request.app["config"]["job_ttl"])
# Note that the TTL and timeout parameters are important for e.g. how long
# the jobs and their results will be kept in the Redis queue. By default our
# config e.g. setups the queue to keep the job results indefinately,
# therefore they we will have to remove them ourselves afterwards.
job = q.enqueue_call(verify_archive,
args=(
archive,
archive_path,
description,
keep_download,
request.app["config"]),
timeout=request.app["config"]["job_timeout"],
result_ttl=request.app["config"]["job_result_ttl"],
ttl=request.app["config"]["job_ttl"])

url = request.url
url_base = request.app["config"]["base_url"]

status_end_point = "{0}://{1}:{2}{3}/status/{4}".format(url.scheme, url.host, url.port, url_base, job.id)
response = { "status": "pending", "job_id": job.id, "link": status_end_point, "path": archive_path }

response = {
"status": "pending",
"job_id": job.id,
"link": status_end_point,
"path": archive_path,
"action": endpoint}

return web.json_response(response)


async def status(request):
"""
Handler accepts a GET call with an URL parameter which corresponds to a previously
Expand All @@ -62,34 +75,47 @@ async def status(request):
"""
job_id = str(request.match_info['job_id'])

redis_conn = redis_client.get_redis_instance()
q = Queue(connection=redis_conn)
q = request.app['redis_q']
job = q.fetch_job(job_id)

if job:
if job.is_started:
payload = {"state": "started", "msg": "Job {} is currently running.".format(job_id)}
payload = {
"state": "started",
"msg": f"Job {job_id} is currently running."}
code = 200
elif job.is_finished:
elif job.is_finished or job.is_failed:
result = job.result

if result and result["state"] == "done":
payload = {"state": "done", "msg": "Job {} has returned with result: {}".format(job_id, job.result)}
if result["state"] == "done":
payload = {
"state": result["state"],
"msg": f"Job {job_id} has returned with result: {result['msg']}"}
code = 200
else:
payload = {"state": "error", "msg": "Job {} has returned with result: {}".format(job_id, job.result), "debug": job.exc_info}
else:
payload = {
"state": result["state"],
"msg": f"Job {job_id} has returned with result: {result['msg']}",
"debug": job.exc_info if job.exc_info else result}
code = 500

job.delete()
elif job.is_failed:
payload = {"state": "error", "msg": "Job {} failed with error: {}".format(job_id, job.exc_info)}
job.delete()
code = 500
else:
payload = {"state": "pending", "msg": "Job {} has not started yet.".format(job_id)}
payload = {
"state": job.get_status(),
"msg": f"Job {job_id} is {job.get_status()}"}
code = 200
else:
payload = {"state": "error", "msg": "No such job {} found!".format(job_id)}
payload = {
"state": "error",
"msg": f"No such job {job_id} found!"}
code = 400

return web.json_response(payload, status=code)


async def redis_context(app):
app["redis_q"] = Queue(
connection=redis_client.get_redis_instance(),
is_async=app["config"].get("async_redis", True))
yield
103 changes: 63 additions & 40 deletions archive_verify/pdc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
log = logging.getLogger('archive_verify.workers')


class PdcClient():
class PdcClient:
"""
Base class representing a PDC client.
Staging and production environments should instantiate PdcClient (default).
Expand All @@ -19,13 +19,15 @@ def __init__(self, archive_name, archive_pdc_path, archive_pdc_description, job_
"""
:param archive_name: The name of the archive we shall download
:param archive_pdc_path: The path in PDC TSM to the archive that we want to download
:param archive_pdc_description: The unique description that was used when uploading the archive to PDC
:param archive_pdc_description: The unique description that was used when uploading the
archive to PDC
:param job_id: The current rq worker job id
:param config: A dict containing the apps configuration
"""
self.dest_root = config["verify_root_dir"]
self.dsmc_log_dir = config["dsmc_log_dir"]
self.whitelisted_warnings = config["whitelisted_warnings"]
self.dsmc_extra_args = config.get("dsmc_extra_args", {})
self.archive_name = archive_name
self.archive_pdc_path = archive_pdc_path
self.archive_pdc_description = archive_pdc_description
Expand All @@ -35,27 +37,50 @@ def dest(self):
"""
:returns The unique path where the archive will be downloaded.
"""
return "{}_{}".format(os.path.join(self.dest_root, self.archive_name), self.job_id)
return f"{os.path.join(self.dest_root, self.archive_name)}_{self.job_id}"

def dsmc_args(self):
"""
Fetch a list of arguments that will be passed to the dsmc command line. If there are
extra arguments specified in the config, with "dsmc_extra_args", these are included as well.
If arguments specified in dsmc_extra_args has the same key as the default arguments, the
defaults will be overridden.
:return: a string with arguments that should be appended to the dsmc command line
"""
key_values = {
"subdir": "yes",
"description": self.archive_pdc_description
}
key_values.update(self.dsmc_extra_args)
args = [f"-{k}='{v}'" for k, v in key_values.items() if v is not None]
args.extend([f"-{k}" for k, v in key_values.items() if v is None])
return " ".join(args)

def download(self):
"""
Downloads the specified archive from PDC to a unique location.
:returns True if no errors or only whitelisted warnings were encountered, False otherwise
"""
log.info("Download_from_pdc started for {}".format(self.archive_pdc_path))
cmd = "export DSM_LOG={} && dsmc retr {}/ {}/ -subdir=yes -description='{}'".format(self.dsmc_log_dir,
self.archive_pdc_path,
self.dest(),
self.archive_pdc_description)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
log.info(f"Download_from_pdc started for {self.archive_pdc_path}")
cmd = f"export DSM_LOG={self.dsmc_log_dir} && " \
f"dsmc retr {self.archive_pdc_path}/ {self.dest()}/ {self.dsmc_args()}"

p = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True)

dsmc_output, _ = p.communicate()
dsmc_exit_code = p.returncode

if dsmc_exit_code != 0:
return PdcClient._parse_dsmc_return_code(dsmc_exit_code, dsmc_output, self.whitelisted_warnings)
return PdcClient._parse_dsmc_return_code(
dsmc_exit_code, dsmc_output, self.whitelisted_warnings)

log.info("Download_from_pdc completed successfully for {}".format(self.archive_pdc_path))
log.info(f"Download_from_pdc completed successfully for {self.archive_pdc_path}")
return True

def downloaded_archive_path(self):
Expand All @@ -67,46 +92,42 @@ def cleanup(self):
@staticmethod
def _parse_dsmc_return_code(exit_code, output, whitelist):
"""
Parses the dsmc output when we've encountered a non-zero exit code. For some certain exit codes,
warnings and errors we still want to return successfully.
Parses the dsmc output when we've encountered a non-zero exit code. For some certain exit
codes, warnings and errors we still want to return successfully.
:param exit_code: The exit code received from the failing dsmc process
:param output: The text output from the dsmc process
:param whitelist: A list of whitelisted warnings
:returns True if only whitelisted warnings was encountered in the output, otherwise False
"""
log.info("DSMC process returned an error!")

# DSMC sets return code to 8 when a warning was encountered.
if exit_code == 8:
log.info("DSMC process actually returned a warning.")
log_fn = log.warning if exit_code == 8 else log.error
log_fn(f"DSMC process returned a{' warning' if exit_code == 8 else 'n error'}!")

output = output.splitlines()
# parse the DSMC output and extract error/warning codes and messages
codes = []
for line in output.splitlines():
if line.startswith("ANS"):
log_fn(line)

# Search through the DSMC log and see if we only have
# whitelisted warnings. If that is the case, change the
# return code to 0 instead. Otherwise keep the error state.
warnings = []
matches = re.findall(r'ANS[0-9]+[EW]', line)
for match in matches:
codes.append(match)

for line in output:
matches = re.findall(r'ANS[0-9]+W', line)
unique_codes = set(sorted(codes))
if unique_codes:
log_fn(f"ANS codes found in DSMC output: {', '.join(unique_codes)}")

for match in matches:
warnings.append(match)
# if we only have whitelisted warnings, change the return code to 0 instead
if unique_codes.issubset(set(whitelist)):
log.info("Only whitelisted DSMC ANS code(s) were encountered. Everything is OK.")
return True

log.info("Warnings found in DSMC output: {}".format(set(warnings)))

for warning in warnings:
if warning not in whitelist:
log.error("A non-whitelisted DSMC warning was encountered. Reporting it as an error! ('{}')".format(
warning))
return False

log.info("Only whitelisted DSMC warnings were encountered. Everything is OK.")
return True
else:
log.error("An uncaught DSMC error code was encountered!")
return False
log.error(
f"Non-whitelisted DSMC ANS code(s) encountered: "
f"{', '.join(unique_codes.difference(set(whitelist)))}")
return False


class MockPdcClient(PdcClient):
Expand Down Expand Up @@ -138,8 +159,10 @@ def dest(self):

def download(self):
if not self.predownloaded_archive_path:
log.error(f"No archive containing the name {self.archive_name} found in {self.dest_root}")
log.error(
f"No archive containing the name {self.archive_name} found in {self.dest_root}")
return False
else:
log.info(f"Found pre-downloaded archive at {self.predownloaded_archive_path}")
log.info(
f"Found pre-downloaded archive at {self.predownloaded_archive_path}")
return True
37 changes: 31 additions & 6 deletions archive_verify/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,21 @@ def configure_log(dsmc_log_dir, archive_pdc_description):
log.addHandler(fh)


def verify_archive(archive_name, archive_pdc_path, archive_pdc_description, config):
def verify_archive(
archive_name,
archive_pdc_path,
archive_pdc_description,
keep_downloaded_archive,
config):
"""
Our main worker function. This will be put into the RQ/Redis queue when the /verify endpoint gets called.
Downloads the specified archive from PDC and then verifies the MD5 sums.
:param archive_name: The name of the archive we shall download
:param archive_pdc_path: The path in PDC TSM to the archive that we want to download
:param archive_pdc_description: The unique description that was used when uploading the archive to PDC
:param keep_downloaded_archive: If True, the downloaded archive will not be removed from local
storage
:param config: A dict containing the apps configuration
:returns A JSON with the result that will be kept in the Redis queue
"""
Expand All @@ -70,13 +77,22 @@ def verify_archive(archive_name, archive_pdc_path, archive_pdc_description, conf
log.debug(f"Using PDC Client of type: {pdc_class.__name__}")

job_id = rq.get_current_job().id
pdc_client = pdc_class(archive_name, archive_pdc_path, archive_pdc_description, job_id, config)
pdc_client = pdc_class(
archive_name,
archive_pdc_path,
archive_pdc_description,
job_id,
config)
dest = pdc_client.dest()
download_ok = pdc_client.download()

if not download_ok:
log.debug("Download of {} failed.".format(archive_name))
return {"state": "error", "msg": "failed to properly download archive from pdc", "path": dest}
return {
"state": "error",
"msg": "failed to properly download archive from pdc",
"path": dest
}
else:
log.debug("Verifying {}...".format(archive_name))
archive = pdc_client.downloaded_archive_path()
Expand All @@ -85,8 +101,17 @@ def verify_archive(archive_name, archive_pdc_path, archive_pdc_description, conf

if verified_ok:
log.info("Verify of {} succeeded.".format(archive))
pdc_client.cleanup()
return {"state": "done", "path": output_file, "msg": "Successfully verified archive md5sums."}
if not keep_downloaded_archive:
pdc_client.cleanup()
return {
"state": "done",
"path": output_file,
"msg": "Successfully verified archive md5sums."
}
else:
log.info("Verify of {} failed.".format(archive))
return {"state": "error", "path": output_file, "msg": "Failed to verify archive md5sums."}
return {
"state": "error",
"path": output_file,
"msg": "Failed to verify archive md5sums."
}
Loading

0 comments on commit f51a976

Please sign in to comment.