-
Notifications
You must be signed in to change notification settings - Fork 0
/
sip.py
70 lines (53 loc) · 2.15 KB
/
sip.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import os
import random
import argparse
from twisted.python import log
from twisted.internet import reactor, protocol, task
from twisted.protocols.sip import Base, Request, Response
script_dir = os.path.dirname(os.path.abspath(__file__))
class SimpleSIPProtocol(Base):
def handle_request(self, request, addr):
logs = {}
dst_host, dst_port = self.get_destination_info()
for key, value in request.headers.items():
if value is not None:
logs[key.upper()] = ", ".join(value)
log.msg(logs)
self.simulateResponse(request, addr)
def simulateResponse(self, request, addr):
delay = random.uniform(0.1, 1.0)
task.deferLater(reactor, delay, self.sendFakeResponse, request, addr)
def sendFakeResponse(self, request, addr):
if request.method == b"INVITE":
code = 100
elif request.method == b"REGISTER":
code = 200
else:
code = 404
response = Response(code)
response.addHeader(b"Server", b"FRITZ!OS 1.0")
self.transport.write(response.toString(), addr)
def get_destination_info(self):
destination = self.transport.getHost()
return destination.host, destination.port
class SimpleSIPFactory(protocol.Factory):
protocol = SimpleSIPProtocol
def main():
parser = argparse.ArgumentParser(description="Run a simple SIP honeypot server.")
parser.add_argument(
"--host", type=str, default="0.0.0.0", help="Host to bind the SIP server to."
)
parser.add_argument(
"--port", type=int, default=5060, help="Port to bind the SIP server to."
)
args = parser.parse_args()
LOG_FILE_PATH = os.path.join(script_dir, "sip_honeypot.log")
print(f"SIP HONEYPOT ACTIVE ON HOST: {args.host}, PORT: {args.port}")
print(f"ALL SIP requests will be logged in: {LOG_FILE_PATH}")
log_observer = log.FileLogObserver(open(LOG_FILE_PATH, "a"))
log.startLoggingWithObserver(log_observer.emit, setStdout=False)
sip_protocol = SimpleSIPProtocol()
reactor.listenUDP(args.port, sip_protocol, interface=args.host)
reactor.run()
if __name__ == "__main__":
main()