diff --git a/sup3r/bias/bias_calc.py b/sup3r/bias/bias_calc.py index 054b5b26c..d909e45b4 100644 --- a/sup3r/bias/bias_calc.py +++ b/sup3r/bias/bias_calc.py @@ -211,7 +211,8 @@ def get_node_cmd(cls, config): f"{fun_str};\n" "t_elap = time.time() - t0;\n") - cmd = BaseCLI.add_status_cmd(config, ModuleName.BIAS_CALC, cmd) + pipeline_step = config.get('pipeline_step') or ModuleName.BIAS_CALC + cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd) cmd += ";\'\n" return cmd.replace('\\', '/') diff --git a/sup3r/bias/bias_calc_cli.py b/sup3r/bias/bias_calc_cli.py index 23c4b3771..13288f277 100644 --- a/sup3r/bias/bias_calc_cli.py +++ b/sup3r/bias/bias_calc_cli.py @@ -34,7 +34,7 @@ def main(ctx, verbose): @click.option('-v', '--verbose', is_flag=True, help='Flag to turn on debug logging. Default is not verbose.') @click.pass_context -def from_config(ctx, config_file, verbose=False, **__): +def from_config(ctx, config_file, verbose=False, pipeline_step=None): """Run sup3r bias correction calculation from a config file.""" config = BaseCLI.from_config_preflight(ModuleName.BIAS_CALC, ctx, config_file, verbose) @@ -56,6 +56,7 @@ def from_config(ctx, config_file, verbose=False, **__): name = ('{}_{}'.format(basename, str(i_node).zfill(6))) ctx.obj['NAME'] = name node_config['job_name'] = name + node_config["pipeline_step"] = pipeline_step cmd = BiasCalcClass.get_node_cmd(node_config) @@ -63,13 +64,14 @@ def from_config(ctx, config_file, verbose=False, **__): logger.debug(f'Running command:\n\t{cmd_log}') if hardware_option.lower() in AVAILABLE_HARDWARE_OPTIONS: - kickoff_slurm_job(ctx, cmd, **exec_kwargs) + kickoff_slurm_job(ctx, cmd, pipeline_step, **exec_kwargs) else: - kickoff_local_job(ctx, cmd) + kickoff_local_job(ctx, cmd, pipeline_step) -def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, - feature=None, stdout_path='./stdout/'): +def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r', + memory=None, walltime=4, feature=None, + stdout_path='./stdout/'): """Run sup3r on HPC via SLURM job submission. Parameters @@ -79,6 +81,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, cmd : str Command to be submitted in SLURM shell script. Example: 'python -m sup3r.cli forward_pass -c ' + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. alloc : str HPC project (allocation) handle. Example: 'sup3r'. memory : int @@ -92,10 +98,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, Path to print .stdout and .stderr files. """ BaseCLI.kickoff_slurm_job(ModuleName.BIAS_CALC, ctx, cmd, alloc, memory, - walltime, feature, stdout_path) + walltime, feature, stdout_path, pipeline_step) -def kickoff_local_job(ctx, cmd): +def kickoff_local_job(ctx, cmd, pipeline_step=None): """Run sup3r bias calc locally. Parameters @@ -105,8 +111,12 @@ def kickoff_local_job(ctx, cmd): cmd : str Command to be submitted in shell script. Example: 'python -m sup3r.cli forward_pass -c ' + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. """ - BaseCLI.kickoff_local_job(ModuleName.BIAS_CALC, ctx, cmd) + BaseCLI.kickoff_local_job(ModuleName.BIAS_CALC, ctx, cmd, pipeline_step) if __name__ == '__main__': diff --git a/sup3r/pipeline/forward_pass.py b/sup3r/pipeline/forward_pass.py index 3d14502f8..89b90291c 100644 --- a/sup3r/pipeline/forward_pass.py +++ b/sup3r/pipeline/forward_pass.py @@ -1761,7 +1761,8 @@ def get_node_cmd(cls, config): f"{cls.__name__}.run(strategy, {node_index});\n" "t_elap = time.time() - t0;\n") - cmd = BaseCLI.add_status_cmd(config, ModuleName.FORWARD_PASS, cmd) + pipeline_step = config.get('pipeline_step') or ModuleName.FORWARD_PASS + cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd) cmd += ";\'\n" return cmd.replace('\\', '/') diff --git a/sup3r/pipeline/forward_pass_cli.py b/sup3r/pipeline/forward_pass_cli.py index 5b21f46bd..3cb33b3b2 100644 --- a/sup3r/pipeline/forward_pass_cli.py +++ b/sup3r/pipeline/forward_pass_cli.py @@ -35,7 +35,7 @@ def main(ctx, verbose): @click.option('-v', '--verbose', is_flag=True, help='Flag to turn on debug logging. Default is not verbose.') @click.pass_context -def from_config(ctx, config_file, verbose=False, **__): +def from_config(ctx, config_file, verbose=False, pipeline_step=None): """Run sup3r forward pass from a config file.""" config = BaseCLI.from_config_preflight(ModuleName.FORWARD_PASS, ctx, @@ -66,19 +66,21 @@ def from_config(ctx, config_file, verbose=False, **__): name = ('{}_{}'.format(basename, str(i_node).zfill(6))) ctx.obj['NAME'] = name node_config['job_name'] = name + node_config["pipeline_step"] = pipeline_step cmd = ForwardPass.get_node_cmd(node_config) cmd_log = '\n\t'.join(cmd.split('\n')) logger.debug(f'Running command:\n\t{cmd_log}') if hardware_option.lower() in AVAILABLE_HARDWARE_OPTIONS: - kickoff_slurm_job(ctx, cmd, **exec_kwargs) + kickoff_slurm_job(ctx, cmd, pipeline_step, **exec_kwargs) else: - kickoff_local_job(ctx, cmd) + kickoff_local_job(ctx, cmd, pipeline_step) -def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, - feature=None, stdout_path='./stdout/'): +def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r', + memory=None, walltime=4, feature=None, + stdout_path='./stdout/'): """Run sup3r on HPC via SLURM job submission. Parameters @@ -88,6 +90,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, cmd : str Command to be submitted in SLURM shell script. Example: 'python -m sup3r.cli forward_pass -c ' + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. alloc : str HPC project (allocation) handle. Example: 'sup3r'. memory : int @@ -101,10 +107,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, Path to print .stdout and .stderr files. """ BaseCLI.kickoff_slurm_job(ModuleName.FORWARD_PASS, ctx, cmd, alloc, memory, - walltime, feature, stdout_path) + walltime, feature, stdout_path, pipeline_step) -def kickoff_local_job(ctx, cmd): +def kickoff_local_job(ctx, cmd, pipeline_step=None): """Run sup3r forward pass locally. Parameters @@ -114,8 +120,12 @@ def kickoff_local_job(ctx, cmd): cmd : str Command to be submitted in shell script. Example: 'python -m sup3r.cli forward_pass -c ' + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. """ - BaseCLI.kickoff_local_job(ModuleName.FORWARD_PASS, ctx, cmd) + BaseCLI.kickoff_local_job(ModuleName.FORWARD_PASS, ctx, cmd, pipeline_step) if __name__ == '__main__': diff --git a/sup3r/postprocessing/collection.py b/sup3r/postprocessing/collection.py index bcbaef968..7a6977dca 100644 --- a/sup3r/postprocessing/collection.py +++ b/sup3r/postprocessing/collection.py @@ -76,8 +76,8 @@ def get_node_cmd(cls, config): "t_elap = time.time() - t0;\n" ) - cmd = BaseCLI.add_status_cmd(config, ModuleName.DATA_COLLECT, cmd) - + pipeline_step = config.get('pipeline_step') or ModuleName.DATA_COLLECT + cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd) cmd += ";\'\n" return cmd.replace('\\', '/') @@ -755,6 +755,7 @@ def collect( log_file=None, write_status=False, job_name=None, + pipeline_step=None, join_times=False, target_final_meta_file=None, n_writes=None, @@ -786,6 +787,10 @@ def collect( Flag to write status file once complete if running from pipeline. job_name : str Job name for status file if running from pipeline. + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``"collect``, + mimicking old reV behavior. By default, ``None``. join_times : bool Option to split full file list into chunks with each chunk having the same temporal_chunk_index. The number of writes will then be @@ -909,8 +914,9 @@ def collect( 'job_status': 'successful', 'runtime': (time.time() - t0) / 60, } + pipeline_step = pipeline_step or 'collect' Status.make_single_job_file( - os.path.dirname(out_file), 'collect', job_name, status + os.path.dirname(out_file), pipeline_step, job_name, status ) logger.info('Finished file collection.') diff --git a/sup3r/postprocessing/data_collect_cli.py b/sup3r/postprocessing/data_collect_cli.py index 5133f5586..ac148dd53 100644 --- a/sup3r/postprocessing/data_collect_cli.py +++ b/sup3r/postprocessing/data_collect_cli.py @@ -33,7 +33,7 @@ def main(ctx, verbose): @click.option('-v', '--verbose', is_flag=True, help='Flag to turn on debug logging. Default is not verbose.') @click.pass_context -def from_config(ctx, config_file, verbose=False, **__): +def from_config(ctx, config_file, verbose=False, pipeline_step=None): """Run sup3r data collection from a config file. If dset_split is True this each feature will be collected into a separate file.""" config = BaseCLI.from_config_preflight(ModuleName.DATA_COLLECT, ctx, @@ -62,18 +62,19 @@ def from_config(ctx, config_file, verbose=False, **__): for config in configs: ctx.obj['NAME'] = config['job_name'] + config['pipeline_step'] = pipeline_step cmd = Collector.get_node_cmd(config) cmd_log = '\n\t'.join(cmd.split('\n')) logger.debug(f'Running command:\n\t{cmd_log}') if hardware_option.lower() in AVAILABLE_HARDWARE_OPTIONS: - kickoff_slurm_job(ctx, cmd, **exec_kwargs) + kickoff_slurm_job(ctx, cmd, pipeline_step, **exec_kwargs) else: - kickoff_local_job(ctx, cmd) + kickoff_local_job(ctx, cmd, pipeline_step) -def kickoff_local_job(ctx, cmd): +def kickoff_local_job(ctx, cmd, pipeline_step=None): """Run sup3r data collection locally. Parameters @@ -83,12 +84,17 @@ def kickoff_local_job(ctx, cmd): cmd : str Command to be submitted in shell script. Example: 'python -m sup3r.cli data_collect -c ' + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. """ - BaseCLI.kickoff_local_job(ModuleName.DATA_COLLECT, ctx, cmd) + BaseCLI.kickoff_local_job(ModuleName.DATA_COLLECT, ctx, cmd, pipeline_step) -def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, - feature=None, stdout_path='./stdout/'): +def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r', + memory=None, walltime=4, feature=None, + stdout_path='./stdout/'): """Run sup3r on HPC via SLURM job submission. Parameters @@ -98,6 +104,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, cmd : str Command to be submitted in SLURM shell script. Example: 'python -m sup3r.cli data-collect -c ' + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. alloc : str HPC project (allocation) handle. Example: 'sup3r'. memory : int @@ -111,7 +121,7 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, Path to print .stdout and .stderr files. """ BaseCLI.kickoff_slurm_job(ModuleName.DATA_COLLECT, ctx, cmd, alloc, memory, - walltime, feature, stdout_path) + walltime, feature, stdout_path, pipeline_step) if __name__ == '__main__': diff --git a/sup3r/preprocessing/data_extract_cli.py b/sup3r/preprocessing/data_extract_cli.py index 4aec216cb..8e182b19f 100644 --- a/sup3r/preprocessing/data_extract_cli.py +++ b/sup3r/preprocessing/data_extract_cli.py @@ -32,7 +32,7 @@ def main(ctx, verbose): @click.option('-v', '--verbose', is_flag=True, help='Flag to turn on debug logging. Default is not verbose.') @click.pass_context -def from_config(ctx, config_file, verbose=False, **__): +def from_config(ctx, config_file, verbose=False, pipeline_step=None): """Run sup3r data extraction from a config file. Parameters @@ -46,6 +46,7 @@ def from_config(ctx, config_file, verbose=False, **__): """ config = BaseCLI.from_config_preflight(ModuleName.DATA_EXTRACT, ctx, config_file, verbose) + config["pipeline_step"] = pipeline_step exec_kwargs = config.get('execution_control', {}) hardware_option = exec_kwargs.pop('option', 'local') @@ -63,7 +64,7 @@ def from_config(ctx, config_file, verbose=False, **__): kickoff_local_job(ctx, cmd) -def kickoff_local_job(ctx, cmd): +def kickoff_local_job(ctx, cmd, pipeline_step=None): """Run sup3r data extraction locally. Parameters @@ -73,12 +74,17 @@ def kickoff_local_job(ctx, cmd): cmd : str Command to be submitted in shell script. Example: 'python -m sup3r.cli data_extract -c ' + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. """ - BaseCLI.kickoff_local_job(ModuleName.DATA_EXTRACT, ctx, cmd) + BaseCLI.kickoff_local_job(ModuleName.DATA_EXTRACT, ctx, cmd, pipeline_step) -def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, - feature=None, stdout_path='./stdout/'): +def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r', + memory=None, walltime=4, feature=None, + stdout_path='./stdout/'): """Run sup3r on HPC via SLURM job submission. Parameters @@ -88,6 +94,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, cmd : str Command to be submitted in SLURM shell script. Example: 'python -m sup3r.cli data_extract -c ' + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. alloc : str HPC project (allocation) handle. Example: 'sup3r'. memory : int @@ -101,7 +111,7 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, Path to print .stdout and .stderr files. """ BaseCLI.kickoff_slurm_job(ModuleName.DATA_EXTRACT, ctx, cmd, alloc, memory, - walltime, feature, stdout_path) + walltime, feature, stdout_path, pipeline_step) if __name__ == '__main__': diff --git a/sup3r/preprocessing/data_handling/base.py b/sup3r/preprocessing/data_handling/base.py index 94eea1673..0fa62d0fd 100644 --- a/sup3r/preprocessing/data_handling/base.py +++ b/sup3r/preprocessing/data_handling/base.py @@ -816,8 +816,8 @@ def get_node_cmd(cls, config): f"data_handler = {dh_init_str};\n" "t_elap = time.time() - t0;\n") - cmd = BaseCLI.add_status_cmd(config, ModuleName.DATA_EXTRACT, cmd) - + pipeline_step = config.get('pipeline_step') or ModuleName.DATA_EXTRACT + cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd) cmd += ";\'\n" return cmd.replace('\\', '/') diff --git a/sup3r/qa/qa.py b/sup3r/qa/qa.py index 6576d6392..c4ca1ae92 100644 --- a/sup3r/qa/qa.py +++ b/sup3r/qa/qa.py @@ -570,7 +570,8 @@ def get_node_cmd(cls, config): "t_elap = time.time() - t0;\n" ) - cmd = BaseCLI.add_status_cmd(config, ModuleName.QA, cmd) + pipeline_step = config.get('pipeline_step') or ModuleName.QA + cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd) cmd += ";\'\n" return cmd.replace('\\', '/') diff --git a/sup3r/qa/qa_cli.py b/sup3r/qa/qa_cli.py index 02ca2c3f6..8713a42b9 100644 --- a/sup3r/qa/qa_cli.py +++ b/sup3r/qa/qa_cli.py @@ -32,9 +32,10 @@ def main(ctx, verbose): @click.option('-v', '--verbose', is_flag=True, help='Flag to turn on debug logging. Default is not verbose.') @click.pass_context -def from_config(ctx, config_file, verbose=False, **__): +def from_config(ctx, config_file, verbose=False, pipeline_step=None): """Run the sup3r QA module from a config file.""" - BaseCLI.from_config(ModuleName.QA, Sup3rQa, ctx, config_file, verbose) + BaseCLI.from_config(ModuleName.QA, Sup3rQa, ctx, config_file, verbose, + pipeline_step) if __name__ == '__main__': diff --git a/sup3r/qa/stats.py b/sup3r/qa/stats.py index 348dca486..58d8eeccf 100644 --- a/sup3r/qa/stats.py +++ b/sup3r/qa/stats.py @@ -154,7 +154,8 @@ def get_node_cmd(cls, config): "t_elap = time.time() - t0;\n" ) - cmd = BaseCLI.add_status_cmd(config, ModuleName.STATS, cmd) + pipeline_step = config.get('pipeline_step') or ModuleName.STATS + cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd) cmd += ";\'\n" return cmd.replace('\\', '/') diff --git a/sup3r/qa/stats_cli.py b/sup3r/qa/stats_cli.py index 6bdd618f9..793b15039 100644 --- a/sup3r/qa/stats_cli.py +++ b/sup3r/qa/stats_cli.py @@ -32,10 +32,10 @@ def main(ctx, verbose): @click.option('-v', '--verbose', is_flag=True, help='Flag to turn on debug logging. Default is not verbose.') @click.pass_context -def from_config(ctx, config_file, verbose=False, **__): +def from_config(ctx, config_file, verbose=False, pipeline_step=None): """Run the sup3r WindStats module from a config file.""" BaseCLI.from_config(ModuleName.STATS, Sup3rStatsMulti, ctx, - config_file, verbose) + config_file, verbose, pipeline_step) if __name__ == '__main__': diff --git a/sup3r/qa/visual_qa.py b/sup3r/qa/visual_qa.py index a18fcabf4..e0118459c 100644 --- a/sup3r/qa/visual_qa.py +++ b/sup3r/qa/visual_qa.py @@ -275,7 +275,8 @@ def get_node_cmd(cls, config): "t_elap = time.time() - t0;\n" ) - cmd = BaseCLI.add_status_cmd(config, ModuleName.VISUAL_QA, cmd) + pipeline_step = config.get('pipeline_step') or ModuleName.VISUAL_QA + cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd) cmd += ";\'\n" return cmd.replace('\\', '/') diff --git a/sup3r/qa/visual_qa_cli.py b/sup3r/qa/visual_qa_cli.py index cc9f9b299..ae38db9e6 100644 --- a/sup3r/qa/visual_qa_cli.py +++ b/sup3r/qa/visual_qa_cli.py @@ -31,10 +31,10 @@ def main(ctx, verbose): @click.option('-v', '--verbose', is_flag=True, help='Flag to turn on debug logging. Default is not verbose.') @click.pass_context -def from_config(ctx, config_file, verbose=False, **__): +def from_config(ctx, config_file, verbose=False, pipeline_step=None): """Run the sup3r visual QA module from a config file.""" BaseCLI.from_config(ModuleName.VISUAL_QA, Sup3rVisualQa, ctx, config_file, - verbose) + verbose, pipeline_step) if __name__ == '__main__': diff --git a/sup3r/solar/solar.py b/sup3r/solar/solar.py index a7378a594..c847cf96b 100644 --- a/sup3r/solar/solar.py +++ b/sup3r/solar/solar.py @@ -524,10 +524,11 @@ def get_node_cmd(cls, config): ) job_name = config.get('job_name', None) + pipeline_step = config.get('pipeline_step') or ModuleName.SOLAR if job_name is not None: status_dir = config.get('status_dir', None) status_file_arg_str = f'"{status_dir}", ' - status_file_arg_str += f'pipeline_step="{ModuleName.SOLAR}", ' + status_file_arg_str += f'pipeline_step="{pipeline_step}", ' status_file_arg_str += f'job_name="{job_name}", ' status_file_arg_str += 'attrs=job_attrs' diff --git a/sup3r/solar/solar_cli.py b/sup3r/solar/solar_cli.py index 85c8d0097..25c6c216d 100644 --- a/sup3r/solar/solar_cli.py +++ b/sup3r/solar/solar_cli.py @@ -34,7 +34,7 @@ def main(ctx, verbose): @click.option('-v', '--verbose', is_flag=True, help='Flag to turn on debug logging. Default is not verbose.') @click.pass_context -def from_config(ctx, config_file, verbose=False, **__): +def from_config(ctx, config_file, verbose=False, pipeline_step=None): """Run sup3r solar from a config file.""" config = BaseCLI.from_config_preflight(ModuleName.SOLAR, ctx, config_file, verbose) @@ -56,6 +56,7 @@ def from_config(ctx, config_file, verbose=False, **__): name = ('{}_{}'.format(basename, str(i_node).zfill(6))) ctx.obj['NAME'] = name node_config['job_name'] = name + node_config["pipeline_step"] = pipeline_step node_config['temporal_id'] = temporal_id cmd = Solar.get_node_cmd(node_config) @@ -64,13 +65,14 @@ def from_config(ctx, config_file, verbose=False, **__): logger.debug(f'Running command:\n\t{cmd_log}') if hardware_option.lower() in AVAILABLE_HARDWARE_OPTIONS: - kickoff_slurm_job(ctx, cmd, **exec_kwargs) + kickoff_slurm_job(ctx, cmd, pipeline_step, **exec_kwargs) else: - kickoff_local_job(ctx, cmd) + kickoff_local_job(ctx, cmd, pipeline_step) -def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, - feature=None, stdout_path='./stdout/'): +def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r', + memory=None, walltime=4, feature=None, + stdout_path='./stdout/'): """Run sup3r on HPC via SLURM job submission. Parameters @@ -80,6 +82,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, cmd : str Command to be submitted in SLURM shell script. Example: 'python -m sup3r.cli forward_pass -c ' + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. alloc : str HPC project (allocation) handle. Example: 'sup3r'. memory : int @@ -93,10 +99,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, Path to print .stdout and .stderr files. """ BaseCLI.kickoff_slurm_job(ModuleName.SOLAR, ctx, cmd, alloc, memory, - walltime, feature, stdout_path) + walltime, feature, stdout_path, pipeline_step) -def kickoff_local_job(ctx, cmd): +def kickoff_local_job(ctx, cmd, pipeline_step=None): """Run sup3r solar locally. Parameters @@ -106,8 +112,12 @@ def kickoff_local_job(ctx, cmd): cmd : str Command to be submitted in shell script. Example: 'python -m sup3r.cli forward_pass -c ' + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. """ - BaseCLI.kickoff_local_job(ModuleName.SOLAR, ctx, cmd) + BaseCLI.kickoff_local_job(ModuleName.SOLAR, ctx, cmd, pipeline_step) if __name__ == '__main__': diff --git a/sup3r/utilities/__init__.py b/sup3r/utilities/__init__.py index 35c8842ef..95a54ccd2 100644 --- a/sup3r/utilities/__init__.py +++ b/sup3r/utilities/__init__.py @@ -47,6 +47,12 @@ class ModuleName(str, Enum): VISUAL_QA = 'visual-qa' REGRID = 'regrid' + def __str__(self): + return self.value + + def __format__(self, format_spec): + return str.__format__(self.value, format_spec) + @classmethod def all_names(cls): """All module names. diff --git a/sup3r/utilities/cli.py b/sup3r/utilities/cli.py index 74582349d..16abd7326 100644 --- a/sup3r/utilities/cli.py +++ b/sup3r/utilities/cli.py @@ -44,7 +44,8 @@ class BaseCLI: """Base CLI class used to create CLI for modules in ModuleName""" @classmethod - def from_config(cls, module_name, module_class, ctx, config_file, verbose): + def from_config(cls, module_name, module_class, ctx, config_file, verbose, + pipeline_step=None): """Run sup3r module from a config file. @@ -61,10 +62,15 @@ def from_config(cls, module_name, module_class, ctx, config_file, verbose): Path to config file provided all needed inputs to module_class verbose : bool Whether to run in verbose mode. + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. """ config = cls.from_config_preflight( module_name, ctx, config_file, verbose ) + config['pipeline_step'] = pipeline_step exec_kwargs = config.get('execution_control', {}) hardware_option = exec_kwargs.pop('option', 'local') @@ -75,9 +81,10 @@ def from_config(cls, module_name, module_class, ctx, config_file, verbose): logger.debug(f'Running command:\n\t{cmd_log}') if hardware_option.lower() in AVAILABLE_HARDWARE_OPTIONS: - cls.kickoff_slurm_job(module_name, ctx, cmd, **exec_kwargs) + cls.kickoff_slurm_job(module_name, ctx, pipeline_step, cmd, + **exec_kwargs) else: - cls.kickoff_local_job(module_name, ctx, cmd) + cls.kickoff_local_job(module_name, ctx, cmd, pipeline_step) @classmethod def from_config_preflight(cls, module_name, ctx, config_file, verbose): @@ -170,6 +177,7 @@ def kickoff_slurm_job( walltime=4, feature=None, stdout_path='./stdout/', + pipeline_step=None, ): """Run sup3r module on HPC via SLURM job submission. @@ -193,8 +201,14 @@ def kickoff_slurm_job( or "--depend=[state:job_id]". Default is None. stdout_path : str Path to print .stdout and .stderr files. + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. """ cls.check_module_name(module_name) + if pipeline_step is None: + pipeline_step = module_name name = ctx.obj['NAME'] out_dir = ctx.obj['OUT_DIR'] @@ -205,7 +219,7 @@ def kickoff_slurm_job( status = Status.retrieve_job_status( out_dir, - pipeline_step=module_name, + pipeline_step=pipeline_step, job_name=name, subprocess_manager=slurm_manager, ) @@ -224,8 +238,11 @@ def kickoff_slurm_job( 'resubmitting' ) else: + job_info = f"{module_name}" + if pipeline_step != module_name: + job_info = f"{job_info} (pipeline step {pipeline_step!r})" logger.info( - f'Running sup3r {module_name} on SLURM with node ' + f'Running sup3r {job_info} on SLURM with node ' f'name "{name}".' ) out = slurm_manager.sbatch( @@ -239,14 +256,14 @@ def kickoff_slurm_job( )[0] if out: msg = ( - f'Kicked off sup3r {module_name} job "{name}" ' + f'Kicked off sup3r {job_info} job "{name}" ' f'(SLURM jobid #{out}).' ) # add job to sup3r status file. Status.mark_job_as_submitted( out_dir, - pipeline_step=module_name, + pipeline_step=pipeline_step, job_name=name, replace=True, job_attrs={'job_id': out, 'hardware': 'kestrel'}, @@ -256,7 +273,7 @@ def kickoff_slurm_job( logger.info(msg) @classmethod - def kickoff_local_job(cls, module_name, ctx, cmd): + def kickoff_local_job(cls, module_name, ctx, cmd, pipeline_step=None): """Run sup3r module locally. Parameters @@ -268,15 +285,21 @@ def kickoff_local_job(cls, module_name, ctx, cmd): cmd : str Command to be submitted in shell script. Example: 'python -m sup3r.cli -c ' + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. """ cls.check_module_name(module_name) + if pipeline_step is None: + pipeline_step = module_name name = ctx.obj['NAME'] out_dir = ctx.obj['OUT_DIR'] subprocess_manager = SubprocessManager status = Status.retrieve_job_status( - out_dir, pipeline_step=module_name, job_name=name + out_dir, pipeline_step=pipeline_step, job_name=name ) job_failed = 'fail' in str(status).lower() job_submitted = status != 'not submitted' @@ -293,21 +316,27 @@ def kickoff_local_job(cls, module_name, ctx, cmd): 'resubmitting' ) else: + job_info = f"{module_name}" + if pipeline_step != module_name: + job_info = f"{job_info} (pipeline step {pipeline_step!r})" logger.info( - f'Running sup3r {module_name} locally with job ' + f'Running sup3r {job_info} locally with job ' f'name "{name}".' ) Status.mark_job_as_submitted( - out_dir, pipeline_step=module_name, job_name=name, replace=True + out_dir, + pipeline_step=pipeline_step, + job_name=name, + replace=True ) subprocess_manager.submit(cmd) - msg = f'Completed sup3r {module_name} job "{name}".' + msg = f'Completed sup3r {job_info} job "{name}".' click.echo(msg) logger.info(msg) @classmethod - def add_status_cmd(cls, config, module_name, cmd): + def add_status_cmd(cls, config, pipeline_step, cmd): """Append status file command to command for executing given module Parameters @@ -315,8 +344,8 @@ def add_status_cmd(cls, config, module_name, cmd): config : dict sup3r config with all necessary args and kwargs to run given module. - module_name : str - Module name string from :class:`sup3r.utilities.ModuleName`. + pipeline_step : str + Name of the pipeline step being run. cmd : str String including command to execute given module. @@ -330,7 +359,7 @@ def add_status_cmd(cls, config, module_name, cmd): status_dir = config.get('status_dir', None) if job_name is not None and status_dir is not None: status_file_arg_str = f'"{status_dir}", ' - status_file_arg_str += f'pipeline_step="{module_name}", ' + status_file_arg_str += f'pipeline_step="{pipeline_step}", ' status_file_arg_str += f'job_name="{job_name}", ' status_file_arg_str += 'attrs=job_attrs' diff --git a/sup3r/utilities/regridder.py b/sup3r/utilities/regridder.py index f73cfc74d..b1b898893 100644 --- a/sup3r/utilities/regridder.py +++ b/sup3r/utilities/regridder.py @@ -666,7 +666,8 @@ def get_node_cmd(cls, config): f"regrid_output.run({node_index});\n" "t_elap = time.time() - t0;\n") - cmd = BaseCLI.add_status_cmd(config, ModuleName.REGRID, cmd) + pipeline_step = config.get('pipeline_step') or ModuleName.REGRID + cmd = BaseCLI.add_status_cmd(config, pipeline_step, cmd) cmd += ";\'\n" return cmd.replace('\\', '/') diff --git a/sup3r/utilities/regridder_cli.py b/sup3r/utilities/regridder_cli.py index 0e54b1d0c..5b1c623a0 100644 --- a/sup3r/utilities/regridder_cli.py +++ b/sup3r/utilities/regridder_cli.py @@ -35,7 +35,7 @@ def main(ctx, verbose): @click.option('-v', '--verbose', is_flag=True, help='Flag to turn on debug logging. Default is not verbose.') @click.pass_context -def from_config(ctx, config_file, verbose=False, **__): +def from_config(ctx, config_file, verbose=False, pipeline_step=None): """Run sup3r regrid from a config file.""" config = BaseCLI.from_config_preflight(ModuleName.REGRID, ctx, @@ -66,19 +66,21 @@ def from_config(ctx, config_file, verbose=False, **__): name = ('{}_{}'.format(basename, str(i_node).zfill(6))) ctx.obj['NAME'] = name node_config['job_name'] = name + node_config["pipeline_step"] = pipeline_step cmd = RegridOutput.get_node_cmd(node_config) cmd_log = '\n\t'.join(cmd.split('\n')) logger.debug(f'Running command:\n\t{cmd_log}') if hardware_option.lower() in AVAILABLE_HARDWARE_OPTIONS: - kickoff_slurm_job(ctx, cmd, **exec_kwargs) + kickoff_slurm_job(ctx, cmd, pipeline_step, **exec_kwargs) else: - kickoff_local_job(ctx, cmd) + kickoff_local_job(ctx, cmd, pipeline_step) -def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, - feature=None, stdout_path='./stdout/'): +def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r', + memory=None, walltime=4, feature=None, + stdout_path='./stdout/'): """Run sup3r on HPC via SLURM job submission. Parameters @@ -88,6 +90,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, cmd : str Command to be submitted in SLURM shell script. Example: 'python -m sup3r.cli regrid -c ' + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. alloc : str HPC project (allocation) handle. Example: 'sup3r'. memory : int @@ -101,10 +107,10 @@ def kickoff_slurm_job(ctx, cmd, alloc='sup3r', memory=None, walltime=4, Path to print .stdout and .stderr files. """ BaseCLI.kickoff_slurm_job(ModuleName.REGRID, ctx, cmd, alloc, memory, - walltime, feature, stdout_path) + walltime, feature, stdout_path, pipeline_step) -def kickoff_local_job(ctx, cmd): +def kickoff_local_job(ctx, cmd, pipeline_step=None): """Run sup3r forward pass locally. Parameters @@ -114,8 +120,12 @@ def kickoff_local_job(ctx, cmd): cmd : str Command to be submitted in shell script. Example: 'python -m sup3r.cli regrid -c ' + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. """ - BaseCLI.kickoff_local_job(ModuleName.REGRID, ctx, cmd) + BaseCLI.kickoff_local_job(ModuleName.REGRID, ctx, cmd, pipeline_step) if __name__ == '__main__': diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 6e162ef4a..52b195a15 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -8,6 +8,7 @@ import click import numpy as np from rex import ResourceX +from rex.utilities.loggers import LOGGERS from gaps import Pipeline from sup3r import CONFIG_DIR, TEST_DATA_DIR @@ -117,3 +118,158 @@ def test_fwp_pipeline(): assert all(s not in str(status) for s in ('fail', 'pending', 'submitted')) assert 'successful' in str(status) + + +def test_multiple_fwp_pipeline(): + """Test sup3r pipeline with multiple fwp steps""" + + fp_gen = os.path.join(CONFIG_DIR, 'spatiotemporal/gen_3x_4x_2f.json') + fp_disc = os.path.join(CONFIG_DIR, 'spatiotemporal/disc.json') + + Sup3rGan.seed() + model = Sup3rGan(fp_gen, fp_disc, learning_rate=1e-4) + _ = model.generate(np.ones((4, 8, 8, 4, len(FEATURES)))) + input_resolution = {'spatial': '12km', 'temporal': '60min'} + model.meta['input_resolution'] = input_resolution + assert model.input_resolution == input_resolution + assert model.output_resolution == {'spatial': '4km', 'temporal': '15min'} + _ = model.generate(np.ones((4, 8, 8, 4, len(FEATURES)))) + model.meta['training_features'] = FEATURES + model.meta['output_features'] = FEATURES[:2] + assert model.s_enhance == 3 + assert model.t_enhance == 4 + + test_context = click.Context(click.Command("pipeline"), obj={}) + with tempfile.TemporaryDirectory() as td, test_context as ctx: + ctx.obj["NAME"] = "test" + ctx.obj["VERBOSE"] = False + + input_files = make_fake_nc_files(td, INPUT_FILE, 20) + out_dir = os.path.join(td, 'st_gan') + model.save(out_dir) + + fp_chunk_shape = (4, 4, 3) + shape = (8, 8) + target = (19.3, -123.5) + n_tsteps = 10 + t_slice = slice(5, 5 + n_tsteps) + t_enhance = 4 + + input_handler_kwargs = dict(target=target, shape=shape, + overwrite_cache=True, + time_chunk_size=10, + worker_kwargs=dict(max_workers=1), + temporal_slice=[t_slice.start, + t_slice.stop]) + + sub_dir_1 = os.path.join(td, 'dir1') + os.mkdir(sub_dir_1) + cache_pattern = os.path.join(sub_dir_1, 'cache') + log_prefix = os.path.join(td, 'log1') + out_files = os.path.join(sub_dir_1, 'fp_out_{file_id}.h5') + config = {'worker_kwargs': {'max_workers': 1}, + 'file_paths': input_files, + 'model_kwargs': {'model_dir': out_dir}, + 'out_pattern': out_files, + 'cache_pattern': cache_pattern, + 'log_level': "DEBUG", + 'log_pattern': log_prefix, + 'fwp_chunk_shape': fp_chunk_shape, + 'input_handler_kwargs': input_handler_kwargs, + 'spatial_pad': 2, + 'temporal_pad': 2, + 'overwrite_cache': True, + 'execution_control': { + "nodes": 1, + "option": "local"}, + 'max_nodes': 1} + + fp_config_path_1 = os.path.join(td, 'fp_config1.json') + with open(fp_config_path_1, 'w') as fh: + json.dump(config, fh) + + sub_dir_2 = os.path.join(td, 'dir2') + os.mkdir(sub_dir_2) + cache_pattern = os.path.join(sub_dir_2, 'cache') + log_prefix = os.path.join(td, 'log2') + out_files = os.path.join(sub_dir_2, 'fp_out_{file_id}.h5') + config = {'worker_kwargs': {'max_workers': 1}, + 'file_paths': input_files, + 'model_kwargs': {'model_dir': out_dir}, + 'out_pattern': out_files, + 'cache_pattern': cache_pattern, + 'log_level': "DEBUG", + 'log_pattern': log_prefix, + 'fwp_chunk_shape': fp_chunk_shape, + 'input_handler_kwargs': input_handler_kwargs, + 'spatial_pad': 2, + 'temporal_pad': 2, + 'overwrite_cache': True, + 'execution_control': { + "nodes": 1, + "option": "local"}, + 'max_nodes': 1} + + fp_config_path_2 = os.path.join(td, 'fp_config2.json') + with open(fp_config_path_2, 'w') as fh: + json.dump(config, fh) + + out_files_1 = os.path.join(sub_dir_1, 'fp_out_*.h5') + features = ['windspeed_100m', 'winddirection_100m'] + fp_out_1 = os.path.join(sub_dir_1, 'out_combined.h5') + config = {'max_workers': 1, + 'file_paths': out_files_1, + 'out_file': fp_out_1, + 'features': features, + 'log_file': os.path.join(td, 'log.log'), + 'execution_control': {"option": "local"}} + + collect_config_path_1 = os.path.join(td, 'collect_config1.json') + with open(collect_config_path_1, 'w') as fh: + json.dump(config, fh) + + out_files_2 = os.path.join(sub_dir_2, 'fp_out_*.h5') + fp_out_2 = os.path.join(sub_dir_2, 'out_combined.h5') + config = {'max_workers': 1, + 'file_paths': out_files_2, + 'out_file': fp_out_2, + 'features': features, + 'log_file': os.path.join(td, 'log2.log'), + 'execution_control': {"option": "local"}} + + collect_config_path_2 = os.path.join(td, 'collect_config2.json') + with open(collect_config_path_2, 'w') as fh: + json.dump(config, fh) + + pipe_config = {"logging": {"log_file": None, "log_level": "INFO"}, + "pipeline": [{'fp1': fp_config_path_1, + 'command': 'forward-pass'}, + {'fp2': fp_config_path_2, + 'command': 'forward-pass'}, + {'data-collect': collect_config_path_1}, + {'collect2': collect_config_path_2, + 'command': 'data-collect'}]} + + tmp_fpipeline = os.path.join(td, 'config_pipeline.json') + with open(tmp_fpipeline, 'w') as fh: + json.dump(pipe_config, fh) + + Pipeline.run(tmp_fpipeline, monitor=True) + + for fp_out in [fp_out_1, fp_out_2]: + assert os.path.exists(fp_out) + with ResourceX(fp_out) as f: + assert len(f.time_index) == t_enhance * n_tsteps + + status_fps = glob.glob(f'{td}/.gaps/*status*.json') + assert len(status_fps) == 1 + status_file = status_fps[0] + with open(status_file, 'r') as fh: + status = json.load(fh) + expected_names = {'fp1', 'fp2', 'data-collect', 'collect2'} + assert all(s in status for s in expected_names) + assert all(s not in str(status) + for s in ('fail', 'pending', 'submitted')) + assert 'successful' in str(status) + + LOGGERS.clear()