forked from OceanDataTools/openrvdas
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlistener.py
127 lines (101 loc) · 4.88 KB
/
listener.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python3
import logging
import logging.handlers
import sys
import time
import traceback
from os.path import dirname, realpath
sys.path.append(dirname(dirname(dirname(realpath(__file__)))))
from logger.readers.composed_reader import ComposedReader # noqa: E402
from logger.writers.composed_writer import ComposedWriter # noqa: E402
################################################################################
class Listener:
"""Listener is a simple, yet relatively self-contained class that
takes a list of one or more Readers, a list of zero or more
Transforms, and a list of zero or more Writers. It calls the Readers
(in parallel) to acquire records, passes those records through the
Transforms (in series), and sends the resulting records to the Writers
(in parallel).
"""
############################
def __init__(self, readers=[], transforms=[], writers=[], stderr_writers=[],
host_id='', interval=0, name=None, check_format=False):
"""listener = Listener(readers, transforms=[], writers=[],
interval=0, check_format=False)
readers A single Reader or a list of Readers.
transforms A single Transform or a list of zero or more Transforms
writers A single Writer or a list of zero or more Writers
stderr_writers A single Writer or a list of zero or more Writers to which
the logger's stderr should be written.
interval How long to sleep before reading sequential records
name Optional human-readable short name to be used in displays
check_format If True, attempt to check that Reader/Transform/Writer
formats are compatible, and throw a ValueError if they
are not. If check_format is False (the default) the
output_format() of the whole reader will be
formats.Unknown.
Sample use:
listener = Listener(readers=[NetworkReader(':6221'),
NetworkReader(':6223')],
transforms=[TimestampTransform()],
writers=[TextFileWriter('/logs/network_recs'),
TextFileWriter(None)],
interval=0.2)
listener.run()
Calling listener.quit() from another thread will cause the run() loop
to exit.
"""
logging.info('Instantiating %s logger', name or 'unnamed')
###########
# Create readers, writers, etc.
self.reader = ComposedReader(readers=readers, check_format=check_format)
self.writer = ComposedWriter(transforms=transforms, writers=writers,
check_format=check_format)
self.interval = interval
self.name = name or 'Unnamed listener'
self.last_read = 0
self.quit_signalled = False
############################
def quit(self):
"""
Signal 'quit' to all the readers.
"""
self.quit_signalled = True
# NIWA: make logger starting info visible in cached stderr logs for UI to retreive
logging.root.setLevel(logging.INFO)
logging.info('Shutting down %s', self.name)
logging.root.setLevel(logging.WARNING)
############################
def run(self):
"""
Read/transform/write until either quit() is called in a separate
thread, or ComposedReader returns None, indicating that all its
component readers have returned EOF.
"""
# NIWA: make logger starting info visible in cached stderr logs for UI to retreive
logging.root.setLevel(logging.INFO)
logging.info('Running %s', self.name)
if not self.reader and not self.writer:
logging.info('No readers or writers defined - exiting.')
return
# NIWA: now only show warnings or errors from the logger
logging.root.setLevel(logging.WARNING)
record = ''
try:
while not self.quit_signalled and record is not None:
record = self.reader.read()
self.last_read = time.time()
logging.debug('ComposedReader read: "%s"', record)
if record:
self.writer.write(record)
if self.interval:
time_to_sleep = self.interval - (time.time() - self.last_read)
time.sleep(max(time_to_sleep, 0))
# Exit in an orderly fashion if someone hits Ctl-C
except KeyboardInterrupt:
logging.info('Listener %s received KeyboardInterrupt - exiting.',
self.name or '')
except Exception as e:
logging.info('Listener %s received exception: %s',
self.name, traceback.format_exc())
raise e