-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfasta_filter.py
executable file
·92 lines (62 loc) · 2.36 KB
/
fasta_filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python
from Bio import SeqIO
from Bio.Seq import Seq
import argparse
import re
import os
from tqdm import tqdm
from jakomics import colors, utilities
import jak_utils
jak_utils.header()
# OPTIONS #####################################################################
parser = argparse.ArgumentParser(
description='Filters fasta files, writing to a new file appended with ".ff"')
parser.add_argument('--in_dir',
help="Directory with fasta files",
required=False,
default="")
parser.add_argument('-f', '--files',
help="Paths to individual fasta files",
nargs='*',
required=False,
default=[])
parser.add_argument('--min_length',
default=1,
help="Length",
type=int)
parser.add_argument('--remove_trailing_asterisks',
action='store_true',
help='Remove stop codon asterisks')
args = parser.parse_args()
# FUNCTIONS ###################################################################
def remove_trailing_asterisks(seqs):
asterisks_removed = []
for seq in seqs:
if seq.seq.endswith("*"):
seq.seq = Seq(re.sub(r"\*$", "", str(seq.seq)))
asterisks_removed.append(seq)
return asterisks_removed
def length_check(seqs):
length_checked = []
for seq in seqs:
if len(str(seq.seq)) >= args.min_length:
length_checked.append(seq)
return length_checked
def write_fasta(seqs, ff):
base_dir = os.path.dirname(ff.file_path)
# for seq in seqs:
# print(seq.id, seq.seq)
SeqIO.write(seqs, os.path.join(base_dir, ff.new_name), "fasta")
# MAIN ########################################################################
fasta_files = utilities.get_files(args.files, args.in_dir, ['faa', 'fa', 'ffn', 'fasta'])
pbar = tqdm(total=len(fasta_files), desc="Finished", unit=" fasta files")
for fasta_file in fasta_files:
fasta_file.new_name = f'{fasta_file.short_name}.ff{fasta_file.suffix}'
seqs = []
for seq_record in SeqIO.parse(fasta_file.file_path, "fasta"):
seqs.append(seq_record)
if args.remove_trailing_asterisks:
seqs = remove_trailing_asterisks(seqs)
seqs = length_check(seqs)
write_fasta(seqs, fasta_file)
pbar.update(1)