-
Notifications
You must be signed in to change notification settings - Fork 1
/
service
executable file
·101 lines (88 loc) · 2.96 KB
/
service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/bin/python
import os, sys, fcntl, socket, struct, requests, time, traceback
from functools import partial
from heapq import heappush, heapreplace
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
def send(key, val):
sys.stderr.write("sending %s->%s\n" % (key, val))
sock.sendto('install/update/%s:%s' % (key, val), ('127.0.0.1', 4444))
def readfile(filename):
with open(filename, "rb") as f:
return f.read()
def get_default_gateway():
with open("/proc/net/route") as fh:
for line in fh:
fields = line.strip().split()
if fields[1] != '00000000' or not int(fields[3], 16) & 2:
continue
return socket.inet_ntoa(struct.pack("<L", int(fields[2], 16)))
def get_mac(ifname):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8927, struct.pack('256s', ifname[:15]))
return ':'.join(['%02x' % ord(char) for char in info[18:24]])
except IOError:
return "<no mac>"
except:
return None
def get_ipv4(ifname):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
info = fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', ifname[:15]))
ip = socket.inet_ntoa(info[20:24])
mask = struct.unpack('>I', fcntl.ioctl(s.fileno(), 0x891b, struct.pack('256s',ifname))[20:24])[0]
mask = bin(mask)[2:].count('1') # Remember: not everything has to be performance optimal :-)
return "%s/%d" % (ip, mask)
except IOError:
return "<no ipv4>"
except:
return None
def check_network():
return os.path.exists('/sd/config/network') and "static" or "dhcp"
def check_internet():
try:
r = requests.get("http://ping.infobeamer.com/ping", timeout=10, headers={
'User-Agent': 'info-beamer syncer/network-check',
})
r.raise_for_status()
if r.content == "pong":
return "online"
else:
return "filtered"
except:
traceback.print_exc()
return "offline"
tests = [
(1, "ethmac", partial(get_mac, "eth0")),
(1, "wlanmac", partial(get_mac, "wlan0")),
(1, "ethip", partial(get_ipv4, "eth0")),
(1, "wlanip", partial(get_ipv4, "wlan0")),
(1, "gw", get_default_gateway),
(10, "online", check_internet),
(5, "network", check_network),
]
q = []
now = time.time()
for interval, key, val_fn in tests:
heappush(q, (now, (interval, key, val_fn)))
def run_next_test():
now = time.time()
t, test = q[0]
if now < t:
return False
interval, key, val_fn = test
heapreplace(q, (now + interval, test))
val = val_fn()
if val is not None:
send(key, val)
return True
if __name__ == "__main__":
while 1:
try:
if not run_next_test():
time.sleep(0.2)
except KeyboardInterrupt:
raise
except:
traceback.print_exc()
time.sleep(1)