-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue_server.py
executable file
·127 lines (105 loc) · 3.61 KB
/
queue_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#!/usr/bin/env python3
import pickle
import uuid
import pika
from typing import TYPE_CHECKING, List, Union
from typing_extensions import Self
if TYPE_CHECKING:
import pika.spec
from pika.adapters.blocking_connection import BlockingChannel
COLMAP_QUEUE = "colmap_queue"
TENSORF_QUEUE = "tensorf_queue"
class QueueServer:
def __init__(
self,
host: str = "localhost",
port: int = 5672,
queues: List[str] = [COLMAP_QUEUE, TENSORF_QUEUE],
) -> Self:
"""
Unless you really need to modify something, call this method with the default arguments, i.e. QueueServer().
"""
if not queues:
raise ValueError("At least one queue is required")
self.host = host
self.port = port
self.queues = queues
self.connection: pika.BlockingConnection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host)
)
self.channel: BlockingChannel = self.connection.channel()
self.declare()
def start(self):
"""
Start consuming messages from the queues
"""
for i in range(10):
uid = str(uuid.uuid4())
if i % 2:
print(f"Sending to colmap: {uid}")
self.send_to_colmap(uid)
else:
print(f"Sending to tensorf: {uid}")
self.send_to_tensorf(uid)
def declare(self):
"""
Declare queues, can add more later if needed
"""
for queue in self.queues:
self.channel.queue_declare(queue=queue, durable=True)
print(f" [*] Declared {queue}.")
def send_to_colmap(self, message: Union[str, bytes]):
"""
Send a message to the colmap queue, as of now should be a string or bytes, but we can modify this to be a serialized dataclass later (see: Serialize and Deserialize)
"""
self.channel.basic_publish(
exchange="",
routing_key=COLMAP_QUEUE,
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
),
)
def send_to_tensorf(self, message: Union[str, bytes]):
"""
Send a message to the tensorf queue, as of now should be a string or bytes, but we can modify this to be a serialized dataclass later (see: Serialize and Deserialize)
"""
self.channel.basic_publish(
exchange="",
routing_key=TENSORF_QUEUE,
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
),
)
@staticmethod
def Serialize(data) -> bytes:
"""
Serializes calling object to bytes
Example::
qs = QueueServer()
data = qs.Serialize("Hello World")
QueueServer.Serialize("Hello World")
"""
return pickle.dumps(data)
@staticmethod
def Deserialize(data: bytes) -> Self:
"""
Deserializes data to a QueueServer object, qs2 is the same as qs but they are not the same object under the hood (i.e. their pointers are different, cool right?)
Example::
qs = QueueServer()
data = qs.Serialize()
qs2 = qs.Deserialize(data)
QueueServer.Deserialize(data)
"""
return pickle.loads(data)
def close(self):
"""
Close this object's connection to the RabbitMQ server
"""
self.connection.close()
if __name__ == "__main__":
qs = QueueServer()
qs.start()
qs.close()
print("Done")