Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #21 from josenavas/pbs
Browse files Browse the repository at this point in the history
Add queue interaction support
  • Loading branch information
antgonza committed Mar 5, 2014
2 parents 3154c16 + ff0dd82 commit 231eda8
Show file tree
Hide file tree
Showing 15 changed files with 874 additions and 344 deletions.
60 changes: 60 additions & 0 deletions scaling/cluster_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/usr/bin/env python

__author__ = "Jose Antonio Navas Molina"
__copyright__ = "Copyright 2014, The QIIME-Scaling Project"
__credits__ = ["Jose Antonio Navas Molina", "Daniel McDonald"]
__license__ = "BSD"
__version__ = "0.0.1-dev"
__maintainer__ = "Jose Antonio Navas Molina"
__email__ = "josenavasmolina@gmail.com"
__status__ = "Development"

from subprocess import Popen, PIPE
from os import environ
from time import sleep


def check_status(jobs_to_monitor):
"""Check the status of the passed list of jobs
Inputs:
jobs_to_monitor: list of job ids
Returns:
A subset of jobs_to_monitor containing those jobs that are still
running
"""
# Get all the commands running pf the current user
user = environ['USER']
qstat_cmd = "qstat | grep %s" % user
proc = Popen(qstat_cmd, stdout=PIPE, stderr=PIPE, shell=True)
(stdout, stderr) = proc.communicate()
# Parse the qstat output
lines = stdout.splitlines()
running_jobs = []
for l in lines:
job_id, job_name, user, time, status, queue = l.split()
job_id = job_id.split('.')[0]
# Check if this job is one of the jobs that we have to
# monitor and check if it is running or queued
if job_id in jobs_to_monitor and status in ['R', 'Q']:
running_jobs.append(job_id)
# Return the list with the running jobs that we're still waiting for
return running_jobs


def wait_on(jobs_to_monitor, poll_interval=5):
"""Block while jobs to monitor are running
Inputs:
jobs_to_monitor: list of job ids
poll_interval: interval between checks, in seconds
"""
# Get the jobs ids by up to the first '.' character
jobs_to_monitor = [job.split('.')[0] for job in jobs_to_monitor]
# Loop until there is some job to monitor
while jobs_to_monitor:
# Sleep before new job status check
sleep(poll_interval)
# Check job status and get new set of jobs to wait on
jobs_to_monitor = check_status(jobs_to_monitor)
18 changes: 10 additions & 8 deletions scaling/commands/bench_results_comparator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@
__email__ = "josenavasmolina@gmail.com"
__status__ = "Development"

from pyqi.core.command import (Command, CommandIn, CommandOut,
from pyqi.core.command import (Command, CommandIn, CommandOut,
ParameterCollection)
from pyqi.core.exception import CommandError
from scaling.process_results import compare_benchmark_results
from matplotlib.figure import Figure


class BenchResultsComparator(Command):
BriefDescription = "Compare different runs results of the same bench suite"
LongDescription = ("Takes a list with paths to directories with benchmark "
"results and generates a plot with the wall time and a plot with the "
"memory consumption of the different runs, allowing performance "
"comparison between them.")
"results and generates a plot with the wall time and a "
"plot with the memory consumption of the different "
"runs, allowing performance comparison between them.")
CommandIns = ParameterCollection([
CommandIn(Name='input_dirs', DataType=list,
Description='List with the path to the directories with the '
Expand All @@ -34,9 +35,10 @@ class BenchResultsComparator(Command):

CommandOuts = ParameterCollection([
CommandOut(Name="time_fig", DataType=Figure,
Description="matplotlib figure with the wall time plot"),
Description="matplotlib figure with the wall time plot"),
CommandOut(Name="mem_fig", DataType=Figure,
Description="matplotlib figure with the memory consumption plot"),
Description="matplotlib figure with the memory consumption "
"plot"),
])

def run(self, **kwargs):
Expand All @@ -47,13 +49,13 @@ def run(self, **kwargs):

if len(input_dirs) < 2:
raise CommandError("You should provide at least two directories "
"with the benchmark results")
"with the benchmark results")

time_fig, mem_fig = compare_benchmark_results(input_dirs, labels)

result['time_fig'] = time_fig
result['mem_fig'] = mem_fig

return result

CommandConstructor = BenchResultsComparator
36 changes: 22 additions & 14 deletions scaling/commands/bench_results_processer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,44 +10,52 @@
__email__ = "josenavasmolina@gmail.com"
__status__ = "Development"

from pyqi.core.command import (Command, CommandIn, CommandOut,
ParameterCollection)
from scaling.process_results import process_benchmark_results
from pyqi.core.command import (Command, CommandIn, CommandOut,
ParameterCollection)
from matplotlib.figure import Figure
from scaling.process_results import process_benchmark_results
from scaling.cluster_util import wait_on


class BenchResultsProcesser(Command):
BriefDescription = "Processes the benchmark suite results"
LongDescription = "Takes the benchmark suite output directory and " +\
"processes the benchmark measurements, creating plots and collapsing" +\
" results in a usable form."
LongDescription = ("Takes the benchmark suite output directory and "
"processes the benchmark measurements, creating plots "
"and collapsing results in a usable form.")
CommandIns = ParameterCollection([
CommandIn(Name='input_dir', DataType=str,
Description='Path to the directory with the time results',
Required=True),
CommandIn(Name='job_ids', DataType=list,
Description='List of job ids to wait for if running in a '
'pbs cluster', Required=False, Default=[])
])

CommandOuts = ParameterCollection([
CommandOut(Name="bench_data", DataType=dict,
Description="Dictionary with the benchmark results"),
Description="Dictionary with the benchmark results"),
CommandOut(Name="time_fig", DataType=Figure,
Description="Figure with the execution time results"),
Description="Figure with the execution time results"),
CommandOut(Name="time_str", DataType=str,
Description="String with the best polynomial fit to the benchmark "
"execution time results"),
Description="String with the best polynomial fit to the "
"benchmark execution time results"),
CommandOut(Name="mem_fig", DataType=Figure,
Description="Figure with the memory consumption results"),
Description="Figure with the memory consumption results"),
CommandOut(Name="mem_str", DataType=str,
Description="String with the best polynomial fit to the benchmark "
"memory consumption results")
Description="String with the best polynomial fit to the "
"benchmark memory consumption results")
])

def run(self, **kwargs):
result = {}

input_dir = kwargs['input_dir']
job_ids = kwargs['job_ids']

wait_on(job_ids)

data, time_fig, time_str, mem_fig, mem_str = \
process_benchmark_results(input_dir)
process_benchmark_results(input_dir)

result['bench_data'] = data
result['time_fig'] = time_fig
Expand Down
95 changes: 66 additions & 29 deletions scaling/commands/bench_suite_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,70 +11,107 @@
__status__ = "Development"

from pyqi.core.command import (Command, CommandIn, CommandOut,
ParameterCollection)
ParameterCollection)
from pyqi.core.exception import CommandError
from scaling.make_bench_suite import (make_bench_suite_files,
make_bench_suite_parameters)
make_bench_suite_parameters)


class BenchSuiteMaker(Command):
BriefDescription = "Generates a benchmark suite file"
LongDescription = ("Given a command and a list of benchmarks files or a "
"dictionary with the options to test, this command generates a shell "
"script that executes a complete benchmark suite.")
"dictionary with the options to test, generates a shell"
" script that executes a complete benchmark suite.")
CommandIns = ParameterCollection([
CommandIn(Name='command', DataType=str,
Description='command to benchmark', Required=True),
Description='command to benchmark', Required=True),
CommandIn(Name='parameters', DataType=dict,
Description='dictionary where the keys are the parameters to test '
'and the values are a list of values for such parameter.',
DefaultDescription='No parameters used', Default=None),
Description='dictionary where the keys are the parameters '
'to test and the values are a list of values for such '
'parameter.',
DefaultDescription='No parameters used', Default=None),
CommandIn(Name='bench_files', DataType=list,
Description='List of lists of paths to the benchmark files to use '
'as input for the command. Each inner list is a test case and '
'should have the same length as the in_opts parameter.',
DefaultDescription='No bench_files used',
Required=False, Default=None),
Description='List of lists of paths to the benchmark files '
'to use as input for the command. Each inner list is a test '
'case and should have the same length as the in_opts '
'parameter.',
DefaultDescription='No bench_files used',
Required=False, Default=None),
CommandIn(Name='in_opts', DataType=list,
Description='list of options used for providing the benchmark files'
' to the command. It should have the same length and order than the'
' inner lists of bench_files.',
DefaultDescription='["-i"] is used as a default',
Required=False, Default=["-i"]),
Description='list of options used for providing the '
'benchmark files to the command. It should have the same '
'length and order than the inner lists of bench_files.',
DefaultDescription='["-i"] is used as a default',
Required=False, Default=["-i"]),
CommandIn(Name='out_opt', DataType=str,
Description='Option used for providing the output path to the '
'command to benchmark.',
DefaultDescription='"-o" is used as default',
Required=False, Default="-o")
Description='Option used for providing the output path to '
'the command to benchmark.',
DefaultDescription='"-o" is used as default',
Required=False, Default="-o"),
CommandIn(Name='pbs', DataType=bool,
Description='Flag to determine if the benchmark suite will '
'run in a PBS cluster environment',
DefaultDescription='False: run serially in bash',
Required=False, Default=False),
CommandIn(Name='job_prefix', DataType=str,
Description='Prefix for the job name in case of a PBS '
'cluster environment',
DefaultDescription='"bench_" is used as a default prefix',
Required=False, Default="bench_"),
CommandIn(Name='queue', DataType=str,
Description='PBS queue to submit jobs',
DefaultDescription='"" is used as default, which will submit'
' the jobs to the system default queue',
Required=False, Default=""),
CommandIn(Name='pbs_extra_args', DataType=str,
Description='Any extra arguments needed to qsub',
DefaultDescription='No extra arguments are used',
Required=False, Default="")
])
CommandOuts = ParameterCollection([
CommandOut(Name='bench_suite', DataType=str,
Description='String with the benchmark suite')])
Description='String with the benchmark suite')])

def run(self, **kwargs):
result = {}

# Get command parameters
command = kwargs['command']
out_opt = kwargs['out_opt']
parameters = kwargs['parameters']
bench_files = kwargs['bench_files']
in_opts = kwargs['in_opts']
pbs = kwargs['pbs']
job_prefix = kwargs['job_prefix']
queue = kwargs['queue']
pbs_extra_args = kwargs['pbs_extra_args']

# Check which type of bench suite are we generating
if parameters:
# We are generating a benchmark suite based on different parameter
# values. In such case, the user should not provide any bench file
if bench_files:
raise CommandError("Parameters or bench_files should be "
"provided, but not both.")
"provided, but not both.")
bench_str = make_bench_suite_parameters(command, parameters,
out_opt)
out_opt, pbs, job_prefix,
queue, pbs_extra_args)
elif bench_files:
# We are generating a benchmark suite based on input files,
# Check that the number of benchmark files for test case match
# the number of options to provide the input files
if not all(len(x) == len(in_opts) for x in bench_files):
raise CommandError("The length of bench_files and in_opts must "
"be the same.")
raise CommandError("The length of bench_files and in_opts "
"must be the same.")
bench_str = make_bench_suite_files(command, in_opts, bench_files,
out_opt)
out_opt, pbs, job_prefix, queue,
pbs_extra_args)
else:
# Not enough parameters!
raise CommandError("Must specify parameters or bench_files.")

result['bench_suite'] = bench_str

return result

CommandConstructor = BenchSuiteMaker
CommandConstructor = BenchSuiteMaker
46 changes: 27 additions & 19 deletions scaling/interfaces/optparse/config/compare_bench_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from pyqi.core.command import (make_command_in_collection_lookup_f,
make_command_out_collection_lookup_f)
from pyqi.core.interfaces.optparse.input_handler import string_list_handler

from scaling.commands.bench_results_comparator import CommandConstructor
from scaling.interfaces.optparse.output_handler import write_matplotlib_figure

Expand All @@ -25,31 +26,38 @@
# Examples of how the command can be used from the command line using an
# optparse interface.
usage_examples = [
OptparseUsageExample(ShortDesc="A short single sentence description of the example",
LongDesc="A longer, more detailed description",
Ex="%prog --foo --bar some_file")
OptparseUsageExample(ShortDesc="Compare different runs results of the same"
" bench suite",
LongDesc="Takes a comma-separated list with paths to "
"directories with benchmark results and generates a "
"plot with the wall time and a plot with the memory "
"consumption of the different runs, allowing "
"performance comparison between them.",
Ex="%prog -i timing1,timing2 -l run1,run2 -o plots")
]

# inputs map command line arguments and values onto Parameters. It is possible
# to define options here that do not exist as parameters, e.g., an output file.
inputs = [
OptparseOption(Parameter=cmd_in_lookup('input_dirs'),
Type='existing_dirpaths',
Action='store', # default is 'store', change if desired
Handler=None, # must be defined if desired
ShortName='i', # must be defined if desired
# Name='input_dirs', # implied by Parameter
# Required=True, # implied by Parameter
# Help='List with the path to the directories with the time results of different runs of the same bench suite', # implied by Parameter
Action='store',
Handler=None,
ShortName='i',
# Name='input_dirs',
# Required=True,
# Help='List with the path to the directories with the time
# results of different runs of the same bench suite',
),
OptparseOption(Parameter=cmd_in_lookup('labels'),
Type='str',
Action='store', # default is 'store', change if desired
Handler=string_list_handler, # must be defined if desired
ShortName='l', # must be defined if desired
# Name='labels', # implied by Parameter
# Required=True, # implied by Parameter
# Help='List of strings to label each data series on the plot', # implied by Parameter
Action='store',
Handler=string_list_handler,
ShortName='l',
# Name='labels',
# Required=True,
# Help='List of strings to label each data series on the
# plot'
),
OptparseOption(Parameter=None,
Type='new_dirpath',
Expand All @@ -64,9 +72,9 @@
# inputs list (above).
outputs = [
OptparseResult(Parameter=cmd_out_lookup('mem_fig'),
Handler=write_matplotlib_figure,
InputName='output-dir'),
Handler=write_matplotlib_figure,
InputName='output-dir'),
OptparseResult(Parameter=cmd_out_lookup('time_fig'),
Handler=write_matplotlib_figure,
InputName='output-dir'),
Handler=write_matplotlib_figure,
InputName='output-dir'),
]
Loading

0 comments on commit 231eda8

Please sign in to comment.