Skip to content

Commit

Permalink
Fix variant plugin's offset parameter to actually work
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzoic committed Apr 24, 2024
1 parent c6bae2c commit 3ad823c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 42 deletions.
12 changes: 7 additions & 5 deletions countess/plugins/variant.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ class VariantPlugin(PandasTransformDictToDictPlugin):
parameters = {
"column": ColumnChoiceParam("Input Column", "sequence"),
"reference": ColumnOrStringParam("Reference Sequence"),
"offset": IntegerParam("Reference Offset", 0),
"output": StringParam("Output Column", "variant"),
"max_mutations": IntegerParam("Max Mutations", 10),
"protein": StringParam("Protein Column", ""),
"offset": IntegerParam("Protein Offset", 0),
"protein": StringParam("Protein Column", "protein"),
"max_protein": IntegerParam("Max Protein Variations", 10),
"drop": BooleanParam("Drop unidentified variants", False),
"drop_columns": BooleanParam("Drop Input Column(s)", False),
Expand All @@ -36,24 +36,26 @@ def process_dict(self, data, logger: Logger) -> dict:
return {}

reference = self.parameters["reference"].get_value(data)
offset = self.parameters["offset"].value

r = {}

if self.parameters["output"].value:
try:
max_mutations = self.parameters["max_mutations"].value
r[self.parameters["output"].value] = find_variant_string("g.", reference, sequence, max_mutations, 0)
r[self.parameters["output"].value] = find_variant_string(
"g.", reference, sequence, max_mutations, offset=offset
)
except ValueError:
pass
except (TypeError, KeyError, IndexError) as exc:
logger.exception(exc)

if self.parameters["protein"].value:
try:
offset = self.parameters["offset"].value
max_protein = self.parameters["max_protein"].value
r[self.parameters["protein"].value] = find_variant_string(
"p.", reference, sequence, max_protein, offset
"p.", reference, sequence, max_protein, offset=offset
)
except ValueError:
pass
Expand Down
72 changes: 35 additions & 37 deletions countess/utils/variant.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ def find_variant_dna(ref_seq: str, var_seq: str, offset: Optional[int] = 0) -> I
"""

ref_seq = ref_seq.strip().upper()[offset:]
var_seq = var_seq.strip().upper()[offset:]
ref_seq = ref_seq.strip().upper()
var_seq = var_seq.strip().upper()

if not re.match("[AGTCN]+$", ref_seq):
raise ValueError("Invalid reference sequence")
Expand Down Expand Up @@ -260,32 +260,32 @@ def find_variant_dna(ref_seq: str, var_seq: str, offset: Optional[int] = 0) -> I
op2.tag = "equal"

for opcode in opcodes:
src_start, src_end = opcode.src_start, opcode.src_end
src_seq = ref_seq[src_start:src_end]
src_seq = ref_seq[opcode.src_start : opcode.src_end]
dest_seq = var_seq[opcode.dest_start : opcode.dest_end]
start, end = opcode.src_start + offset, opcode.src_end + offset

if opcode.tag == "delete":
assert dest_seq == ""
# 'delete' opcode maps to HGVS 'del' operation
if len(src_seq) == 1:
yield f"{src_start+1}del"
yield f"{start+1}del"
else:
yield f"{src_start+1}_{src_end}del"
yield f"{start+1}_{end}del"

elif opcode.tag == "insert":
assert src_seq == ""
# 'insert' opcode maps to either an HGVS 'dup' or 'ins' operation

if ref_seq[src_start - len(dest_seq) : src_start] == dest_seq:
if ref_seq[opcode.src_start - len(dest_seq) : opcode.src_start] == dest_seq:
# This is a duplication of one or more symbols immediately
# preceding this point.
if len(dest_seq) == 1:
yield f"{src_start}dup"
yield f"{start}dup"
else:
yield f"{src_start - len(dest_seq) + 1}_{src_start}dup"
yield f"{start - len(dest_seq) + 1}_{start}dup"
else:
inserted_sequence = search_for_sequence(ref_seq, dest_seq)
yield f"{src_start}_{src_start+1}ins{inserted_sequence}"
yield f"{start}_{start+1}ins{inserted_sequence}"

elif opcode.tag == "replace":
# 'replace' opcode maps to either an HGVS '>' (single substitution) or
Expand All @@ -296,12 +296,12 @@ def find_variant_dna(ref_seq: str, var_seq: str, offset: Optional[int] = 0) -> I
# as this code has no concept of amino acid alignment.

if len(src_seq) == 1 and len(dest_seq) == 1:
yield f"{src_start+1}{src_seq}>{dest_seq}"
yield f"{start+1}{src_seq}>{dest_seq}"
elif len(src_seq) == len(dest_seq) and dest_seq == reverse_complement(src_seq):
yield f"{src_start+1}_{src_end}inv"
yield f"{start+1}_{end}inv"
else:
inserted_sequence = search_for_sequence(ref_seq, dest_seq)
yield f"{src_start+1}_{src_end}delins{inserted_sequence}"
yield f"{start+1}_{end}delins{inserted_sequence}"


def find_variant_protein(ref_seq: str, var_seq: str, offset: Optional[int] = 0):
Expand Down Expand Up @@ -356,8 +356,6 @@ def find_variant_protein(ref_seq: str, var_seq: str, offset: Optional[int] = 0):
>>> list(find_variant_protein("ATGGTTTGGTAG", "ATGGTTTAGTAG"))
['Trp3Ter']
Offset lets you set the frame offset (0, 1 or 2, practically)
"""

ref_seq = ref_seq.strip().upper()
Expand All @@ -369,58 +367,59 @@ def find_variant_protein(ref_seq: str, var_seq: str, offset: Optional[int] = 0):
if not re.match("[AGTCN]+$", var_seq):
raise ValueError("Invalid variant sequence")

ref_pro = translate_dna(ref_seq[offset:])[0]
var_pro = translate_dna(var_seq[offset:])[0]
frame = (3 - offset) % 3
ref_pro = translate_dna(ref_seq[frame:])[0]
var_pro = translate_dna(var_seq[frame:])[0]
offset = (offset + 2) // 3

# cut protein translations off at first '*' (terminator)
if "*" in ref_pro:
ref_pro = ref_pro[: ref_pro.find("*") + 1]
if "*" in var_pro:
var_pro = var_pro[: var_pro.find("*") + 1]

def _ref(ref_offset):
return f"{AA_CODES[ref_pro[ref_offset]]}{ref_offset+1}"
def _ref(pos):
return f"{AA_CODES[ref_pro[pos]]}{pos+1+offset}"

opcodes = list(levenshtein_opcodes(ref_pro, var_pro))

for opcode in opcodes:
src_start, src_end = opcode.src_start, opcode.src_end
src_pro = ref_pro[src_start:src_end]
start, end = opcode.src_start, opcode.src_end
dest_pro = var_pro[opcode.dest_start : opcode.dest_end]

if opcode.tag == "delete":
assert dest_pro == ""
if len(ref_pro) > src_end and ref_pro[src_end] == "*":
if len(ref_pro) > end and ref_pro[end] == "*":
# if the codon just after this deletion is a terminator,
# consider this an early termination.
yield f"{_ref(src_start)}Ter"
yield f"{_ref(start)}Ter"
return
if len(src_pro) == 1:
yield f"{_ref(src_start)}del"
if end - start == 1:
yield f"{_ref(start)}del"
else:
yield f"{_ref(src_start)}_{_ref(src_end-1)}del"
yield f"{_ref(start)}_{_ref(end-1)}del"

elif opcode.tag == "insert":
assert src_pro == ""
assert start == end

if ref_pro[src_start - len(dest_pro) : src_start] == dest_pro:
if ref_pro[start - len(dest_pro) : start] == dest_pro:
if len(dest_pro) == 1:
yield f"{_ref(src_start-1)}dup"
yield f"{_ref(start-1)}dup"
else:
yield f"{_ref(src_start-len(dest_pro))}_{_ref(src_start)}dup"
elif src_start == len(ref_pro):
yield f"{_ref(start-len(dest_pro))}_{_ref(start)}dup"
elif start == len(ref_pro):
# 'extension', not quite standards compliant
yield f"{_ref(src_start-1)}ext{translate_aa(dest_pro)}"
yield f"{_ref(start-1)}ext{translate_aa(dest_pro)}"
else:
yield f"{_ref(src_start-1)}_{_ref(src_end)}ins{translate_aa(dest_pro)}"
yield f"{_ref(start-1)}_{_ref(end)}ins{translate_aa(dest_pro)}"

elif opcode.tag == "replace":
# XXX handle extension if src_pro[-1] == '*'

if len(src_pro) == 1 and len(dest_pro) == 1:
yield f"{_ref(src_start)}{translate_aa(dest_pro)}"
if end - start == 1 and len(dest_pro) == 1:
yield f"{_ref(start)}{translate_aa(dest_pro)}"
else:
yield f"{_ref(src_start)}_{_ref(src_end-1)}delins{translate_aa(dest_pro)}"
yield f"{_ref(start)}_{_ref(end-1)}delins{translate_aa(dest_pro)}"

# If the variant protein terminated, stop translating now:
if dest_pro[-1] == "*":
Expand Down Expand Up @@ -490,7 +489,6 @@ def find_variant_string(
Traceback (most recent call last):
...
ValueError: Too many variations (2) in GATTACA
"""

if prefix.endswith("g.") and not prefix.endswith("n."):
Expand Down

0 comments on commit 3ad823c

Please sign in to comment.