Skip to content

Commit

Permalink
Merge pull request #7 from Spico197/dev
Browse files Browse the repository at this point in the history
refreshed interface and bug fixed
  • Loading branch information
Spico197 authored Mar 13, 2021
2 parents 95b1cee + c191cc7 commit 31247eb
Show file tree
Hide file tree
Showing 15 changed files with 606 additions and 187 deletions.
31 changes: 21 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# watchmen
# Watchmen
A simple and easy-to-use toolkit for GPU scheduling.

## Dependencies
Expand Down Expand Up @@ -38,7 +38,7 @@ $ python -m watchmen.server

If you want the server to be running backend, try:
```bash
$ nohup python -m watchmen.server &
$ nohup python -m watchmen.server 1>watchmen.log 2>&1 &
```

There are some configurations for the server
Expand All @@ -57,7 +57,8 @@ optional arguments:
--request_interval REQUEST_INTERVAL
interval for gpu status requesting (seconds)
--status_queue_keep_time STATUS_QUEUE_KEEP_TIME
hours for keeping the client status
hours for keeping the client status. set `-1` to keep
all clients' status
```

2. Modify the source code in your project:
Expand All @@ -70,15 +71,21 @@ client = WatchClient(id="short description of this running", gpus=[1],
client.wait()
```

When the program goes on after `client.wait()`, you are in the queue.
When the program goes on after `client.wait()`, you are in the working queue.
Watchmen supports two requesting mode:
- `queue` mode means you are waiting for the gpus in `gpus` arguments.
- `schedule` mode means you are waiting for the server to spare `req_gpu_num` of available GPUs in `gpus`.
You can check examples in `example/` for further reading.

```bash
# single card queue mode
$ cd example && python single_card_mnist.py --id="single" --cuda=0 --wait
# single card schedule mode
$ cd example && python single_card_mnist.py --id="single schedule" --cuda=0,2,3 --req_gpu_num=1 --wait_mode="schedule" --wait
# queue mode
$ cd example && python multi_card_mnist.py --id="multi" --cuda=2,3 --wait
# schedule mode
$ cd example && python multi_card_mnist.py --id='multi card scheduling wait' --cuda=1,0,3 --req_gpu_num=2 --wait=schedule
$ cd example && python multi_card_mnist.py --id='multi card scheduling wait' --cuda=1,0,3 --req_gpu_num=2 --wait="schedule"
```

3. Check the queue in browser.
Expand All @@ -88,15 +95,18 @@ Open the following link to your browser: `http://<server ip address>:<server por
And you can get a result like the demo below.
Please be aware that the page is not going to change dynamically, so you can refresh the page manually to check the latest status.

New Demo (scheduling mode supported)
Home page: GPU status

![Demo](demo.png)
![HomePage](homepage.png)

Old Demo (queue mode supported)
Working queue:
![WorkingQueue](working_queue.png)

![Old Demo](demo_old.png)
Finished queue:
![FinishedQueue](finished_queue.png)

4. Reminder when program is finished.

1. Reminder when program is finished.

`watchmen` also support email and other kinds of reminders for message informing.
For example, you can send yourself an email when the program is finished.
Expand All @@ -120,6 +130,7 @@ send_email(
To get more reminders, please check `watchmen/reminder.py`.

## UPDATE
- v0.3.4: refreshed interface, add `register_time` field, fix `check_finished` bug
- v0.3.3: fix `check_finished` bug in server end, quit the main thread if the sub-thread is quit, and remove the backend cmd in the main thread
- v0.3.2: fix `WatchClient` bug
- v0.3.1: change `Client` into `WatchClient`, fix `ClientCollection` and `send_email` bug
Expand Down
Binary file removed demo.png
Binary file not shown.
Binary file removed demo_old.png
Binary file not shown.
37 changes: 27 additions & 10 deletions example/single_card_mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
https://github.com/pytorch/examples/blob/master/mnist/main.py
"""
from __future__ import print_function

import sys
import argparse

import torch
import torch.nn as nn
import torch.nn.functional as F
Expand All @@ -12,6 +15,7 @@
from torch.optim.lr_scheduler import StepLR

from watchmen import WatchClient
from watchmen.client import ClientMode


class Net(nn.Module):
Expand Down Expand Up @@ -99,36 +103,49 @@ def main():
help='For Saving the current Model')
parser.add_argument("--id", type=str, default="id",
help="identifier")
parser.add_argument("--cuda", type=int, default=2,
help="cuda device")
parser.add_argument("--cuda", type=str, default="0",
help="cuda devices, seperated by `,` with no spaces")
parser.add_argument("--wait", action="store_true",
help="wait for watchmen signal")
parser.add_argument("--wait_mode", type=str,
choices=["queue", "schedule"], default="queue",
help="gpu waiting mode")
args = parser.parse_args()
torch.manual_seed(args.seed)

device = torch.device(f"cuda:{args.cuda}")

"""WATCHMEN"""
if args.wait:
client = WatchClient(id=f"mnist single card {args.id} cuda={args.cuda}", gpus=[args.cuda],
if args.wait_mode == 'queue':
waiting_mode = ClientMode.QUEUE
else:
waiting_mode = ClientMode.SCHEDULE
client = WatchClient(id=f"mnist single card {args.id} cuda={args.cuda}",
gpus=eval(f"[{args.cuda}]"),
req_gpu_num=1, mode=waiting_mode,
server_host="127.0.0.1", server_port=62333)
client.wait()
# client.register()
available_gpus = []
available_gpus = client.wait()
if len(available_gpus) <= 0:
sys.exit(1)
else:
device = torch.device(f"cuda:{available_gpus[0]}")
"""END OF WATCHMEN"""

train_kwargs = {'batch_size': args.batch_size}
test_kwargs = {'batch_size': args.test_batch_size}

cuda_kwargs = {'num_workers': 1,
'pin_memory': True,
'shuffle': True}
'pin_memory': True,
'shuffle': True}
train_kwargs.update(cuda_kwargs)
test_kwargs.update(cuda_kwargs)

transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])

dataset1 = datasets.MNIST('../data', train=True, download=True,
transform=transform)
dataset2 = datasets.MNIST('../data', train=False,
Expand Down
Binary file added finished_queue.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added homepage.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
long_description = fh.read()

setuptools.setup(
name='gpu-watchmen',
version='0.3.3',
name='gpu-watchmen',
version='0.3.4',
author="Tong Zhu",
author_email="tzhu1997@outlook.com",
description="watchmen for GPU scheduling",
Expand Down
2 changes: 2 additions & 0 deletions watchmen/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
from .client import WatchClient
from .client import ClientMode

__version__ = "0.3.4"
26 changes: 13 additions & 13 deletions watchmen/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,23 @@ def has_value(cls, value):


class ClientModel(BaseModel):
id: str # identifier in string format
mode: Optional[ClientMode] = ClientMode.QUEUE # `queue` (wait for specific gpus) or `schedule` (schedule by the server automatically)
id: str # identifier in string format
mode: Optional[ClientMode] = ClientMode.QUEUE # `queue` (wait for specific gpus) or `schedule` (schedule by the server automatically)
register_time: Optional[datetime.datetime] = None # datetime.datetime
last_request_time: Optional[datetime.datetime] = None # datetime.datetime
status: Optional[ClientStatus] = ClientStatus.WAITING # `waiting`, `timeout`, `ok`
queue_num: Optional[int] = 0 # queue number
gpus: Optional[List[int]] = [] # `queue` mode: gpus for requesting to run on; `schedule` mode: available gpu scope.
msg: Optional[str] = "" # error or status message
req_gpu_num : Optional[int] = 0 # `schedule` mode: how many gpus are requested
msg: Optional[str] = "" # error or status message
req_gpu_num: Optional[int] = 0 # `schedule` mode: how many gpus are requested
available_gpus: Optional[List[int]] = []


class ClientCollection(object):
def __init__(self):
self.work_queue = OrderedDict() # only `ok` and `waiting`
self.work_queue = OrderedDict() # only `ok` and `waiting`
self.finished_queue = OrderedDict()

def mark_finished(self, client_id: str):
self.finished_queue[client_id] = self.work_queue[client_id]
self.work_queue.pop(client_id)
Expand All @@ -53,14 +54,12 @@ def get_all_clients(self):
all_clients.sort(key=lambda x: x.last_request_time)
all_clients.extend(list(self.work_queue.values()))
return all_clients

def __getitem__(self, index: str):
if index in self.work_queue:
return self.work_queue[index]
elif index in self.finished_queue:
return self.finished_queue[index]
else:
raise IndexError(f"index: {index} does not exist")
raise IndexError(f"index: {index} does not exist or has finished")

def __contains__(self, index: str):
return index in self.work_queue
Expand Down Expand Up @@ -89,10 +88,10 @@ def __init__(self, id: str, gpus: List[int],

def _validate_gpus(self, gpus: List[int]):
return check_gpus_existence(gpus)

def _validate_mode(self, mode: ClientMode):
return ClientMode.has_value(mode)

def _validate_req_gpu_num(self, req_gpu_num: int):
return check_req_gpu_num(req_gpu_num)

Expand Down Expand Up @@ -122,11 +121,12 @@ def ping(self):
elif result["msg"] == ClientStatus.OK:
return True, result["available_gpus"]
elif result["msg"] == ClientStatus.TIMEOUT:
raise RuntimeError(f"status changed to TIMEOUT")
raise RuntimeError("status changed to TIMEOUT")

def wait(self):
self.register()
flag = False
available_gpus = []
while not flag:
flag, available_gpus = self.ping()
time.sleep(10)
Expand Down
19 changes: 9 additions & 10 deletions watchmen/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

def is_single_gpu_totally_free(gpu_index: int):
gs = GPUStatCollection.new_query()

if not isinstance(gpu_index, int):
raise ValueError(f"gpu_index: {gpu_index} is not int")
if gpu_index >= len(gs.gpus) or gpu_index < 0:
Expand All @@ -15,8 +15,7 @@ def is_single_gpu_totally_free(gpu_index: int):
gpu = gs.gpus[gpu_index]
if len(gpu.processes) <= 0 \
and gpu.utilization <= 10 \
and (float(gpu.memory_used)/float(gpu.memory_total) <= 1e-3
or gpu.memory_used < 50):
and (float(gpu.memory_used) / float(gpu.memory_total) <= 1e-3 or gpu.memory_used < 50):
return True
else:
return False
Expand Down Expand Up @@ -46,30 +45,30 @@ def new_query(self):
gs = GPUStatCollection.new_query()
self.gpus = gs.gpus
self.gs = gs

def _is_totally_free(self, gpu_index: int):
self.new_query()
gpu = self.gpus[gpu_index]
if len(gpu.processes) <= 0 \
and gpu.utilization <= 10 \
and (float(gpu.memory_used)/float(gpu.memory_total) <= 1e-3
or gpu.memory_used < 50):
and (float(gpu.memory_used) / float(gpu.memory_total) <= 1e-3 or gpu.memory_used < 50):
return True
else:
return False

def is_gpus_available(self, gpus: List[int]):
stts = []
for gpu in gpus:
stts.append(self._is_totally_free(gpu))
return all(stts)

def get_available_gpus_in_scope(self, gpu_scope: List[int]):
available_gpus = []
for gpu in gpu_scope:
if self._is_totally_free(gpu):
available_gpus.append(gpu)
return available_gpus

def is_req_gpu_num_satisfied(self, gpu_scope: List[int], req_gpu_num: int):
ok = False
available_gpus = self.get_available_gpus_in_scope(gpu_scope)
Expand All @@ -80,7 +79,7 @@ def is_req_gpu_num_satisfied(self, gpu_scope: List[int], req_gpu_num: int):

def __getitem__(self, index: int):
return self._is_totally_free(index)

def __str__(self):
tmp = self.gs.jsonify()
tmp["query_time"] = str(tmp["query_time"])
Expand Down
57 changes: 30 additions & 27 deletions watchmen/reminder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@


def send_email(
host: str, # email host to login, like `smtp.163.com`
port: int, # email port to login, like `25`
user: str, # user email address for login, like `***@163.com`
password: str, # password or auth code for login
receiver: str, # receiver email address
html_message: str, # content, html format supported
subject: Optional[str] = "Notice" # email subject
host: str, # email host to login, like `smtp.163.com`
port: int, # email port to login, like `25`
user: str, # user email address for login, like `***@163.com`
password: str, # password or auth code for login
receiver: str, # receiver email address
html_message: str, # content, html format supported
subject: Optional[str] = "Notice" # email subject
):
# set up the SMTP server
s = smtplib.SMTP(host=host, port=port)
Expand All @@ -37,33 +37,36 @@ def send_email(


def send_dingtalk_msg(
dingtalk_user_mentions: List[str], # which user to mention, like `[183********]`
dingtalk_secret: str, #like SEc1f****
dingtalk_webhook_url: str, # like `https://oapi.dingtalk.com/robot/send?access_token=***`
message: str # message content
dingtalk_user_mentions: List[str], # which user to mention, like `[183********]`
dingtalk_secret: str, # like SEc1f****
dingtalk_webhook_url: str, # like `https://oapi.dingtalk.com/robot/send?access_token=***`
message: str # message content
):
r"""
Reference:
- https://github.com/huggingface/knockknock
"""
msg_template = {
"msgtype": "text",
"msgtype": "text",
"text": {
"content": message
},
},
"at": {
"atMobiles": dingtalk_user_mentions,
"isAtAll": False
}
}
def _construct_encrypted_url():
'''
Visit https://ding-doc.dingtalk.com/doc#/serverapi2/qf2nxq for details
'''
timestamp = round(datetime.datetime.now().timestamp() * 1000)
secret_enc = dingtalk_secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, dingtalk_secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
encrypted_url = dingtalk_webhook_url + '&timestamp={}'\
.format(timestamp) + '&sign={}'.format(sign)
return encrypted_url
postto = _construct_encrypted_url()
'''
construct_encrypted_url
Visit https://ding-doc.dingtalk.com/doc#/serverapi2/qf2nxq for details
'''
timestamp = round(datetime.datetime.now().timestamp() * 1000)
secret_enc = dingtalk_secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, dingtalk_secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
encrypted_url = dingtalk_webhook_url + '&timestamp={}'\
.format(timestamp) + '&sign={}'.format(sign)
postto = encrypted_url
requests.post(postto, json=msg_template)
Loading

0 comments on commit 31247eb

Please sign in to comment.