-
Notifications
You must be signed in to change notification settings - Fork 0
/
wanderer.py
331 lines (287 loc) · 10.8 KB
/
wanderer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
import argparse
import csv
import os
from dataclasses import dataclass
from struct import unpack
from time import sleep
import pendulum
import serial
from pendulum.datetime import DateTime
from pendulum.duration import Duration
DEBUG = os.getenv("DEBUG", False)
class ResponseError(Exception):
pass
def debug(*args, **kwargs):
if DEBUG:
print(*args, **kwargs)
@dataclass
class Sample:
time: DateTime
temp: float
vib: float
@dataclass
class Measurement:
resolution_temperature: int
resolution_vibration: int
period_sampling: Duration
period_storing: Duration
time_start: DateTime
time_current: DateTime
samples: list[Sample]
class Wanderer:
FMT_TIME = "YYMMDDHHmmss"
def __init__(self, port: str):
self.port = port
def _write(self, buf: bytes | str):
# Wanderer seems to be pretty picky when accepting writes. One
# character at a time seemed most robust based on my tests.
if isinstance(buf, str):
buf = buf.encode()
debug(f"write: [{len(buf)}] {buf!r}")
for i in range(len(buf)):
c = buf[i : i + 1]
self.s.write(c)
self.s.write(b"\r")
def _read(self, n: int) -> bytes:
r = self.s.read(n)
if len(r) != n:
raise ResponseError(f"wanted {n} bytes, got {len(r)}")
return r
def __enter__(self):
s = serial.Serial()
print(f"Connecting to port {self.port}...")
s.port = self.port
s.baudrate = 9600
s.bytesize = serial.EIGHTBITS
s.parity = serial.PARITY_NONE
s.stopbits = serial.STOPBITS_ONE
s.xonxoff = False
s.rtscts = False
s.dsrdtr = False
s.timeout = 2
s.rts = True # Seems to use this one for extra power
s.dtr = False # Not connected in original straight cable
s.open()
self.s = s
print("Reading NUL byte after init: ", self._read(1))
return self
def __exit__(self, type, value, traceback):
self.s.close()
@staticmethod
def time_format(dt) -> str:
return dt.format(Wanderer.FMT_TIME)
@staticmethod
def time_parse(raw: str | bytes):
if isinstance(raw, bytes):
raw = raw.decode("utf-8")
return pendulum.from_format(raw, Wanderer.FMT_TIME, tz="local")
def _expect(self, what: bytes | str, extra: int = 0) -> bytes:
if isinstance(what, str):
what = what.encode("utf-8")
debug(f"expect: {what!r}+{extra}")
r = self._read(len(what) + extra)
if r[: len(what)] != what:
raise ResponseError(f"expecting {what!r}, got {r[:len(what)]!r}")
return r
@staticmethod
def transform_raw_temp(raw: int) -> float:
return raw / 2.0 - 30
@staticmethod
def transform_raw_vib(raw: int) -> float:
# XXX This is a guess based on a few example around 1-3 G.
return raw / 14.5
def battery(self) -> int:
self._write("BA")
r = self._expect("BA", extra=4)
val = int(r[3:5], base=16) - 100
if val < 0:
raise ResponseError(f"battery level less than zero: {val}")
return val
def measure(
self,
start,
measure: Duration,
period_sample: int,
period_store: int,
res_temp: int,
res_vib: int,
):
debug(
f"measure: start={start}, measure={measure}\n"
+ f" period_sample={period_sample}, period_store={period_store}\n"
+ f" res_temp={res_temp}, res_vib={res_vib}"
)
# ??
self._write("LN")
self._expect("LN", extra=1)
# ??
self._write("EQ")
self._expect("EQ")
# Time Current
self._write("TC " + Wanderer.time_format(pendulum.now()))
self._expect("TC")
# Time Start
self._write("TS " + Wanderer.time_format(start))
self._expect("TS")
# Record Length
hours = measure.hours if measure.hours > 0 else 1
self._write(f"TL {hours:04}")
self._expect("TL")
# Sampling Period
#
# Manual says this specifies "how often sensors are read",
# and can be between 1..10 sec.
#
self._write(f"PS {period_sample:04}")
self._expect("PS")
# Store Period
#
# Manual says this specifies "how often Wanderer unit stores the sensor
# readings, or samples, to memory"
#
# It also says that the Wanderer has memory for 6540 samples.
#
self._write(f"PM {period_store:04}")
self._expect("PM")
# Vibration/Temperature Resolution
#
# Manual says "Resolution" means the minimum relative deviation from
# previous sample that we record a new value. Resolution of 1 seems to
# have a special meaning of "no change is too small", but 2 means a
# minimum deviation of 2 %, 3 means 3 % etc. Temperature and vibration
# resolutions have identical logic.
#
# In practice this means that larger values for resolution means we
# accept more variance in values before recording a new entry.
#
self._write(f"RE {res_vib:02X}{res_temp:02X}")
self._expect("RE")
def read(self) -> Measurement:
# If there's an ongoing measurement when we do a read, it will be
# stopped. Wanderer will maintain the measurement values until a new
# one is programmed or it loses power.
debug("read")
# ??
self._write("EQ")
self._expect("EQ")
# ??
self._write("AP")
# ??
self._expect("SW ", extra=4 + 1)
tc = Wanderer.time_parse(self._expect("TC ", extra=12 + 1)[3:-1])
ts = Wanderer.time_parse(self._expect("TS ", extra=12 + 1)[3:-1])
tl = pendulum.duration(hours=int(self._expect("TL ", extra=4 + 1)[3:-1]))
ps = pendulum.duration(seconds=int(self._expect("PS ", extra=4 + 1)[3:-1]))
pm = pendulum.duration(seconds=int(self._expect("PM ", extra=4 + 1)[3:-1]))
re = self._expect("RE ", extra=4 + 1)[3:-1]
re_vib = int(re[0:2], base=16)
re_temp = int(re[2:4], base=16)
# Maybe "How many values in each sample?"
vs = self._expect("VS ", extra=2 + 1)
# Sample Number, that is, amount of samples
sn = self._expect("SN ", extra=4 + 1)
debug(f"read: Time Current: {tc}")
debug(f"read: Time Start: {ts}")
debug(f"read: Time Length: {tl}")
debug(f"read: Sampling Period: {ps}")
debug(f"read: Store Period: {pm}")
debug(f"read: Vibration resolution: {re_vib}")
debug(f"read: Temperature resolution: {re_temp}")
s = int(vs[3 : 3 + 2])
n = int(sn[3 : 3 + 4])
debug(f"read: vs={s} ({vs!r})")
debug(f"read: sn={n} ({sn!r})")
samples = []
temps = []
vibs = []
if n >= 1:
raw_samples = self._read(5 * n + 1)[:-1]
for i in range(0, len(raw_samples), 5):
sample_slice = raw_samples[i : i + 5]
# XXX Time shift may also include more bytes.
one, unitless_time, raw_temp, raw_vib = unpack(">BHBB", sample_slice)
temp = Wanderer.transform_raw_temp(raw_temp)
vib = Wanderer.transform_raw_vib(raw_vib)
temps.append(temp)
vibs.append(vib)
print(f"{one:04} {unitless_time:08} {temp:04} {vib:04}")
samples.append(
Sample(
time=ts + pm * unitless_time,
temp=temp,
vib=vib,
)
)
print(f"vib_max={max(vibs)}, temp_min={min(temps)}, temp_max={max(temps)}")
return Measurement(
resolution_temperature=re_temp,
resolution_vibration=re_vib,
period_sampling=ps,
period_storing=pm,
time_start=ts,
time_current=tc,
samples=samples,
)
if __name__ == "__main__":
p = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
p.add_argument("--read", action="store_true", help="Read current measurement")
p.add_argument("--measure", action="store_true", help="Program a new measurment")
p.add_argument(
"--res-temp",
type=int,
default=1,
help="Temperature resolution [deg]",
)
p.add_argument("--res-vib", type=int, default=1, help="Vibration resolution [G]")
p.add_argument("--period-sample", type=int, default=1, help="Sampling period [s]")
p.add_argument(
"--period-store", type=int, default=1, help="Memory store period [s]"
)
p.add_argument("--measure-hours", type=int, default=1, help="Time to measure [h]")
p.add_argument(
"--output-csv",
type=str,
help="When reading, store measurement data as CSV to this filepath",
)
p.add_argument(
"--port", type=str, default="com1", help="Serial port to use with Wanderer"
)
args = p.parse_args()
mt = pendulum.duration(hours=args.measure_hours)
with Wanderer(args.port) as k:
sleep(0.5)
print(f"Battery level at start: {k.battery()} %")
if args.measure:
print("Programming a new measurement...")
k.measure(
pendulum.now(),
mt,
args.period_sample,
args.period_store,
args.res_temp,
args.res_vib,
)
if args.read:
print("Reading measurement...")
m = k.read()
print(f"Got {len(m.samples)} samples starting from {m.time_start}.")
if len(m.samples) > 0 and args.output_csv:
print(f"Writing samples to CSV file: {args.output_csv}")
with open(args.output_csv, "w", newline="") as f:
fieldnames = ["timestamp", "temperature", "vibration"]
sw = csv.DictWriter(
f, quoting=csv.QUOTE_MINIMAL, fieldnames=fieldnames
)
sw.writeheader()
for sample in m.samples:
sw.writerow(
{
"timestamp": sample.time.to_iso8601_string(),
"temperature": sample.temp,
"vibration": sample.vib,
}
)
# It's beneficial to try reading battery level even if the information
# isn't interesting because it tells us that we're correctly parsing
# the serial stream from our Wanderer.
print(f"Battery level at end: {k.battery()} %")