From 3ad823c32b79d02d2e1b3358a302a61a3c5f21bb Mon Sep 17 00:00:00 2001 From: Nick Moore Date: Wed, 24 Apr 2024 15:23:06 +1000 Subject: [PATCH] Fix variant plugin's offset parameter to actually work --- countess/plugins/variant.py | 12 ++++--- countess/utils/variant.py | 72 ++++++++++++++++++------------------- 2 files changed, 42 insertions(+), 42 deletions(-) diff --git a/countess/plugins/variant.py b/countess/plugins/variant.py index 0306cdf..c2bb555 100644 --- a/countess/plugins/variant.py +++ b/countess/plugins/variant.py @@ -20,10 +20,10 @@ class VariantPlugin(PandasTransformDictToDictPlugin): parameters = { "column": ColumnChoiceParam("Input Column", "sequence"), "reference": ColumnOrStringParam("Reference Sequence"), + "offset": IntegerParam("Reference Offset", 0), "output": StringParam("Output Column", "variant"), "max_mutations": IntegerParam("Max Mutations", 10), - "protein": StringParam("Protein Column", ""), - "offset": IntegerParam("Protein Offset", 0), + "protein": StringParam("Protein Column", "protein"), "max_protein": IntegerParam("Max Protein Variations", 10), "drop": BooleanParam("Drop unidentified variants", False), "drop_columns": BooleanParam("Drop Input Column(s)", False), @@ -36,13 +36,16 @@ def process_dict(self, data, logger: Logger) -> dict: return {} reference = self.parameters["reference"].get_value(data) + offset = self.parameters["offset"].value r = {} if self.parameters["output"].value: try: max_mutations = self.parameters["max_mutations"].value - r[self.parameters["output"].value] = find_variant_string("g.", reference, sequence, max_mutations, 0) + r[self.parameters["output"].value] = find_variant_string( + "g.", reference, sequence, max_mutations, offset=offset + ) except ValueError: pass except (TypeError, KeyError, IndexError) as exc: @@ -50,10 +53,9 @@ def process_dict(self, data, logger: Logger) -> dict: if self.parameters["protein"].value: try: - offset = self.parameters["offset"].value max_protein = self.parameters["max_protein"].value r[self.parameters["protein"].value] = find_variant_string( - "p.", reference, sequence, max_protein, offset + "p.", reference, sequence, max_protein, offset=offset ) except ValueError: pass diff --git a/countess/utils/variant.py b/countess/utils/variant.py index e9b5ada..ca3e469 100644 --- a/countess/utils/variant.py +++ b/countess/utils/variant.py @@ -202,8 +202,8 @@ def find_variant_dna(ref_seq: str, var_seq: str, offset: Optional[int] = 0) -> I """ - ref_seq = ref_seq.strip().upper()[offset:] - var_seq = var_seq.strip().upper()[offset:] + ref_seq = ref_seq.strip().upper() + var_seq = var_seq.strip().upper() if not re.match("[AGTCN]+$", ref_seq): raise ValueError("Invalid reference sequence") @@ -260,32 +260,32 @@ def find_variant_dna(ref_seq: str, var_seq: str, offset: Optional[int] = 0) -> I op2.tag = "equal" for opcode in opcodes: - src_start, src_end = opcode.src_start, opcode.src_end - src_seq = ref_seq[src_start:src_end] + src_seq = ref_seq[opcode.src_start : opcode.src_end] dest_seq = var_seq[opcode.dest_start : opcode.dest_end] + start, end = opcode.src_start + offset, opcode.src_end + offset if opcode.tag == "delete": assert dest_seq == "" # 'delete' opcode maps to HGVS 'del' operation if len(src_seq) == 1: - yield f"{src_start+1}del" + yield f"{start+1}del" else: - yield f"{src_start+1}_{src_end}del" + yield f"{start+1}_{end}del" elif opcode.tag == "insert": assert src_seq == "" # 'insert' opcode maps to either an HGVS 'dup' or 'ins' operation - if ref_seq[src_start - len(dest_seq) : src_start] == dest_seq: + if ref_seq[opcode.src_start - len(dest_seq) : opcode.src_start] == dest_seq: # This is a duplication of one or more symbols immediately # preceding this point. if len(dest_seq) == 1: - yield f"{src_start}dup" + yield f"{start}dup" else: - yield f"{src_start - len(dest_seq) + 1}_{src_start}dup" + yield f"{start - len(dest_seq) + 1}_{start}dup" else: inserted_sequence = search_for_sequence(ref_seq, dest_seq) - yield f"{src_start}_{src_start+1}ins{inserted_sequence}" + yield f"{start}_{start+1}ins{inserted_sequence}" elif opcode.tag == "replace": # 'replace' opcode maps to either an HGVS '>' (single substitution) or @@ -296,12 +296,12 @@ def find_variant_dna(ref_seq: str, var_seq: str, offset: Optional[int] = 0) -> I # as this code has no concept of amino acid alignment. if len(src_seq) == 1 and len(dest_seq) == 1: - yield f"{src_start+1}{src_seq}>{dest_seq}" + yield f"{start+1}{src_seq}>{dest_seq}" elif len(src_seq) == len(dest_seq) and dest_seq == reverse_complement(src_seq): - yield f"{src_start+1}_{src_end}inv" + yield f"{start+1}_{end}inv" else: inserted_sequence = search_for_sequence(ref_seq, dest_seq) - yield f"{src_start+1}_{src_end}delins{inserted_sequence}" + yield f"{start+1}_{end}delins{inserted_sequence}" def find_variant_protein(ref_seq: str, var_seq: str, offset: Optional[int] = 0): @@ -356,8 +356,6 @@ def find_variant_protein(ref_seq: str, var_seq: str, offset: Optional[int] = 0): >>> list(find_variant_protein("ATGGTTTGGTAG", "ATGGTTTAGTAG")) ['Trp3Ter'] - Offset lets you set the frame offset (0, 1 or 2, practically) - """ ref_seq = ref_seq.strip().upper() @@ -369,8 +367,10 @@ def find_variant_protein(ref_seq: str, var_seq: str, offset: Optional[int] = 0): if not re.match("[AGTCN]+$", var_seq): raise ValueError("Invalid variant sequence") - ref_pro = translate_dna(ref_seq[offset:])[0] - var_pro = translate_dna(var_seq[offset:])[0] + frame = (3 - offset) % 3 + ref_pro = translate_dna(ref_seq[frame:])[0] + var_pro = translate_dna(var_seq[frame:])[0] + offset = (offset + 2) // 3 # cut protein translations off at first '*' (terminator) if "*" in ref_pro: @@ -378,49 +378,48 @@ def find_variant_protein(ref_seq: str, var_seq: str, offset: Optional[int] = 0): if "*" in var_pro: var_pro = var_pro[: var_pro.find("*") + 1] - def _ref(ref_offset): - return f"{AA_CODES[ref_pro[ref_offset]]}{ref_offset+1}" + def _ref(pos): + return f"{AA_CODES[ref_pro[pos]]}{pos+1+offset}" opcodes = list(levenshtein_opcodes(ref_pro, var_pro)) for opcode in opcodes: - src_start, src_end = opcode.src_start, opcode.src_end - src_pro = ref_pro[src_start:src_end] + start, end = opcode.src_start, opcode.src_end dest_pro = var_pro[opcode.dest_start : opcode.dest_end] if opcode.tag == "delete": assert dest_pro == "" - if len(ref_pro) > src_end and ref_pro[src_end] == "*": + if len(ref_pro) > end and ref_pro[end] == "*": # if the codon just after this deletion is a terminator, # consider this an early termination. - yield f"{_ref(src_start)}Ter" + yield f"{_ref(start)}Ter" return - if len(src_pro) == 1: - yield f"{_ref(src_start)}del" + if end - start == 1: + yield f"{_ref(start)}del" else: - yield f"{_ref(src_start)}_{_ref(src_end-1)}del" + yield f"{_ref(start)}_{_ref(end-1)}del" elif opcode.tag == "insert": - assert src_pro == "" + assert start == end - if ref_pro[src_start - len(dest_pro) : src_start] == dest_pro: + if ref_pro[start - len(dest_pro) : start] == dest_pro: if len(dest_pro) == 1: - yield f"{_ref(src_start-1)}dup" + yield f"{_ref(start-1)}dup" else: - yield f"{_ref(src_start-len(dest_pro))}_{_ref(src_start)}dup" - elif src_start == len(ref_pro): + yield f"{_ref(start-len(dest_pro))}_{_ref(start)}dup" + elif start == len(ref_pro): # 'extension', not quite standards compliant - yield f"{_ref(src_start-1)}ext{translate_aa(dest_pro)}" + yield f"{_ref(start-1)}ext{translate_aa(dest_pro)}" else: - yield f"{_ref(src_start-1)}_{_ref(src_end)}ins{translate_aa(dest_pro)}" + yield f"{_ref(start-1)}_{_ref(end)}ins{translate_aa(dest_pro)}" elif opcode.tag == "replace": # XXX handle extension if src_pro[-1] == '*' - if len(src_pro) == 1 and len(dest_pro) == 1: - yield f"{_ref(src_start)}{translate_aa(dest_pro)}" + if end - start == 1 and len(dest_pro) == 1: + yield f"{_ref(start)}{translate_aa(dest_pro)}" else: - yield f"{_ref(src_start)}_{_ref(src_end-1)}delins{translate_aa(dest_pro)}" + yield f"{_ref(start)}_{_ref(end-1)}delins{translate_aa(dest_pro)}" # If the variant protein terminated, stop translating now: if dest_pro[-1] == "*": @@ -490,7 +489,6 @@ def find_variant_string( Traceback (most recent call last): ... ValueError: Too many variations (2) in GATTACA - """ if prefix.endswith("g.") and not prefix.endswith("n."):