-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfasta_validator.py
102 lines (67 loc) · 2.75 KB
/
fasta_validator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/env python
from Bio import SeqIO
from Bio.Seq import Seq
import argparse
import sys
import json
from jakomics import colors, utilities
import jak_utils
jak_utils.header()
# OPTIONS #####################################################################
parser = argparse.ArgumentParser(
description='Filters fasta files, writing to a new file appended with ".ff"')
parser.add_argument('-f', '--file',
help="Paths to individual fasta files",
required=True)
parser.add_argument('-m', '--mode',
help="id or description mode",
required=False,
default="description")
parser.add_argument('--remove_identical_sequences',
action='store_true',
help='Remove stop codon asterisks')
args = parser.parse_args()
if args.mode not in ["description", "id"]:
print(f"{colors.bcolors.RED}ERROR: --mode (-m) must be either 'description' or 'id' {colors.bcolors.END}", file=sys.stderr)
sys.exit()
# FUNCTIONS
def get_header(seq):
# chosen based off the --mode selection
if args.mode == 'description':
header = seq.description
elif args.mode == "id":
header = seq.id
return(header)
def validate_records(seqs):
deduped_records = {}
for seq in seqs:
header = get_header(seq)
if header not in deduped_records:
deduped_records[header] = [str(seq.seq)]
else:
deduped_records[header].append(str(seq.seq))
good_records = 0
bad_records = {'same_sequence': {},
'diff_sequence': {}}
for header, records in deduped_records.items():
if len(records) == 1:
good_records += 1
if len(records) > 1:
# check if the sequences are identical
if all(element == records[0] for element in records):
bad_records['same_sequence'][header] = records
else:
bad_records['diff_sequence'][header] = records
print(f"There are {len(seqs)} total records")
print(f"Of these, {good_records} look OK")
print(f"{len(bad_records['same_sequence'])} records are found more than once, but each record has the same sequence")
print(f"{len(bad_records['diff_sequence'])} records are found more than once, and there is more than one sequence")
with open("out.json", 'w') as outfile:
json.dump(bad_records, outfile, indent=2)
# MAIN ########################################################################
#pbar = tqdm(total=len(fasta_files), desc="Finished", unit=" fasta files")
if __name__ == "__main__":
seqs = []
for seq_record in SeqIO.parse(args.file, "fasta"):
seqs.append(seq_record)
validate_records(seqs)