Skip to content

Commit

Permalink
Merge pull request #4 from mariya/BIO-1506-mock-dsmc
Browse files Browse the repository at this point in the history
WIP: Bio 1506 mock dsmc
  • Loading branch information
mariya authored Oct 5, 2018
2 parents d1935e6 + 693eb8f commit 6c5379b
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 136 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ __pycache__
*.swp
*.swo
.idea/
data
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ A self contained (aiohttp) REST service that helps verify uploaded SNP&SEQ archi

The web service enqueues certain job functions in the RQ/Redis queue, where they get picked up by the separate RQ worker process.

The downloaded files are deleted on successful verification, and retained if any error occurs.

Trying it out
-------------

Expand All @@ -17,6 +19,30 @@ Try running it:
pipenv run ./archive-verify-ws -c=config/
pipenv run rq worker


Mock Downloading
----------------

If you are running this service locally and don't have IBM's dsmc client installed, you can skip the downloading step and verify an archive that is already on your machine.

To use this method:
- copy* an archive that has been pre-downloaded from PDC into the verify_root_dir set in app.yaml
- Delete or edit some files from the archive if you wish to trigger a validation error.
- in app.yml, set:


pdc_client: "MockPdcClient"

\*Note that the archive will be deleted from verify_root_dir on successful verification

#### Naming Conventions for Mock Download ####
Note that when an archive is downloaded from PDC using snpseq-archive-verify, the downloaded directory is formatted with the name of the archive plus the RQ job id, like so:

{verify_root_dir}/{archive_name}_{rq_job_id}

When mocking downloading, we search verify_root_dir for archive_name and use the first directory found, ignoring the rq_job_id.


Running tests
-------------

Expand Down
145 changes: 145 additions & 0 deletions archive_verify/pdc_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import fnmatch
import logging
import os
import re
import shutil
import subprocess

# Share pre-configured workers log
log = logging.getLogger('archive_verify.workers')


class PdcClient():
"""
Base class representing a PDC client.
Staging and production environments should instantiate PdcClient (default).
Local and testing environments should instantiate MockPdcClient.
"""
def __init__(self, archive_name, archive_pdc_path, archive_pdc_description, job_id, config):
"""
:param archive_name: The name of the archive we shall download
:param archive_pdc_path: The path in PDC TSM to the archive that we want to download
:param archive_pdc_description: The unique description that was used when uploading the archive to PDC
:param job_id: The current rq worker job id
:param config: A dict containing the apps configuration
"""
self.dest_root = config["verify_root_dir"]
self.dsmc_log_dir = config["dsmc_log_dir"]
self.whitelisted_warnings = config["whitelisted_warnings"]
self.archive_name = archive_name
self.archive_pdc_path = archive_pdc_path
self.archive_pdc_description = archive_pdc_description
self.job_id = job_id

def dest(self):
"""
:returns The unique path where the archive will be downloaded.
"""
return "{}_{}".format(os.path.join(self.dest_root, self.archive_name), self.job_id)

def download(self):
"""
Downloads the specified archive from PDC to a unique location.
:returns True if no errors or only whitelisted warnings were encountered, False otherwise
"""
log.info("Download_from_pdc started for {}".format(self.archive_pdc_path))
cmd = "export DSM_LOG={} && dsmc retr {}/ {}/ -subdir=yes -description='{}'".format(self.dsmc_log_dir,
self.archive_pdc_path,
self.dest(),
self.archive_pdc_description)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

dsmc_output, _ = p.communicate()
dsmc_exit_code = p.returncode

if dsmc_exit_code != 0:
return PdcClient._parse_dsmc_return_code(dsmc_exit_code, dsmc_output, self.whitelisted_warnings)

log.info("Download_from_pdc completed successfully for {}".format(self.archive_pdc_path))
return True

def downloaded_archive_path(self):
return os.path.join(self.dest(), self.archive_name)

def cleanup(self):
shutil.rmtree(self.dest())

@staticmethod
def _parse_dsmc_return_code(exit_code, output, whitelist):
"""
Parses the dsmc output when we've encountered a non-zero exit code. For some certain exit codes,
warnings and errors we still want to return successfully.
:param exit_code: The exit code received from the failing dsmc process
:param output: The text output from the dsmc process
:param whitelist: A list of whitelisted warnings
:returns True if only whitelisted warnings was encountered in the output, otherwise False
"""
log.info("DSMC process returned an error!")

# DSMC sets return code to 8 when a warning was encountered.
if exit_code == 8:
log.info("DSMC process actually returned a warning.")

output = output.splitlines()

# Search through the DSMC log and see if we only have
# whitelisted warnings. If that is the case, change the
# return code to 0 instead. Otherwise keep the error state.
warnings = []

for line in output:
matches = re.findall(r'ANS[0-9]+W', line)

for match in matches:
warnings.append(match)

log.info("Warnings found in DSMC output: {}".format(set(warnings)))

for warning in warnings:
if warning not in whitelist:
log.error("A non-whitelisted DSMC warning was encountered. Reporting it as an error! ('{}')".format(
warning))
return False

log.info("Only whitelisted DSMC warnings were encountered. Everything is OK.")
return True
else:
log.error("An uncaught DSMC error code was encountered!")
return False


class MockPdcClient(PdcClient):
"""
Instead of downloading the specified archive from PDC, the download method
checks verify_root_dir for a pre-downloaded archive with the specified name.
This can be used to test archive verification in environments where
dsmc cannot be easily installed, e.g. local development environments.
To use this method, copy an archive that has been pre-downloaded from PDC
into the verify_root_dir. Delete or edit some files from the archive if
you wish to trigger a validation error.
"""
def __init__(self, archive_name, archive_pdc_path, archive_pdc_description, job_id, config):
super().__init__(archive_name, archive_pdc_path, archive_pdc_description, job_id, config)

self.predownloaded_archive_path = ''

# Find a pre-downloaded archive with a matching name
for file in os.listdir(os.path.join(self.dest_root)):
if fnmatch.fnmatch(file, f'{self.archive_name}*'):
self.predownloaded_archive_path = file

def dest(self):
"""
:returns The path of the predownloaded archive.
"""
return os.path.join(self.dest_root, self.predownloaded_archive_path)

def download(self):
if not self.predownloaded_archive_path:
log.error(f"No archive containing the name {self.archive_name} found in {self.dest_root}")
return False
else:
log.info(f"Found pre-downloaded archive at {self.predownloaded_archive_path}")
return True
128 changes: 40 additions & 88 deletions archive_verify/workers.py
Original file line number Diff line number Diff line change
@@ -1,77 +1,13 @@
import logging
import re
import rq
import subprocess
import os
import datetime

log = logging.getLogger(__name__)

def _parse_dsmc_return_code(exit_code, output, whitelist):
"""
Parses the dsmc output when we've encountered a non-zero exit code. For some certain exit codes,
warnings and errors we still want to return successfully.
:param exit_code: The exit code recieved from the failing dsmc process
:param output: The text output from the dsmc process
:param whitelist: A list of whitelisted warnings
:returns True if only whitelisted warnings was encountered in the output, otherwise False
"""
log.debug("DSMC process returned an error!")

# DSMC sets return code to 8 when a warning was encountered.
if exit_code == 8:
log.debug("DSMC process actually returned a warning.")

output = output.splitlines()

# Search through the DSMC log and see if we only have
# whitelisted warnings. If that is the case, change the
# return code to 0 instead. Otherwise keep the error state.
warnings = []

for line in output:
matches = re.findall(r'ANS[0-9]+W', line)

for match in matches:
warnings.append(match)

log.debug("Warnings found in DSMC output: {}".format(set(warnings)))

for warning in warnings:
if warning not in whitelist:
log.error("A non-whitelisted DSMC warning was encountered. Reporting it as an error! ('{}')".format(warning))
return False

log.debug("Only whitelisted DSMC warnings were encountered. Everything is OK.")
return True
else:
log.error("An uncaught DSMC error code was encountered!")
return False

def download_from_pdc(archive, description, dest, dsmc_log_dir, whitelist):
"""
Downloads the specified archive from PDC to a unique location.
:param archive: The path in PDC TSM to the archive that we want to download
:param description: The unique description used when previsouly uploaded the archive
:param dest: The unique path where we shall store the downloaded archive
:param dsmc_log_dir: Path to dir where we can save dsmc logs
:param whitelist: A list of dsmc warnings which we shall allow to be generated by dsmc but still count the download as successful
:returns True if no errors or only whitelisted warnings were encountered, False otherwise
"""
log.debug("download_from_pdc started for {}".format(archive))
cmd = "export DSM_LOG={} && dsmc retr {}/ {}/ -subdir=yes -description='{}'".format(dsmc_log_dir, archive, dest, description)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
from archive_verify.pdc_client import PdcClient, MockPdcClient

dsmc_output, _ = p.communicate()
dsmc_exit_code = p.returncode

if dsmc_exit_code != 0:
return _parse_dsmc_return_code(dsmc_exit_code, dsmc_output, whitelist)
log = logging.getLogger(__name__)

log.debug("download_from_pdc completed successfully for {}".format(archive))
return True

def compare_md5sum(archive_dir):
"""
Expand All @@ -84,6 +20,7 @@ def compare_md5sum(archive_dir):
parent_dir = os.path.abspath(os.path.join(archive_dir, os.pardir))
md5_output = os.path.join(parent_dir, "compare_md5sum.out")
cmd = "cd {} && md5sum -c ./{} > {}".format(archive_dir, "checksums_prior_to_pdc.md5", md5_output)
log.debug(f"Executing verification: {cmd}...")
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

p.communicate()
Expand All @@ -93,48 +30,63 @@ def compare_md5sum(archive_dir):
else:
return True

def verify_archive(archive, archive_path, description, config):

def pdc_client_factory(config):
"""
Our main worker function. This will be put into the RQ/Redis queue when the /verify endpoint gets called.
Downloads the specified archive from PDC and then verifies the MD5 sums.
Determines which PDC Client should be used.
:param archive: The name of the archive we shall download
:param archive_path: The path to the archive on PDC
:param description: The unique description that was used when uploading the archive to PDC
:param config: A dict containing the apps configuration
:returns A JSON with the result that will be kept in the Redis queue
:returns A PDC Client.
"""
dest_root = config["verify_root_dir"]
dsmc_log_dir = config["dsmc_log_dir"]
whitelist = config["whitelisted_warnings"]
return MockPdcClient if config.get("pdc_client", "PdcClient") == "MockPdcClient" else PdcClient


def configure_log(dsmc_log_dir, archive_pdc_description):
now_str = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M')
log.setLevel(logging.DEBUG)
fh = logging.FileHandler(os.path.join(dsmc_log_dir, "{}-{}.log".format(description, now_str)))
fh = logging.FileHandler(os.path.join(dsmc_log_dir, "{}-{}.log".format(archive_pdc_description, now_str)))
fh.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
log.addHandler(fh)

log.debug("verify_archive started for {}".format(archive))

job_id = rq.get_current_job().id
dest = "{}_{}".format(os.path.join(dest_root, archive), job_id)
def verify_archive(archive_name, archive_pdc_path, archive_pdc_description, config):
"""
Our main worker function. This will be put into the RQ/Redis queue when the /verify endpoint gets called.
Downloads the specified archive from PDC and then verifies the MD5 sums.
download_ok = download_from_pdc(archive_path, description, dest, dsmc_log_dir, whitelist)
:param archive_name: The name of the archive we shall download
:param archive_pdc_path: The path in PDC TSM to the archive that we want to download
:param archive_pdc_description: The unique description that was used when uploading the archive to PDC
:param config: A dict containing the apps configuration
:returns A JSON with the result that will be kept in the Redis queue
"""
dsmc_log_dir = config["dsmc_log_dir"]
configure_log(dsmc_log_dir, archive_pdc_description)
log.debug("verify_archive started for {}".format(archive_name))

pdc_class = pdc_client_factory(config)
log.debug(f"Using PDC Client of type: {pdc_class.__name__}")

job_id = rq.get_current_job().id
pdc_client = pdc_class(archive_name, archive_pdc_path, archive_pdc_description, job_id, config)
dest = pdc_client.dest()
download_ok = pdc_client.download()

if not download_ok:
log.debug("Download of {} failed.".format(archive))
log.debug("Download of {} failed.".format(archive_name))
return {"state": "error", "msg": "failed to properly download archive from pdc", "path": dest}
else:
log.debug("verifying {}".format(archive))
archive = os.path.join(dest, archive)
log.debug("Verifying {}...".format(archive_name))
archive = pdc_client.downloaded_archive_path()
verified_ok = compare_md5sum(archive)
output_file = "{}/compare_md5sum.out".format(dest)

if verified_ok:
log.debug("Verify of {} succeeded.".format(archive))
return {"state": "done", "path": output_file, "msg": "sucessfully verified archive md5sums"}
log.info("Verify of {} succeeded.".format(archive))
pdc_client.cleanup()
return {"state": "done", "path": output_file, "msg": "Successfully verified archive md5sums."}
else:
log.debug("Verify of {} failed.".format(archive))
return {"state": "error", "path": output_file, "msg": "failed to verify archive md5sums"}
log.info("Verify of {} failed.".format(archive))
return {"state": "error", "path": output_file, "msg": "Failed to verify archive md5sums."}
3 changes: 3 additions & 0 deletions config/app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,6 @@ job_result_ttl: "-1" # maximum time to keep job result; -1 never expires
#
# See full list at e.g. https://www.ibm.com/support/knowledgecenter/en/SSGSG7_7.1.1/com.ibm.itsm.msgs.client.doc/msgs_client_list_intro.html
whitelisted_warnings: ["ANS1809W", "ANS2042W", "ANS2250W"]

# For local use; remove this in environments where the dsmc client is installed
pdc_client: "MockPdcClient"
Loading

0 comments on commit 6c5379b

Please sign in to comment.