-
Notifications
You must be signed in to change notification settings - Fork 4
/
nix_common_api.py
176 lines (134 loc) · 5.3 KB
/
nix_common_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""
Author: Armon Dadgar
Start Date: April 16th, 2009
Description:
Houses code which is common between the Linux, Darwin, and FreeBSD API's to avoid redundancy.
"""
import ctypes # Allows us to make C calls
import ctypes.util # Helps to find the C library
# Import for Popen
import portable_popen
# Seattlelib text-processing library (not a Python stdlib):
import textops
# Get the standard library
# AR: work around find_library deficiencies on Android
try:
import android
libc = ctypes.CDLL("/system/lib/libc.so")
except ImportError:
libc = ctypes.CDLL(ctypes.util.find_library("c"))
# Functions
_strerror = libc.strerror
_strerror.restype = ctypes.c_char_p
# This functions helps to conveniently retrieve the errno
# of the last call. This is a bit tedious to do, since
# Python doesn't understand that this is a globally defined int
def get_ctypes_errno():
errno_pointer = ctypes.cast(libc.errno, ctypes.POINTER(ctypes.c_int))
err_val = errno_pointer.contents
return err_val.value
# Returns the string version of the errno
def get_ctypes_error_str():
errornum = get_ctypes_errno()
return _strerror(errornum)
def exists_outgoing_network_socket(localip, localport, remoteip, remoteport):
"""
<Purpose>
Determines if there exists a network socket with the specified unique tuple.
Assumes TCP.
<Arguments>
localip: The IP address of the local socket
localport: The port of the local socket
remoteip: The IP of the remote host
remoteport: The port of the remote host
<Returns>
A Tuple, indicating the existence and state of the socket. E.g. (Exists (True/False), State (String or None))
"""
# This only works if all are not of the None type
if not (localip and localport and remoteip and remoteport):
return (False, None)
# Grab netstat output.
netstat_process = portable_popen.Popen(["netstat", "-an"])
netstat_stdout, _ = netstat_process.communicate()
netstat_lines = textops.textops_rawtexttolines(netstat_stdout)
# Search for things matching the local and remote ip+port we are trying to get
# information about.
target_lines = textops.textops_grep(localip + ':' + str(localport), netstat_lines) + \
textops.textops_grep(localip + '.' + str(localport), netstat_lines)
target_lines = textops.textops_grep(remoteip + ':' + str(remoteport), target_lines) + \
textops.textops_grep(remoteip + '.' + str(remoteport), target_lines)
# Only tcp connections.
target_lines = textops.textops_grep('tcp', target_lines)
# Check if there is any entries
if len(target_lines) > 0:
line = target_lines[0]
# Replace tabs with spaces, explode on spaces
parts = line.replace("\t","").strip("\n").split()
# Get the state
socket_state = parts[-1]
return (True, socket_state)
else:
return (False, None)
def exists_listening_network_socket(ip, port, tcp):
"""
<Purpose>
Determines if there exists a network socket with the specified ip and port which is the LISTEN state.
<Arguments>
ip: The IP address of the listening socket
port: The port of the listening socket
tcp: Is the socket of TCP type, else UDP
<Returns>
True or False.
"""
# This only works if both are not of the None type
if not (ip and port):
return False
# UDP connections are stateless, so for TCP check for the LISTEN state
# and for UDP, just check that there exists a UDP port
if tcp:
grep_terms = ["tcp", "LISTEN"]
else:
grep_terms = ["udp"]
# Launch up a shell, get the feedback
netstat_process = portable_popen.Popen(["netstat", "-an"])
netstat_stdout, _ = netstat_process.communicate()
netstat_lines = textops.textops_rawtexttolines(netstat_stdout)
# Search for things matching the ip+port we are trying to get
# information about.
target_lines = textops.textops_grep(ip + ':' + str(port), netstat_lines) + \
textops.textops_grep(ip + '.' + str(port), netstat_lines)
for term in grep_terms:
target_lines = textops.textops_grep(term, target_lines)
number_of_sockets = len(target_lines)
return (number_of_sockets > 0)
def get_available_interfaces():
"""
<Purpose>
Returns a list of available network interfaces.
<Returns>
An array of string interfaces
"""
# Common headers
# This list contains common header elements so that they can be stripped
common_headers_list = ["Name", "Kernel", "Iface"]
# Netstat will return all interfaces, but also has some duplication.
# Cut will get the first field from each line, which is the interface name.
# Sort prepares the input for uniq, which only works on sorted lists.
# Uniq, is somewhat obvious, it will only return the unique interfaces to remove duplicates.
# Launch up a shell, get the feedback
netstat_process = portable_popen.Popen(["netstat", "-i"])
netstat_stdout, _ = netstat_process.communicate()
netstat_lines = textops.textops_rawtexttolines(netstat_stdout)
target_lines = textops.textops_cut(netstat_lines, delimiter=" ", fields=[0])
unique_lines = set(target_lines)
# Create an array for the interfaces
interfaces_list = []
for line in unique_lines:
# Strip the newline
line = line.strip("\n")
# Check if this is a header
if line in common_headers_list:
continue
interfaces_list.append(line)
# Done, return the interfaces
return interfaces_list