-
Notifications
You must be signed in to change notification settings - Fork 35
/
Copy pathpolygon.py
63 lines (51 loc) · 1.68 KB
/
polygon.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#!/usr/bin/python3.6
import asyncio
import os
import signal
import sys
from nats.aio.client import Client as NATS
# Be sure to: pip install asyncio-nats-client
print("API KEY: {0}".format(sys.argv[1]))
def run(loop):
nc = NATS()
@asyncio.coroutine
def closed_cb():
print("Connection to NATS is closed.")
yield from asyncio.sleep(0.1, loop=loop)
loop.stop()
options = {
"servers": [
"nats://{0}@nats1.polygon.io:30401".format(sys.argv[1]),
"nats://{0}@nats2.polygon.io:30402".format(sys.argv[1]),
"nats://{0}@nats3.polygon.io:30403".format(sys.argv[1])
],
"io_loop": loop,
"closed_cb": closed_cb
}
print( str(options) )
yield from nc.connect(**options)
print("Connected to NATS at {}...".format(nc.connected_url.netloc))
@asyncio.coroutine
def subscribe_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))
# Basic subscription to receive all published messages
# which are being sent to a single topic 'discover'
yield from nc.subscribe("C.*", cb=subscribe_handler)
def signal_handler():
if nc.is_closed:
return
print("Disconnecting...")
loop.create_task(nc.close())
for sig in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, sig), signal_handler)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
try:
loop.run_forever()
finally:
loop.close()