Skip to content

Commit

Permalink
queue run (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
joglekara authored Nov 20, 2023
1 parent 7b38762 commit e33bfab
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 23 deletions.
12 changes: 8 additions & 4 deletions env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@ dependencies:
- pip
- pip:
- "jax[cpu]"
- jaxopt
- numpy
- scipy
- matplotlib
- pyhdf
- xlrd
- pyyaml
- mlflow
- boto3
- flatdict
- typing-extensions
- optax
- tqdm
- xarray
- diffrax
- h5netcdf
- optax
- jaxopt
- mlflow_export_import
- pint
- diffrax
22 changes: 12 additions & 10 deletions env_gpu.yaml
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
name: adept
name: adept-gpu
channels:
- conda-forge
- "nvidia/label/cuda-11.7.0"
- nvidia
dependencies:
- python=3.10
- ipython
- ipykernel
- cuda-nvcc
- cudatoolkit==11.7.0
- pip
- pip:
# works for regular pip packages
- --find-links https://storage.googleapis.com/jax-releases/jax_cuda_releases.html
- "jax[cuda11_cudnn82]"
- jaxlib==0.4.20+cuda12.cudnn89
- jax==0.4.20
- jaxopt
- numpy
- scipy
- matplotlib
- pyhdf
- xlrd
- pyyaml
- mlflow
- boto3
- flatdict
- typing-extensions
- optax
- tqdm
- xarray
- mlflow_export_import
- pint
- diffrax
- h5netcdf
- optax
- jaxopt
- flatdict
12 changes: 12 additions & 0 deletions nersc-cpu.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash
#SBATCH --qos=debug
#SBATCH --time=5
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --constraint=cpu

export BASE_TEMPDIR="$PSCRATCH/tmp/"
export MLFLOW_TRACKING_URI="$PSCRATCH/mlflow"

source /global/u2/a/archis/adept/venv/bin/activate
cd /global/u2/a/archis/adept/
20 changes: 20 additions & 0 deletions nersc-gpu.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash
#SBATCH -A m4434_g
#SBATCH -C gpu
#SBATCH -q shared
#SBATCH -t 2:00:00
#SBATCH -n 1
#SBATCH -c 32
#SBATCH --gpus-per-task=1

export SLURM_CPU_BIND="cores"
export BASE_TEMPDIR="$PSCRATCH/tmp/"
export MLFLOW_TRACKING_URI="$PSCRATCH/mlflow"

# copy job stuff over
module load python
module load cudnn/8.9.3_cuda12.lua
module load cudatoolkit/12.0.lua

mamba activate adept-gpu
cd /global/u2/a/archis/adept/
58 changes: 58 additions & 0 deletions queue_adept.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import argparse
import os
import tempfile
import time

import mlflow
import yaml

if "BASE_TEMPDIR" in os.environ:
BASE_TEMPDIR = os.environ["BASE_TEMPDIR"]
else:
BASE_TEMPDIR = None


def _queue_run_(machine, run_id):
if "cpu" in machine:
base_job_file = os.environ["CPU_BASE_JOB_FILE"]
elif "gpu" in machine:
base_job_file = os.environ["GPU_BASE_JOB_FILE"]
else:
raise NotImplementedError

with open(base_job_file, "r") as fh:
base_job = fh.read()

with open(os.path.join(os.getcwd(), "new_job.sh"), "w") as job_file:
job_file.write(base_job)
job_file.writelines(f"srun python run.py --mode remote --run_id {run_id}")

os.system(f"sbatch new_job.sh")
time.sleep(0.1)
os.system("sqs")


def load_and_make_folders(cfg_path):
with open(f"{cfg_path}.yaml", "r") as file:
cfg = yaml.safe_load(file)

mlflow.set_experiment(cfg["mlflow"]["experiment"])
# modify config
with mlflow.start_run(run_name=cfg["mlflow"]["run"]) as run:
tags = {"sim_status": "queued"}
with tempfile.TemporaryDirectory(dir=BASE_TEMPDIR) as temp_path:
with open(os.path.join(temp_path, "config.yaml"), "w") as fp:
yaml.dump(cfg, fp)
mlflow.log_artifacts(temp_path)
mlflow.set_tags(tags)

return cfg, run.info.run_id


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Automatic Differentiation Enabled Plasma Transport")
parser.add_argument("--cfg", help="enter path to cfg")
args = parser.parse_args()

cfg, run_id = load_and_make_folders(args.cfg)
_queue_run_(cfg["machine"]["calculator"], run_id)
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ flatdict
h5netcdf
optax
jaxopt
boto3
boto3
pint
mlflow_export_import
30 changes: 27 additions & 3 deletions run.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,41 @@
import argparse
import os

from jax import config

config.update("jax_enable_x64", True)
# config.update("jax_disable_jit", True)

import yaml, mlflow
from utils.runner import run
from utils.runner import run, run_job
from utils.misc import export_run

if __name__ == "__main__":

def _run_(cfg_path):
# with open("configs/tf-1d/damping.yaml", "r") as fi:
with open("configs/vlasov-2d/base.yaml", "r") as fi:
with open(f"{cfg_path}.yaml", "r") as fi:
cfg = yaml.safe_load(fi)

mlflow.set_experiment(cfg["mlflow"]["experiment"])
# modify config
with mlflow.start_run(run_name=cfg["mlflow"]["run"]) as mlflow_run:
result, datasets = run(cfg)

return mlflow_run.info.run_id


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Automatic Differentiation Enabled Plasma Transport")
parser.add_argument("--cfg", help="enter path to cfg")
parser.add_argument("--run_id", help="enter run_id to continue")
args = parser.parse_args()

if args.mode == "local":
run_id = _run_(args.cfg)

elif args.mode == "remote":
run_job(args.run_id, nested=None)
run_id = args.run_id

if "MLFLOW_EXPORT" in os.environ:
export_run(run_id)
52 changes: 47 additions & 5 deletions utils/misc.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import flatdict, mlflow, os, boto3, botocore, shutil, pickle, yaml, operator
import flatdict, mlflow, os, boto3, botocore, shutil, pickle, yaml, time, tempfile
from urllib.parse import urlparse
from mlflow.tracking import MlflowClient
import jax
import equinox as eqx
from mlflow_export_import.run.export_run import RunExporter


def log_params(cfg):
Expand Down Expand Up @@ -53,14 +54,13 @@ def download_file(fname, artifact_uri, destination_path):
s3.download_file(bucket_name, rest_of_path[1:], dest_file_path)
except botocore.exceptions.ClientError as e:
return None
elif "file" in artifact_uri:
file_uri = file_uri[7:]
else:
if "file" in artifact_uri:
file_uri = file_uri[7:]
if os.path.exists(file_uri):
shutil.copyfile(file_uri, dest_file_path)
else:
return None
else:
raise NotImplementedError

return dest_file_path

Expand Down Expand Up @@ -143,3 +143,45 @@ def queue_sim(sim_request):
submissionResult = client.submit_job(**job_template)

return submissionResult


def upload_dir_to_s3(local_directory: str, bucket: str, destination: str, run_id: str):
"""
Uploads directory to s3 bucket for ingestion into mlflow on remote / cloud side
This requires you to have permission to access the s3 bucket
:param local_directory:
:param bucket:
:param destination:
:param run_id:
:return:
"""
client = boto3.client("s3")

# enumerate local files recursively
for root, dirs, files in os.walk(local_directory):
for filename in files:
# construct the full local path
local_path = os.path.join(root, filename)

# construct the full Dropbox path
relative_path = os.path.relpath(local_path, local_directory)
s3_path = os.path.join(destination, relative_path)
client.upload_file(local_path, bucket, s3_path)

with open(os.path.join(local_directory, f"ingest-{run_id}.txt"), "w") as fi:
fi.write("ready")

client.upload_file(os.path.join(local_directory, f"ingest-{run_id}.txt"), bucket, f"ingest-{run_id}.txt")


def export_run(run_id):
t0 = time.time()
run_exp = RunExporter(mlflow_client=mlflow.MlflowClient())
with tempfile.TemporaryDirectory() as td2:
run_exp.export_run(run_id, td2)
print(f"Export took {round(time.time() - t0, 2)} s")
t0 = time.time()
upload_dir_to_s3(td2, "remote-mlflow-staging", f"artifacts/{run_id}", run_id)
print(f"Uploading took {round(time.time() - t0, 2)} s")
12 changes: 12 additions & 0 deletions utils/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@

from utils import misc

if "BASE_TEMPDIR" in os.environ:
BASE_TEMPDIR = os.environ["BASE_TEMPDIR"]
else:
BASE_TEMPDIR = None


def get_helpers(mode):
if mode == "tf-1d":
Expand Down Expand Up @@ -80,6 +85,13 @@ def write_units(cfg, td):
yaml.dump(all_quantities, fi)


def run_job(run_id, nested):
with mlflow.start_run(run_id=run_id, nested=nested) as run:
with tempfile.TemporaryDirectory(dir=BASE_TEMPDIR) as temp_path:
cfg = misc.get_cfg(artifact_uri=run.info.artifact_uri, temp_path=temp_path)
run(cfg)


def run(cfg: Dict) -> Tuple[Solution, Dict]:
helpers = get_helpers(cfg["mode"])

Expand Down

0 comments on commit e33bfab

Please sign in to comment.