Skip to content

Commit

Permalink
Add userbenchmark to run a group of userbenchmarks
Browse files Browse the repository at this point in the history
Summary:
Add a userbenchmark that runs a group of dynamo benchmarks.

Save the output to user specified directory.

Differential Revision: D52783199
  • Loading branch information
xuzhao9 authored and facebook-github-bot committed Jan 15, 2024
1 parent 4ffb8a9 commit 7c875af
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 4 deletions.
2 changes: 1 addition & 1 deletion userbenchmark/group_bench/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def parse_args(args: List[str]):
parser.add_argument("--config", "-c", required=True, help="YAML config to specify group of tests to run.")
parser.add_argument("--dryrun", action="store_true", help="Dryrun the command.")
parser.add_argument("--debug", action="store_true", help="Save the debug output.")
parser.add_argument("--output", default=f"/tmp/{BM_NAME}", help="Output torchbench userbenchmark metrics file path.")
parser.add_argument("--output", default=get_default_output_json_path(BM_NAME), help="Output torchbench userbenchmark metrics file path.")
return parser.parse_args(args)

def run(args: List[str]):
Expand Down
1 change: 1 addition & 0 deletions userbenchmark/group_userbench/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
BM_NAME = "group_userbench"
6 changes: 6 additions & 0 deletions userbenchmark/group_userbench/configs/torch_ao.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
name: dynamo
base_args: --bfloat16 --inductor --performance --inference --inductor-compile-mode max-autotune
test_group:
test_default_batch_size:
subgroup:
- extra_args: --tag baseline
121 changes: 121 additions & 0 deletions userbenchmark/group_userbench/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""
Run grouped userbenchmarks.
"""
import argparse
import ast
import copy
import dataclasses
import itertools
import json
import os
import pathlib
import re
import shutil

from typing import Any, Dict, List, Optional, Union

import numpy
import yaml

from torchbenchmark.util.experiment.metrics import (
get_model_accuracy,
get_model_test_metrics,
TorchBenchModelMetrics,
)

from ..task import TBUserbenchmarkConfig, TBUserTask
from userbenchmark.task import TBUserTask

from ..utils import (
add_path,
get_default_debug_output_dir,
get_default_output_json_path,
get_output_json,
REPO_PATH,
)
from . import BM_NAME

CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
DEFAULT_CONFIG_DIR = os.path.join(CURRENT_DIR, "configs")


@dataclasses.dataclass
class TBUserbenchmarkGroupConfig:
group_configs: Dict[str, List[TBUserbenchmarkConfig]]

@property
def configs(self):
return [ c for configs in self.group_configs.values() for c in configs ]


def init_output_dir(group_config: TBUserbenchmarkGroupConfig, output_dir: pathlib.Path):
for group_name in group_config.group_configs:
configs = group_config.group_configs[group_name]
for config in configs:
config_str = config.output_dir_name
config.output_dir = output_dir.joinpath(group_name, config_str)
if config.output_dir.exists():
shutil.rmtree(config.output_dir)
config.output_dir.mkdir(parents=True)


def run_config(config: TBUserbenchmarkConfig, dryrun: bool=False) -> None:
print(f"Running {config} ...", end='', flush=True)
if dryrun:
print(" [skip_by_dryrun]", flush=True)
return
# We do not allow RuntimeError in this test
try:
# load the userbenchmark and run it
task = TBUserTask(config)
task.run(config.args)
print(" [done]", flush=True)
return
except NotImplementedError:
print(" [not_implemented]", flush=True)
return
except OSError:
print(" [oserror]", flush=True)
return

def load_group_config(config_file: str) -> TBUserbenchmarkGroupConfig:
if not os.path.exists(config_file):
config_file = os.path.join(DEFAULT_CONFIG_DIR, config_file)
with open(config_file, "r") as fp:
data = yaml.safe_load(fp)
baseline_config = TBUserbenchmarkConfig(
name=data["name"],
args=data["base_args"].split(" "),
)
group_configs = {}
for group_name in data["test_group"]:
group_configs[group_name] = []
group_extra_args = list(filter(lambda x: x, data["test_group"][group_name].get("extra_args", "").split(" ")))
subgroup_extra_args_list = list(map(lambda x: x["extra_args"].split(" "), data["test_group"][group_name]["subgroup"]))
for subgroup_extra_args in subgroup_extra_args_list:
subgroup_config = copy.deepcopy(baseline_config)
subgroup_config.args.extend(group_extra_args)
subgroup_config.args.extend(subgroup_extra_args)
group_configs[group_name].append(subgroup_config)
return TBUserbenchmarkGroupConfig(group_configs)

def parse_args(args: List[str]):
parser = argparse.ArgumentParser()
parser.add_argument("--config", "-c", required=True, help="YAML config to specify group of tests to run.")
parser.add_argument("--dryrun", action="store_true", help="Dryrun the command.")
parser.add_argument("--output", default=get_default_output_json_path(BM_NAME), help="Output torchbench userbenchmark metrics file path.")
return parser.parse_args(args)

def run(args: List[str]):
args = parse_args(args)
group_config: TBUserbenchmarkGroupConfig = load_group_config(args.config)
output_dir = get_default_debug_output_dir(args.output)
init_output_dir(group_config, output_dir)

try:
for config in group_config.configs:
run_config(config, dryrun=args.dryrun)
except KeyboardInterrupt:
print("User keyboard interrupted!")

print(f"Benchmark results are saved to the output dir: {output_dir}")
79 changes: 79 additions & 0 deletions userbenchmark/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import dataclasses
import gc
import threading

from pathlib import Path
from typing import Dict, List, Optional

from components._impl.tasks import base as base_task
from components._impl.workers import subprocess_worker

from torchbenchmark import ModelDetails, Worker


@dataclasses.dataclass
class TBUserbenchmarkConfig:
name: str
args: List[str]
output_dir: Optional[Path] = None

@property
def output_dir_name(self) -> str:
return self.name + " " + " ".join(self.args)

class TBUserTask(base_task.TaskBase):

# The worker may (and often does) consume significant system resources.
# In order to ensure that runs do not interfere with each other, we only
# allow a single UserTask to exist at a time.
_lock = threading.Lock()

def __init__(
self,
config: TBUserbenchmarkConfig,
timeout: Optional[float] = None,
extra_env: Optional[Dict[str, str]] = None,
) -> None:
# gc.collect() # Make sure previous task has a chance to release the lock
assert self._lock.acquire(blocking=False), "Failed to acquire lock."
self._worker = Worker(timeout=timeout, extra_env=extra_env, save_output_dir=config.output_dir)
self._details = config
self._maybe_import_userbenchmark(config.name)

def __del__(self) -> None:
self._lock.release()

@property
def worker(self) -> subprocess_worker.SubprocessWorker:
return self._worker

def __str__(self) -> str:
return f"TBUserTask(Name: {self._details.name}, Details: {self._details})"

# =========================================================================
# == Run the userbenchmark in subprocess ================================
# =========================================================================
@base_task.run_in_worker(scoped=True)
@staticmethod
def _maybe_import_userbenchmark(ub_name: str) -> None:
import importlib
import os
import traceback
try:
module = importlib.import_module(f'.{ub_name}.run', package="userbenchmark")
run_func = getattr(module, 'run', None)
except ModuleNotFoundError:
traceback.print_exc()
exit(-1)

# Populate global namespace so subsequent calls to worker.run can access `Model`
globals()["_run_func"] = run_func
return

@base_task.run_in_worker(scoped=True)
@staticmethod
def run(args: List[str]) -> None:
import gc
gc.collect()
run_func = globals()["_run_func"]
run_func(args)
11 changes: 8 additions & 3 deletions userbenchmark/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,14 @@ def get_output_json(bm_name, metrics) -> Dict[str, Any]:
}


def get_output_dir(bm_name) -> Path:
current_dir = Path(os.path.dirname(os.path.abspath(__file__)))
target_dir = current_dir.parent.joinpath(USERBENCHMARK_OUTPUT_PREFIX, bm_name)
def get_output_dir(bm_name: str) -> Path:
import torch
IS_FBCODE = False if hasattr(torch.version, "git_version") else True
if not IS_FBCODE:
current_dir = Path(os.path.dirname(os.path.abspath(__file__)))
target_dir = current_dir.parent.joinpath(USERBENCHMARK_OUTPUT_PREFIX, bm_name)
else:
target_dir = Path(f"/tmp/{bm_name}")
target_dir.mkdir(exist_ok=True, parents=True)
return target_dir

Expand Down

0 comments on commit 7c875af

Please sign in to comment.