This repository has been archived by the owner on Jun 13, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathHuawei-TCX-Converter.py
1564 lines (1322 loc) · 78 KB
/
Huawei-TCX-Converter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Huawei-TCX-Converter.py
# Copyright (c) 2019 Ari Cooper-Davis / Christoph Vanthuyne - github.com/aricooperdavis/Huawei-TCX-Converter
import argparse
import collections
import csv
import datetime
import json
import logging
import math
import operator
import os
import re
import sys
import tarfile
import tempfile
# lib for time procedure
import time
import urllib.request as url_req
import xml.etree.cElementTree as xml_et
from datetime import datetime as dts
from datetime import timedelta as dts_delta
# External libraries that require installation
from typing import List, Optional
try:
import xmlschema # (only) needed to validate the generated TCX XML.
except:
print('Info - External library xmlschema could not be imported.\n' +
'It is required when using the --validate_xml argument.\n' +
'It can be installed using: pip install xmlschema')
# Global Constants
PROGRAM_NAME = 'Huawei-TCX-Converter'
PROGRAM_MAJOR_VERSION = '3'
PROGRAM_MINOR_VERSION = '0'
PROGRAM_MAJOR_BUILD = '1912'
PROGRAM_MINOR_BUILD = '1901'
PROGRAM_DAN67_BUILD = '20191019'
OUTPUT_DIR = './output'
GPS_TIMEOUT = dts_delta(seconds=10)
class HiActivity:
"""" This class represents all the data contained in a HiTrack file."""
TYPE_WALK = 'Walk'
TYPE_RUN = 'Run'
TYPE_CYCLE = 'Cycle'
TYPE_POOL_SWIM = 'Swim_Pool'
TYPE_OPEN_WATER_SWIM = 'Swim_Open_Water'
TYPE_UNKNOWN = '?'
_ACTIVITY_TYPE_LIST = (TYPE_WALK, TYPE_RUN, TYPE_CYCLE, TYPE_POOL_SWIM, TYPE_OPEN_WATER_SWIM)
def __init__(self, activity_id: str, activity_type: str = TYPE_UNKNOWN):
logging.debug('New HiTrack activity to process <%s>', activity_id)
self.activity_id = activity_id
if activity_type == self.TYPE_UNKNOWN:
self._activity_type = self.TYPE_UNKNOWN
else:
self.set_activity_type(activity_type) # validate and set activity type of the activity
# Will hold a set of parameters to auto-determine activity type
self.activity_params = {}
self.pool_length = -1
self.start = None
self.stop = None
self.distance = -1
# Create an empty segment and segment list
self._current_segment = None
self._segment_list: List = None
# Create an empty detail data dictionary. key = timestamp, value = dict{t, lat, lon, alt, hr)
self.data_dict = {}
# Private variable to temporarily hold the last parsed SWOLF data during parsing of swimming activities
self.last_swolf_data = None
# Data from JSON
self.JSON_timeOffset = 0
self.JSON_timeZone = 'Z'
self.JSON_swim_pool_length = -1
def get_activity_type(self) -> str:
if self._activity_type == self.TYPE_UNKNOWN:
# Perform activity type detection only once.
self._activity_type = self._detect_activity_type()
return self._activity_type
def set_activity_type(self, activity_type: str):
if activity_type in self._ACTIVITY_TYPE_LIST:
logging.info('Setting activity type of activity %s to %s', self.activity_id, activity_type)
self._activity_type = activity_type
else:
logging.error('Invalid activity type <%s>', activity_type)
raise Exception('Invalid activity type <%s>', activity_type)
def set_pool_length(self, pool_length: int):
logging.info('Setting pool length of activity %s to %d', self.activity_id, pool_length)
self.pool_length = pool_length
if not self.get_activity_type() == self.TYPE_POOL_SWIM:
logging.warning('Pool length for activity %s of type %s will not be used. It is not a pool swimming \
activity', self.activity_id, self._activity_type)
def _add_segment_start(self, segment_start: datetime):
if self._current_segment:
logging.error('Request to start segment at %s when there is already a current segment active',
segment_start)
return
logging.debug('Adding segment start at %s', segment_start)
# No current segment, create one
self._current_segment = {'start': segment_start, 'stop': None}
# Add it to the segment list (note: if no explicit stop record is found, the segment will exist and stay 'open')
if not self._segment_list:
self._segment_list = []
self._segment_list.append(self._current_segment)
if not self.start:
# Set activity start
self.start = segment_start
def _add_segment_stop(self, segment_stop: datetime, segment_distance: int = -1):
logging.debug('Adding segment stop at %s', segment_stop)
if not self._current_segment:
logging.error('Request to stop segment at %s when there is no current segment active', segment_stop)
return
# Set stop of current segment, add it to the segment list and clear the current segment
self._current_segment['stop'] = segment_stop
self._current_segment['duration'] = int((segment_stop - self._current_segment['start']).total_seconds())
if not segment_distance == -1:
self._current_segment['distance'] = segment_distance
self._current_segment = None
# TODO Verify if something useful can be done with the (optional) altitude data in the tp=lbs records
def add_location_data(self, data: []):
""""Add location data from a tp=lbs record in the HiTrack file.
Information:
- When tracking an activity with a mobile phone only, the HiTrack files seem to contain altitude
information in the alt data tag (in ft). This seems not to be the case when an activity is started from a
tracking device.
- When tracking an activity with a mobile phone only, the HiTrack files seem to contain stop records (see below)
with a valid timestamp. This is not the case when a tracking device is used, where the timestamp of these
records = 0
- When tracking an activity with a tracking the device, the records in the HiTrack file seem to be ordered by
record type. This seems not to be the case when using a mobile phone only, where records seem to be added in
order of the timestamp they occurred.
- Location records are NOT ordered by timestamp when the activity contains loops of the same track.
- Pause and stop records are identified by tp=lbs;lat=90;lon=-80;alt=0;t=<valid epoch time value or zero>
"""
logging.debug('Adding location data %s', data)
try:
# Create a dictionary from the key value pairs
location_data = dict(data)
# All raw values are floats (timestamp will be converted later)
for keys in location_data:
location_data[keys] = float(location_data[keys])
except Exception as e:
logging.error('One or more required data fields (t, lat, lon) missing or invalid in location data %s\n%s',
data,
e)
raise Exception('One or more required data fields (t, lat, lon) missing or invalid in location data %s',
data)
if location_data['t'] == 0 and location_data['lat'] == 90 and location_data['lon'] == -80:
# Pause/stop record without a valid epoch timestamp. Set it to the last timestamp recorded.
location_data['t'] = self.stop
else:
# Regular location record or pause/stop record with valid epoch timestamp.
# Convert the timestamp to a datetime
location_data['t'] = _convert_hitrack_timestamp(location_data['t'])
self.activity_params['gps'] = True
# Only add location data with a valid timestamp (ignore GPS loss or pause records at start of the location data)
if location_data['t']:
self._add_data_detail(location_data)
def _get_last_location(self) -> Optional[dict]:
""" Returns the last location record in the data dictionary """
if self.data_dict:
reverse_sorted_data = sorted(self.data_dict.items(), key=operator.itemgetter(0), reverse=True)
for t, data in reverse_sorted_data:
if 'lat' in data:
return data
# Empty data dictionary or no last location found in dictionary
return None
def _vincenty(self, point1: tuple, point2: tuple) -> float:
"""
Determine distance between two coordinates
Parameters
----------
point1 : Tuple
[Latitude of first point, Longitude of first point]
point2: Tuple
[Latitude of second point, Longitude of second point]
Returns
-------
s : float
distance in m between point1 and point2
"""
# WGS 84
a = 6378137
f = 1 / 298.257223563
b = 6356752.314245
MAX_ITERATIONS = 200
CONVERGENCE_THRESHOLD = 1e-12
if point1[0] == point2[0] and point1[1] == point2[1]:
return 0.0
U1 = math.atan((1 - f) * math.tan(math.radians(point1[0])))
U2 = math.atan((1 - f) * math.tan(math.radians(point2[0])))
L = math.radians(point2[1] - point1[1])
Lambda = L
sinU1 = math.sin(U1)
cosU1 = math.cos(U1)
sinU2 = math.sin(U2)
cosU2 = math.cos(U2)
for iteration in range(MAX_ITERATIONS):
sinLambda = math.sin(Lambda)
cosLambda = math.cos(Lambda)
sinSigma = math.sqrt((cosU2 * sinLambda) ** 2 +
(cosU1 * sinU2 - sinU1 * cosU2 * cosLambda) ** 2)
if sinSigma == 0:
return 0.0
cosSigma = sinU1 * sinU2 + cosU1 * cosU2 * cosLambda
sigma = math.atan2(sinSigma, cosSigma)
sinAlpha = cosU1 * cosU2 * sinLambda / sinSigma
cosSqAlpha = 1 - sinAlpha ** 2
try:
cos2SigmaM = cosSigma - 2 * sinU1 * sinU2 / cosSqAlpha
except ZeroDivisionError:
cos2SigmaM = 0
C = f / 16 * cosSqAlpha * (4 + f * (4 - 3 * cosSqAlpha))
LambdaPrev = Lambda
Lambda = L + (1 - C) * f * sinAlpha * (sigma + C * sinSigma *
(cos2SigmaM + C * cosSigma *
(-1 + 2 * cos2SigmaM ** 2)))
if abs(Lambda - LambdaPrev) < CONVERGENCE_THRESHOLD:
break
else:
logging.error('Failed to calculate distance between %s and %s', point1, point2)
raise Exception('Failed to calculate distance between %s and %s', point1, point2)
uSq = cosSqAlpha * (a ** 2 - b ** 2) / (b ** 2)
A = 1 + uSq / 16384 * (4096 + uSq * (-768 + uSq * (320 - 175 * uSq)))
B = uSq / 1024 * (256 + uSq * (-128 + uSq * (74 - 47 * uSq)))
deltaSigma = B * sinSigma * (cos2SigmaM + B / 4 * (cosSigma *
(-1 + 2 * cos2SigmaM ** 2) - B / 6 * cos2SigmaM *
(-3 + 4 * sinSigma ** 2) * (-3 + 4 * cos2SigmaM ** 2)))
s = b * A * (sigma - deltaSigma)
return round(s, 6)
def add_heart_rate_data(self, data: []):
"""Add heart rate data from a tp=h-r record in the HiTrack file
"""
# Create a dictionary from the key value pairs
logging.debug('Adding heart rate data %s', data)
try:
hr_data = dict(data)
# Use unique keys. Update keys k -> t and v -> hr
hr_data['t'] = _convert_hitrack_timestamp(float(hr_data.pop('k')))
hr_data['hr'] = int(hr_data.pop('v'))
# Ignore invalid heart rate data (for export)
if hr_data['hr'] < 1 or hr_data['hr'] > 254:
logging.warning('Invalid heart rate data detected and ignored in data %s', data)
except Exception as e:
logging.error('One or more required data fields (k, v) missing or invalid in heart rate data %s\n%s',
data,
e)
raise Exception('One or more required data fields (k, v) missing or invalid in heart rate data %s\n%s',
data)
# Add heart rate data
self._add_data_detail(hr_data)
def add_altitude_data(self, data: []):
"""Add altitude data from a tp=alti record in a HiTrack file"""
# Create a dictionary from the key value pairs
logging.debug('Adding altitude data %s', data)
try:
alti_data = dict(data)
# Use unique keys. Update keys k -> t and v -> hr
alti_data['t'] = _convert_hitrack_timestamp(float(alti_data.pop('k')))
alti_data['alti'] = float(alti_data.pop('v'))
# Ignore invalid heart rate data (for export)
if alti_data['alti'] < -1000 or alti_data['alti'] > 10000:
logging.warning('Invalid altitude data detected and ignored in data %s', data)
return
except Exception as e:
logging.error('One or more required data fields (k, v) missing or invalid in altitude data %s\n%s',
data,
e)
raise Exception('One or more required data fields (k, v) missing or invalid in altitude data %s\n%s', data)
# Add altitude data
self._add_data_detail(alti_data)
# TODO Further verification of assumptions and testing required related to auto activity type detection
# TODO For activities that were tracked using a phone only without a fitness device, there are no s-r records. Hence, in these cases auto detection should use a 'fallback mode' e.g. by using the p-m records (and assume that swimming activities with phone only won't occur)
def add_step_frequency_data(self, data: []):
"""Add step frequency data from a tp=s-r record in a HiTrack file.
The unit of measure of the step frequency is steps/minute.
Assumptions:
- Cycling activities have s-r records with value = 0 (and Huawei/Honor doesn't seem to sell cadence meters)
- Swimming activities have s-r records but no lbs records. The s-r records have negative values
(indicating the stroke type). It seems that s-r records are used to indicate
the start of a new segments for swimming.
"""
logging.debug('Adding step frequency data or detect cycling or swimming activities %s', data)
try:
# Create a dictionary from the key value pairs
step_freq_data = dict(data)
# Use unique keys. Update keys k -> t and v -> s_r
step_freq_data['t'] = _convert_hitrack_timestamp(float(step_freq_data.pop('k')))
step_freq_data['s-r'] = int(step_freq_data.pop('v'))
except Exception as e:
logging.error('One or more required data fields (k, v) missing or invalid in step frequency data %s\n%s',
data,
e)
raise Exception('One or more required data fields (k, v) missing or invalid in step frequency data %s\n%s',
data)
# Keep track of minimum, maximum and average step frequency data for activity type auto-detection.
# Ignore negative values since these belong to swimming activities and are not important to recognize the
# swimming activity.
if step_freq_data['s-r'] >= 0:
if 'step frequency min' not in self.activity_params:
self.activity_params['step frequency min'] = step_freq_data['s-r']
self.activity_params['step frequency max'] = step_freq_data['s-r']
self.activity_params['step frequency data'] = []
elif step_freq_data['s-r'] < self.activity_params['step frequency min']:
self.activity_params['step frequency min'] = step_freq_data['s-r']
elif step_freq_data['s-r'] > self.activity_params['step frequency max']:
self.activity_params['step frequency max'] = step_freq_data['s-r']
# Add step frequency data detail to activity parameters for later average step frequency calculation.
self.activity_params['step frequency data'].append(step_freq_data['s-r'])
# Add step frequency data.
self._add_data_detail(step_freq_data)
def add_swolf_data(self, data: []):
""" Add SWOLF (swimming) data from a tp=swf record in a HiTrack file
SWOLF value = time to swim one pool length + number of strokes
"""
logging.debug('Adding SWOLF swim data %s', data)
try:
# Create a dictionary from the key value pairs
swolf_data = dict(data)
# Use unique keys. Update keys k -> t and v -> swf
# Time of SWOLF swimming data is relative to activity start.
# The first record with k=0 is the value registered after 5 seconds of activity.
swolf_data['t'] = self.start + dts_delta(seconds=int(swolf_data.pop('k')) + 5)
swolf_data['swf'] = int(swolf_data.pop('v'))
self.activity_params['swim'] = True
# If there is no last swf record or the last added swf record had a different swf value, then this record
# belongs to a new lap (segment)
# TODO There is a chance that checking on SWOLF only might miss a lap in case two consecutive laps have the same SWOLF (but then again, chances are that stroke and speed data are also identical)
# TODO Since SWOLF value contains both time and strokes, add extra check to not process consecutive same time laps beyond the SWOLF value.
if not self._current_segment:
# First record of first lap. Start new segment (lap)
self._add_segment_start(swolf_data['t'] - dts_delta(seconds=5))
else:
if self.last_swolf_data['swf'] != swolf_data['swf']:
# New lap detected.
# Close segment of previous lap. Since the current lap starts at the exact same time
self._current_segment['stop'] = self.last_swolf_data['t']
self._current_segment = None
# Open new segment for this lap. End of previous lap is start of current lap.
# Add 1 microsecond to split the lap data correctly.
self._add_segment_start(swolf_data['t'] + dts_delta(microseconds=1))
# Remember this SWOLF data as last parsed SWOLF data.
self.last_swolf_data = swolf_data
except Exception as e:
logging.error('One or more required data fields (k, v) missing or invalid in SWOLF data %s\n%s',
data,
e)
raise Exception('One or more required data fields (k, v) missing or invalid in SWOLF data %s\n%s',
data)
# Add SWOLF data
self._add_data_detail(swolf_data)
def add_stroke_frequency_data(self, data: []):
""" Add stroke frequency (swimming) data (in strokes/minute) from a tp=p-f record in a HiTrack file """
logging.debug('Adding stroke frequency swim data %s', data)
try:
# Create a dictionary from the key value pairs
stroke_freq_data = dict(data)
# Use unique keys. Update keys k -> t and v -> p-f
# Time of stroke frequency swimming data is relative to activity start.
# The first record with k=0 is the value registered after 5 seconds of activity.
stroke_freq_data['t'] = self.start + dts_delta(seconds=int(stroke_freq_data.pop('k')) + 5)
stroke_freq_data['p-f'] = int(stroke_freq_data.pop('v'))
except Exception as e:
logging.error('One or more required data fields (k, v) missing or invalid in stroke frequency data %s\n%s',
data,
e)
raise Exception(
'One or more required data fields (k, v) missing or invalid in stroke frequency data %s\n%s',
data)
# Add stroke frequency data
self._add_data_detail(stroke_freq_data)
def add_speed_data(self, data: []):
""" Add speed data (in decimeter/second) from a tp=rs record in a HiTrack file """
logging.debug('Adding speed data %s', data)
try:
# Create a dictionary from the key value pairs
speed_data = dict(data)
# Use unique keys. Update keys k -> t and v -> p-f
# Time of speed data is relative to activity start.
# The first record with k=0 is the value registered after 5 seconds of activity.
speed_data['t'] = self.start + dts_delta(seconds=int(speed_data.pop('k')) + 5)
speed_data['rs'] = int(speed_data.pop('v'))
except Exception as e:
logging.error('One or more required data fields (k, v) missing or invalid in speed data %s\n%s',
data,
e)
raise Exception('One or more required data fields (k, v) missing or invalid in speed data %s\n%s',
data)
# Add speed data
self._add_data_detail(speed_data)
def _add_data_detail(self, data: dict):
# Add the data to the data dictionary.
if data['t'] not in self.data_dict:
# No data for timestamp. Create a new record for it.
self.data_dict[data['t']] = data
else:
# Existing data for timestamp. Add the new data to the existing record.
self.data_dict[data['t']].update(data)
# Records are NOT necessarily in chronological order.
# Update start of the activity when a record with an earlier timestamp is added.
if not self.start or self.start > data['t']:
self.start = data['t']
# Update stop of the activity when a record with a later timestamp is added.
if not self.stop or self.stop < data['t']:
self.stop = data['t']
def get_segments(self) -> list:
"""" Returns the segment list.
- For swimming activities, the segments were identified during parsing of the SWOLF data.
- For walking, running and cycling activities, the segments must be calculated once based on the parsed
location data. Because the location data is not (always) in chronological order (e.g. loops in the track),
for these activities
"""
# Make sure calculation of segments is done.
self._calc_segments_and_distances()
return self._segment_list
def _reset_segments(self):
self._segment_list = None
self._current_segment = None
def _detect_activity_type(self) -> str:
""""Auto-detection of the activity type. Only valid when called after all data has been parsed."""
logging.debug('Detecting activity type for activity %s with parameters %s',
self.activity_id, self.activity_params)
# Filter out swimming
if 'swim' in self.activity_params:
# Swimming detected
if 'gps' not in self.activity_params:
self._activity_type = self.TYPE_POOL_SWIM
else:
self._activity_type = self.TYPE_OPEN_WATER_SWIM
logging.debug('Activity type %s detected for activity %s', self._activity_type, self.activity_id)
return self._activity_type
# Walk / Run / Cycle
if 'step frequency min' in self.activity_params:
# Walk / Run / Cycle - Step frequency data available
# For walking and running, the assumption is that step frequency data is available regardless whether
# a fitness tracking device is used or not.
# Calculate average step frequency
step_freq_sum = 0
for n, step_freq in enumerate(self.activity_params['step frequency data']):
step_freq_sum += step_freq
step_freq_avg = step_freq_sum / (n + 1)
logging.debug('Activity %s has a calculated average step frequency of %d', self.activity_id, step_freq_avg)
if self.activity_params['step frequency min'] == 0 and self.activity_params['step frequency max'] == 0:
# Specific check for cycling - all step frequency records being zero
self._activity_type = self.TYPE_CYCLE
elif self.activity_params['step frequency min'] == 0 and step_freq_avg < 70:
# TODO This condition will have to be confirmed in practice whether a long pause during walking would cause it to be detected as cycling
# Some walking on foot during cycling activity - detect it as cycling
# See https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5435734/ - Figure 2 extrapolated theoretical stride
# frequency of 35 at speed 0.
self._activity_type = self.TYPE_CYCLE
elif self.activity_params['step frequency max'] < 135:
# See https://www.ncbi.nlm.nih.gov/pmc/articles/PMC5435734/ - Walk-to-run stride frequency of 70.6 +- 3.2
self._activity_type = self.TYPE_WALK
else:
self._activity_type = self.TYPE_RUN
logging.debug('Activity type %s detected using step frequency data for activity %s',
self._activity_type, self.activity_id)
return self._activity_type
else:
# Walk / Run / Cycle - no step frequency data available (e.g. activities registered using phone only).
# See above, since it is assumed that walking or running activities will always have step frequency records
# regardless whether a fitness tracking device was used or not, this must be a cycling activity.
self._activity_type = self.TYPE_CYCLE
logging.debug('Activity type %s detected using step frequency data for activity %s',
self._activity_type, self.activity_id)
return self._activity_type
def _calc_segments_and_distances(self):
"""" Perform the following detailed data calculations for walk, run, or cycle activities:
- segment list
- segment start, stop, duration and cumulative distance
- detailed track point cumulative distances
- total distance
Calculations change/add the following class attributes in place:
- _segment_list
- data_dict : sorted by timestamp and distances added
- distance
"""
# Calculate only once
if self._segment_list:
return
logging.debug('Calculating segment and distance data for activity %s', self.activity_id)
# Sort the data dictionary by timestamp
self.data_dict = collections.OrderedDict(sorted(self.data_dict.items()))
# Do calculations
last_location = None
# Start first segment at earliest data found while adding the data
self._add_segment_start(self.start)
for key, data in self.data_dict.items():
if 'lat' in data: # This is a location record
if last_location:
if data['lat'] == 90 and data['lon'] == -80:
# Pause or stop records (lat = 90, long = -80, alt = 0) and handle segment data creation
# Use timestamp and distance of last (location) record
self._add_segment_stop(last_location['t'], last_location['distance'])
elif 'lat' not in last_location:
# GPS was lost and is now back. Set distance to last known distance and use this record as the
# last known location.
logging.debug('GPS signal available at %s in %s. Calculating distance using location data.',
data['t'], self.activity_id)
data['distance'] = last_location['distance']
# If no current segment, create one
if not self._current_segment:
self._add_segment_start(data['t'])
last_location = data
else:
# Regular location record. If no current segment, create one
if not self._current_segment:
self._add_segment_start(data['t'])
# Calculate and set the accumulative distance of the location record
data['distance'] = self._vincenty((last_location['lat'], last_location['lon']),
(data['lat'], data['lon'])) + \
last_location['distance']
last_location = data
else:
# First location. Set distance 0
data['distance'] = 0
last_location = data
elif 'rs' in data:
if last_location:
time_delta = data['t'] - last_location['t']
if 'lat' not in last_location or time_delta > GPS_TIMEOUT:
# GPS signal lost for more than the GPS timeout period. Calculate distance based on speed records
logging.debug('No GPS signal between %s and %s in %s. Calculating distance using speed data '
'(%s dm/s)',
last_location['t'], data['t'], self.activity_id, data['rs'])
# If no current segment, create one
if not self._current_segment:
self._add_segment_start(data['t'])
data['distance'] = last_location['distance'] + (data['rs'] * time_delta.seconds / 10)
last_location = data
else:
# No location records processed and speed record available = start without GPS or no GPS at all.
# Set distance 0
data['distance'] = 0
last_location = data
# Close last segment if it is still open
if self._current_segment:
# If the segment is open (no stop record for end of activity), use timestamp and distance of last location
# record.
self._add_segment_stop(last_location['t'], last_location['distance'])
# Set the total distance of the activity
self.distance = int(last_location['distance'])
def get_segment_data(self, segment: dict) -> list:
"""" Returns a filtered and sorted data set containing all raw parsed data from the requested segment """
# Filter data
if segment['stop']:
segment_data_dict = {k: v for k, v in self.data_dict.items()
if segment['start'] <= k <= segment['stop']}
else:
# E.g for swimming activities, the last segment is not closed due to no stop record nor valid record that
# indicates the end of the activity. Return all remaining data starting from the start timestamp
segment_data_dict = {k: v for k, v in self.data_dict.items()
if segment['start'] <= k}
# Sort data by timestamp (sort on key in data dictionary)
segment_data = [value for (key, value) in sorted(segment_data_dict.items())]
return segment_data
def get_swim_data(self) -> Optional[list]:
if self.get_activity_type() == self.TYPE_POOL_SWIM:
return self._get_pool_swim_data()
elif self.get_activity_type() == self.TYPE_OPEN_WATER_SWIM:
return self._get_open_water_swim_data()
else:
return None
def _get_pool_swim_data(self) -> list:
"""" Calculates the real swim (lap) data based on the raw parsed pool swim data
The following calculation steps on the raw parsed data is applied.
1. Starting point is the raw parsed data per lap (segment). The data consists of multiple data records
with a 5 second time interval containing the same SWOLF and stroke frequency (in strokes/minute) values.
2. Calculate the number of strokes in the lap.
Number of strokes = stroke frequency x (last - first lqp timestamp) / 60
3. Calculate the lap time: lap time = SWOLF - number of strokes
:return
A list of lap data dictionaries containing the following data:
'lap' : lap number in the activity
'start' : Start timestamp of the lap
'stop' : Stop timestamp of the lap
'duration' : lap duration in seconds
'swolf' : lap SWOLF value (duration + number of strokes in lap)
'strokes' : number of strokes in lap
'speed' : estimated average speed during the lap in m/s.
Note: this is an approximate value as the minimum resolution of the raw speed data is 1 dm/s
'distance' : estimated distance based on the average speed and the lap duration.
Note: this is an approximate value as the minimum resolution of the raw speed data is 1 dm/s
"""
logging.info('Calculating swim data for activity %s', self.activity_id)
swim_data = []
# Sort the data dictionary by timestamp
self.data_dict = collections.OrderedDict(sorted(self.data_dict.items()))
total_distance = 0
for n, segment in enumerate(self._segment_list):
segment_data = self.get_segment_data(segment)
first_swf_index = 0
while 'swf' not in segment_data[first_swf_index]:
first_swf_index += 1
first_lap_record = segment_data[first_swf_index]
last_lap_record = segment_data[-1]
# First record is after 5 s in lap
raw_data_duration = (last_lap_record['t'] - first_lap_record['t']).total_seconds() + 5
lap_data = {}
lap_data['lap'] = n + 1
lap_data['swolf'] = first_lap_record['swf']
lap_data['strokes'] = round(
first_lap_record['p-f'] * raw_data_duration / 60) # Convert strokes/min -> strokes/lap
lap_data['duration'] = lap_data['swolf'] - lap_data['strokes'] # Derive lap time from SWOLF - strokes
if self.pool_length < 1:
# Pool length not set. Derive estimated distance from raw speed data
lap_data['speed'] = first_lap_record['rs'] / 10 # estimation in m/s
lap_data['distance'] = lap_data['speed'] * lap_data['duration']
else:
lap_data['distance'] = self.pool_length
lap_data['speed'] = self.pool_length / lap_data['duration']
total_distance += lap_data['distance']
# Start timestamp of lap
if not swim_data:
lap_data['start'] = self.start
else:
# Start of this lap is stop of previous lap
lap_data['start'] = swim_data[-1]['stop']
# Stop timestamp of lap
lap_data['stop'] = lap_data['start'] + dts_delta(seconds=lap_data['duration'])
logging.debug('Calculated swim data for lap %d : %s', n + 1, lap_data)
swim_data.append(lap_data)
# Update activity distance
self.distance = total_distance
return swim_data
def _get_open_water_swim_data(self) -> list:
"""" Calculates the real swim (lap) data based on the raw parsed open water swim data"""
logging.info('Calculating swim data for activity %s', self.activity_id)
swim_data = []
# Sort the data dictionary by timestamp
self.data_dict = collections.OrderedDict(sorted(self.data_dict.items()))
total_distance = 0
# The generated segment list based on the SWOLF data is unusable for open water swim activities.
# Reset it and recalculate segments and distances based on the GPS location data.
self._reset_segments()
self._calc_segments_and_distances()
# Create 1 large lap
lap_data = {}
lap_data['lap'] = 1
lap_data['start'] = self.start
lap_data['stop'] = self.stop
lap_data['duration'] = (self.stop - self.start).seconds
lap_data['distance'] = self.distance
swim_data.append(lap_data)
return swim_data
def __repr__(self):
to_string = self.__class__.__name__ + \
'\nID : ' + self.activity_id + \
'\nType : ' + self._activity_type + \
'\nDate : ' + dts.strftime(self.start, "%Y-%m-%d") + ' (YYYY-MM-DD)' + \
'\nDuration : ' + str(self.stop - self.start) + ' (H:MM:SS)' \
'\nDistance : ' + str(self.distance) + 'm'
return to_string
class HiTrackFile:
"""The HiTrackFile class represents a single HiTrack file. It contains all file handling and parsing methods."""
def __init__(self, hitrack_filename: str, activity_type: str = HiActivity.TYPE_UNKNOWN):
# Validate the file parameter and (try to) open the file for reading
if not hitrack_filename:
logging.error('Parameter HiTrack filename is missing')
try:
self.hitrack_file = open(hitrack_filename, 'r')
except Exception as e:
logging.error('Error opening HiTrack file <%s>\n%s', hitrack_filename, e)
raise Exception('Error opening HiTrack file <%s>', hitrack_filename)
self.activity = None
self.activity_type = activity_type
# Try to parse activity start and stop datetime from the filename.
# Original HiTrack filename is: HiTrack_<12 digit start datetime><12 digit stop datetime><5 digit unknown>
try:
# Get start timestamp from file in seconds (10 digits)
self.start = _convert_hitrack_timestamp(float(os.path.basename(self.hitrack_file.name)[8:18]))
except:
self.start = None
try:
# Get stop timestamp from file in seconds (10 digits)
self.stop = _convert_hitrack_timestamp(float(os.path.basename(self.hitrack_file.name)[20:30]))
except:
self.stop = None
def parse(self) -> HiActivity:
"""
Parses the HiTrack file and returns the parsed data in a HiActivity object
"""
if self.activity:
return self.activity # No need to parse a second time if the file was already parsed
logging.info('Parsing file <%s>', self.hitrack_file.name)
# Create a new activity object for the file
self.activity = HiActivity(os.path.basename(self.hitrack_file.name), self.activity_type)
data_list = []
line_number = 0
line = ''
try:
csv_reader = csv.reader(self.hitrack_file, delimiter=';')
for line_number, line in enumerate(csv_reader, start=1):
data_list.clear()
if line[0] == 'tp=lbs': # Location line format: tp=lbs;k=_;lat=_;lon=_;alt=_;t=_
for data_index in [5, 2, 3]: # Parse parameters t, lat, lon parameters (alt not parsed)
# data_list.append(line[data_index].split('=')[1]) # Parse values after the '=' character
data_list.append(line[data_index].split('=')) # Parse key value pairs
self.activity.add_location_data(data_list)
elif line[0] == 'tp=h-r': # Heart rate line format: tp=h-r;k=_;v=_
for data_index in [1, 2]: # Parse parameters k (timestamp) and v (heart rate)
data_list.append(line[data_index].split('=')) # Parse values after the '=' character
self.activity.add_heart_rate_data(data_list)
elif line[0] == 'tp=alti': # Altitude line format: tp=alti;k=_;v=_
for data_index in [1, 2]: # Parse parameters k (timestamp) and v (heart rate)
data_list.append(line[data_index].split('=')) # Parse values after the '=' character
self.activity.add_altitude_data(data_list)
elif line[0] == 'tp=s-r': # Step frequency (steps/minute) format: tp=s-r;k=_;v=_
for data_index in [1, 2]: # Parse parameters k (timestamp) and v (step frequency)
data_list.append(line[data_index].split('=')) # Parse values after the '=' character
self.activity.add_step_frequency_data(data_list)
elif line[0] == 'tp=swf': # SWOLF format: tp=swf;k=_;v=_
for data_index in [1, 2]: # Parse parameters k (timestamp) and v (step frequency)
data_list.append(line[data_index].split('=')) # Parse values after the '=' character
self.activity.add_swolf_data(data_list)
elif line[0] == 'tp=p-f': # Stroke frequency (strokes/minute) format: tp=p-f;k=_;v=_
for data_index in [1, 2]: # Parse parameters k (timestamp) and v (step frequency)
data_list.append(line[data_index].split('=')) # Parse values after the '=' character
self.activity.add_stroke_frequency_data(data_list)
elif line[0] == 'tp=rs': # Speed (decimeter/second) format: tp=p-f;k=_;v=_
for data_index in [1, 2]: # Parse parameters k (timestamp) and v (step frequency)
data_list.append(line[data_index].split('=')) # Parse values after the '=' character
self.activity.add_speed_data(data_list)
except Exception as e:
logging.error('Error parsing file <%s> at line <%d>\nCSV data: %s\n%s',
self.hitrack_file.name, line_number, line, e)
raise Exception('Error parsing file <%s> at line <%d>\n%s', self.hitrack_file.name, line_number)
finally:
self._close_file()
return self.activity
def _close_file(self):
try:
if self.hitrack_file and not self.hitrack_file.closed:
self.hitrack_file.close()
logging.debug('HiTrack file <%s> closed', self.hitrack_file.name)
except Exception as e:
logging.error('Error closing HiTrack file <%s>\n', self.hitrack_file.name, e)
def __del__(self):
self._close_file()
class HiTarBall:
_TAR_HITRACK_DIR = 'com.huawei.health/files'
_HITRACK_FILE_START = 'HiTrack_'
def __init__(self, tarball_filename: str, extract_dir: str = OUTPUT_DIR):
# Validate the tarball file parameter
if not tarball_filename:
logging.error('Parameter HiHealth tarball filename is missing')
try:
self.tarball = tarfile.open(tarball_filename, 'r')
except Exception as e:
logging.error('Error opening tarball file <%s>\n%s', tarball_filename, e)
raise Exception('Error opening tarball file <%s>', tarball_filename)
self.extract_dir = extract_dir
self.hi_activity_list = []
def parse(self, from_date: dts = None) -> list:
try:
# Look for HiTrack files in directory com.huawei.health/files in tarball
tar_info: tarfile.TarInfo
for tar_info in self.tarball.getmembers():
if tar_info.path.startswith(self._TAR_HITRACK_DIR) \
and os.path.basename(tar_info.path).startswith(self._HITRACK_FILE_START):
hitrack_filename = os.path.basename(tar_info.path)
logging.info('Found HiTrack file <%s> in tarball <%s>', hitrack_filename, self.tarball.name)
if from_date:
# Is file from or later than start date parameter?
hitrack_file_date = _convert_hitrack_timestamp(
float(hitrack_filename[len(self._HITRACK_FILE_START):len(self._HITRACK_FILE_START) + 10]))
if hitrack_file_date >= from_date:
# Parse Hitrack file from tar ball
self._extract_and_parse_hitrack_file(tar_info)
else:
logging.info(
'Skipped parsing HiTrack file <%s> being an activity from %s before %s (YYYYMMDD).',
hitrack_filename, hitrack_file_date.isoformat(), from_date.isoformat())
else:
# Parse HiTrack file from tar ball
self._extract_and_parse_hitrack_file(tar_info)
return self.hi_activity_list
except Exception as e:
logging.error('Error parsing tarball <%s>\n%s', self.tarball.name, e)
raise Exception('Error parsing tarball <%s>', self.tarball.name)
def _extract_and_parse_hitrack_file(self, tar_info):
try:
# Flatten directory structure in the TarInfo object to extract the file directly in the extraction directory
tar_info.name = os.path.basename(tar_info.name)
self.tarball.extract(tar_info, self.extract_dir)
hitrack_file = HiTrackFile(self.extract_dir + '/' + tar_info.path)
hi_activity = hitrack_file.parse()
self.hi_activity_list.append(hi_activity)
except Exception as e:
logging.error('Error parsing HiTrack file <%s> in tarball <%s>', tar_info.path, self.tarball.name, e)
def _close_tarball(self):
try:
if self.tarball and not self.tarball.closed:
self.tarball.close()
logging.debug('Tarball <%s> closed', self.tarball.name)
except Exception as e:
logging.error('Error closing tarball <%s>\n', self.tarball.name, e)
def __del__(self):
self._close_tarball()
class HiJson:
def __init__(self, json_filename: str, output_dir: str = OUTPUT_DIR):
# Validate the tarball file parameter
if not json_filename:
logging.error('Parameter for JSON filename is missing')
try:
self.json_file = open(json_filename, 'r')
except Exception as e:
logging.error('Error opening JSON file <%s>\n%s', json_filename, e)
raise Exception('Error opening JSON file <%s>', json_filename)
self.output_dir = output_dir
# If output directory doesn't exist, make it.
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
self.hi_activity_list = []
def parse(self, from_date: dts = None, usetimezone : bool = False) -> list:
try:
# Look for HiTrack information in JSON file
# The JSON file from Huawei contains invalid formatting in the 'partTimeMap' data (missing double quotes
# for the keys). For now, remove the invalid parts using a regular expression.
json_string = self.json_file.read()
json_string = re.sub('\"partTimeMap\"\:{(.*?)}\,', '', json_string)
data = json.loads(json_string)
# JSON data structure
# data {list}
# 00 {dict}
# motionPathData {list}
# 0 {dict)
# sportType {int}
# attribute {str} 'HW_EXT_TRACK_DETAIL@is<HiTrack File Data>&&HW_EXT_TRACK_SIMPLIFY@is<Other Data>
# 1 {dict)
# sportType {int}
# attribute {str} 'HW_EXT_TRACK_DETAIL@is<HiTrack File Data>&&HW_EXT_TRACK_SIMPLIFY@is<Other Data>
# 2 {dict)
# sportType {int}
# attribute {str} 'HW_EXT_TRACK_DETAIL@is<HiTrack File Data>&&HW_EXT_TRACK_SIMPLIFY@is<Other Data>
# ...
# sportType {int}
# timeZone {string} '+0200'
# recordDay {int} 'YYYYMMDD'
for n, activity_dict in enumerate(data):
activity_date = dts.strptime(str(activity_dict['recordDay']), "%Y%m%d")
if activity_date >= from_date:
# add sub/level for multisport day...
for y in range(len(activity_dict["motionPathData"])):