-
Notifications
You must be signed in to change notification settings - Fork 20
/
captive_portal.py
169 lines (141 loc) · 5.56 KB
/
captive_portal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import gc
import network
import ubinascii as binascii
import uselect as select
import utime as time
from captive_dns import DNSServer
from captive_http import HTTPServer
from credentials import Creds
class CaptivePortal:
AP_IP = "192.168.4.1"
AP_OFF_DELAY = const(10 * 1000)
MAX_CONN_ATTEMPTS = 10
def __init__(self, essid=None):
self.local_ip = self.AP_IP
self.sta_if = network.WLAN(network.STA_IF)
self.ap_if = network.WLAN(network.AP_IF)
if essid is None:
essid = b"ESP8266-%s" % binascii.hexlify(self.ap_if.config("mac")[-3:])
self.essid = essid
self.creds = Creds()
self.dns_server = None
self.http_server = None
self.poller = select.poll()
self.conn_time_start = None
def start_access_point(self):
# sometimes need to turn off AP before it will come up properly
self.ap_if.active(False)
while not self.ap_if.active():
print("Waiting for access point to turn on")
self.ap_if.active(True)
time.sleep(1)
# IP address, netmask, gateway, DNS
self.ap_if.ifconfig(
(self.local_ip, "255.255.255.0", self.local_ip, self.local_ip)
)
self.ap_if.config(essid=self.essid, authmode=network.AUTH_OPEN)
print("AP mode configured:", self.ap_if.ifconfig())
def connect_to_wifi(self):
print(
"Trying to connect to SSID '{:s}' with password {:s}".format(
self.creds.ssid, self.creds.password
)
)
# initiate the connection
self.sta_if.active(True)
self.sta_if.connect(self.creds.ssid, self.creds.password)
attempts = 1
while attempts <= self.MAX_CONN_ATTEMPTS:
if not self.sta_if.isconnected():
print("Connection attempt {:d}/{:d} ...".format(attempts, self.MAX_CONN_ATTEMPTS))
time.sleep(2)
attempts += 1
else:
print("Connected to {:s}".format(self.creds.ssid))
self.local_ip = self.sta_if.ifconfig()[0]
return True
print(
"Failed to connect to {:s} with {:s}. WLAN status={:d}".format(
self.creds.ssid, self.creds.password, self.sta_if.status()
)
)
# forget the credentials since they didn't work, and turn off station mode
self.creds.remove()
self.sta_if.active(False)
return False
def check_valid_wifi(self):
if not self.sta_if.isconnected():
if self.creds.load().is_valid():
# have credentials to connect, but not yet connected
# return value based on whether the connection was successful
return self.connect_to_wifi()
# not connected, and no credentials to connect yet
return False
if not self.ap_if.active():
# access point is already off; do nothing
return False
# already connected to WiFi, so turn off Access Point after a delay
if self.conn_time_start is None:
self.conn_time_start = time.ticks_ms()
remaining = self.AP_OFF_DELAY
else:
remaining = self.AP_OFF_DELAY - time.ticks_diff(
time.ticks_ms(), self.conn_time_start
)
if remaining <= 0:
self.ap_if.active(False)
print("Turned off access point")
return False
def captive_portal(self):
print("Starting captive portal")
self.start_access_point()
if self.http_server is None:
self.http_server = HTTPServer(self.poller, self.local_ip)
print("Configured HTTP server")
if self.dns_server is None:
self.dns_server = DNSServer(self.poller, self.local_ip)
print("Configured DNS server")
try:
while True:
gc.collect()
# check for socket events and handle them
for response in self.poller.ipoll(1000):
sock, event, *others = response
is_handled = self.handle_dns(sock, event, others)
if not is_handled:
self.handle_http(sock, event, others)
if self.check_valid_wifi():
print("Connected to WiFi!")
self.http_server.set_ip(self.local_ip, self.creds.ssid)
self.dns_server.stop(self.poller)
break
except KeyboardInterrupt:
print("Captive portal stopped")
self.cleanup()
def handle_dns(self, sock, event, others):
if sock is self.dns_server.sock:
# ignore UDP socket hangups
if event == select.POLLHUP:
return True
self.dns_server.handle(sock, event, others)
return True
return False
def handle_http(self, sock, event, others):
self.http_server.handle(sock, event, others)
def cleanup(self):
print("Cleaning up")
if self.dns_server:
self.dns_server.stop(self.poller)
gc.collect()
def try_connect_from_file(self):
if self.creds.load().is_valid():
if self.connect_to_wifi():
return True
# WiFi Connection failed - remove credentials from disk
self.creds.remove()
return False
def start(self):
# turn off station interface to force a reconnect
self.sta_if.active(False)
if not self.try_connect_from_file():
self.captive_portal()