This repository has been archived by the owner on Aug 11, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
128 lines (110 loc) · 3.53 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
Script written by Arnold DECHAMPS for the RIPE LABS article :
"""
import io, re
import dns.resolver
import dns.rrset
import asyncio
from dns.asyncresolver import Resolver
from typing import Tuple
def filemanager():
"""
Opens the file and reads every line to a list
:returns the list of all lines:
"""
dbobjects = io.open("ripe.db.domain", "r", encoding='latin-1')
rawlines = dbobjects.readlines()
for i in range(len(rawlines)):
rawlines[i] = rawlines[i].strip("\n")
print("File reading done.")
return rawlines
def sorter(rawlines):
"""
Takes the Data of all the object lines and makes them processable in a form of list/dictionary
:param rawlines:
:return sorted-data:
"""
objects = []
dns = []
dnssec = []
for line in rawlines:
if re.match(r'^\s*domain:', line) is not None:
if len(dns) != 0:
objects.append({"domain": domain, "nameserver": dns, "dnssec": dnssec})
dnssec = []
dns = []
domain = line.replace("domain: ", "")
elif re.match(r'^\s*nserver:', line) is not None:
dns.append(line.replace("nserver: ", ""))
elif re.match(r'^\s*ds-rdata:', line) is not None:
dnssec.append(line.replace("ds-rdata: ", ""))
print("File sorted.")
return objects
def statmaker(sorteddata):
"""
makes statistics on the objects
:param sorteddata:
:return int with non-dnssec objects
"""
dnssecenabled = 0
for object in sorteddata:
if len(object["dnssec"]) > 0:
dnssecenabled += 1
print("DNSSEC stats done.")
return dnssecenabled
async def dns_query(domain: str, rtype: str = 'A', **kwargs) -> dns.rrset.RRset:
kwargs, res_cfg = dict(kwargs), {}
rs = Resolver(**res_cfg)
res: dns.resolver.Answer = await rs.resolve(domain, rdtype=rtype, **kwargs)
return res.rrset
async def dns_bulk(*queries: Tuple[str, str], **kwargs):
ret_ex = kwargs.pop('return_exceptions', True)
coros = [dns_query(dom, rt, **kwargs) for dom, rt in list(queries)]
return await asyncio.gather(*coros, return_exceptions=ret_ex)
async def dnstest(queries):
success = 0
res = await dns_bulk(*queries)
for i, a in enumerate(res):
if isinstance(a, Exception):
continue
else:
success += 1
return success
def dnstester(sorteddata):
"""
Takes the data and tests the DNS servers
"""
queries = []
working = 0
runs = 0
status = 0
total = len(sorteddata)
for object in sorteddata:
if runs == 1000:
runs = 0
working = working + asyncio.run(dnstest(queries))
print("status : " + str(status) + "/" + str(total))
queries = []
queries.append((object["domain"], "SOA"))
else:
runs += 1
queries.append((object["domain"], "SOA"))
status += 1
if status == len(sorteddata):
working = working + asyncio.run(dnstest(queries))
print("status : " + str(status) + "/" + str(total))
return working
def main():
"""
main function
:nothing:
"""
data = sorter(filemanager())
dnssec = statmaker(data)
operational = dnstester(data)
print("These are the test results :")
print("Total amount of objects : " + str(len(data)))
print("Total amount of working ones : " + str(operational))
print("Total with DNSSEC enabled : " + str(dnssec))
if __name__ == '__main__':
main()