-
Notifications
You must be signed in to change notification settings - Fork 0
/
analyze.py
141 lines (118 loc) · 4.43 KB
/
analyze.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# import features
# import joblib
# import collections
# import sys
# # Load the pre-trained machine learning model
# model = joblib.load('stk_model.pkl')
# def preprocess(packet_data):
# # Ensure all features needed for prediction are present
# processed_features = [
# packet_data['packet_size'],
# packet_data['src_packet_freq'],
# packet_data['dst_packet_freq'],
# packet_data['traffic_volume']
# ]
# print(f"Preprocessed features: {processed_features}") # Debugging statement
# return processed_features
# def make_prediction(packets):
# predictions = []
# packet_freqs = collections.defaultdict(int)
# total_traffic = 0
# for packet in packets:
# try:
# packet_size = len(packet)
# src_ip = packet.get('IP', {}).get('src', '')
# dst_ip = packet.get('IP', {}).get('dst', '')
# packet_freqs[src_ip] += 1
# packet_freqs[dst_ip] += 1
# total_traffic += packet_size
# packet_data = {
# "packet_size": packet_size,
# "src_packet_freq": packet_freqs[src_ip],
# "dst_packet_freq": packet_freqs[dst_ip],
# "traffic_volume": total_traffic
# }
# features_for_model = preprocess(packet_data)
# prediction = model.predict([features_for_model])
# predictions.append(prediction[0])
# print(f"Prediction for IP {src_ip} to {dst_ip}: {prediction[0]}") # Debugging statement
# except Exception as e:
# print(f"Error processing packet: {e}")
# return predictions
# def main(pcap_file):
# print("Analyzing traffic from:", pcap_file)
# try:
# packets = features.load_packets(pcap_file)
# print("Starting traffic analysis...")
# predictions = make_prediction(packets)
# print("Predictions:", predictions)
# print("Traffic analysis complete")
# except Exception as e:
# print(f"An error occurred during analysis: {e}")
# if __name__ == "__main__":
# if len(sys.argv) < 2:
# print("Usage: python analyze.py <pcap_file>")
# sys.exit(1)
# pcap_file = sys.argv[1]
# main(pcap_file)
import features
import joblib
import collections
import sys
import logging
from scapy.all import rdpcap, IP, TCP, UDP
# Setup basic configuration for logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Load the pre-trained machine learning model
model = joblib.load('stk_model.pkl')
def preprocess(packet_data):
# Ensure all features needed for prediction are present
processed_features = [
packet_data['packet_size'],
packet_data['src_packet_freq'],
packet_data['dst_packet_freq'],
packet_data['traffic_volume']
]
logging.debug(f"Preprocessed features: {processed_features}")
return processed_features
def make_prediction(packets):
predictions = []
packet_freqs = collections.defaultdict(int)
total_traffic = 0
for packet in packets:
if IP in packet:
ip_layer = packet[IP]
src_ip = ip_layer.src
dst_ip = ip_layer.dst
packet_size = len(packet)
packet_freqs[src_ip] += 1
packet_freqs[dst_ip] += 1
total_traffic += packet_size
packet_data = {
"packet_size": packet_size,
"src_packet_freq": packet_freqs[src_ip],
"dst_packet_freq": packet_freqs[dst_ip],
"traffic_volume": total_traffic
}
features_for_model = preprocess(packet_data)
prediction = model.predict([features_for_model])
predictions.append(prediction[0])
else:
logging.error("Packet does not contain an IP layer.")
return predictions
def main(pcap_file):
logging.info(f"Analyzing traffic from: {pcap_file}")
try:
packets = rdpcap(pcap_file)
logging.info("Starting traffic analysis...")
predictions = make_prediction(packets)
logging.info(f"Predictions: {predictions}")
logging.info("Traffic analysis complete")
except Exception as e:
logging.error(f"An error occurred during analysis: {e}")
if __name__ == "__main__":
if len(sys.argv) < 2:
logging.error("Usage: python analyze.py <pcap_file>")
sys.exit(1)
pcap_file = sys.argv[1]
main(pcap_file)