-
Notifications
You must be signed in to change notification settings - Fork 0
/
subscribe_c.py
32 lines (27 loc) · 1.08 KB
/
subscribe_c.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
import nats
async def main():
nc = await nats.connect("demo.nats.io")
js = nc.jetstream()
try:
# Create pull-based consumer on 'cio'.
psub = await js.pull_subscribe("c.io", stream="cio")
print("Subscribed to NATS subject. [from subscribe_b]")
while True:
try:
# Fetch and ack messages from consumer.
msgs = await psub.fetch(1000, timeout=1)
for msg in msgs:
await msg.ack()
print(f"Received message from consumer c: {msg.data.decode()}")
except nats.errors.TimeoutError:
# No new messages, handle timeout if needed
pass
except Exception as e:
print(f"Error fetching and processing messages: {e}")
# Optionally, you can re-subscribe in case of an error
psub = await js.pull_subscribe("c.io", stream="cio")
except Exception as e:
print(f"Error in NATS subscription: {e}")
if __name__ == '__main__':
asyncio.run(main())