-
Notifications
You must be signed in to change notification settings - Fork 0
/
auth_log_parser.py
121 lines (101 loc) · 3.93 KB
/
auth_log_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# stdlib
from datetime import datetime, date
import argparse
import re
import pprint
# validate date from argument
def valid_date(string):
try:
return datetime.strptime(string, "%Y-%m-%d").date()
except ValueError:
msg = f"Invalid date: '{string}'."
raise argparse.ArgumentTypeError(msg)
# parse date from a line
def parse_date(line):
string_date = re.search(r'^[a-zA-Z]{3}(\s+)[\d]{1,2}', line)
return datetime.strptime(f"{date.today().year} {string_date.group(0)}", "%Y %b %d").date().strftime("%Y-%m-%d")
# parse user from a line
def parse_user(line):
return re.search(r'for (\binvalid\suser\s)?(\w+)', line).group(2)
# parse getaddr from a line
def parse_addr(line):
return re.search(r'for ([\w\.-]+)\s', line).group(1)
# parse ipv4 from a line containing failed password text
def parse_fails_ipv4(line):
return re.search(r'(\bfrom\s)((\d{1,3}\.){3}\d{1,3})', line).group(2)
# parse ipv4 from a line containing reverse mapping text
def parse_reverse_mapping_ipv4(line):
return re.search(r'\[((\d{1,3}\.){3}\d{1,3})\]', line).group(1)
def get_fails_or_addrs(args, is_reverse_mapping):
results = {}
pattern = "reverse mapping checking " if is_reverse_mapping else "Failed password for "
for line in args.file:
# filter by pattern
if pattern in line:
# parse data date, user or uri, ip address
on_date = parse_date(line)
if args.date and on_date != str(args.date):
continue
main_key = parse_addr(
line) if is_reverse_mapping else parse_user(line)
from_ip = parse_reverse_mapping_ipv4(
line) if is_reverse_mapping else parse_fails_ipv4(line)
# increase counter or add results
if on_date in results:
results_on_date = results[on_date]
if main_key in results_on_date:
user_data = results_on_date[main_key]
if 'TOTAL' in user_data:
user_data['TOTAL'] += 1
else:
user_data['TOTAL'] = 1
if 'IPLIST' in user_data:
iplist = user_data['IPLIST']
if from_ip in iplist:
iplist[from_ip] += 1
else:
iplist[from_ip] = 1
else:
iplist = {
from_ip: 1
}
user_data['IPLIST'] = iplist
else:
user_data = {
'TOTAL': 1,
'IPLIST': {
from_ip: 1
}
}
results_on_date[main_key] = user_data
results[on_date] = results_on_date
else:
results[on_date] = {
main_key: {
'TOTAL': 1,
'IPLIST': {
from_ip: 1
}
}
}
return results
def main():
pp = pprint.PrettyPrinter(indent=4)
# parse argument
parser = argparse.ArgumentParser(
description='Parsing for “Failed password” and “reverse mapping” attempts distributed by IP addresses')
parser.add_argument("-d",
"--date",
help="Specific date data - format YYYY-MM-DD",
required=False,
type=valid_date)
parser.add_argument('--file', type=argparse.FileType('r'),
help="file path")
args = parser.parse_args()
# print results
pp.pprint(get_fails_or_addrs(args, is_reverse_mapping=False))
args.file.seek(0)
pp.pprint(get_fails_or_addrs(args, is_reverse_mapping=True))
args.file.close()
if __name__ == "__main__":
main()