Skip to content

Commit

Permalink
Merge pull request #11 from Spico197/dev
Browse files Browse the repository at this point in the history
lock free version
  • Loading branch information
Spico197 authored Mar 26, 2021
2 parents fb07144 + 35822f1 commit 875eb6b
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 34 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ send_email(
To get more reminders, please check `watchmen/reminder.py`.

## UPDATE
- v0.3.7: much faster due to lock free changes! fix timeout and schedule bug
- v0.3.6: fix front-end api hostname bug
- v0.3.5: fix front-end api port bug
- v0.3.4: refreshed interface, add `register_time` field, fix `check_finished` bug
Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import setuptools

from watchmen import __version__


with open("README.md", "r") as fh:
long_description = fh.read()

setuptools.setup(
name='gpu-watchmen',
version='0.3.6',
version=__version__,
author="Tong Zhu",
author_email="tzhu1997@outlook.com",
description="watchmen for GPU scheduling",
Expand Down
2 changes: 1 addition & 1 deletion watchmen/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .client import WatchClient
from .client import ClientMode

__version__ = "0.3.6"
__version__ = "0.3.7"
2 changes: 1 addition & 1 deletion watchmen/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(self, id: str, gpus: List[int],
server_host: str, server_port: int,
mode: Optional[ClientMode] = ClientMode.QUEUE,
req_gpu_num: Optional[int] = 0,
timeout: Optional[int] = 10):
timeout: Optional[int] = 60):
self.base_url = f"http://{server_host}:{server_port}"
self.id = f"{getpass.getuser()}@{id}"
if self._validate_gpus(gpus):
Expand Down
46 changes: 17 additions & 29 deletions watchmen/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@

app = Flask("watchmen.server")
gpu_queue = queue.Queue()
gpu_queue.put(GPUInfo())
gpu_info = GPUInfo()
gpu_queue.put(1)
client_queue = queue.Queue()
client_queue.put(ClientCollection())
cc = ClientCollection()
client_queue.put(1)

APP_PORT = None

Expand Down Expand Up @@ -105,7 +107,6 @@ def client_ping():
available_gpus = []
msg = ""
client_id = client_info.id
cc = client_queue.get()
if client_id in cc:
cc[client_id].last_request_time = datetime.datetime.now()
status = "ok"
Expand All @@ -114,7 +115,6 @@ def client_ping():
else:
status = "err"
msg = "cannot ping before register"
client_queue.put(cc)
return jsonify({"status": status,
"available_gpus": available_gpus,
"msg": msg})
Expand All @@ -139,7 +139,6 @@ def client_register():
status = "err"
msg = "`req_gpu_num` is not valid"
else:
cc = client_queue.get()
if client_info.id not in cc:
client = ClientModel(
id=client_info.id,
Expand All @@ -156,56 +155,46 @@ def client_register():
else:
status = "err"
msg = f"client_id: {client_info.id} has been registered!"
client_queue.put(cc)
return jsonify({"status": status, "msg": msg})


@app.route("/show/work", methods=["GET"])
def show_work():
status = ""
msg = ""
cc = client_queue.get()
try:
status = "ok"
msg = [x.dict() for x in cc.work_queue.values()]
except Exception as err:
status = "err"
msg = str(err)
finally:
client_queue.put(cc)
return jsonify({"status": status, "msg": msg})


@app.route("/show/finished", methods=["GET"])
def show_finished():
status = ""
msg = ""
cc = client_queue.get()
try:
status = "ok"
msg = [x.dict() for x in cc.finished_queue.values()]
except Exception as err:
status = "err"
msg = str(err)
finally:
client_queue.put(cc)
return jsonify({"status": status, "msg": msg})


@app.route("/show/gpus", methods=["GET"])
def show_gpus():
status = ""
msg = ""
gpu_info = gpu_queue.get()
try:
status = "ok"
msg = gpu_info.gs.jsonify()
msg["query_time"] = str(msg["query_time"])
except Exception as err:
status = "err"
msg = str(err)
finally:
gpu_queue.put(gpu_info)
return jsonify({"status": status, "msg": msg})


Expand Down Expand Up @@ -250,18 +239,18 @@ def old_index():


def check_gpu_info():
info = gpu_queue.get()
info.new_query()
gpu_queue.put(info)
gpu_info.new_query()
logger.info("check gpu info")


def check_work(queue_timeout):
cc = client_queue.get()
logger.info("regular check")
marked_finished = []
reserved_gpus = set() # whether there can be multiple `ok` in one scan
queue_num = 0
for client_id, client in cc.work_queue.items():
time_delta = datetime.datetime.now() - client.last_request_time
logger.info(f"client: {client.id}, time_delta.seconds: {time_delta.seconds}, time_delta: {time_delta}")
if time_delta.seconds > queue_timeout:
if client.status != ClientStatus.OK:
client.status = ClientStatus.TIMEOUT
Expand All @@ -272,9 +261,8 @@ def check_work(queue_timeout):
ok = False
available_gpus = []
if client.status == ClientStatus.OK:
reserved_gpus |= set(client.gpus)
reserved_gpus |= set(client.available_gpus)
else:
gpu_info = gpu_queue.get()
try:
if client.mode == 'queue':
ok = gpu_info.is_gpus_available(client.gpus)
Expand All @@ -290,29 +278,29 @@ def check_work(queue_timeout):
client.msg = str(err)
except RuntimeError as err:
client.msg = str(err)
finally:
gpu_queue.put(gpu_info)
if ok and len(set(client.gpus) & reserved_gpus) <= 0:

if ok and len(set(available_gpus) & reserved_gpus) <= 0:
client.status = ClientStatus.OK
client.available_gpus = available_gpus
reserved_gpus |= set(client.gpus)
reserved_gpus |= set(client.available_gpus)
logger.info(f"client: {client.id} is ready, available gpus: {client.available_gpus}")
queue_num += 1

for client_id in marked_finished:
logger.info(f"client {client.id} marked as finished, status: {client.status}")
cc.mark_finished(client_id)
client_queue.put(cc)


def check_finished(status_queue_keep_time):
cc = client_queue.get()
logger.info("check out-dated finished clients")
marked_delete_ids = []
for client_id, client in cc.finished_queue.items():
delta = datetime.datetime.now() - client.last_request_time
if (delta.days * 24 + delta.seconds / 3600) >= status_queue_keep_time:
marked_delete_ids.append(client_id)
for client_id in marked_delete_ids:
cc.finished_queue.pop(client_id)
client_queue.put(cc)
logger.info(f"remove {client.id} from finished queue")


def regular_check(request_interval, queue_timeout, status_queue_keep_time):
Expand Down Expand Up @@ -347,7 +335,7 @@ def api_server(host, port):
help="host address for api server")
parser.add_argument("--port", type=str, default=62333,
help="port for api server")
parser.add_argument("--queue_timeout", type=int, default=60,
parser.add_argument("--queue_timeout", type=int, default=120,
help="timeout for queue waiting (seconds)")
parser.add_argument("--request_interval", type=int, default=1,
help="interval for gpu status requesting (seconds)")
Expand Down
4 changes: 2 additions & 2 deletions watchmen/templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@
}

async function getInfoAndUpdate() {
await timeout(5000, fetch(`http://${window.location.hostname}:${port}/api`))
await timeout(30000, fetch(`http://${window.location.hostname}:${port}/api`))
.then(async (response) => {
if (!response.ok) {
connection.classList.remove("green")
Expand All @@ -301,7 +301,7 @@
})
}

setInterval(getInfoAndUpdate, 10000)
setInterval(getInfoAndUpdate, 60000)

homepageElem.addEventListener("click", (e) => {
selected = "homepage-template"
Expand Down

0 comments on commit 875eb6b

Please sign in to comment.