-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_market_data.py
71 lines (56 loc) · 2.01 KB
/
test_market_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import time
import multiprocessing
from multiprocessing import Queue
from market_data import PolygonWebsocketProvider
from utils import PrintLogger
def market_data_process(logger, directives_queue, receivers):
data_provider = PolygonWebsocketProvider(
logger, directives_queue, receivers, "wss://socket.polygon.io/stocks"
)
logger.info("Starting market data")
# Blocks
data_provider.run()
data_provider.close()
def consumer_process(logger, observation, directives_queue, market_data_queue):
id_ob = observation["id"]
sub_ob = observation["sub"]
logger.info(f"Ob {id_ob} - Sleeping for {id_ob} sec")
time.sleep(id_ob)
directives_queue.put([{"action": "subscribe", "stream": sub_ob, "id": id_ob}])
while True:
msg = market_data_queue.get()
try:
ts_cons_recv = time.time()
lag_pg = ts_cons_recv * 1000 - msg["t"]
lag_int = (ts_cons_recv - msg["ts_recv"]) * 1000
logger.info(f"Ob {id_ob} - {lag_pg:.1f} ms, {lag_int:.3f} ms")
except:
logger.error(f"Ob {id_ob} - {msg}")
if __name__ == "__main__":
tickers = ["QQQ", "AAPL", "SPY", "MSFT", "IWM"]
obs = [{"id": i, "sub": f"Q.{ticker}"} for i, ticker in enumerate(tickers)]
logger = PrintLogger()
q_directives = Queue(-1)
receivers = {ob["id"]: Queue(-1) for ob in obs}
logger.info("Starting market data process")
mdp = multiprocessing.Process(
target=market_data_process, args=(logger, q_directives, receivers), daemon=True
)
mdp.start()
logger.info("Starting consumer processes")
consumers = [
multiprocessing.Process(
target=consumer_process,
args=(logger, ob, q_directives, receivers[ob["id"]]),
daemon=True,
)
for ob in obs
]
for consumer in consumers:
consumer.start()
time.sleep(20)
logger.info("Joining processes")
mdp.join(5)
for consumer in consumers:
consumer.join(1)
logger.info("Exiting")