Skip to content

Commit

Permalink
Merge pull request #6 from Spico197/dev
Browse files Browse the repository at this point in the history
v0.3.3
  • Loading branch information
Spico197 authored Dec 12, 2020
2 parents 4f8ae47 + 19ac94c commit 95b1cee
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 36 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,5 @@ dmypy.json

# self-defined
.vscode/
data/
data/
watchmen_server.pid
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ send_email(
To get more reminders, please check `watchmen/reminder.py`.

## UPDATE
- v0.3.3: fix `check_finished` bug in server end, quit the main thread if the sub-thread is quit, and remove the backend cmd in the main thread
- v0.3.2: fix `WatchClient` bug
- v0.3.1: change `Client` into `WatchClient`, fix `ClientCollection` and `send_email` bug
- v0.3.0: support gpu scheduling, fix blank input output, fix `check_gpus_existence`
Expand All @@ -131,6 +132,7 @@ To get more reminders, please check `watchmen/reminder.py`.
- [ ] test and support distributed model parallel configurations (with `python -m torch.distributed.launch`)
- [ ] prettify the web page and divide functions into different tabs
- [ ] gpu using stats for each user and process
- [x] quit the main thread if the sub-thread is quit
- [x] change `Client` into `WatchClient`, in case of any ambiguity
- [x] `ClientCollection/__contains__` function should not include `finished_queue`, to help the `id` releases
- [x] subject bug in `reminder/send_email()`
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name='gpu-watchmen',
version='0.3.2',
version='0.3.3',
author="Tong Zhu",
author_email="tzhu1997@outlook.com",
description="watchmen for GPU scheduling",
Expand Down
75 changes: 41 additions & 34 deletions watchmen/server.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
import sys
import json
import queue
import logging
Expand All @@ -24,7 +26,16 @@
)


logging.getLogger('apscheduler').setLevel(logging.ERROR)
apscheduler_logger = logging.getLogger('apscheduler')
apscheduler_logger.setLevel(logging.ERROR)
logger = logging.getLogger("common")
logger.setLevel(logging.INFO)
fmt = "[%(asctime)-15s]-%(levelname)s-%(filename)s-%(lineno)d-%(process)d: %(message)s"
datefmt = "%a %d %b %Y %H:%M:%S"
formatter = logging.Formatter(fmt, datefmt)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)

app = Flask("watchmen.server")
gpu_queue = queue.Queue()
Expand Down Expand Up @@ -251,15 +262,16 @@ def check_finished(status_queue_keep_time):
marked_delete_ids = []
for client_id, client in cc.finished_queue.items():
time_delta = datetime.datetime.now() - client.last_request_time
if time_delta.hours >= status_queue_keep_time:
# there is no timedelta.hours attribute and 1 hour = 3600 seconds
if time_delta.seconds/3600 >= status_queue_keep_time:
marked_delete_ids.append(client_id)
for client_id in marked_delete_ids:
cc.finished_queue.pop(client_id)
client_queue.put(cc)


def regular_check(request_interval, queue_timeout, status_queue_keep_time):
scheduler = BlockingScheduler()
scheduler = BlockingScheduler(logger=apscheduler_logger)
scheduler.add_job(check_gpu_info,
trigger='interval',
seconds=request_interval,
Expand All @@ -278,12 +290,12 @@ def regular_check(request_interval, queue_timeout, status_queue_keep_time):


def api_server(host, port):
app.run(host="0.0.0.0", port=port)
app.run(host=host, port=port)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default="127.0.0.1",
parser.add_argument("--host", type=str, default="0.0.0.0",
help="host address for api server")
parser.add_argument("--port", type=str, default=62333,
help="port for api server")
Expand All @@ -294,7 +306,13 @@ def api_server(host, port):
parser.add_argument("--status_queue_keep_time", type=int, default=48,
help="hours for keeping the client status")
args = parser.parse_args()

logger.info(f"Running at: {args.host}:{args.port}")
logger.info(f"Current pid: {os.getpid()} > watchmen_server.pid")
with open("watchmen_server.pid", "wt", encoding="utf-8") as fout:
fout.write(f"{os.getpid()}")

# daemon threads will end automaticly if the main thread ends
# thread 1: check gpu and client info regularly
check_worker = threading.Thread(name="check",
target=regular_check,
Expand All @@ -303,40 +321,29 @@ def api_server(host, port):
args.status_queue_keep_time),
daemon=True)

# thread 2: fastapi backend
# thread 2: main server api backend
api_server_worker = threading.Thread(name="api",
target=api_server,
args=(args.host, args.port),
daemon=True)

check_worker.start()
logger.info("check worker started")
api_server_worker.start()

in_str = ''
while in_str != 'exit':
in_str = input('`help` >')
if in_str == 'help':
print("show work: show working queue")
print("show finished: show finished queue")
print("show gpus: show gpu status")
print("exit: exit server")
elif in_str == "exit":
logger.info("api server started")

while True:
try:
if not check_worker.is_alive():
logger.error("check worker is not alive, server quit")
raise RuntimeError("check worker is not alive, server quit")
if not api_server_worker.is_alive():
logger.error("api server worker is not alive, server quit")
raise RuntimeError("api server worker is not alive, server quit")
except RuntimeError as err:
logger.error("runtime error, kill the server")
break
elif in_str == "show work":
cc = client_queue.get()
for cid in cc.work_queue:
print(cc[cid])
client_queue.put(cc)
elif in_str == "show finished":
cc = client_queue.get()
for cid in cc.finished_queue:
print(cc[cid])
client_queue.put(cc)
elif in_str == "show gpus":
gpu_info = gpu_queue.get()
gpu_info.gs.print_formatted()
gpu_queue.put(gpu_info)
elif len(in_str.strip()) == 0:
continue
else:
print("Not understand orz")
except KeyboardInterrupt:
logger.error("keyboard interrupted, kill the server")
break
logger.error("bye")

0 comments on commit 95b1cee

Please sign in to comment.