-
Notifications
You must be signed in to change notification settings - Fork 1
/
pcap_helper.py
64 lines (45 loc) · 1.64 KB
/
pcap_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import scapy
import scapy.layers.l2
import scapy.layers.inet
from scapy.utils import PcapReader
from scapy.all import *
def print_psuedo_header(p):
if UDP in p:
print ''.join(["UDP: (SRC:", p[IP].src,':', str(p[UDP].sport),
' DST:', p[IP].dst ,':', str(p[UDP].dport),')',
' LEN: ', str(p[UDP].len)])
elif TCP in p:
print ''.join(["TCP: (", p[IP].src,':', str(p[TCP].sport),
' DST:', p[IP].dst, ':', str(p[TCP].dport),')',
' LEN: ', str(p[TCP].len)])
else:
print ''.join(["Proto: (", p.src, ', ', p.dst, ')'])
def psuedo_header(p):
if any((UDP, TCP)) in p:
return (p[IP].src, p[IP].dst, p[IP].sport, p[IP].dport)
def parse_tcp_flows(packets):
flows = {}
for p in packets:
if psuedo_header(p) not in flows:
flows[psuedo_header(p)] = []
flows[psuedo_header(p)].append(p)
return flows
def load_udp_packets(location):
return load_packets(location, filter=lambda x: True if UDP in x else False)
def load_tcp_packets(location):
return load_packets(location, filter=lambda x: True if TCP in x else False)
def get_time_values(pkts):
return [x.time for x in pkts]
def load_packets(location, filter=lambda x: x is not None):
pkts = []
packets = rdpcap(location)
for p in packets:
if filter(p):
pkts.append(p)
return pkts
def get_duration(packets):
start = min(packets, key=attrgetter('time'))
end = max(packets, key=attrgetter('time'))
return end
def get_total_size(packets, proto='tcp'):
return sum(p[IP].len for p in packets)