forked from OceanDataTools/openrvdas
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtext_file_reader.py
352 lines (299 loc) · 13 KB
/
text_file_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
#!/usr/bin/env python3
import glob
import logging
import sys
import time
from os.path import dirname, realpath
sys.path.append(dirname(dirname(dirname(realpath(__file__)))))
from logger.utils.formats import Text # noqa: E402
from logger.readers.reader import StorageReader # noqa: E402
################################################################################
# Open and read single-line records from one or more text files.
class TextFileReader(StorageReader):
"""Read lines from one or more text files. Sequentially open all
files that match the file_spec.
"""
############################
def __init__(self, file_spec=None, tail=False, refresh_file_spec=False,
retry_interval=0.1, interval=0, eol=None):
"""
```
file_spec Possibly wildcarded string speficying files to be opened.
Special case: if file_spec is None, read from stdin.
tail If False, return None upon reaching end of last file; if
True, block upon reaching EOF of last file and wait for
more records.
refresh_file_spec
If True, refresh the search for matching filenames when
reaching last EOF to see if any new matching files have
appeared in the interim.
retry_interval
If tail and/or refresh_file_spec are True, how long to
wait before looking to see if any new records or files
have shown up.
interval
How long to sleep between returning records. In general
this should be zero except for debugging purposes.
eol Optional character by which to recognize the end of a record
```
Note that the order in which files are opened will probably be in
alphanumeric by filename, but this is not strictly enforced and
depends on how glob returns them.
"""
super().__init__(output_format=Text)
self.file_spec = file_spec
self.tail = tail
self.refresh_file_spec = refresh_file_spec
self.retry_interval = retry_interval
self.interval = interval
self.eol = eol
# If interval != 0, we need to keep track of our last_read to know
# how long to sleep
self.last_read = 0
# The file we're currently using
self.current_file = None
self.pos = 0
self.start_pos = {}
self.end_pos = {}
# Special case if file_spec is None
if file_spec is None:
self.current_file = sys.stdin
self.used_file_list = []
self.unused_file_list = []
self.tail = True
return
# Which files will we use, which haven't we used yet?
self.unused_file_list = sorted(glob.glob(file_spec))
if not self.unused_file_list:
logging.warning('TextFileReader: file_spec "%s" matches no files',
file_spec)
self.used_file_list = []
############################
def _get_next_file(self):
"""Internal - Open and assign the next unused file to
self.current_file if we can find one. Return None (and don't mess
with current_file) if we can't find a next one.
"""
# If no more unused files, but refresh_file_spec is specified, see
# if more files have shown up
if not self.unused_file_list and self.refresh_file_spec:
matching_files = sorted(glob.glob(self.file_spec))
self.unused_file_list = [f for f in matching_files
if f not in self.used_file_list]
logging.info('TextFileReader found %d new files matching spec "%s": %s',
len(self.unused_file_list), self.file_spec,
self.unused_file_list)
# Are there any more files? If so, get the next one and open it
if self.unused_file_list:
# First, save the record count for the file we're about to close.
if self.used_file_list:
prev_filename = self.used_file_list[-1]
self.end_pos[prev_filename] = self.pos
next_filename = self.unused_file_list.pop(0)
logging.info('TextFileReader opening next file "%s"', next_filename)
self.start_pos[next_filename] = self.pos
self.current_file = open(next_filename, 'r')
self.used_file_list.append(next_filename)
return self.current_file
# If here, we've found no unused next file. Give up
return None
############################
def read(self):
"""Get the next line of text. Return None if there are no more
records. To test EOF you'll need to test
if record is None:
no more records...
rather than simply
if not record:
could be EOF or simply an empty next line
"""
if self.interval:
now = time.time()
sleep_time = max(0, self.interval - (now - self.last_read))
logging.debug('Sleeping %f seconds', sleep_time)
if sleep_time:
time.sleep(sleep_time)
record = None
while not record:
# If we've got a current file, or if _get_next_file() gets one
# for us, try to read a record.
if self.current_file or self._get_next_file():
if not self.eol:
record = self.current_file.readline()
else:
record = self._read_until_eol()
if record:
self.last_read = time.time()
record = record.rstrip('\n')
logging.debug('TextFileReader got record "%s"', record)
self.pos += 1
return record
# No record: our current_file has reached EOF. See if more
# files we should try to read.
if self._get_next_file():
# Found a new file to read - loop again right away
continue
# EOF when we're reading from stdin means we're done
if not self.file_spec:
return None
# No record, no new files, no tail or refresh directive -
# there's nothing left for us to try. Go home empty-handed.
if not self.refresh_file_spec and not self.tail:
return None
# User wants refresh or tail, so sleep and try again.
logging.debug('TextFileReader - tail/refresh specified, so sleeping '
'%f seconds before trying again', self.retry_interval)
time.sleep(self.retry_interval)
############################
# If self.eol is a string instead of None, read until we've consumed that
# string or reached eof, and return that as a record.
def _read_until_eol(self):
if not self.eol:
logging.fatal('Code error: called _read_until_eof, but no eof string specified')
return
record = ''
eol_index = 0 # we're going to count our way through eol characters
while eol_index < len(self.eol):
# read by character
char = self.current_file.read(1)
if char == '':
break
elif char == self.eol[eol_index]:
eol_index += 1
record += char
else:
eol_index = 0
record += char
# If we're here because we did in fact get a full eol string,
# retroactively snip it from our record.
if eol_index == len(self.eol):
record = record[:-eol_index]
return record
############################
# Current behavior is to just go to the end if we run out of records,
# as io.IOBase.seek() does.
# QUESTION: To really behave like seek(), we'd have to keep track of self.pos
# beyond the end of the file, e.g. seek(100, 'start') would always return
# 100, even if there are < 100 records. Is this what we want?
def _seek_forward_from_current(self, offset=0):
if offset == 0:
return
if offset < 0:
return self._seek_back_from_current(offset)
i = 0
while i < offset:
if self.current_file or self._get_next_file():
if self.current_file.readline():
i += 1
self.pos += 1
else:
if self._get_next_file() is None:
break
# TODO: take advantage of self.start_pos and self.end_pos if we've
# already processed later files.
############################
def _seek_back_from_current(self, offset=0):
if offset == 0:
return
if offset > 0:
return self._seek_forward_from_current(offset)
target = self.pos + offset
if target < 0:
raise ValueError("Can't back up past earliest record")
# Find the right file.
current_filename = self.used_file_list[-1]
while target < self.start_pos[current_filename]:
self.unused_file_list.insert(0, current_filename)
self.used_file_list.pop()
current_filename = self.used_file_list[-1]
self.current_file = open(current_filename, 'r')
# TODO: implement backwards search within the file
for _ in range(target - self.start_pos[current_filename]):
self.current_file.readline()
self.pos = target
############################
def _save_state(self):
state = {
'used_file_list': self.used_file_list[:],
'unused_file_list': self.unused_file_list[:],
'pos': self.pos
}
if self.current_file:
state['current_filename'] = self.used_file_list[-1]
state['current_file_pos'] = self.current_file.tell()
return state
############################
def _restore_state(self, state):
self.used_file_list = state['used_file_list']
self.unused_file_list = state['unused_file_list']
if 'current_filename' in state:
self.current_file = open(state['current_filename'], 'r')
self.current_file.seek(state['current_file_pos'])
else:
self.current_file = None
self.pos = state['pos']
############################
# Behavior is intended to mimic file seek() behavior but with
# respect to records: 'offset' means number of records, and origin
# is either 'start', 'current' or 'end'.
def seek(self, offset=0, origin='current'):
original_state = self._save_state()
try:
if origin == 'start':
if offset < 0:
raise ValueError("Can't back up past earliest record")
self.used_file_list = []
self.unused_file_list = sorted(glob.glob(self.file_spec))
self.current_file = None
self.pos = 0
self._seek_forward_from_current(offset)
elif origin == 'current':
if offset >= 0:
self._seek_forward_from_current(offset)
else:
self._seek_back_from_current(offset)
elif origin == 'end':
# Have to count lines in all files that haven't been processed yet.
# TODO: take self.refresh_file_spec into account
file_list = sorted(glob.glob(self.file_spec))
pos = 0
for filename in file_list:
if filename in self.end_pos:
pos = self.end_pos[filename]
else:
self.start_pos[filename] = pos
# TODO: this can be made faster, if needed
with open(filename) as f:
for n, _ in enumerate(f, 1):
pass
pos += n
self.end_pos[filename] = pos
self.used_file_list = file_list
self.unused_file_list = []
self.current_file = None
self.pos = pos
self._seek_back_from_current(offset)
else:
raise ValueError('Unknown origin value: "%s"' % origin)
except: # noqa: E722
self._restore_state(original_state)
raise
return self.pos
############################
def read_range(self, start=None, stop=None):
"""
Read a range of records beginning with record number start, and ending
*before* record number stop.
"""
if start is None:
start = 0
if stop is None:
stop = sys.maxsize
self.seek(start, 'start')
records = []
for _ in range(stop - start):
record = self.read()
if record is None:
break
records.append(record)
return records