From dd22153ac1859cfffdcda7e88f39c6359a255bf0 Mon Sep 17 00:00:00 2001 From: nsheff Date: Wed, 23 Aug 2023 21:43:06 -0400 Subject: [PATCH] refactor, validate, and utility functions --- seqcol/__init__.py | 14 +- seqcol/const.py | 3 + seqcol/exceptions.py | 10 ++ seqcol/schemas/seqcol.yaml | 1 + seqcol/seqcol.py | 330 +++++++++++++------------------------ seqcol/seqcol_client.py | 172 +++++++++++++++++++ seqcol/utilities.py | 72 +++----- tests/test_seqcol.py | 46 +++++- 8 files changed, 365 insertions(+), 283 deletions(-) create mode 100644 seqcol/exceptions.py create mode 100644 seqcol/seqcol_client.py diff --git a/seqcol/__init__.py b/seqcol/__init__.py index 8d7c732..e09ba0d 100644 --- a/seqcol/__init__.py +++ b/seqcol/__init__.py @@ -2,18 +2,12 @@ import logging -from ._version import __version__ -from .seqcol import SeqColClient -from .seqcol import ( - parse_fasta, - explain_flag, - fasta_to_seqcol, - build_sorted_name_length_pairs, -) - from .const import * +from .seqcol import * +from .seqcol_client import * from .utilities import * +from ._version import __version__ __classes__ = ["SeqColClient"] -__all__ = __classes__ + ["build_sorted_name_length_pairs"] +__all__ = (__classes__ + ["build_sorted_name_length_pairs", "compare", "validate_seqcol"],) diff --git a/seqcol/const.py b/seqcol/const.py index e77d692..987b644 100644 --- a/seqcol/const.py +++ b/seqcol/const.py @@ -46,3 +46,6 @@ def _schema_path(name): SCHEMA_NAMES = [ASL_NAME + ".yaml"] SCHEMA_FILEPATH = os.path.join(os.path.dirname(__file__), "schemas") INTERNAL_SCHEMAS = [_schema_path(s) for s in SCHEMA_NAMES] + +# Alias dict to make typehinting clearer +SeqCol = dict diff --git a/seqcol/exceptions.py b/seqcol/exceptions.py new file mode 100644 index 0000000..fd7bde0 --- /dev/null +++ b/seqcol/exceptions.py @@ -0,0 +1,10 @@ +class InvalidSeqColError(Exception): + """Object was not validated successfully according to schema.""" + + def __init__(self, message, errors): + super().__init__(message) + self.message = message + self.errors = errors + + def __str__(self): + return f"InvalidSeqColError ({self.message}): {self.errors}" diff --git a/seqcol/schemas/seqcol.yaml b/seqcol/schemas/seqcol.yaml index 46f6b5d..42a722b 100644 --- a/seqcol/schemas/seqcol.yaml +++ b/seqcol/schemas/seqcol.yaml @@ -21,6 +21,7 @@ properties: description: "Digests of sequences computed using the GA4GH digest algorithm (sha512t24u)." required: - lengths + - names inherent: - lengths - names diff --git a/seqcol/seqcol.py b/seqcol/seqcol.py index 10883a7..a46168d 100644 --- a/seqcol/seqcol.py +++ b/seqcol/seqcol.py @@ -1,4 +1,5 @@ import henge +import jsonschema import logging import os import pyfaidx @@ -7,8 +8,9 @@ from copy import copy from functools import reduce from itertools import compress +from typing import Callable -from .utilities import trunc512_digest +from .utilities import * from .const import * @@ -16,222 +18,6 @@ henge.ITEM_TYPE = "_item_type" -class SeqColClient(refget.RefGetClient): - """ - Extension of henge that accommodates collections of sequences. - """ - - def __init__( - self, - api_url_base=None, - database={}, - schemas=None, - henges=None, - checksum_function=trunc512_digest, - ): - """ - A user interface to insert and retrieve decomposable recursive unique - identifiers (DRUIDs). - - :param dict database: Dict-like lookup database with sequences - and hashes - :param dict schemas: One or more jsonschema schemas describing the - data types stored by this Henge - :param function(str) -> str checksum_function: Default function to - handle the digest of the - serialized items stored in this henge. - """ - super(SeqColClient, self).__init__( - api_url_base=api_url_base, - database=database, - schemas=schemas or INTERNAL_SCHEMAS, - henges=henges, - checksum_function=checksum_function, - ) - _LOGGER.info("Initializing SeqColClient") - - def load_fasta(self, fa_file, skip_seq=False, topology_default="linear"): - """ - Load a sequence collection into the database - - :param str fa_file: path to the FASTA file to parse and load - :param bool skip_seq: whether to disregard the actual sequences, - load just the names and lengths and topology - :param bool skip_seq: whether to disregard the actual sequences, - load just the names and lengths and topology - :param str topology_default: the default topology assigned to - every sequence - """ - # TODO: any systematic way infer topology from a FASTA file? - if topology_default not in KNOWN_TOPOS: - raise ValueError( - f"Invalid topology ({topology_default}). " f"Choose from: {','.join(KNOWN_TOPOS)}" - ) - fa_object = parse_fasta(fa_file) - aslist = [] - for k in fa_object.keys(): - seq = str(fa_object[k]) - aslist.append( - { - NAME_KEY: k, - LEN_KEY: len(seq), - TOPO_KEY: topology_default, - SEQ_KEY: {"" if skip_seq else SEQ_KEY: seq}, - } - ) - collection_checksum = self.insert(aslist, ASL_NAME) - _LOGGER.debug(f"Loaded {ASL_NAME}: {aslist}") - return collection_checksum, aslist - - def load_fasta2(self, fa_file, skip_seq=False, topology_default="linear"): - """ - Load a sequence collection into the database - - :param str fa_file: path to the FASTA file to parse and load - :param bool skip_seq: whether to disregard the actual sequences, - load just the names and lengths and topology - :param bool skip_seq: whether to disregard the actual sequences, - load just the names and lengths and topology - :param str topology_default: the default topology assigned to - every sequence - """ - # TODO: any systematic way infer topology from a FASTA file? - _LOGGER.info("Loading fasta file...") - fa_object = parse_fasta(fa_file) - aslist = [] - for k in fa_object.keys(): - seq = str(fa_object[k]) - _LOGGER.info("Loading key: {k} / Length: {l}...".format(k=k, l=len(seq))) - aslist.append( - { - NAME_KEY: k, - LEN_KEY: len(seq), - TOPO_KEY: topology_default, - SEQ_KEY: "" if skip_seq else seq, - } - ) - _LOGGER.info("Inserting into database...") - collection_checksum = self.insert(aslist, "RawSeqCol") - _LOGGER.debug(f"Loaded {ASL_NAME}: {aslist}") - return collection_checksum, aslist - - def compare_digests(self, digestA, digestB): - A = self.retrieve(digestA, reclimit=1) - B = self.retrieve(digestB, reclimit=1) - # _LOGGER.info(A) - # _LOGGER.info(B) - return self.compare(A, B) - - @staticmethod - def compare_elements(A, B): - """ - Compare two arrays between two arrays - """ - - A_filtered = list(filter(lambda x: x in B, A)) - B_filtered = list(filter(lambda x: x in A, B)) - A_count = len(A_filtered) - B_count = len(B_filtered) - overlap = min(len(A_filtered), len(B_filtered)) ## counts duplicates - - if A_count + B_count < 1: - # order match requires at least 2 matching elements - order = None - elif not (A_count == B_count == overlap): - # duplicated matches means order match is undefined - order = None - else: - order = A_filtered == B_filtered - return {"a-and-b": overlap, "a-and-b-same-order": order} - - @staticmethod - def compare(A, B): - """ - Workhorse comparison function - - @param A Sequence collection A - @param B Sequence collection B - @return dict Following formal seqcol specification comparison function return value - """ - all_keys = list(A.keys()) + list(set(B.keys()) - set(list(A.keys()))) - result = {} - return_obj = { - "arrays": {"a-only": [], "b-only": [], "a-and-b": []}, - "elements": { - "total": {"a": len(A["lengths"]), "b": len(B["lengths"])}, - "a-and-b": {}, - "a-and-b-same-order": {}, - }, - } - - for k in all_keys: - _LOGGER.info(k) - if k not in A: - result[k] = {"flag": -1} - return_obj["arrays"]["b-only"].append(k) - elif k not in B: - return_obj["arrays"]["a-only"].append(k) - else: - return_obj["arrays"]["a-and-b"].append(k) - res = SeqColClient.compare_elements(A[k], B[k]) - return_obj["elements"]["a-and-b"][k] = res["a-and-b"] - return_obj["elements"]["a-and-b-same-order"][k] = res["a-and-b-same-order"] - return return_obj - - def retrieve(self, druid, reclimit=None, raw=False): - try: - return super(SeqColClient, self).retrieve(druid, reclimit, raw) - except henge.NotFoundException as e: - _LOGGER.debug(e) - try: - return self.refget(druid) - except Exception as e: - _LOGGER.debug(e) - raise e - return henge.NotFoundException( - "{} not found in database, or in refget.".format(druid) - ) - - def load_fasta_from_refgenie(self, rgc, refgenie_key): - """ - @param rgc RefGenConf object - @param refgenie_key key of genome to load - @param scc SeqColClient object to load into - """ - filepath = rgc.seek(refgenie_key, "fasta") - return self.load_fasta_from_filepath(filepath) - - def load_fasta_from_filepath(self, filepath): - """ - @param filepath Path to fasta file - @param sc - """ - fa_object = parse_fasta(filepath) - SCAS = fasta_obj_to_seqcol(fa_object, digest_function=self.checksum_function) - digest = self.insert(SCAS, "SeqColArraySet", reclimit=1) - return { - "fa_file": filepath, - "fa_object": fa_object, - "SCAS": SCAS, - "digest": digest, - } - - def load_multiple_fastas(self, fasta_dict): - """ - Wrapper for load_fasta_from_filepath - - @param fasta_list - """ - results = {} - for name, path in fasta_dict.items(): - print(f"Processing fasta '{name}'' at path '{path}'...") - results[name] = self.load_fasta_from_filepath(path) - return results - - -# Static functions below (these don't require a database) - - def explain_flag(flag): """Explains a compare flag""" print(f"Flag: {flag}\nBinary: {bin(flag)}\n") @@ -260,9 +46,6 @@ def parse_fasta(fa_file) -> pyfaidx.Fasta: return pyfaidx.Fasta(f_out.name) -from typing import Callable - - def fasta_to_seqcol(fa_file_path: str) -> dict: """Given a fasta, return a canonical seqcol object""" fa_obj = parse_fasta(fa_file_path) @@ -314,3 +97,110 @@ def build_sorted_name_length_pairs(obj: dict, digest_function): nl_digests.sort() return nl_digests + + +def compare_seqcols(A: SeqCol, B: SeqCol): + """ + Workhorse comparison function + + @param A Sequence collection A + @param B Sequence collection B + @return dict Following formal seqcol specification comparison function return value + """ + validate_seqcol(A) # First ensure these are the right structure + validate_seqcol(B) + + all_keys = list(A.keys()) + list(set(B.keys()) - set(list(A.keys()))) + result = {} + return_obj = { + "arrays": {"a-only": [], "b-only": [], "a-and-b": []}, + "elements": { + "total": {"a": len(A["lengths"]), "b": len(B["lengths"])}, + "a-and-b": {}, + "a-and-b-same-order": {}, + }, + } + + for k in all_keys: + _LOGGER.info(k) + if k not in A: + result[k] = {"flag": -1} + return_obj["arrays"]["b-only"].append(k) + elif k not in B: + return_obj["arrays"]["a-only"].append(k) + else: + return_obj["arrays"]["a-and-b"].append(k) + res = compare_elements(A[k], B[k]) + return_obj["elements"]["a-and-b"][k] = res["a-and-b"] + return_obj["elements"]["a-and-b-same-order"][k] = res["a-and-b-same-order"] + return return_obj + + +def compare_elements(A: list, B: list): + """ + Compare elements between two arrays + """ + + A_filtered = list(filter(lambda x: x in B, A)) + B_filtered = list(filter(lambda x: x in A, B)) + A_count = len(A_filtered) + B_count = len(B_filtered) + overlap = min(len(A_filtered), len(B_filtered)) # counts duplicates + + if A_count + B_count < 1: + # order match requires at least 2 matching elements + order = None + elif not (A_count == B_count == overlap): + # duplicated matches means order match is undefined + order = None + else: + order = A_filtered == B_filtered + return {"a-and-b": overlap, "a-and-b-same-order": order} + + +def seqcol_digest(seqcol_obj: dict, schema: dict = None) -> str: + """ + Given a canonical sequence collection, compute its digest. + + :param dict seqcol_obj: Dictionary representation of a canonical sequence collection object + :param dict schema: Schema defining the inherent attributes to digest + :return str: The sequence collection digest + """ + validate_seqcol(seqcol_obj) + seqcol_obj2 = {} + for attribute in seqcol_obj: + seqcol_obj2[attribute] = canonical_str(seqcol_obj[attribute]) + + # Step 1a: Remove any non-inherent attributes, + # so that only the inherent attributes contribute to the digest. + if schema: + seqcol_obj2_filtered = {} + for k in schema["inherent"]: + seqcol_obj2_filtered[k] = seqcol_obj2[k] + else: + seqcol_obj2_filtered = seqcol_obj2 + + # Step 2: Apply RFC-8785 to canonicalize the value + # associated with each attribute individually. + seqcol_obj2 = {} + for attribute in seqcol_obj: + seqcol_obj2[attribute] = canonical_str(seqcol_obj[attribute]) + # seqcol_obj2 # visualize the result + + # Step 3: Digest each canonicalized attribute value + # using the GA4GH digest algorithm. + + seqcol_obj3 = {} + for attribute in seqcol_obj2: + seqcol_obj3[attribute] = trunc512_digest(seqcol_obj2[attribute]) + # print(json.dumps(seqcol_obj3, indent=2)) # visualize the result + + # Step 4: Apply RFC-8785 again to canonicalize the JSON + # of new seqcol object representation. + + seqcol_obj4 = canonical_str(seqcol_obj3) + # seqcol_obj4 # visualize the result + + # Step 5: Digest the final canonical representation again. + seqcol_digest = trunc512_digest(seqcol_obj4) + return seqcol_digest diff --git a/seqcol/seqcol_client.py b/seqcol/seqcol_client.py new file mode 100644 index 0000000..adeaf1f --- /dev/null +++ b/seqcol/seqcol_client.py @@ -0,0 +1,172 @@ +import henge +import logging +import refget + +from copy import copy +from functools import reduce +from itertools import compress + +from .const import * +from .seqcol import * +from .utilities import trunc512_digest + + +_LOGGER = logging.getLogger(__name__) +henge.ITEM_TYPE = "_item_type" + + +class SeqColClient(refget.RefGetClient): + """ + Extension of henge that accommodates collections of sequences. + """ + + def __init__( + self, + api_url_base=None, + database={}, + schemas=None, + henges=None, + checksum_function=trunc512_digest, + ): + """ + A user interface to insert and retrieve decomposable recursive unique + identifiers (DRUIDs). + + :param dict database: Dict-like lookup database with sequences + and hashes + :param dict schemas: One or more jsonschema schemas describing the + data types stored by this Henge + :param function(str) -> str checksum_function: Default function to + handle the digest of the + serialized items stored in this henge. + """ + super(SeqColClient, self).__init__( + api_url_base=api_url_base, + database=database, + schemas=schemas or INTERNAL_SCHEMAS, + henges=henges, + checksum_function=checksum_function, + ) + _LOGGER.info("Initializing SeqColClient") + + def load_fasta(self, fa_file, skip_seq=False, topology_default="linear"): + """ + Load a sequence collection into the database + + :param str fa_file: path to the FASTA file to parse and load + :param bool skip_seq: whether to disregard the actual sequences, + load just the names and lengths and topology + :param bool skip_seq: whether to disregard the actual sequences, + load just the names and lengths and topology + :param str topology_default: the default topology assigned to + every sequence + """ + # TODO: any systematic way infer topology from a FASTA file? + if topology_default not in KNOWN_TOPOS: + raise ValueError( + f"Invalid topology ({topology_default}). " f"Choose from: {','.join(KNOWN_TOPOS)}" + ) + fa_object = parse_fasta(fa_file) + aslist = [] + for k in fa_object.keys(): + seq = str(fa_object[k]) + aslist.append( + { + NAME_KEY: k, + LEN_KEY: len(seq), + TOPO_KEY: topology_default, + SEQ_KEY: {"" if skip_seq else SEQ_KEY: seq}, + } + ) + collection_checksum = self.insert(aslist, ASL_NAME) + _LOGGER.debug(f"Loaded {ASL_NAME}: {aslist}") + return collection_checksum, aslist + + def load_fasta2(self, fa_file, skip_seq=False, topology_default="linear"): + """ + Load a sequence collection into the database + + :param str fa_file: path to the FASTA file to parse and load + :param bool skip_seq: whether to disregard the actual sequences, + load just the names and lengths and topology + :param bool skip_seq: whether to disregard the actual sequences, + load just the names and lengths and topology + :param str topology_default: the default topology assigned to + every sequence + """ + # TODO: any systematic way infer topology from a FASTA file? + _LOGGER.info("Loading fasta file...") + fa_object = parse_fasta(fa_file) + aslist = [] + for k in fa_object.keys(): + seq = str(fa_object[k]) + _LOGGER.info("Loading key: {k} / Length: {l}...".format(k=k, l=len(seq))) + aslist.append( + { + NAME_KEY: k, + LEN_KEY: len(seq), + TOPO_KEY: topology_default, + SEQ_KEY: "" if skip_seq else seq, + } + ) + _LOGGER.info("Inserting into database...") + collection_checksum = self.insert(aslist, "RawSeqCol") + _LOGGER.debug(f"Loaded {ASL_NAME}: {aslist}") + return collection_checksum, aslist + + def compare_digests(self, digestA, digestB): + A = self.retrieve(digestA, reclimit=1) + B = self.retrieve(digestB, reclimit=1) + # _LOGGER.info(A) + # _LOGGER.info(B) + return compare(A, B) + + def retrieve(self, druid, reclimit=None, raw=False): + try: + return super(SeqColClient, self).retrieve(druid, reclimit, raw) + except henge.NotFoundException as e: + _LOGGER.debug(e) + try: + return self.refget(druid) + except Exception as e: + _LOGGER.debug(e) + raise e + return henge.NotFoundException( + "{} not found in database, or in refget.".format(druid) + ) + + def load_fasta_from_refgenie(self, rgc, refgenie_key): + """ + @param rgc RefGenConf object + @param refgenie_key key of genome to load + @param scc SeqColClient object to load into + """ + filepath = rgc.seek(refgenie_key, "fasta") + return self.load_fasta_from_filepath(filepath) + + def load_fasta_from_filepath(self, filepath): + """ + @param filepath Path to fasta file + @param sc + """ + fa_object = parse_fasta(filepath) + SCAS = fasta_obj_to_seqcol(fa_object, digest_function=self.checksum_function) + digest = self.insert(SCAS, "SeqColArraySet", reclimit=1) + return { + "fa_file": filepath, + "fa_object": fa_object, + "SCAS": SCAS, + "digest": digest, + } + + def load_multiple_fastas(self, fasta_dict): + """ + Wrapper for load_fasta_from_filepath + + @param fasta_list + """ + results = {} + for name, path in fasta_dict.items(): + print(f"Processing fasta '{name}'' at path '{path}'...") + results[name] = self.load_fasta_from_filepath(path) + return results diff --git a/seqcol/utilities.py b/seqcol/utilities.py index 09baabf..2162c11 100644 --- a/seqcol/utilities.py +++ b/seqcol/utilities.py @@ -1,6 +1,14 @@ import binascii import hashlib import json +import os + +from jsonschema import Draft7Validator +from typing import Optional +from yacman import load_yaml + +from .const import SeqCol +from .exceptions import * # Refget digests from published seqcol v1.0 protocol @@ -25,49 +33,21 @@ def print_csc(csc: dict) -> str: return print(json.dumps(csc, indent=2)) -def seqcol_digest(seqcol_obj: dict, schema: dict = None) -> str: - """ - Given a canonical sequence collection, compute its digest. - - :param dict seqcol_obj: Dictionary representation of a canonical sequence collection object - :param dict schema: Schema defining the inherent attributes to digest - :return str: The sequence collection digest - """ - - seqcol_obj2 = {} - for attribute in seqcol_obj: - seqcol_obj2[attribute] = canonical_str(seqcol_obj[attribute]) - - # Step 1a: Remove any non-inherent attributes, - # so that only the inherent attributes contribute to the digest. - if schema: - seqcol_obj2_filtered = {} - for k in schema["inherent"]: - seqcol_obj2_filtered[k] = seqcol_obj2[k] - else: - seqcol_obj2_filtered = seqcol_obj2 - - # Step 2: Apply RFC-8785 to canonicalize the value - # associated with each attribute individually. - seqcol_obj2 = {} - for attribute in seqcol_obj: - seqcol_obj2[attribute] = canonical_str(seqcol_obj[attribute]) - # seqcol_obj2 # visualize the result - - # Step 3: Digest each canonicalized attribute value - # using the GA4GH digest algorithm. - - seqcol_obj3 = {} - for attribute in seqcol_obj2: - seqcol_obj3[attribute] = trunc512_digest(seqcol_obj2[attribute]) - # print(json.dumps(seqcol_obj3, indent=2)) # visualize the result - - # Step 4: Apply RFC-8785 again to canonicalize the JSON - # of new seqcol object representation. - - seqcol_obj4 = canonical_str(seqcol_obj3) - # seqcol_obj4 # visualize the result - - # Step 5: Digest the final canonical representation again. - seqcol_digest = trunc512_digest(seqcol_obj4) - return seqcol_digest +# Simple true/false validation +def validate_seqcol_bool(seqcol_obj: SeqCol, schema=None) -> bool: + schema_path = os.path.join(os.path.dirname(__file__), "schemas", "seqcol.yaml") + schema = load_yaml(schema_path) + validator = Draft7Validator(schema) + return validator.is_valid(seqcol_obj) + + +# Get errors if invalid (use this one) +# Get the errors with exception.errors +def validate_seqcol(seqcol_obj: SeqCol, schema=None) -> Optional[dict]: + schema_path = os.path.join(os.path.dirname(__file__), "schemas", "seqcol.yaml") + schema = load_yaml(schema_path) + validator = Draft7Validator(schema) + if not validator.is_valid(seqcol_obj): + errors = sorted(validator.iter_errors(seqcol_obj), key=lambda e: e.path) + raise InvalidSeqColError("Validation failed", errors) + return True diff --git a/tests/test_seqcol.py b/tests/test_seqcol.py index 545f54c..cf45652 100644 --- a/tests/test_seqcol.py +++ b/tests/test_seqcol.py @@ -1,7 +1,10 @@ import json +import os import pytest -from seqcol import SeqColClient -from seqcol.const import * +import seqcol + +# from seqcol import SeqColClient, validate_seqcol, compare +# from seqcol.const import * DEMO_FILES = [ "demo0.fa", @@ -29,13 +32,13 @@ def test_no_schemas_required(self): In contrast to the generic Henge object, SeqColClient does not require schemas as input, they are predefined in the constructor """ - assert isinstance(SeqColClient(database={}), SeqColClient) + assert isinstance(seqcol.SeqColClient(database={}), seqcol.SeqColClient) class TestFastaInserting: @pytest.mark.parametrize("fasta_name", DEMO_FILES) def test_fasta_loading_works(self, fasta_name, fa_root): - scc = SeqColClient(database={}) + scc = seqcol.SeqColClient(database={}) f = os.path.join(fa_root, fasta_name) print("Fasta file to be loaded: {}".format(f)) res = scc.load_fasta(f) @@ -45,7 +48,7 @@ def test_fasta_loading_works(self, fasta_name, fa_root): class TestRetrieval: @pytest.mark.parametrize("fasta_name", DEMO_FILES) def test_retrieval_works(self, fasta_name, fa_root): - scc = SeqColClient(database={}) + scc = seqcol.SeqColClient(database={}) f = os.path.join(fa_root, fasta_name) print("Fasta file to be loaded: {}".format(f)) d, asds = scc.load_fasta(f) @@ -57,12 +60,12 @@ def test_retrieval_works(self, fasta_name, fa_root): def check_comparison(fasta1, fasta2, expected_comparison): print(f"Comparison: Fasta1: {fasta1} vs Fasta2: {fasta2}. Expected: {expected_comparison}") - scc = SeqColClient(database={}) + scc = seqcol.SeqColClient(database={}) d = scc.load_fasta_from_filepath(fasta1) d2 = scc.load_fasta_from_filepath(fasta2) with open(expected_comparison) as fp: correct_compare_response = json.load(fp) - proposed_compare_response = scc.compare(d["SCAS"], d2["SCAS"]) + proposed_compare_response = seqcol.compare_seqcols(d["SCAS"], d2["SCAS"]) print( json.dumps( proposed_compare_response, @@ -90,3 +93,32 @@ def test_fasta_compare(self, fasta1, fasta2, answer_file, fa_root): def test_names_lengths_order(self, fasta1, fasta2, answer_file, fa_root): """Does the names_lengths array correctly identify order variants""" check_comparison(os.path.join(fa_root, fasta1), os.path.join(fa_root, fasta2), answer_file) + + +seqcol_obj = { + "lengths": [248956422, 133797422, 135086622], + "names": ["chr1", "chr2", "chr3"], + "sequences": [ + "2648ae1bacce4ec4b6cf337dcae37816", + "907112d17fcb73bcab1ed1c72b97ce68", + "1511375dc2dd1b633af8cf439ae90cec", + ], +} + +bad_seqcol = {"bogus": True} + + +class TestValidate: + """ + Test validation + """ + + @pytest.mark.parametrize(["seqcol_obj"], [[seqcol_obj]]) + def test_validate(self, seqcol_obj): + is_valid = seqcol.validate_seqcol(seqcol_obj) + assert is_valid + + @pytest.mark.parametrize(["seqcol_obj"], [[bad_seqcol]]) + def test_failure(self, seqcol_obj): + with pytest.raises(Exception): + seqcol.validate_seqcol(seqcol_obj)