This repository has been archived by the owner on May 28, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdata_handling.py
252 lines (206 loc) · 9.74 KB
/
data_handling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
"""This module provide the class and functions needed to handle the data used to generate a dataset for activity
recognition.
"""
import os
import csv
import platform
from datetime import datetime
from datetime import timedelta
class TimecodePresentError(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return repr(self.message)
class TooFewColumnInFile(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return repr(self.message)
def import_unified_file(path_to_file: str) -> dict:
"""Import the parameters from a unified CSV file.
It will read the file, create an array of each column_value (ordered) and store the array in a dictionary with the
key set to the column_name (upper_case).
At least one column_name different of 'TIMECODE' and 'TAG' must be present in the file.
The file have to be correctly formatted :
1st line : column_name1, column_name2, columen_name3, ...
following lines : value1, value2, value3, ...
Parameters
----------
path_to_file : str
The absolute path to the CSV file to import.
Returns
-------
parameter_dict : dict
The imported parameters as a dictionary build as follow:
parameter_dict['column_name'] = [value_row1, value_row2, value_row3, ...]
The keys 'TIMECODE' and 'TAG' are added if not present in the CSV file.
Raises
------
TypeError
You have provided a wrong parameter.
FileNotFoundError
The given file does not exist or the path is mis-formatted.
IOError
The file can't be open because of an OS restriction or error.
csv.Error
If the csv library builtin functions fail.
TooFewColumnInFile
When the CSV file contains to few column (only one or only both :"TIMECODE", "TAG").
"""
# Test if the given parameters have the right type
if type(path_to_file) is not str:
raise TypeError('The "path_to_file" must be a string', type(path_to_file))
# Define the function variable
parameter_dict = {}
# Normalize path for Windows
if platform.system() == 'Windows' and path_to_file[0] == '/':
path_to_file = path_to_file[1:]
# Read the input file
with open(path_to_file, newline='') as unified_file:
# Create the CSV reader
reader = csv.DictReader(unified_file, restkey='UnknownName', restval='N/A')
# Set the dictionaries
for fieldname in reader.fieldnames:
parameter_dict[str(fieldname).upper()] = []
# Test if the CSV file contain enough data
if len(parameter_dict.keys() - {'TIMECODE', 'TAG'}) == 0:
raise TooFewColumnInFile('Their is too few parameter to import in the given file.')
# Add th values to the dictionaries
for row in reader:
for key, value in row.items():
key = str(key).upper()
if key == 'TIMECODE' and value == '':
pass
else:
parameter_dict[key].append(value)
# Add the default dictionaries if not present in the imported CSV file
if 'TIMECODE' not in parameter_dict.keys():
parameter_dict['TIMECODE'] = []
elif len(parameter_dict['TIMECODE']) != 0:
if parameter_dict['TIMECODE'][0].isnumeric():
for value in parameter_dict['TIMECODE']:
value = datetime.utcfromtimestamp(float(value))
else:
for value in parameter_dict['TIMECODE']:
temp_value = value.split(':')
value = datetime(1970, 1, 1, int(temp_value[0]), int(temp_value[1]), int(temp_value[2]),
int(temp_value[3])*1000)
if 'TAG' not in parameter_dict.keys():
parameter_dict['TAG'] = [''] * len(parameter_dict[list(parameter_dict.keys() - {'TIMECODE', 'TAG'})[0]])
return parameter_dict
def generate_timecodes(parameter_dict: dict, sampling_rate: int, force=False) -> None:
"""Generate the data timecodes with a specific sampling rate
It will generate the timecodes for the `parameter_dict` passed as parameter using a given `sampling_rate`.
Existing timecodes can be overwritten by changing the `force` parameter.
Parameters
----------
parameter_dict : dict
A parameter dictionary generated by one of the import function.
sampling_rate : int
The sampling rate of the given data (record per second).
force : bool
Set to True to overwrite the existing timecode.
Raises
------
TypeError
You have provided a wrong parameter.
TimecodePresentError
The given `parameter_dict` already have timecodes set.
Notes
-----
The time delta (μs) between each timecode will be calculated with the formula :
timedelta = (1 / sampling_rate) * 1000000)
"""
# Test if the given parameters have the right type
if type(parameter_dict) is not dict or 'TIMECODE' not in parameter_dict.keys():
raise TypeError('The "parameter_dict" must be a dictionary generated by one of the import function.')
elif type(sampling_rate) is not int:
raise TypeError('The "sampling_rate" must be an int.')
elif type(force) is not bool:
raise TypeError('The "force" must be a boolean.')
# Test if there if values in parameter_dict['TIMECODE']
if len(parameter_dict['TIMECODE']) != 0 and not force:
raise TimecodePresentError('The "TIMECODE" parameter already contains entries.')
else:
# Get the number of timecode to generate
amount = len(parameter_dict[list(parameter_dict.keys() - {'TIMECODE', 'TAG'})[0]])
# Generate timecodes
timecode = datetime(1970, 1, 1, 0, 0, 0, 0)
delta = timedelta(microseconds=(1 / sampling_rate) * 1000000)
for i in range(amount):
parameter_dict['TIMECODE'].append(timecode)
timecode = timecode + delta
def export_dataset(parameter_dict: dict, selected_parameter: list, output_dir: str) -> None:
"""Export the given parameter to a dataset
It will generate a dataset using the `parameters_dict` passed as a parameter in the selected directory.
Parameters
----------
parameter_dict : dict
A parameter dictionary generated by one of the import function.
selected_parameter : list
List of parameter selected for exportation.
output_dir : str
Path to the output directory for the generated dataset.
"""
# Test if the given parameters have the right type
if type(parameter_dict) is not dict:
raise TypeError('The "parameter_dict" must be a dictionary', type(parameter_dict))
if type(selected_parameter) is not list:
raise TypeError('The "selected_parameter" must be a list', type(selected_parameter))
if type(output_dir) is not str:
raise TypeError('The "output_dir" must be a string', type(output_dir))
# Generate the dataset file
parameter_list = ['TIMECODE']
parameter_list.extend(selected_parameter)
parameter_list.append('TAG')
# Normalize the path for windows
if platform.system() == 'Windows' and output_dir[0] == '/':
output_dir = output_dir[1:]
# Write the output file
with open(output_dir + os.path.sep + 'dataset.csv', 'w', newline='') as output_file:
writer = csv.DictWriter(output_file, fieldnames=parameter_list, dialect='excel')
writer.writeheader()
for i in range(len(parameter_dict['TIMECODE'])):
temp_dict = {}
for parameter in parameter_list:
value = parameter_dict.get(parameter)[i]
if type(value) is datetime:
value = value.strftime('%H:%M:%S:') + str(int(value.microsecond / 1000))
temp_dict[parameter] = value
writer.writerow(temp_dict)
def export_dataset_separated_files(parameter_dict: dict, selected_parameter: list, output_dir: str) -> None:
"""Export the given parameter to a dataset as separated files
It will generate a dataset using the `parameters_dict` passed as a parameter in the selected directory.
Parameters
----------
parameter_dict : dict
A parameter dictionary generated by one of the import function.
selected_parameter : list
List of parameter selected for exportation.
output_dir : str
Path to the output directory for the generated dataset.
"""
# Test if the given parameters have the right type
if type(parameter_dict) is not dict:
raise TypeError('The "parameter_dict" must be a dictionary', type(parameter_dict))
if type(selected_parameter) is not list:
raise TypeError('The "selected_parameter" must be a list', type(selected_parameter))
if type(output_dir) is not str:
raise TypeError('The "output_dir" must be a string', type(output_dir))
# Normalize the output files
if platform.system() == 'Windows' and output_dir[0] == '/':
output_dir = output_dir[1:]
# Write th files
for parameter in selected_parameter:
with open(output_dir + os.path.sep + parameter + '.csv', 'w', newline='') as output_file:
local_list = ['TIMECODE', parameter, 'TAG']
writer = csv.DictWriter(output_file, fieldnames=local_list, dialect='excel')
writer.writeheader()
for i in range(len(parameter_dict['TIMECODE'])):
temp_dict = {}
for value in local_list:
value = parameter_dict.get(parameter)[i]
if type(value) is datetime:
value = value.strftime('%H:%M:%S:') + str(int(value.microsecond / 1000))
temp_dict[parameter] = value
writer.writerow(temp_dict)