-
Notifications
You must be signed in to change notification settings - Fork 39
/
ntds_parser.py
87 lines (64 loc) · 3.39 KB
/
ntds_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import sys, re, itertools, time
from binascii import hexlify
from threading import Thread, Event
from impacket.examples.secretsdump import LocalOperations, RemoteOperations, NTDSHashes
from impacket.smbconnection import SMBConnection, SessionError
from socket import error as socket_error
def process_remote(username, password, target, historic):
hashes = list()
print("Attempting to connect to {}...".format(target))
try:
connection = SMBConnection(target, target)
connection.login(username, password, "", "", "")
ops = RemoteOperations(connection, False, None)
ops.setExecMethod("smbexec")
stopper = Event()
spinner = Thread(target=__update, args=(stopper, hashes))
spinner.start()
NTDSHashes(None, None, isRemote=True, remoteOps=ops, noLMHash=True, useVSSMethod=False,
justNTLM=True, printUserStatus=True, history=historic, lastLogon=True, pwdLastSet=True,
perSecretCallback=lambda type, secret: hashes.append(__process_hash(secret))).dump()
stopper.set()
spinner.join()
if len(hashes) == 0:
raise Exception("Extraction seemingly finished successfully but I didn't find any hashes...")
return __get_domain(hashes), hashes
except socket_error:
raise Exception("Failed to connect to {}".format(target))
except SessionError as e:
if e.error == 3221225581:
raise Exception("Username or password incorrect - please try again.")
def process_local(system, ntds, historic):
hashes = list()
print("Attempting to grab decryption key...")
ops = LocalOperations(system)
try:
bootKey = ops.getBootKey()
except:
raise Exception("Failed to retrieve decryption key. Ensure your SYSTEM hive is correct.")
print("Found key: 0x{0}.".format(hexlify(bootKey)))
stopper = Event()
spinner = Thread(target=__update, args=(stopper, hashes))
spinner.start()
NTDSHashes(ntds, bootKey, noLMHash=ops.checkNoLMHashPolicy(), useVSSMethod=True, justNTLM=True,
printUserStatus=True, history=historic, lastLogon=True, pwdLastSet=True,
perSecretCallback=lambda type, secret: hashes.append(__process_hash(secret))).dump()
stopper.set()
spinner.join()
return __get_domain(hashes), hashes
def __process_hash(hash):
user, rid, lmhash, nthash, pwdLastSet, enabled, lastLogon = re.findall("(?P<user>.*):(?P<rid>.*):(?P<lmhash>.*):(?P<ntlmhash>.*):::(?:(?: \(pwdLastSet=(?P<pwdLastSet>.*)\))(?: \(status=(?P<enabled>.*)\))(?: \(lastLogon=(?P<lastLogon>.*)\)))?", hash)[0]
history_match = re.match("(?P<user>.*)(_history\d+$)", user)
if history_match:
user = history_match.group(1)
return {"username": user.strip(), "ntlmhash": nthash, "historic": True}
else:
return {"username": user.strip(), "ntlmhash": nthash, "enabled": True if enabled == "Enabled" else False, "passwordLastSet": pwdLastSet, "lastLogon": lastLogon}
def __get_domain(hashes):
return [hash["username"].split("\\")[0] for hash in hashes if "\\" in hash["username"]][0]
def __update(stopper, hashes):
spinner = itertools.cycle(['-', '/', '|', '\\'])
while not stopper.is_set():
sys.stdout.write("[" + spinner.next() + "] (" + str(len(hashes)) + ") Finding and extracting hashes - this might take a few minutes... \r")
sys.stdout.flush()
time.sleep(0.2)