-
Notifications
You must be signed in to change notification settings - Fork 4
/
glvis-browser-server
executable file
·159 lines (132 loc) · 5.32 KB
/
glvis-browser-server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#!/usr/bin/env python3
# Copyright (c) 2010-2024, Lawrence Livermore National Security, LLC. Produced
# at the Lawrence Livermore National Laboratory. All Rights reserved. See files
# LICENSE and NOTICE for details. LLNL-CODE-443271.
#
# This file is part of the GLVis visualization tool and library. For more
# information and source code availability see https://glvis.org.
#
# GLVis is free software; you can redistribute it and/or modify it under the
# terms of the BSD-3 license. We welcome feedback and contributions, see file
# CONTRIBUTING.md for details.
# The glvis-browser-server script is an MFEM socket-to-websocket forwarder to
# make it easier to use glvis-js as a viewer for inline MFEM visualization.
#
# The input is assumed to be on the default MFEM port of 19916, the default host
# and websocket port is localhost:8080. The glvis-browser-server script must be
# run before sending data to the MFEM socket.
#
# The script requires the websockets Python library (pip install websockets) and
# have been tested with Python 3.7. Earlier versions may work.
import argparse
import asyncio
import logging
import websockets
import json
logger = logging.getLogger("websockets.server")
logger.setLevel(logging.ERROR)
logger.addHandler(logging.StreamHandler())
parser = argparse.ArgumentParser()
parser.add_argument("--ws-port", type=int, default=8080)
parser.add_argument("--port", type=int, default=19916)
args = parser.parse_args()
async def ws_handler(queue, websocket, path):
"""Write serial stream messages or encode and write stream-list messages"""
print("websocket client connected")
while True:
msg = await queue.get()
if type(msg) == list:
msg = json.dumps(msg)
print(f"sending {msg[0:min(len(msg), 20)]}...")
try:
await websocket.send(msg)
except (
websockets.exceptions.ConnectionClosedOK,
websockets.exceptions.ConnectionClosedError,
):
# if there is only one message we can requeue it, otherwise drop it
if queue.empty():
queue.put_nowait(msg)
break
print("websocked client disconnected")
async def mfem_handler(queue, reader, writer, timeout=1, block_size=1024):
"""Read and queue streams from mfem"""
peername = writer.get_extra_info("peername")
if peername is not None:
client, port = peername
else:
client, port = "unknown", 0
print(f"new mfem client: {client}@{port}")
msg = b""
while True:
try:
block = await asyncio.wait_for(reader.read(block_size), timeout)
if not block:
break
idx = block.rfind(b"parallel")
if idx == -1: idx = block.rfind(b"solution")
if (idx != -1 and msg) or idx > 0:
msg += block[:idx]
queue.put_nowait(msg.decode())
msg = block[idx:]
else:
msg += block
except asyncio.TimeoutError:
if len(msg) > 0:
queue.put_nowait(msg.decode())
msg = b""
if msg:
queue.put_nowait(msg.decode())
print(f"mfem client disconnected: {client}@{port}")
async def create_mfem_server(queue):
"""Helper method: start mfem_handler server"""
print("mfem server: starting")
async def handler_wrap(reader, writer):
await mfem_handler(queue, reader, writer)
server = await asyncio.create_task(
asyncio.start_server(handler_wrap, host="0.0.0.0", port=args.port)
)
await server.serve_forever()
print("mfem server: complete")
async def parallel_message_builder(pqueues, ws_queue):
"""Process queued parallel streams and build/queue stream-list messages"""
print("parallel_message_builder: starting")
try:
while True:
stream_list = []
for q in pqueues:
stream_list.append(await q.get())
ws_queue.put_nowait(stream_list)
except asyncio.CancelledError:
print("parallel_message_builder: stopping")
async def message_handler(mfem_queue, ws_queue):
"""Forward serial stream messages and queue parallel streams"""
print("message handler: starting")
pqueues = ptask = None
while True:
msg = await mfem_queue.get()
if msg.startswith("parallel"):
info = msg[:msg.find("\n")].split(" ")
nproc = int(info[1])
rank = int(info[2])
if pqueues is None or len(pqueues) != nproc:
print(f"message_handler: now handling parallel nproc={nproc} messages")
pqueues = [asyncio.Queue() for i in range(nproc)]
if ptask is not None: ptask.cancel()
ptask = asyncio.create_task(parallel_message_builder(pqueues, ws_queue))
pqueues[rank].put_nowait(msg)
else:
ws_queue.put_nowait(msg)
await ptask
async def main():
mfem_queue = asyncio.Queue()
ws_queue = asyncio.Queue()
async def ws_handler_wrap(websocket, path):
await ws_handler(ws_queue, websocket, path)
ws_server = websockets.serve(ws_handler_wrap, "0.0.0.0", args.ws_port)
mfem_server = asyncio.create_task(create_mfem_server(mfem_queue))
msg_handler = asyncio.create_task(message_handler(mfem_queue, ws_queue))
await ws_server
await mfem_server
await msg_handler
asyncio.run(main())