-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathexample_aiogps.py
118 lines (101 loc) · 3.62 KB
/
example_aiogps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2019 Grand Joldes (grandwork2@yahoo.com).
#
# This file is Copyright 2019 by the GPSD project
#
# SPDX-License-Identifier: BSD-2-clause
# This code run compatibly under Python 3.x for x >= 6.
"""Example of using the asyncio Python interface to GPSD.
This example demonstrates error handling by the application code when
aiogps is not configured to handle automatic re-connection.
"""
import asyncio
import logging
import gps.aiogps
async def get_gps_updates(gpsd: gps.aiogps.aiogps) -> None:
"""Receives and prints messages from GPSD..
The GPS status information is updated within aiogps every time a new
message is received.
This function also demonstrates what error messages can be expected when
auto reconnection is not used in aiogps (reconnect = 0).
"""
while True:
try:
async for msg in gpsd:
# Print received message
logging.info(f'Received: {msg}')
except asyncio.CancelledError:
return
except asyncio.IncompleteReadError:
logging.info('Connection closed by server')
except asyncio.TimeoutError:
logging.error('Timeout waiting for gpsd to respond')
except Exception as exc: # pylint: disable=W0703
logging.error(f'Error: {exc}')
# Try again in 1s
await asyncio.sleep(1)
async def print_gps_info(gpsd: gps.aiogps.aiogps) -> None:
"""Prints GPS status every 5s."""
while True:
try:
await asyncio.sleep(5)
logging.info(f'\nGPS status:\n{gpsd}')
except asyncio.CancelledError:
return
except Exception as exc: # pylint: disable=W0703
logging.error(f'Error: {exc}')
async def main():
"""Main coroutine - executes 2 asyncio tasks in parallel."""
try:
# Example of using custom connection configuration
async with gps.aiogps.aiogps(
connection_args={
'host': '127.0.0.1',
'port': 2947
},
connection_timeout=5,
reconnect=0, # do not reconnect, raise errors
alive_opts={
'rx_timeout': 5
}
) as gpsd:
# These tasks will be executed in parallel
await asyncio.gather(
get_gps_updates(gpsd),
print_gps_info(gpsd),
return_exceptions=True
)
except asyncio.CancelledError:
return
except Exception as exc: # pylint: disable=W0703
logging.error(f'Error: {exc}')
def run():
"""
Main function.
Because this code only compiles on Python versions >= 3.6,
it is not run directly, but through the example_aiogps_run wrapper,
which fails gracefully on unsupported Python versions.
"""
# Set up logging program logging
logging.basicConfig()
logging.root.setLevel(logging.INFO)
# Example of setting up logging level for aiogps - this setting will
# prevent all aiogps events from being logged
logging.getLogger('gps.aiogps').setLevel(logging.CRITICAL)
loop = asyncio.events.new_event_loop()
try:
asyncio.events.set_event_loop(loop)
loop.run_until_complete(main())
except KeyboardInterrupt:
print('Got keyboard interrupt.')
finally:
print('Exiting.')
try:
for task in asyncio.Task.all_tasks():
task.cancel()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
asyncio.events.set_event_loop(None)
loop.close()
# vim: set expandtab shiftwidth=4