From 3dd8ab30d67c9f7ec43f62c7a0dd15b72944aebb Mon Sep 17 00:00:00 2001 From: nsheff Date: Wed, 23 Aug 2023 22:03:14 -0400 Subject: [PATCH] cleanup --- seqcol/__init__.py | 4 ---- seqcol/const.py | 1 - seqcol/seqcol.py | 46 +++++++++++++++------------------------------ seqcol/utilities.py | 2 +- 4 files changed, 16 insertions(+), 37 deletions(-) diff --git a/seqcol/__init__.py b/seqcol/__init__.py index e09ba0d..c67de6c 100644 --- a/seqcol/__init__.py +++ b/seqcol/__init__.py @@ -1,7 +1,3 @@ -# Project configuration, particularly for logging. - -import logging - from .const import * from .seqcol import * from .seqcol_client import * diff --git a/seqcol/const.py b/seqcol/const.py index 987b644..086c129 100644 --- a/seqcol/const.py +++ b/seqcol/const.py @@ -1,6 +1,5 @@ import os - def _schema_path(name): return os.path.join(SCHEMA_FILEPATH, name) diff --git a/seqcol/seqcol.py b/seqcol/seqcol.py index a46168d..4c0ed28 100644 --- a/seqcol/seqcol.py +++ b/seqcol/seqcol.py @@ -1,13 +1,7 @@ import henge -import jsonschema import logging -import os import pyfaidx -import refget -from copy import copy -from functools import reduce -from itertools import compress from typing import Callable from .utilities import * @@ -37,7 +31,6 @@ def parse_fasta(fa_file) -> pyfaidx.Fasta: # gunzip the file into a temporary one and read it in not to interfere # with the original one. from gzip import open as gzopen - from shutil import copyfileobj from tempfile import NamedTemporaryFile with gzopen(fa_file, "rt") as f_in, NamedTemporaryFile(mode="w+t", suffix=".fa") as f_out: @@ -55,7 +48,7 @@ def fasta_to_seqcol(fa_file_path: str) -> dict: def fasta_obj_to_seqcol( fa_object: pyfaidx.Fasta, verbose: bool = True, - digest_function: Callable[[str], str] = henge.md5, + digest_function: Callable[[str], str] = trunc512_digest, ) -> dict: """ Given a fasta object, return a CSC (Canonical Sequence Collection object) @@ -76,7 +69,7 @@ def fasta_obj_to_seqcol( seq_name = fa_object[k].name seq_digest = digest_function(seq.upper()) snlp = {"length": seq_length, "name": seq_name} # sorted_name_length_pairs - snlp_digest = digest_function(henge.canonical_str(snlp)) + snlp_digest = digest_function(canonical_str(snlp)) CSC["lengths"].append(seq_length) CSC["names"].append(seq_name) CSC["sorted_name_length_pairs"].append(snlp_digest) @@ -93,7 +86,7 @@ def build_sorted_name_length_pairs(obj: dict, digest_function): sorted_name_length_pairs.append({"length": obj["lengths"][i], "name": obj["names"][i]}) nl_digests = [] for i in range(len(sorted_name_length_pairs)): - nl_digests.append(digest_function(henge.canonical_str(sorted_name_length_pairs[i]))) + nl_digests.append(digest_function(canonical_str(sorted_name_length_pairs[i]))) nl_digests.sort() return nl_digests @@ -130,15 +123,15 @@ def compare_seqcols(A: SeqCol, B: SeqCol): return_obj["arrays"]["a-only"].append(k) else: return_obj["arrays"]["a-and-b"].append(k) - res = compare_elements(A[k], B[k]) + res = _compare_elements(A[k], B[k]) return_obj["elements"]["a-and-b"][k] = res["a-and-b"] return_obj["elements"]["a-and-b-same-order"][k] = res["a-and-b-same-order"] return return_obj -def compare_elements(A: list, B: list): +def _compare_elements(A: list, B: list): """ - Compare elements between two arrays + Compare elements between two arrays. Helper function for individual elements used by workhorse compare_seqcols function """ A_filtered = list(filter(lambda x: x in B, A)) @@ -158,7 +151,7 @@ def compare_elements(A: list, B: list): return {"a-and-b": overlap, "a-and-b-same-order": order} -def seqcol_digest(seqcol_obj: dict, schema: dict = None) -> str: +def seqcol_digest(seqcol_obj: SeqCol, schema: dict = None) -> str: """ Given a canonical sequence collection, compute its digest. @@ -166,27 +159,19 @@ def seqcol_digest(seqcol_obj: dict, schema: dict = None) -> str: :param dict schema: Schema defining the inherent attributes to digest :return str: The sequence collection digest """ - validate_seqcol(seqcol_obj) - seqcol_obj2 = {} - for attribute in seqcol_obj: - seqcol_obj2[attribute] = canonical_str(seqcol_obj[attribute]) + validate_seqcol(seqcol_obj) # Step 1a: Remove any non-inherent attributes, # so that only the inherent attributes contribute to the digest. + seqcol_obj2 = {} if schema: - seqcol_obj2_filtered = {} for k in schema["inherent"]: - seqcol_obj2_filtered[k] = seqcol_obj2[k] - else: - seqcol_obj2_filtered = seqcol_obj2 - - # Step 2: Apply RFC-8785 to canonicalize the value - # associated with each attribute individually. - seqcol_obj2 = {} - for attribute in seqcol_obj: - seqcol_obj2[attribute] = canonical_str(seqcol_obj[attribute]) - # seqcol_obj2 # visualize the result - + # Step 2: Apply RFC-8785 to canonicalize the value + # associated with each attribute individually. + seqcol_obj2[k] = canonical_str(seqcol_obj[k]) + else: # no schema provided, so assume all attributes are inherent + for k in seqcol_obj: + seqcol_obj2[k] = canonical_str(seqcol_obj[k]) # Step 3: Digest each canonicalized attribute value # using the GA4GH digest algorithm. @@ -199,7 +184,6 @@ def seqcol_digest(seqcol_obj: dict, schema: dict = None) -> str: # of new seqcol object representation. seqcol_obj4 = canonical_str(seqcol_obj3) - # seqcol_obj4 # visualize the result # Step 5: Digest the final canonical representation again. seqcol_digest = trunc512_digest(seqcol_obj4) diff --git a/seqcol/utilities.py b/seqcol/utilities.py index 2162c11..d8d5c97 100644 --- a/seqcol/utilities.py +++ b/seqcol/utilities.py @@ -41,7 +41,7 @@ def validate_seqcol_bool(seqcol_obj: SeqCol, schema=None) -> bool: return validator.is_valid(seqcol_obj) -# Get errors if invalid (use this one) +# Get errors if invalid (use this one) # Get the errors with exception.errors def validate_seqcol(seqcol_obj: SeqCol, schema=None) -> Optional[dict]: schema_path = os.path.join(os.path.dirname(__file__), "schemas", "seqcol.yaml")