Skip to content

Commit

Permalink
feat: Add Ray cluster management scripts for PBS
Browse files Browse the repository at this point in the history
  • Loading branch information
adigitoleo committed Apr 8, 2024
1 parent 4dffdf2 commit 493c80c
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 0 deletions.
3 changes: 3 additions & 0 deletions tests/test_simple_shear_3d.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ def test_direction_change(
switch_time_Ma * 1e6,
_id,
)
if HAS_RAY:
ray.init(address="auto")
_log.info("using Ray cluster with %s", ray.cluster_resources())
with Pool(processes=ncpus) as pool:
for s, out in enumerate(pool.imap_unordered(_run, _seeds)):
olivine, enstatite = out
Expand Down
22 changes: 22 additions & 0 deletions tools/pbs_scripts/run_test_ray_cluster_setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash
#PBS -P xd2
#PBS -q normalbw
#PBS -l walltime=00:10:00
#PBS -l ncpus=56
#PBS -l mem=256GB
#PBS -l jobfs=400GB
#PBS -l storage=scratch/xd2+gdata/xd2
#PBS -l wd
#PBS -o test_ray_cluster_setup.log
#PBS -e test_ray_cluster_setup.err
#PBS -N test_ray_cluster_setup

module purge
module load python3/3.11.7 python3-as-python
# NOTE: First run pip install 'ray[default]' in this python environment.

source pbs_start_ray_cluster.sh

python -c 'import ray; ray.init(address="auto"); print(f"nodes in cluster: {ray.nodes()}"); print(f"cluster resources: {ray.cluster_resources()}")'

ray stop
129 changes: 129 additions & 0 deletions tools/pbs_start_ray_cluster.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/bin/bash
set -u
readonly SCRIPTNAME="${0##*/}"
usage() {
printf 'Usage: source %s\n' "$SCRIPTNAME"
echo
echo "Source this script in your PBS job file to set up a Ray cluster."
echo "Stop the Ray cluster with 'ray stop' as the terminal line of the job."
echo "If the environment variable RAY_NWORKERS is set to an integer,"
echo "only that many workers will be started. Otherwise, the number of"
echo "workers will be set to PBS_NCPUS."
}
warn() { >&2 printf '%s\n' "$SCRIPTNAME: $1"; }

# Check for the main prerequisites.
[ $# -gt 0 ] && { usage; exit 1; }
module prepend-path PATH "$HOME/.local/bin"
command -v ray || { warn "unknown command 'ray'"; exit 1; }
[ -x "pbs_start_ray_worker.sh" ] || { warn "cannot execute pbs_start_ray_worker.sh"; exit 1; }

# https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html
ulimit -n 65535

USER_CFG=$PBS_O_WORKDIR/.cfg_${PBS_JOBID}

NWORKERS=${RAY_NWORKERS:-${PBS_NCPUS}}
NGPUS=$((PBS_NGPUS/PBS_NNODES))
NCPUS=$((NWORKERS/PBS_NNODES))
unset NWORKERS

# Choose Ray scheduler port in the range 8701-8800.
SCHEDULER_PORT=$(shuf -n 1 -i 8701-8800)
while `ss -lntupw | grep -q ":$SCHEDULER_PORT" >& /dev/null`; do
SCHEDULER_PORT=$(shuf -n 1 -i 8701-8800) # SCHEDULER_PORT is taken.
done

# Choose Ray dashboard port in the range 8801-8900.
DASH_PORT=$(shuf -n 1 -i 8801-8900)
while `ss -lntupw | grep -q ":$DASH_PORT" >& /dev/null`; do
DASH_PORT=$(shuf -n 1 -i 8801-8900) # DASH_PORT is taken.
done

LOG_FILE=${USER_CFG}/head_node.log
if [ ! -d ${USER_CFG} ]; then
mkdir ${USER_CFG}
fi
touch $LOG_FILE

# Each node needs to load the modules as well.
module save ${USER_CFG}/module_coll
echo "#!/bin/bash" >${USER_CFG}/jobpython
echo "module restore ${USER_CFG}/module_coll" >> ${USER_CFG}/jobpython
echo "module prepend-path PATH $HOME/.local/bin" >> ${USER_CFG}/jobpython
echo " python \$* " >> ${USER_CFG}/jobpython
chmod 755 ${USER_CFG}/jobpython

# Parameters used to wait for Ray worker connection.
TIMEOUT=300
INTERVAL=2

IP_PREFIX=`hostname -i`
IP_HEAD=${IP_PREFIX}:${SCHEDULER_PORT}

# Set resource numbers (per worker) for ray workers to pick up.
echo ${IP_HEAD} > ${USER_CFG}/ip_head
echo ${NGPUS} > ${USER_CFG}/ngpus
echo ${NCPUS} > ${USER_CFG}/ncpus
if [ ${PBS_NGPUS} -gt 0 ]; then
GPU_MEM=$(( PBS_VMEM / PBS_NNODES / NGPUS))
echo ${GPU_MEM} > ${USER_CFG}/mem_proc
else
PROC_MEM=$(( PBS_VMEM / PBS_NNODES / NCPUS))
echo ${PROC_MEM} > ${USER_CFG}/mem_proc
fi

# Start Ray scheduler on the head node.
ray start --head --node-ip-address=${IP_PREFIX} --port=${SCHEDULER_PORT} \
--dashboard-host=${IP_PREFIX} --dashboard-port=${DASH_PORT} --num-cpus ${NCPUS} --num-gpus ${NGPUS} &>> ${LOG_FILE}

((t = TIMEOUT))
while [ ! -f ${LOG_FILE} ]; do
sleep ${INTERVAL}
((t -= INTERVAL))
while ((t <= 2)); do
warn "scheduler failed to start up within $TIMEOUT seconds, aborting."
exit 1
done
done

((t = TIMEOUT))
while ! grep "Ray runtime started." ${LOG_FILE} >& /dev/null; do
sleep ${INTERVAL}
((t -= INTERVAL))
while ((t <= 2)); do
warn "no ray runtime established within $TIMEOUT seconds, aborting"
exit 1
done
done

# File to store the ssh command that connects to the Ray head node.
if [ ! -e ${USER_CFG}/client_cmd ]; then
echo "ssh -N -L $DASH_PORT:`hostname`:$DASH_PORT ${USER}@gadi.nci.org.au " >& $USER_CFG/client_cmd
else
echo "ssh -N -L $DASH_PORT:`hostname`:$DASH_PORT ${USER}@gadi.nci.org.au " >> $USER_CFG/client_cmd
fi

TOT_NPROCS=0
# Start Ray workers on the remaining nodes.
for node in `cat $PBS_NODEFILE | uniq`; do
if [ $node != `hostname` ]; then
pbs_tmrsh ${node} "${PBS_O_WORKDIR}/pbs_start_ray_worker.sh" &
fi
if [ ${PBS_NGPUS} -gt 0 ]; then
TOT_NPROCS=$(( $TOT_NPROCS + $NGPUS ))
else
TOT_NPROCS=$(( $TOT_NPROCS + $NCPUS ))
fi
done

echo "========== RAY cluster resources =========="
if [ ${PBS_NGPUS} -gt 0 ]; then
echo "RAY NODE: GPU"
echo "RAY WORKERS: ${NGPUS}/Node, ${TOT_NPROCS} in total."
echo "RAY MEMORY: $(( GPU_MEM /1024/1024/1024 ))GiB/worker, $(( PBS_VMEM /1024/1024/1024 ))GiB in total."
else
echo "RAY NODE: CPU"
echo "RAY WORKERS: ${NCPUS}/Node, ${TOT_NPROCS} in total."
echo "RAY MEMORY: $(( PROC_MEM /1024/1024/1024 ))GiB/worker, $(( PBS_VMEM /1024/1024/1024 ))GiB in total."
fi
24 changes: 24 additions & 0 deletions tools/pbs_start_ray_worker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/bin/bash
set -e
readonly SCRIPTNAME="${0##*/}"
warn() { >&2 printf '%s\n' "$SCRIPTNAME: $1"; }
[ $# -gt 0 ] && { warn "do not run this script directly, see pbs_start_ray_cluster.sh instead"; exit 1; }

# This sets up the PBS commands like 'module' which we need below.
source /etc/bashrc

ulimit -s unlimited
ulimit -n 65535
USER_CFG=$PBS_O_WORKDIR/.cfg_${PBS_JOBID}
HOSTNAME=`hostname`
JOBDIR=$PBS_JOBFS
cd $JOBDIR

NCPUS=`cat ${USER_CFG}/ncpus`
NGPUS=`cat ${USER_CFG}/ngpus`
IP_HEAD=`cat ${USER_CFG}/ip_head`

module restore ${USER_CFG}/module_coll >& ${USER_CFG}/worker.${HOSTNAME}.log
module prepend-path PATH "$HOME/.local/bin"
echo "$SCRIPTNAME: starting worker $IP_HEAD with ${NCPUS} CPUs and ${NGPUS} GPUs" >> ${USER_CFG}/worker.${HOSTNAME}.log
ray start --address=$IP_HEAD --num-cpus ${NCPUS} --num-gpus ${NGPUS} --block &>> ${USER_CFG}/worker.${HOSTNAME}.log

0 comments on commit 493c80c

Please sign in to comment.