Skip to content

Commit

Permalink
set env concurrent. (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
adoda authored Sep 3, 2024
1 parent e1f98c7 commit f616134
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 21 deletions.
14 changes: 1 addition & 13 deletions .github/workflows/unit_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ name: Unit Tests

on:
workflow_dispatch:
pull_request_review:
types: [submitted]
branches:
- main
pull_request:
paths-ignore:
- 'docs/**'
push:
Expand All @@ -22,23 +19,14 @@ concurrency:

jobs:
run-shell-script:
if: github.event.review.state == 'approved'
runs-on: self-hosted

steps:
- name: Check for 2 approvals
id: approvals
run: |
REVIEW_API_URL="https://api.github.com/repos/${{ github.repository }}/pulls/${{ github.event.pull_request.number }}/reviews"
reviews=$(curl -s -H "Authorization: token ${{ secrets._GITHUB_TOKEN }}" $REVIEW_API_URL)
approve_count=$(echo "$reviews" | jq '[.[] | select(.state == "APPROVED")] | length')
echo "approve_count=$approve_count" >> $GITHUB_ENV

- name: Checkout code
uses: actions/checkout@v3

- name: Run unit test
if: env.approve_count == '2'
run: |
docker pull $UT_IMAGE
docker run -v $PWD:$PWD -w $PWD --net host --ipc host --shm-size 80G -t --rm --gpus all $UT_IMAGE bash -c 'make test'
Expand Down
41 changes: 34 additions & 7 deletions chatlearn/schedule/model_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
# ==============================================================================
"""model manager"""

import concurrent.futures
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor

import ray
import ray.experimental.state.api
Expand Down Expand Up @@ -92,9 +94,10 @@ def remote(self) -> list:
f"while the number of required gpus is {total_gpu_required}, " + \
f"there is {self.resouce_manager.total_gpu - total_gpu_required} wasted gpus")

env_list = []
for group in self.runtime_args.colocation:
colocate_models = [self._name2distmodel[name] for name in group]
self.place_models_to_remote_devices(colocate_models)
self.place_models_to_remote_devices(colocate_models, env_list)
if len(colocate_models) > 1:
for model in colocate_models:
model.is_colocate = True
Expand All @@ -103,7 +106,8 @@ def remote(self) -> list:
for model in self.dist_models:
# place non-colocate models
if model.name not in remote_states:
self.place_models_to_remote_devices([model])
self.place_models_to_remote_devices([model], env_list)
self.set_dist_env_concurrent(env_list)
self.converted = True
return self.dist_models

Expand Down Expand Up @@ -270,7 +274,7 @@ def find_model_packing_strategy(self, models, total_gpu):
final_packs.extend(packs_list)
return final_packs

def place_gpu_models(self, gpu_models):
def place_gpu_models(self, gpu_models, env_list=None):
if not gpu_models:
return
max_gpu = max(m.total_gpu for m in gpu_models)
Expand Down Expand Up @@ -330,8 +334,11 @@ def _get_model_replica_from_pack(gpu_index, model_pack):
reverse_gpu_placement = True
else:
reverse_gpu_placement = False
for replica in model.replicas:
replica.set_dist_env(reverse_gpu_placement)
if env_list is None:
for replica in model.replicas:
replica.set_dist_env(reverse_gpu_placement)
else:
env_list.append((model, reverse_gpu_placement))

def place_cpu_models(self, cpu_models):
if not cpu_models:
Expand Down Expand Up @@ -360,15 +367,35 @@ def place_cpu_models(self, cpu_models):
if i >= len(placement_groups):
i = 0

def place_models_to_remote_devices(self, models):
def place_models_to_remote_devices(self, models, env_list=None):
cpu_models = [model for model in models if model.total_gpu == 0]
gpu_models = [model for model in models if model.total_gpu > 0]
self.place_gpu_models(gpu_models)
self.place_gpu_models(gpu_models, env_list)
self.place_cpu_models(cpu_models)
for model in models:
for replica in model.replicas:
replica.preprocess_actors()

def _set_dist_env(self, model, reverse):
for replica in model.replicas:
replica.set_dist_env(reverse)

def set_dist_env_concurrent(self, env_list):
num = len(env_list)
if num == 0:
return
with ThreadPoolExecutor(max_workers=num) as executor:
futures = []
for model,reverse in env_list:
# set env
futures.append(executor.submit(self._set_dist_env, model, reverse))
for _future in concurrent.futures.as_completed(futures):
try:
_future.result()
except Exception as e:
raise RuntimeError(f"Set dist env generated an exception: {e}") # pylint: disable=raise-missing-from
concurrent.futures.wait(futures)

def clean(self):
for group in self.parameter_sync_groups.values():
group.destroy_collective_group()
Expand Down
2 changes: 1 addition & 1 deletion tests/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ function run_test {
echo "$file fail, retry ($attempts/3)..."
else
echo "$file fail, exit ..."
exit 1
exit 1
fi
done
ray stop
Expand Down

0 comments on commit f616134

Please sign in to comment.