-
Notifications
You must be signed in to change notification settings - Fork 2
/
bleradio.py
283 lines (215 loc) · 7.96 KB
/
bleradio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
from micropython import const
from time import ticks_ms
import bluetooth
from struct import pack_into, unpack
_IRQ_SCAN_RESULT = const(5)
_LEGO_ID_MSB = const(0x03)
_LEGO_ID_LSB = const(0x97)
_MANUFACTURER_DATA = const(0xFF)
_DURATION = const(0)
_INTERVAL_US = const(30000)
_WINDOW_US = const(30000)
_RSSI_FILTER_WINDOW_MS = const(512)
_OBSERVED_DATA_TIMEOUT_MS = const(1000)
_RSSI_MIN = const(-128)
_ADVERTISING_OBJECT_SINGLE = const(0x00)
_ADVERTISING_OBJECT_TRUE = const(0x01)
_ADVERTISING_OBJECT_FALSE = const(0x02)
_ADVERTISING_OBJECT_INT = const(0x03)
_ADVERTISING_OBJECT_FLOAT = const(0x04)
_ADVERTISING_OBJECT_STRING = const(0x05)
_ADVERTISING_OBJECT_BYTES = const(0x06)
_ADV_MAX_SIZE = const(31)
_ADV_HEADER_SIZE = const(5)
_ADV_COPY_FMT = const("31s")
_LEN = const(0)
_DATA = const(1)
_TIME = const(2)
_RSSI = const(3)
INT_FORMATS = {
1: "b",
2: "h",
4: "i",
}
observed_data = {}
def observe_irq(event, data):
if event != _IRQ_SCAN_RESULT:
return
addr_type, addr, adv_type, rssi, adv_data = data
# Analyze only advertisements matching Pybricks scheme.
if (
len(adv_data) <= _ADV_HEADER_SIZE
or adv_data[1] != _MANUFACTURER_DATA
or adv_data[2] != _LEGO_ID_LSB
or adv_data[3] != _LEGO_ID_MSB
):
return
if len(adv_data) - 1 != adv_data[0]:
return
# Get channel buffer, if allocated.
channel = adv_data[4]
if channel not in observed_data:
return
info = observed_data[channel]
# Update time interval.
diff = ticks_ms() - info[_TIME]
info[_TIME] += diff
if diff > _RSSI_FILTER_WINDOW_MS:
diff = _RSSI_FILTER_WINDOW_MS
# Approximate a slow moving average to make RSSI more stable.
info[_RSSI] = (
info[_RSSI] * (_RSSI_FILTER_WINDOW_MS - diff) + rssi * diff
) // _RSSI_FILTER_WINDOW_MS
# Copy advertising data without allocation.
info[_LEN] = len(adv_data) - _ADV_HEADER_SIZE
pack_into(_ADV_COPY_FMT, info[_DATA], 0, adv_data)
# Allow handler to run other callback code on successfully
# receiving a broadcasted message.
return channel
def get_data_info(info_byte: int):
data_type = info_byte >> 5
data_length = info_byte & 0x1F
return data_type, data_length
def unpack_one(data_type: int, data: memoryview):
if data_type == _ADVERTISING_OBJECT_TRUE:
return True
elif data_type == _ADVERTISING_OBJECT_FALSE:
return False
elif data_type == _ADVERTISING_OBJECT_SINGLE:
return None
# Remaining types require data.
if len(data) == 0:
return None
elif data_type == _ADVERTISING_OBJECT_INT and len(data) in INT_FORMATS:
return unpack(INT_FORMATS[len(data)], data)[0]
elif data_type == _ADVERTISING_OBJECT_FLOAT:
return unpack("f", data)[0]
elif data_type == _ADVERTISING_OBJECT_STRING:
return bytes(data).decode("utf-8")
elif data_type == _ADVERTISING_OBJECT_BYTES:
return data
else:
return None
def decode(data: memoryview):
first_type, _ = get_data_info(data[0])
# Case of one value instead of tuple.
if first_type == _ADVERTISING_OBJECT_SINGLE:
# Only proceed if this has some data.
if len(data) < 2:
return None
value_type, value_length = get_data_info(data[1])
return unpack_one(value_type, data[2 : 2 + value_length])
# Unpack iteratively.
unpacked = []
index = 0
while index < len(data):
data_type, data_length = get_data_info(data[index])
# Check if there is enough data left.
if index + 1 + data_length > len(data):
break
# Unpack the value.
data_value = data[index + 1 : index + 1 + data_length]
unpacked.append(unpack_one(data_type, data_value))
index += 1 + data_length
return unpacked
def smallest_format(n):
if -(1 << 7) <= n < (1 << 7):
return "b", 1
elif -(1 << 15) <= n < (1 << 15):
return "h", 2
else:
return "i", 4
def get_data_info(info_byte: int):
data_type = info_byte >> 5
data_length = info_byte & 0x1F
return data_type, data_length
def encode_one_object(obj, buffer, offset):
if isinstance(obj, bool):
buffer[offset] = (
_ADVERTISING_OBJECT_TRUE if obj else _ADVERTISING_OBJECT_FALSE
) << 5
return 1
if isinstance(obj, int):
format, size = smallest_format(obj)
buffer[offset] = (_ADVERTISING_OBJECT_INT << 5) + size
pack_into(format, buffer, offset + 1, obj)
return 1 + size
if isinstance(obj, float):
buffer[offset] = (_ADVERTISING_OBJECT_FLOAT << 5) + 4
pack_into("f", buffer, offset + 1, obj)
return 1 + 4
if isinstance(obj, (bytes, bytearray, str)):
if isinstance(obj, str):
buffer[offset] = _ADVERTISING_OBJECT_STRING << 5
data = obj.encode("utf-8")
else:
buffer[offset] = _ADVERTISING_OBJECT_BYTES << 5
data = obj
buffer[offset] += len(data)
pack_into(str(len(data)) + "s", buffer, offset + 1, data)
return 1 + len(data)
raise ValueError("Data type not supported")
class BLERadio:
def __init__(self, broadcast_channel: int = None, observe_channels=[], ble=None):
for channel in observe_channels:
if not isinstance(channel, int) or 0 < channel > 255:
raise ValueError(
"Observe channel must be list of integers from 0 to 255."
)
if broadcast_channel is not None and (
not isinstance(broadcast_channel, int) or 0 < broadcast_channel > 255
):
raise ValueError("Broadcast channel must be None or integer from 0 to 255.")
global observed_data
observed_data = {
ch: [0, bytearray(_ADV_MAX_SIZE), 0, _RSSI_MIN] for ch in observe_channels
}
self.broadcast_channel = broadcast_channel
self.send_buffer = memoryview(bytearray(_ADV_MAX_SIZE))
if ble is None:
# BLE not given, so initialize our own instance.
self.ble = bluetooth.BLE()
self.ble.active(True)
self.ble.irq(observe_irq)
self.ble.gap_scan(_DURATION, _INTERVAL_US, _WINDOW_US)
else:
# Use externally provided BLE, configured and
# controlled by user.
self.ble = ble
def observe(self, channel: int):
if channel not in observed_data:
raise ValueError("Channel not allocated.")
info = observed_data[channel]
if ticks_ms() - info[_TIME] > _OBSERVED_DATA_TIMEOUT_MS:
info[_RSSI] = _RSSI_MIN
if info[_RSSI] == _RSSI_MIN:
return None
data = memoryview(info[_DATA])
return decode(data[_ADV_HEADER_SIZE : info[_LEN] + _ADV_HEADER_SIZE])
def signal_strength(self, channel: int):
if channel not in observed_data:
raise ValueError("Channel not allocated.")
info = observed_data[channel]
if ticks_ms() - info[_TIME] > _OBSERVED_DATA_TIMEOUT_MS:
info[_RSSI] = _RSSI_MIN
return info[_RSSI]
def broadcast(self, data):
if self.broadcast_channel is None:
raise RuntimeError("Broadcast channel not configured.")
if data is None:
self.ble.gap_advertise(None)
return
send_buffer = self.send_buffer
size = _ADV_HEADER_SIZE
if isinstance(data, (int, float, bool, str, bytes, bytearray)):
send_buffer[_ADV_HEADER_SIZE] = _ADVERTISING_OBJECT_SINGLE
size += 1 + encode_one_object(data, send_buffer, _ADV_HEADER_SIZE + 1)
else:
for value in data:
size += encode_one_object(value, send_buffer, size)
send_buffer[0] = size - 1
send_buffer[1] = _MANUFACTURER_DATA
send_buffer[2] = _LEGO_ID_LSB
send_buffer[3] = _LEGO_ID_MSB
send_buffer[4] = self.broadcast_channel
self.ble.gap_advertise(40000, send_buffer[0:size])