From 0ec54f1c5aa227da93789722fee44f289aaa9e0b Mon Sep 17 00:00:00 2001 From: akremin Date: Wed, 2 Oct 2024 17:05:18 -0700 Subject: [PATCH 1/9] add desi_update_processing_table_statuses script --- bin/desi_update_processing_table_statuses | 59 +++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100755 bin/desi_update_processing_table_statuses diff --git a/bin/desi_update_processing_table_statuses b/bin/desi_update_processing_table_statuses new file mode 100755 index 000000000..c0a29f3d2 --- /dev/null +++ b/bin/desi_update_processing_table_statuses @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# coding: utf-8 + +import argparse +import os + +from desiutil.log import get_logger +from desispec.workflow.tableio import load_table, write_table +from desispec.workflow.proctable import get_processing_table_pathname +from desispec.workflow.queue import update_from_queue + + +def parse_args(): # options=None): + """ + Get command line arguments for desi_update_processing_table_statuses + """ + parser = argparse.ArgumentParser(description="Update the STATUS of all jobs " + + "in a DESI processing table by " + + "querying Slurm.") + + parser.add_argument("-n","--night", type=str, default=None, + required=False, help="The night you want processed.") + parser.add_argument("--proc-table-pathname", type=str, required=False, default=None, + help="Directory name where the output processing table should be saved.") + parser.add_argument("--tab-filetype", type=str, required=False, default='csv', + help="File format and extension for the exp and proc tables.") + parser.add_argument("--dry-run", action="store_true", + help="Perform a dry run where the processing table is not written back out to disk.") + parser.add_argument("--check-complete-jobs", action="store_true", + help="Default is False. Query NERSC about jobs with STATUS 'COMPLETED'.") + args = parser.parse_args() + + return args + + +if __name__ == '__main__': + args = parse_args() + log = get_logger() + ptable_pathname = args.proc_table_pathname + if ptable_pathname is None: + if args.night is None: + ValueError("Either night or --proc-table-path must be specified") + ## Determine where the processing table will be written + ptable_pathname = get_processing_table_pathname(prodmod=args.night, + extension=args.tab_filetype) + + if not os.path.exists(ptable_pathname): + ValueError(f"Processing table: {ptable_pathname} doesn't exist.") + + ## Load in the files defined above + ptable = load_table(tablename=ptable_pathname, tabletype='proctable') + log.info(f"Identified ptable with {len(ptable)} entries.") + ptable = update_from_queue(ptable, dry_run=args.dry_run, + check_complete_jobs=args.check_complete_jobs) + + if not args.dry_run: + write_table(ptable, tablename=ptable_pathname) + + log.info(f"Done updating STATUS column for processing table: {ptable_pathname}") From c18c4709ba839e09e0d94929879274029e66daf3 Mon Sep 17 00:00:00 2001 From: akremin Date: Wed, 2 Oct 2024 17:06:41 -0700 Subject: [PATCH 2/9] add desi_update_processing_table_statuses script --- bin/desi_update_processing_table_statuses | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bin/desi_update_processing_table_statuses b/bin/desi_update_processing_table_statuses index c0a29f3d2..0dff6671a 100755 --- a/bin/desi_update_processing_table_statuses +++ b/bin/desi_update_processing_table_statuses @@ -27,7 +27,9 @@ def parse_args(): # options=None): parser.add_argument("--dry-run", action="store_true", help="Perform a dry run where the processing table is not written back out to disk.") parser.add_argument("--check-complete-jobs", action="store_true", - help="Default is False. Query NERSC about jobs with STATUS 'COMPLETED'.") + help="Query NERSC about jobs with STATUS 'COMPLETED'" + + "in addition to all other jobs. Default is False, " + + "which skips COMPLETED jobs.") args = parser.parse_args() return args From 614972862ca4be370eaf1e286c0208e568ff2adf Mon Sep 17 00:00:00 2001 From: akremin Date: Thu, 3 Oct 2024 12:00:43 -0700 Subject: [PATCH 3/9] add option to update_statuses to print updated table --- bin/desi_update_processing_table_statuses | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/bin/desi_update_processing_table_statuses b/bin/desi_update_processing_table_statuses index 0dff6671a..3b2d94f21 100755 --- a/bin/desi_update_processing_table_statuses +++ b/bin/desi_update_processing_table_statuses @@ -3,6 +3,7 @@ import argparse import os +import numpy as np from desiutil.log import get_logger from desispec.workflow.tableio import load_table, write_table @@ -30,9 +31,10 @@ def parse_args(): # options=None): help="Query NERSC about jobs with STATUS 'COMPLETED'" + "in addition to all other jobs. Default is False, " + "which skips COMPLETED jobs.") - args = parser.parse_args() + parser.add_argument("--show-updated-table", action="store_true", + help="Print a subset of the columns from the ptable with updated statuses.") - return args + return parser.parse_args() if __name__ == '__main__': @@ -58,4 +60,10 @@ if __name__ == '__main__': if not args.dry_run: write_table(ptable, tablename=ptable_pathname) + if args.show_updated_table: + log.info("Updated processing table:") + cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID', + 'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS'] + log.info(np.array(cols)) + log.info(f"Done updating STATUS column for processing table: {ptable_pathname}") From 02e8fd3fd3353e331e9d63f4d01b5a46e74149a2 Mon Sep 17 00:00:00 2001 From: kremin Date: Thu, 3 Oct 2024 15:56:17 -0700 Subject: [PATCH 4/9] add option to update_statuses to print updated table --- bin/desi_update_processing_table_statuses | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bin/desi_update_processing_table_statuses b/bin/desi_update_processing_table_statuses index 3b2d94f21..2bbe90a39 100755 --- a/bin/desi_update_processing_table_statuses +++ b/bin/desi_update_processing_table_statuses @@ -65,5 +65,8 @@ if __name__ == '__main__': cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID', 'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS'] log.info(np.array(cols)) - + for row in ptable: + log.info(np.array(row[cols])) + log.info("\n") + log.info(f"Done updating STATUS column for processing table: {ptable_pathname}") From cac44c1796b5700d1d885d6c8115098f7e474602 Mon Sep 17 00:00:00 2001 From: akremin Date: Fri, 4 Oct 2024 18:18:01 -0700 Subject: [PATCH 5/9] improve dry_run_level handling in queue.py --- bin/desi_resubmit_queue_failures | 18 ++++++----- bin/desi_update_processing_table_statuses | 17 +++++++--- py/desispec/scripts/proc_night.py | 29 ++++++++--------- py/desispec/test/test_workflow_queue.py | 12 +++---- py/desispec/workflow/processing.py | 38 +++++++++++++---------- py/desispec/workflow/queue.py | 6 ++-- 6 files changed, 68 insertions(+), 52 deletions(-) diff --git a/bin/desi_resubmit_queue_failures b/bin/desi_resubmit_queue_failures index 30c8a0464..0ce188221 100755 --- a/bin/desi_resubmit_queue_failures +++ b/bin/desi_resubmit_queue_failures @@ -31,9 +31,13 @@ def parse_args(): # options=None): help="File format and extension for the exp and proc tables.") parser.add_argument("-r", "--reservation", type=str, required=False, default=None, help="The reservation to submit jobs to. If None, it is not submitted to a reservation.") - parser.add_argument("--dry-run", action="store_true", - help="Perform a dry run where no jobs are actually created or submitted. Overwritten if "+ - "dry-run-level is defined as nonzero.") + parser.add_argument("--dry-run-level", type=int, default=0, + help="What level of dry_run to perform, if any. Default is 0. " + + "0 which runs the code normally. " + + "1 writes all files but doesn't submit any jobs to Slurm. " + + "2 writes tables but doesn't write scripts or submit anything. " + + "3 Doesn't write or submit anything but queries Slurm normally for job status. " + + "4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. ") parser.add_argument("--resub-states", type=str, default=None, required=False, help="The SLURM queue states that should be resubmitted. " + "E.g. UNSUBMITTED, BOOT_FAIL, DEADLINE, NODE_FAIL, " + @@ -74,9 +78,8 @@ if __name__ == '__main__': if not os.path.exists(ptable_pathname): ValueError(f"Processing table: {ptable_pathname} doesn't exist.") - if args.dry_run > 0 and args.dry_run < 3: - log.warning(f"{args.dry_run=} will be run with limited simulation " - f"because we don't want to write out incorrect queue information.") + if args.dry_run_level > 0: + log.info(f"{args.dry_run=}, so will be simulating some features.") ## Combine the table names and types for easier passing to io functions table_type = 'proctable' @@ -87,7 +90,8 @@ if __name__ == '__main__': ptable, nsubmits = update_and_recursively_submit(ptable, submits=0, resubmission_states=args.resub_states, no_resub_failed=args.no_resub_failed, - ptab_name=ptable_pathname, dry_run=args.dry_run, + ptab_name=ptable_pathname, + dry_run_level=args.dry_run_level, reservation=args.reservation) if not args.dry_run: diff --git a/bin/desi_update_processing_table_statuses b/bin/desi_update_processing_table_statuses index 3b2d94f21..7e564c6c0 100755 --- a/bin/desi_update_processing_table_statuses +++ b/bin/desi_update_processing_table_statuses @@ -25,8 +25,13 @@ def parse_args(): # options=None): help="Directory name where the output processing table should be saved.") parser.add_argument("--tab-filetype", type=str, required=False, default='csv', help="File format and extension for the exp and proc tables.") - parser.add_argument("--dry-run", action="store_true", - help="Perform a dry run where the processing table is not written back out to disk.") + parser.add_argument("--dry-run-level", type=int, default=0, + help="What level of dry_run to perform, if any. Default is 0. " + + "0 which runs the code normally. " + + "1 writes all files but doesn't submit any jobs to Slurm. " + + "2 writes tables but doesn't write scripts or submit anything. " + + "3 Doesn't write or submit anything but queries Slurm normally for job status. " + + "4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. ") parser.add_argument("--check-complete-jobs", action="store_true", help="Query NERSC about jobs with STATUS 'COMPLETED'" + "in addition to all other jobs. Default is False, " @@ -51,13 +56,17 @@ if __name__ == '__main__': if not os.path.exists(ptable_pathname): ValueError(f"Processing table: {ptable_pathname} doesn't exist.") + if args.dry_run_level > 0: + log.info(f"{args.dry_run_level=}, so will be simulating some features." + + f" See parser for what each level limits.") + ## Load in the files defined above ptable = load_table(tablename=ptable_pathname, tabletype='proctable') log.info(f"Identified ptable with {len(ptable)} entries.") - ptable = update_from_queue(ptable, dry_run=args.dry_run, + ptable = update_from_queue(ptable, dry_run_level=args.dry_run_level, check_complete_jobs=args.check_complete_jobs) - if not args.dry_run: + if args.dry_run_level < 3: write_table(ptable, tablename=ptable_pathname) if args.show_updated_table: diff --git a/py/desispec/scripts/proc_night.py b/py/desispec/scripts/proc_night.py index 09aefcc38..0d79724e9 100644 --- a/py/desispec/scripts/proc_night.py +++ b/py/desispec/scripts/proc_night.py @@ -85,16 +85,15 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, override_pathname (str): Full path to the override file. update_exptable (bool): If true then the exposure table is updated. The default is False. - dry_run_level (int, optional): If nonzero, this is a simulated run. - If dry_run_level=1 the scripts will be written but not submitted. - If dry_run_level=2, the scripts will not be written nor submitted - but the processing_table is still created. - If dry_run_level=3, no output files are written. - Logging will remain the same for testing as though scripts are - being submitted. Default is 0 (false). + dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. dry_run (bool, optional): When to run without submitting scripts or not. If dry_run_level is defined, then it over-rides this flag. - dry_run_level not set and dry_run=True, dry_run_level is set to 2 + dry_run_level not set and dry_run=True, dry_run_level is set to 3 (no scripts generated or run). Default for dry_run is False. no_redshifts (bool, optional): Whether to submit redshifts or not. If True, redshifts are not submitted. @@ -191,7 +190,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, ## Reconcile the dry_run and dry_run_level if dry_run and dry_run_level == 0: - dry_run_level = 2 + dry_run_level = 3 elif dry_run_level > 0: dry_run = True @@ -358,7 +357,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, ## Update processing table tableng = len(ptable) if tableng > 0: - ptable = update_from_queue(ptable, dry_run=dry_run_level) + ptable = update_from_queue(ptable, dry_run_level=dry_run_level) if dry_run_level < 3: write_table(ptable, tablename=proc_table_pathname, tabletype='proctable') if any_jobs_failed(ptable['STATUS']): @@ -373,7 +372,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, ptable, nsubmits = update_and_recursively_submit(ptable, no_resub_failed=no_resub_failed, ptab_name=proc_table_pathname, - dry_run=dry_run, + dry_run_level=dry_run_level, reservation=reservation) elif not ignore_proc_table_failures: err = "Some jobs have an incomplete job status. This script " \ @@ -588,11 +587,9 @@ def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs, ################### Wrap things up ################### unproc_table = None if len(ptable) > 0: - ## All jobs now submitted, update information from job queue and save - ## But only if actually submitting or fully simulating, don't simulate - ## outputs that will be written to disk (levels 1 and 2) - if dry_run_level < 1 or dry_run_level > 2: - ptable = update_from_queue(ptable, dry_run=dry_run_level) + ## All jobs now submitted, update information from job queue + ## If dry_run_level > 3, then Slurm isn't queried and these results are made up + ptable = update_from_queue(ptable, dry_run_level=dry_run_level) if dry_run_level < 3: write_table(ptable, tablename=proc_table_pathname, tabletype='proctable') ## Now that processing is complete, lets identify what we didn't process diff --git a/py/desispec/test/test_workflow_queue.py b/py/desispec/test/test_workflow_queue.py index 49924533d..ba6e79640 100644 --- a/py/desispec/test/test_workflow_queue.py +++ b/py/desispec/test/test_workflow_queue.py @@ -20,17 +20,17 @@ def setUp(self): def test_queue_info_from_qids(self): """Test queue_info_from_qids""" qids = [11,10,2,5] - qinfo = queue.queue_info_from_qids(qids, dry_run=3) + qinfo = queue.queue_info_from_qids(qids, dry_run_level=4) self.assertEqual(list(qinfo['JOBID']), qids) def test_queue_state_cache(self): """Test queue state cache""" # Query qids to get state into cache qids = [11,10,2,5] - qinfo = queue.queue_info_from_qids(qids, dry_run=3) + qinfo = queue.queue_info_from_qids(qids, dry_run_level=4) # check cache matches state - qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run=3) + qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4) self.assertEqual(list(qinfo['STATE']), list(qstates.values())) # should be ['COMPLETED', 'COMPLETED', 'COMPLETED', 'COMPLETED'] @@ -39,13 +39,13 @@ def test_queue_state_cache(self): qinfo['STATE'][0] = 'PENDING' queue.update_queue_state_cache_from_table(qinfo) - qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run=3) + qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4) self.assertEqual(list(qinfo['STATE']), list(qstates.values())) # should be ['PENDING', 'FAILED', 'FAILED', 'FAILED'] # update state of just one qid queue.update_queue_state_cache(10, 'COMPLETED') - qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run=3) + qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4) # should be ['PENDING', 'COMPLETED', 'FAILED', 'FAILED'] self.assertEqual(qstates[11], 'PENDING') self.assertEqual(qstates[10], 'COMPLETED') @@ -55,7 +55,7 @@ def test_queue_state_cache(self): # Asking for qids not in the cache should requery sacct for all of them. # Since this is dry run, that will also reset all back to COMPLETED. qids.append(100) - qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run=3) + qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4) # should be ['COMPLETED', 'COMPLETED', 'TIMEOUT', 'COMPLETED', 'COMPLETED'] for qid, state in qstates.items(): self.assertEqual(state, 'COMPLETED', f'{qid=} {state=} not COMPLETED') diff --git a/py/desispec/workflow/processing.py b/py/desispec/workflow/processing.py index 1df250438..d72b4dec3 100644 --- a/py/desispec/workflow/processing.py +++ b/py/desispec/workflow/processing.py @@ -636,7 +636,7 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F failed_dependency = False if len(dep_qids) > 0 and not dry_run: non_final_states = get_non_final_states() - state_dict = get_queue_states_from_qids(dep_qids, dry_run=dry_run, use_cache=True) + state_dict = get_queue_states_from_qids(dep_qids, dry_run_level=dry_run, use_cache=True) still_depids = [] for depid in dep_qids: if depid in state_dict.keys(): @@ -1156,7 +1156,7 @@ def all_calibs_submitted(accounted_for, do_cte_flats): def update_and_recursively_submit(proc_table, submits=0, resubmission_states=None, no_resub_failed=False, ptab_name=None, - dry_run=0, reservation=None): + dry_run_level=0, reservation=None): """ Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states). Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively @@ -1172,9 +1172,12 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non no_resub_failed: bool. Set to True if you do NOT want to resubmit jobs with Slurm status 'FAILED' by default. Default is False. ptab_name, str, the full pathname where the processing table should be saved. - dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If - dry_run=2, the scripts will not be writter or submitted. Logging will remain the same - for testing as though scripts are being submitted. Default is 0 (false). + dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation. Returns: @@ -1193,7 +1196,7 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non resubmission_states = get_resubmission_states(no_resub_failed=no_resub_failed) log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}") - proc_table = update_from_queue(proc_table, dry_run=dry_run) + proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level) log.info("Updated processing table queue information:") cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID', 'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS'] @@ -1207,12 +1210,12 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non proc_table, submits = recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name, resubmission_states, - reservation, dry_run) - proc_table = update_from_queue(proc_table, dry_run=dry_run) + reservation, dry_run_level) + proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level) return proc_table, submits def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None, - resubmission_states=None, reservation=None, dry_run=0): + resubmission_states=None, reservation=None, dry_run_level=0): """ Given a row of a processing table and the full processing table, this resubmits the given job. Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to @@ -1230,9 +1233,12 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name= possible Slurm scheduler state, where you wish for jobs with that outcome to be resubmitted reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation. - dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If - dry_run=2, the scripts will not be writter or submitted. Logging will remain the same - for testing as though scripts are being submitted. Default is 0 (false). + dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns: tuple: A tuple containing: @@ -1273,7 +1279,7 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name= + f"fatal error." log.critical(msg) raise ValueError(msg) - reftab = update_from_queue(reftab, dry_run=dry_run) + reftab = update_from_queue(reftab, dry_run_level=dry_run_level) entry = reftab[reftab['INTID'] == idep][0] if entry['STATUS'] not in good_states: msg = f"Internal ID: {idep} not in id_to_row_map. " \ @@ -1311,7 +1317,7 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name= proc_table, submits, id_to_row_map, reservation=reservation, - dry_run=dry_run) + dry_run_level=dry_run_level) ## Now that we've resubmitted the dependency if necessary, ## add the most recent QID to the list qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]]) @@ -1327,10 +1333,10 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name= log.error(f"number of qdeps should be 1 or more: Rown {rown}, ideps {ideps}") proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation, - strictly_successful=True, dry_run=dry_run) + strictly_successful=True, dry_run=dry_run_level) submits += 1 - if not dry_run: + if dry_run_level < 3: if ptab_name is None: write_table(proc_table, tabletype='processing', overwrite=True) else: diff --git a/py/desispec/workflow/queue.py b/py/desispec/workflow/queue.py index 54718eb1c..cf0dadfa4 100644 --- a/py/desispec/workflow/queue.py +++ b/py/desispec/workflow/queue.py @@ -432,9 +432,9 @@ def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False, check_complete_jobs, bool. Default is False. Set to true if you want to also check QID's that currently have a STATUS "COMPLETED". in the ptable. - The following are only used if qtable is not provided: - dry_run, int. Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default - table that doesn't query the Slurm scheduler. + dry_run, int. Used if qtable is not providedWhether this is a simulated run or real run. + If nonzero, it is a simulation and it returns a default + table that doesn't query the Slurm scheduler. Returns: ptab, Table. A opy of the same processing table as the input except that the "STATUS" column in ptable for all jobs is From 877bb3f529e7dc909518ef88dadd49e2373e9b94 Mon Sep 17 00:00:00 2001 From: akremin Date: Mon, 7 Oct 2024 13:43:02 -0700 Subject: [PATCH 6/9] improve dry_run_level handling in queue.py --- py/desispec/workflow/processing.py | 87 +++++++++----- py/desispec/workflow/queue.py | 178 ++++++++++++++++++----------- 2 files changed, 166 insertions(+), 99 deletions(-) diff --git a/py/desispec/workflow/processing.py b/py/desispec/workflow/processing.py index d72b4dec3..e85b2371a 100644 --- a/py/desispec/workflow/processing.py +++ b/py/desispec/workflow/processing.py @@ -266,9 +266,12 @@ def create_and_submit(prow, queue='realtime', reservation=None, dry_run=0, desispect.workflow.proctable.get_processing_table_column_defs() queue (str, optional): The name of the NERSC Slurm queue to submit to. Default is the realtime queue. reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation. - dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If - dry_run=2, the scripts will not be writter or submitted. Logging will remain the same - for testing as though scripts are being submitted. Default is 0 (false). + dry_run (int, optional): If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. joint (bool, optional): Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be run with desi_proc_joint_fit. Default is False. strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is @@ -422,9 +425,12 @@ def create_batch_script(prow, queue='realtime', dry_run=0, joint=False, prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in desispect.workflow.proctable.get_processing_table_column_defs() queue, str. The name of the NERSC Slurm queue to submit to. Default is the realtime queue. - dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written but not submitted. - If dry_run=2, the scripts will not be written nor submitted. Logging will remain the same - for testing as though scripts are being submitted. Default is 0 (false). + dry_run (int, optional): If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be run with desi_proc_joint_fit when not using tilenight. Default is False. system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu @@ -608,9 +614,12 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F Args: prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in desispect.workflow.proctable.get_processing_table_column_defs() - dry_run, int. If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If - dry_run=2, the scripts will not be writter or submitted. Logging will remain the same - for testing as though scripts are being submitted. Default is 0 (false). + dry_run (int, optional): If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation. strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is less desirable because e.g. the sciences can run with SVN default calibrations rather @@ -1370,9 +1379,12 @@ def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, z_subm or 'flat' or 'nightlyflat'. z_submit_types (list of str, optional): The "group" types of redshifts that should be submitted with each exposure. If not specified or None, then no redshifts are submitted. - dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If - dry_run=2, the scripts will not be writter or submitted. Logging will remain the same - for testing as though scripts are being submitted. Default is 0 (false). + dry_run (int, optional): If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is less desirable because e.g. the sciences can run with SVN default calibrations rather than failing completely from failed calibrations. Default is False. @@ -1511,9 +1523,12 @@ def joint_cal_fit(descriptor, ptable, prows, internal_id, queue, reservation, internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used). queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used. reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation. - dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If - dry_run=2, the scripts will not be writter or submitted. Logging will remain the same - for testing as though scripts are being submitted. Default is 0 (false). + dry_run (int, optional): If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is less desirable because e.g. the sciences can run with SVN default calibrations rather than failing completely from failed calibrations. Default is False. @@ -1583,9 +1598,12 @@ def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation, internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used). queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used. reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation. - dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If - dry_run=2, the scripts will not be writter or submitted. Logging will remain the same - for testing as though scripts are being submitted. Default is 0 (false). + dry_run (int, optional): If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is less desirable because e.g. the sciences can run with SVN default calibrations rather than failing completely from failed calibrations. Default is False. @@ -1704,9 +1722,12 @@ def submit_tilenight(ptable, prows, calibjobs, internal_id, queue, reservation, internal_id (int): the next internal id to be used for assignment (already incremented up from the last used id number used). queue (str): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used. reservation (str): The reservation to submit jobs to. If None, it is not submitted to a reservation. - dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If - dry_run=2, the scripts will not be writter or submitted. Logging will remain the same - for testing as though scripts are being submitted. Default is 0 (false). + dry_run (int, optional): If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is less desirable because e.g. the sciences can run with SVN default calibrations rather than failing completely from failed calibrations. Default is False. @@ -1751,7 +1772,7 @@ def science_joint_fit(ptable, sciences, internal_id, queue='realtime', reservati """ Wrapper function for desiproc.workflow.processing.joint_fit specific to the stdstarfit joint fit and redshift fitting. - All variables are the same except:: + All variables are the same except: Arg 'sciences' is mapped to the prows argument of joint_fit. The joint_fit argument descriptor is pre-defined as 'science'. @@ -1769,7 +1790,7 @@ def flat_joint_fit(ptable, flats, internal_id, queue='realtime', """ Wrapper function for desiproc.workflow.processing.joint_fit specific to the nightlyflat joint fit. - All variables are the same except:: + All variables are the same except: Arg 'flats' is mapped to the prows argument of joint_fit. The joint_fit argument descriptor is pre-defined as 'nightlyflat'. @@ -1787,7 +1808,7 @@ def arc_joint_fit(ptable, arcs, internal_id, queue='realtime', """ Wrapper function for desiproc.workflow.processing.joint_fit specific to the psfnight joint fit. - All variables are the same except:: + All variables are the same except: Arg 'arcs' is mapped to the prows argument of joint_fit. The joint_fit argument descriptor is pre-defined as 'psfnight'. @@ -1967,9 +1988,12 @@ def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs, is the smallest unassigned value. z_submit_types (list of str): The "group" types of redshifts that should be submitted with each exposure. If not specified or None, then no redshifts are submitted. - dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If - dry_run=2, the scripts will not be writter or submitted. Logging will remain the same - for testing as though scripts are being submitted. Default is 0 (false). + dry_run (int, optional): If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used. reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation. strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is @@ -2079,9 +2103,12 @@ def submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id, dry (if currently processing that tile). May be empty if none identified yet. internal_id (int): an internal identifier unique to each job. Increments with each new job. This is the smallest unassigned value. - dry_run (int, optional): If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If - dry_run=2, the scripts will not be writter or submitted. Logging will remain the same - for testing as though scripts are being submitted. Default is 0 (false). + dry_run (int, optional): If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used. reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation. strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is diff --git a/py/desispec/workflow/queue.py b/py/desispec/workflow/queue.py index cf0dadfa4..ddfee9d86 100644 --- a/py/desispec/workflow/queue.py +++ b/py/desispec/workflow/queue.py @@ -154,7 +154,7 @@ def get_non_final_states(): def queue_info_from_time_window(start_time=None, end_time=None, user=None, \ columns='jobid,jobname,partition,submit,eligible,'+ 'start,end,elapsed,state,exitcode', - dry_run=0): + dry_run_level=0): """ Queries the NERSC Slurm database using sacct with appropriate flags to get information within a specified time window of all jobs submitted or executed during that time. @@ -175,18 +175,22 @@ def queue_info_from_time_window(start_time=None, end_time=None, user=None, \ it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode. Other options include: suspended,derivedexitcode,reason,priority,jobname. - dry_run : int - Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default - table that doesn't query the Slurm scheduler. + dry_run_level : int + If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns ------- - Table + astropy.table.Table Table with the columns defined by the input variable 'columns' and information relating to all jobs submitted by the specified user in the specified time frame. """ # global queue_info_table - if dry_run: + if dry_run_level: string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,State,ExitCode\n' string += '49482394,arc-20211102-00107062-a0123456789,realtime,2021-11-02'\ +'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T'\ @@ -233,7 +237,7 @@ def queue_info_from_time_window(start_time=None, end_time=None, user=None, \ return queue_info_table def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+ - 'eligible,start,end,elapsed,state,exitcode', dry_run=0): + 'eligible,start,end,elapsed,state,exitcode', dry_run_level=0): """ Queries the NERSC Slurm database using sacct with appropriate flags to get information about specific jobs based on their jobids. @@ -247,13 +251,17 @@ def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+ it should have MUST have columns "JOBID" and "STATE". Other columns available that aren't included in the default list are: jobid,jobname,partition,submit,eligible,start,end,elapsed,state,exitcode. Other options include: suspended,derivedexitcode,reason,priority,jobname. - dry_run : int - Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default - table that doesn't query the Slurm scheduler. + dry_run_level : int + If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns ------- - Table + astropy.table.Table Table with the columns defined by the input variable 'columns' and information relating to all jobs submitted by the specified user in the specified time frame. """ @@ -265,7 +273,8 @@ def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+ if len(qids) > nmax: results = list() for i in range(0, len(qids), nmax): - results.append(queue_info_from_qids(qids[i:i+nmax], columns=columns, dry_run=dry_run)) + results.append(queue_info_from_qids(qids[i:i+nmax], columns=columns, + dry_run_level=dry_run_level)) results = vstack(results) return results elif len(qids) == 0: @@ -277,7 +286,7 @@ def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+ cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,', f'--format={columns}', '-j', qid_str] - if dry_run: + if dry_run_level: log.info("Dry run, would have otherwise queried Slurm with the" +f" following: {' '.join(cmd_as_list)}") ### Set a random 5% of jobs as TIMEOUT, set seed for reproducibility @@ -325,7 +334,7 @@ def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+ return queue_info_table -def get_queue_states_from_qids(qids, dry_run=0, use_cache=False): +def get_queue_states_from_qids(qids, dry_run_level=0, use_cache=False): """ Queries the NERSC Slurm database using sacct with appropriate flags to get information on the job STATE. If use_cache is set and all qids have cached @@ -335,10 +344,15 @@ def get_queue_states_from_qids(qids, dry_run=0, use_cache=False): ---------- jobids : list or array of ints Slurm QID's at NERSC that you want to return information about. - dry_run : int - Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default - table that doesn't query the Slurm scheduler. - use_cache, bool. If True the code first looks for a cached status + dry_run_level : int + If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + use_cache : bool + If True the code first looks for a cached status for the qid. If unavailable, then it queries Slurm. Default is False. Returns @@ -360,11 +374,11 @@ def get_queue_states_from_qids(qids, dry_run=0, use_cache=False): for qid in qids: outdict[qid] = _cached_slurm_states[qid] else: - if dry_run > 2 or dry_run < 1: - outtable = queue_info_from_qids(qids, columns='jobid,state', dry_run=dry_run) - for row in outtable: - if int(row['JOBID']) != def_qid: - outdict[int(row['JOBID'])] = row['STATE'] + outtable = queue_info_from_qids(qids, columns='jobid,state', + dry_run_level=dry_run_level) + for row in outtable: + if int(row['JOBID']) != def_qid: + outdict[int(row['JOBID'])] = row['STATE'] return outdict def update_queue_state_cache_from_table(queue_info_table): @@ -415,31 +429,42 @@ def clear_queue_state_cache(): _cached_slurm_states.clear() -def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False, +def update_from_queue(ptable, qtable=None, dry_run_level=0, ignore_scriptnames=False, check_complete_jobs=False): """ Given an input prcessing table (ptable) and query table from the Slurm queue (qtable) it cross matches the Slurm job ID's and updates the 'state' in the table using the current state in the Slurm scheduler system. - Args: - ptable, Table. Processing table that contains the jobs you want updated with the most recent queue table. Must - have at least columnns 'LATEST_QID' and 'STATUS'. - qtable, Table. Table with the columns defined by the input variable 'columns' and information relating - to all jobs submitted by the specified user in the specified time frame. - ignore_scriptnames, bool. Default is False. Set to true if you do not - want to check whether the scriptname matches the jobname - return by the slurm scheduler. - check_complete_jobs, bool. Default is False. Set to true if you want to - also check QID's that currently have a STATUS "COMPLETED". - in the ptable. - dry_run, int. Used if qtable is not providedWhether this is a simulated run or real run. - If nonzero, it is a simulation and it returns a default - table that doesn't query the Slurm scheduler. + Parameters + ---------- + ptable : astropy.table.Table + Processing table that contains the jobs you want updated with the most recent queue table. Must + have at least columnns 'LATEST_QID' and 'STATUS'. + qtable : astropy.table.Table + Table with the columns defined by the input variable 'columns' and information relating + to all jobs submitted by the specified user in the specified time frame. + ignore_scriptnames : bool + Default is False. Set to true if you do not + want to check whether the scriptname matches the jobname + return by the slurm scheduler. + check_complete_jobs: bool + Default is False. Set to true if you want to + also check QID's that currently have a STATUS "COMPLETED". + in the ptable. + dry_run_level : int + If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. - Returns: - ptab, Table. A opy of the same processing table as the input except that the "STATUS" column in ptable for all jobs is - updated based on the 'STATE' in the qtable (as matched by "LATEST_QID" in the ptable - and "JOBID" in the qtable). + Returns + ------- + ptab : astropy.table.Table + A opy of the same processing table as the input except that the "STATUS" column in ptable for all jobs is + updated based on the 'STATE' in the qtable (as matched by "LATEST_QID" in the ptable + and "JOBID" in the qtable). """ log = get_logger() ptab = ptable.copy() @@ -458,7 +483,7 @@ def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False, if len(qids) == 0: log.info(f"No QIDs left to query. Returning the original table.") return ptab - qtable = queue_info_from_qids(qids, dry_run=dry_run) + qtable = queue_info_from_qids(qids, dry_run_level=dry_run_level) log.info(f"Slurm returned information on {len(qtable)} jobs out of " +f"{len(ptab)} jobs in the ptab. Updating those now.") @@ -483,8 +508,7 @@ def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False, state = str(row['STATE']).split(' ')[0] ## Since dry run 1 and 2 save proc tables, don't alter the ## states for these when simulating - if dry_run > 2 or dry_run < 1: - ptab['STATUS'][ind] = state + ptab['STATUS'][ind] = state return ptab @@ -494,15 +518,20 @@ def any_jobs_not_complete(statuses, termination_states=None): (as based on the list of acceptable final states, termination_states, given as an argument. These should be states that are viewed as final, as opposed to job states that require resubmission. - Args: - statuses, Table.Column or list or np.array. The statuses in the processing table "STATUS". Each element should - be a string. - termination_states, list or np.array. Each element should be a string signifying a state that is returned - by the Slurm scheduler that should be deemed terminal state. + Parameters + ---------- + statuses : Table.Column or list or np.array + The statuses in the processing table "STATUS". Each element should + be a string. + termination_states : list or np.array + Each element should be a string signifying a state that is returned + by the Slurm scheduler that should be deemed terminal state. - Returns: - bool. True if any of the statuses of the jobs given in statuses are NOT a member of the termination states. - Otherwise returns False. + Returns + ------- + bool + True if any of the statuses of the jobs given in statuses are NOT a member of the termination states. + Otherwise returns False. """ if termination_states is None: termination_states = get_termination_states() @@ -516,16 +545,21 @@ def any_jobs_failed(statuses, failed_states=None): should be states that are viewed as final, as opposed to job states that require resubmission. - Args: - statuses, Table.Column or list or np.array. The statuses in the - processing table "STATUS". Each element should be a string. - failed_states, list or np.array. Each element should be a string - signifying a state that is returned by the Slurm scheduler that - should be consider failing or problematic. + Parameters + ---------- + statuses : Table.Column or list or np.array + The statuses in the + processing table "STATUS". Each element should be a string. + failed_states : list or np.array + Each element should be a string + signifying a state that is returned by the Slurm scheduler that + should be consider failing or problematic. - Returns: - bool. True if any of the statuses of the jobs given in statuses are - a member of the failed_states. + Returns + ------- + bool + True if any of the statuses of the jobs given in statuses are + a member of the failed_states. """ if failed_states is None: failed_states = get_failed_states() @@ -544,13 +578,16 @@ def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0): True if you want to include scron entries in the returned table. Default is False. dry_run_level : int - Whether this is a simulated run or real run. If nonzero, it is a - simulation and it returns a default table that doesn't query the - Slurm scheduler. + If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns ------- - Table + astropy.table.Table Table with the columns JOBID, PARTITION, RESERVATION, NAME, USER, ST, TIME, NODES, NODELIST(REASON) for the specified user. """ @@ -652,9 +689,12 @@ def check_queue_count(user=None, include_scron=False, dry_run_level=0): True if you want to include scron entries in the returned table. Default is False. dry_run_level : int - Whether this is a simulated run or real run. If nonzero, it is a - simulation and it returns a default table that doesn't query the - Slurm scheduler. + If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns ------- From 1c58fab57d73b58faf4b9d5ad6024a121160f790 Mon Sep 17 00:00:00 2001 From: akremin Date: Mon, 7 Oct 2024 22:56:31 -0700 Subject: [PATCH 7/9] fix queue bug and only update from queue in proc_night if dry run set appropriately --- bin/desi_proc_night | 18 ++--- bin/desi_resubmit_queue_failures | 15 ++--- bin/desi_update_processing_table_statuses | 3 +- py/desispec/scripts/daily_processing.py | 8 +-- py/desispec/scripts/proc_night.py | 13 ++-- py/desispec/test/test_proc_night.py | 19 ++++-- py/desispec/test/test_workflow_queue.py | 12 ++-- py/desispec/workflow/processing.py | 36 +++++++--- py/desispec/workflow/queue.py | 81 +++++++++++++++-------- 9 files changed, 130 insertions(+), 75 deletions(-) diff --git a/bin/desi_proc_night b/bin/desi_proc_night index 145ebecf4..52bdc9bf3 100755 --- a/bin/desi_proc_night +++ b/bin/desi_proc_night @@ -60,13 +60,14 @@ def parse_args(): parser.add_argument("--dry-run", action="store_true", help="Perform a dry run where no jobs are actually created or submitted. Overwritten if "+ "dry-run-level is defined as nonzero.") - parser.add_argument("--dry-run-level", type=int, default=0, required=False, - help="""If nonzero, this is a simulated run. - If level=1 the scripts will be written but not submitted. - If level=2, scripts will not be written or submitted but processing_table is created. - if level=3, no output files are written at all. - Logging will remain the same for testing as though scripts are being submitted. - Default is 0 (false).""") + parser.add_argument("--dry-run-level", type=int, default=0, + help="What level of dry_run to perform, if any. Default is 0. " + + "0 which runs the code normally. " + + "1 writes all files but doesn't submit any jobs to Slurm. " + + "2 writes tables but doesn't write scripts or submit anything. " + + "3 Doesn't write or submit anything but queries Slurm normally for job status. " + + "4 Doesn't write, submit jobs, or query Slurm." + + "5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.") parser.add_argument("--no-redshifts", action="store_true", help="Whether to submit redshifts or not. If set, redshifts are not submitted.") parser.add_argument("--ignore-proc-table-failures", action="store_true", @@ -136,5 +137,4 @@ def parse_args(): return args if __name__ == '__main__': - args = parse_args() - proc_night(**args.__dict__) + args = parse \ No newline at end of file diff --git a/bin/desi_resubmit_queue_failures b/bin/desi_resubmit_queue_failures index 0ce188221..990fe7020 100755 --- a/bin/desi_resubmit_queue_failures +++ b/bin/desi_resubmit_queue_failures @@ -37,7 +37,8 @@ def parse_args(): # options=None): + "1 writes all files but doesn't submit any jobs to Slurm. " + "2 writes tables but doesn't write scripts or submit anything. " + "3 Doesn't write or submit anything but queries Slurm normally for job status. " - + "4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. ") + + "4 Doesn't write, submit jobs, or query Slurm." + + "5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.") parser.add_argument("--resub-states", type=str, default=None, required=False, help="The SLURM queue states that should be resubmitted. " + "E.g. UNSUBMITTED, BOOT_FAIL, DEADLINE, NODE_FAIL, " + @@ -79,13 +80,11 @@ if __name__ == '__main__': ValueError(f"Processing table: {ptable_pathname} doesn't exist.") if args.dry_run_level > 0: - log.info(f"{args.dry_run=}, so will be simulating some features.") - - ## Combine the table names and types for easier passing to io functions - table_type = 'proctable' + log.info(f"{args.dry_run_level=}, so will be simulating some features." + + f" See parser for what each level limits.") ## Load in the files defined above - ptable = load_table(tablename=ptable_pathname, tabletype=table_type) + ptable = load_table(tablename=ptable_pathname, tabletype='proctable') log.info(f"Identified ptable with {len(ptable)} entries.") ptable, nsubmits = update_and_recursively_submit(ptable, submits=0, resubmission_states=args.resub_states, @@ -94,8 +93,8 @@ if __name__ == '__main__': dry_run_level=args.dry_run_level, reservation=args.reservation) - if not args.dry_run: + if args.dry_run_level < 3: write_table(ptable, tablename=ptable_pathname) log.info("Completed all necessary queue resubmissions from processing " - + f"table: {ptable_pathname}") + + f"table: {ptable_pathname}") diff --git a/bin/desi_update_processing_table_statuses b/bin/desi_update_processing_table_statuses index 11e3942af..fe34b4dcb 100755 --- a/bin/desi_update_processing_table_statuses +++ b/bin/desi_update_processing_table_statuses @@ -31,7 +31,8 @@ def parse_args(): # options=None): + "1 writes all files but doesn't submit any jobs to Slurm. " + "2 writes tables but doesn't write scripts or submit anything. " + "3 Doesn't write or submit anything but queries Slurm normally for job status. " - + "4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. ") + + "4 Doesn't write, submit jobs, or query Slurm." + + "5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.") parser.add_argument("--check-complete-jobs", action="store_true", help="Query NERSC about jobs with STATUS 'COMPLETED'" + "in addition to all other jobs. Default is False, " diff --git a/py/desispec/scripts/daily_processing.py b/py/desispec/scripts/daily_processing.py index 532aad91e..178aba3da 100644 --- a/py/desispec/scripts/daily_processing.py +++ b/py/desispec/scripts/daily_processing.py @@ -444,9 +444,9 @@ def daily_processing_manager(specprod=None, exp_table_path=None, proc_table_path dry_run=(dry_run and ())) if len(ptable) > 0: - ptable = update_from_queue(ptable, dry_run=dry_run_level) + ptable = update_from_queue(ptable, dry_run_level=dry_run_level) # ptable, nsubmits = update_and_recursively_submit(ptable, - # ptab_name=proc_table_pathname, dry_run=dry_run_level) + # ptab_name=proc_table_pathname, dry_run_level=dry_run_level) ## Exposure table doesn't change in the interim, so no need to re-write it to disk if dry_run_level < 3: @@ -481,7 +481,7 @@ def daily_processing_manager(specprod=None, exp_table_path=None, proc_table_path resubmit_partial_complete=resubmit_partial_complete, z_submit_types=z_submit_types) ## All jobs now submitted, update information from job queue and save - ptable = update_from_queue(ptable, dry_run=dry_run_level) + ptable = update_from_queue(ptable, dry_run_level=dry_run_level) if dry_run_level < 3: write_table(ptable, tablename=proc_table_pathname) @@ -520,4 +520,4 @@ def daily_processing_manager(specprod=None, exp_table_path=None, proc_table_path print("Exiting") ## Flush the outputs sys.stdout.flush() - sys.stderr.flush() + sys.std \ No newline at end of file diff --git a/py/desispec/scripts/proc_night.py b/py/desispec/scripts/proc_night.py index 0d79724e9..48c092e3a 100644 --- a/py/desispec/scripts/proc_night.py +++ b/py/desispec/scripts/proc_night.py @@ -90,7 +90,8 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. dry_run (bool, optional): When to run without submitting scripts or not. If dry_run_level is defined, then it over-rides this flag. dry_run_level not set and dry_run=True, dry_run_level is set to 3 @@ -246,6 +247,8 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, resubmit_partial_complete = (not dont_resubmit_partial_jobs) require_cals = (not dont_require_cals) do_cte_flats = (not no_cte_flats) + ## False if not submitting or simulating + update_proctable = (dry_run_level == 0 or dry_run_level > 3) ## cte flats weren't available before 20211130 so hardcode that in if do_cte_flats and night < 20211130: @@ -357,7 +360,8 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, ## Update processing table tableng = len(ptable) if tableng > 0: - ptable = update_from_queue(ptable, dry_run_level=dry_run_level) + if update_proctable: + ptable = update_from_queue(ptable, dry_run_level=dry_run_level) if dry_run_level < 3: write_table(ptable, tablename=proc_table_pathname, tabletype='proctable') if any_jobs_failed(ptable['STATUS']): @@ -588,8 +592,9 @@ def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs, unproc_table = None if len(ptable) > 0: ## All jobs now submitted, update information from job queue - ## If dry_run_level > 3, then Slurm isn't queried and these results are made up - ptable = update_from_queue(ptable, dry_run_level=dry_run_level) + ## If dry_run_level > 3, then Slurm isn't queried + if update_proctable: + ptable = update_from_queue(ptable, dry_run_level=dry_run_level) if dry_run_level < 3: write_table(ptable, tablename=proc_table_pathname, tabletype='proctable') ## Now that processing is complete, lets identify what we didn't process diff --git a/py/desispec/test/test_proc_night.py b/py/desispec/test/test_proc_night.py index 94934bbde..389064241 100644 --- a/py/desispec/test/test_proc_night.py +++ b/py/desispec/test/test_proc_night.py @@ -138,6 +138,15 @@ def test_proc_night_dryrun3(self): self.assertEqual(len(prodfiles), 1) self.assertTrue(prodfiles[0].endswith('exposure_tables')) + def test_proc_night_dryrun3(self): + """Test that dry_run_level=4 doesn't produce any output""" + proctable, unproctable = proc_night(self.night, z_submit_types=['cumulative',], + dry_run_level=4, sub_wait_time=0.0) + + prodfiles = glob.glob(self.proddir+'/*') + self.assertEqual(len(prodfiles), 1) + self.assertTrue(prodfiles[0].endswith('exposure_tables')) + def test_proc_night_noz(self): """Test that z_submit_types=None doesn't submit any redshift jobs""" @@ -203,7 +212,7 @@ def test_proc_night_resubmit_queue_failures(self): desispec.workflow.proctable.reset_tilenight_ptab_cache() ## test that the code runs - updatedtable2, nsubmits = update_and_recursively_submit(proctable2, submits=0, dry_run=3) + updatedtable2, nsubmits = update_and_recursively_submit(proctable2, submits=0, dry_run_level=4) self.assertFalse(np.any(np.in1d(updatedtable2['STATUS'], [b'DEP_NOT_SUBD', b'TIMEOUT'])), msg='No TIMEOUTs in nominal resubmission') @@ -214,7 +223,7 @@ def test_proc_night_resubmit_queue_failures(self): proctable2['STATUS'][proctable2['INTID']==cumulative2['INTID']] = 'TIMEOUT' updatedtable2, nsubmits = update_and_recursively_submit(proctable2, submits=0, - dry_run=1) + dry_run_level=4) self.assertFalse(np.any(np.in1d(updatedtable2['STATUS'], [b'DEP_NOT_SUBD', b'TIMEOUT'])), msg='Cross night resubmission should leave no TIMEOUTs') @@ -237,11 +246,9 @@ def test_proc_night_resubmit_queue_failures(self): ## Run resubmission code updatedtable2, nsubmits = update_and_recursively_submit(proctable2, submits=0, - dry_run=1) - self.assertTrue(np.any(np.in1d(updatedtable2['STATUS'], [b'DEP_NOT_SUBD', b'TIMEOUT'])), - msg='Cross night resubmission should leave two TIMEOUTs') + dry_run_level=4) self.assertTrue(np.sum(updatedtable2['STATUS'] == 'DEP_NOT_SUBD')==2, - msg='Cross night resubmission should have 2 TIMEOUTs' \ + msg='Cross night resubmission should have 2 DEP_NOT_SUBDs' \ + ' after forcing failed previous night jobs.') diff --git a/py/desispec/test/test_workflow_queue.py b/py/desispec/test/test_workflow_queue.py index ba6e79640..e4f2177e2 100644 --- a/py/desispec/test/test_workflow_queue.py +++ b/py/desispec/test/test_workflow_queue.py @@ -20,17 +20,17 @@ def setUp(self): def test_queue_info_from_qids(self): """Test queue_info_from_qids""" qids = [11,10,2,5] - qinfo = queue.queue_info_from_qids(qids, dry_run_level=4) + qinfo = queue.queue_info_from_qids(qids, dry_run_level=5) self.assertEqual(list(qinfo['JOBID']), qids) def test_queue_state_cache(self): """Test queue state cache""" # Query qids to get state into cache qids = [11,10,2,5] - qinfo = queue.queue_info_from_qids(qids, dry_run_level=4) + qinfo = queue.queue_info_from_qids(qids, dry_run_level=5) # check cache matches state - qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4) + qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=5) self.assertEqual(list(qinfo['STATE']), list(qstates.values())) # should be ['COMPLETED', 'COMPLETED', 'COMPLETED', 'COMPLETED'] @@ -39,13 +39,13 @@ def test_queue_state_cache(self): qinfo['STATE'][0] = 'PENDING' queue.update_queue_state_cache_from_table(qinfo) - qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4) + qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=5) self.assertEqual(list(qinfo['STATE']), list(qstates.values())) # should be ['PENDING', 'FAILED', 'FAILED', 'FAILED'] # update state of just one qid queue.update_queue_state_cache(10, 'COMPLETED') - qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4) + qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=5) # should be ['PENDING', 'COMPLETED', 'FAILED', 'FAILED'] self.assertEqual(qstates[11], 'PENDING') self.assertEqual(qstates[10], 'COMPLETED') @@ -55,7 +55,7 @@ def test_queue_state_cache(self): # Asking for qids not in the cache should requery sacct for all of them. # Since this is dry run, that will also reset all back to COMPLETED. qids.append(100) - qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4) + qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=5) # should be ['COMPLETED', 'COMPLETED', 'TIMEOUT', 'COMPLETED', 'COMPLETED'] for qid, state in qstates.items(): self.assertEqual(state, 'COMPLETED', f'{qid=} {state=} not COMPLETED') diff --git a/py/desispec/workflow/processing.py b/py/desispec/workflow/processing.py index e85b2371a..2759c9d40 100644 --- a/py/desispec/workflow/processing.py +++ b/py/desispec/workflow/processing.py @@ -271,7 +271,8 @@ def create_and_submit(prow, queue='realtime', reservation=None, dry_run=0, 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. joint (bool, optional): Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be run with desi_proc_joint_fit. Default is False. strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is @@ -430,7 +431,8 @@ def create_batch_script(prow, queue='realtime', dry_run=0, joint=False, 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be run with desi_proc_joint_fit when not using tilenight. Default is False. system_name (str): batch system name, e.g. cori-haswell or perlmutter-gpu @@ -619,7 +621,8 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation. strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is less desirable because e.g. the sciences can run with SVN default calibrations rather @@ -1186,7 +1189,8 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation. Returns: @@ -1206,6 +1210,7 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}") proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level) + log.info("Updated processing table queue information:") cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID', 'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS'] @@ -1220,7 +1225,9 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non id_to_row_map, ptab_name, resubmission_states, reservation, dry_run_level) + proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level) + return proc_table, submits def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None, @@ -1247,7 +1254,8 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name= 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns: tuple: A tuple containing: @@ -1384,7 +1392,8 @@ def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, z_subm 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is less desirable because e.g. the sciences can run with SVN default calibrations rather than failing completely from failed calibrations. Default is False. @@ -1528,7 +1537,8 @@ def joint_cal_fit(descriptor, ptable, prows, internal_id, queue, reservation, 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is less desirable because e.g. the sciences can run with SVN default calibrations rather than failing completely from failed calibrations. Default is False. @@ -1603,7 +1613,8 @@ def submit_redshifts(ptable, prows, tnight, internal_id, queue, reservation, 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is less desirable because e.g. the sciences can run with SVN default calibrations rather than failing completely from failed calibrations. Default is False. @@ -1727,7 +1738,8 @@ def submit_tilenight(ptable, prows, calibjobs, internal_id, queue, reservation, 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is less desirable because e.g. the sciences can run with SVN default calibrations rather than failing completely from failed calibrations. Default is False. @@ -1993,7 +2005,8 @@ def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, calibjobs, 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used. reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation. strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is @@ -2108,7 +2121,8 @@ def submit_tilenight_and_redshifts(ptable, sciences, calibjobs, internal_id, dry 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. queue (str, optional): The name of the queue to submit the jobs to. If None is given the current desi_proc default is used. reservation (str, optional): The reservation to submit jobs to. If None, it is not submitted to a reservation. strictly_successful (bool, optional): Whether all jobs require all inputs to have succeeded. For daily processing, this is diff --git a/py/desispec/workflow/queue.py b/py/desispec/workflow/queue.py index ddfee9d86..0ab1333c0 100644 --- a/py/desispec/workflow/queue.py +++ b/py/desispec/workflow/queue.py @@ -151,6 +151,36 @@ def get_non_final_states(): """ return ['PENDING', 'RUNNING', 'REQUEUED', 'RESIZING'] +def get_mock_slurm_data(): + """ + Returns a string of output that mimics what Slurm would return from + sacct -X --parsable2 --delimiter=, + --format=JobID,JobName,Partition,Submit,Eligible,Start,End,Elapsed,State,ExitCode -j + + Returns + ------- + str + Mock Slurm data csv format. + """ + string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,Elapsed,State,ExitCode\n' + string += '49482394,arc-20211102-00107062-a0123456789,realtime,2021-11-02' \ + + 'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T' \ + + '18:48:32,00:11:59,COMPLETED,0:0' + '\n' + string += '49482395,arc-20211102-00107063-a0123456789,realtime,2021-11-02' \ + + 'T18:31:16,2021-11-02T18:36:33,2021-11-02T18:48:34,2021-11-02T' \ + + '18:57:02,00:11:59,COMPLETED,0:0' + '\n' + string += '49482397,arc-20211102-00107064-a0123456789,realtime,2021-11-02' \ + + 'T18:31:19,2021-11-02T18:36:33,2021-11-02T18:57:05,2021-11-02T' \ + + '19:06:17,00:11:59,COMPLETED,0:0' + '\n' + string += '49482398,arc-20211102-00107065-a0123456789,realtime,2021-11-02' \ + + 'T18:31:24,2021-11-02T18:36:33,2021-11-02T19:06:18,2021-11-02T' \ + + '19:13:59,00:11:59,COMPLETED,0:0' + '\n' + string += '49482399,arc-20211102-00107066-a0123456789,realtime,2021-11-02' \ + + 'T18:31:27,2021-11-02T18:36:33,2021-11-02T19:14:00,2021-11-02T' \ + + '19:24:49,00:11:59,COMPLETED,0:0' + return string + + def queue_info_from_time_window(start_time=None, end_time=None, user=None, \ columns='jobid,jobname,partition,submit,eligible,'+ 'start,end,elapsed,state,exitcode', @@ -181,7 +211,8 @@ def queue_info_from_time_window(start_time=None, end_time=None, user=None, \ 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns ------- @@ -190,24 +221,11 @@ def queue_info_from_time_window(start_time=None, end_time=None, user=None, \ to all jobs submitted by the specified user in the specified time frame. """ # global queue_info_table - if dry_run_level: - string = 'JobID,JobName,Partition,Submit,Eligible,Start,End,State,ExitCode\n' - string += '49482394,arc-20211102-00107062-a0123456789,realtime,2021-11-02'\ - +'T18:31:14,2021-11-02T18:36:33,2021-11-02T18:36:33,2021-11-02T'\ - +'18:48:32,COMPLETED,0:0' + '\n' - string += '49482395,arc-20211102-00107063-a0123456789,realtime,2021-11-02'\ - +'T18:31:16,2021-11-02T18:36:33,2021-11-02T18:48:34,2021-11-02T'\ - +'18:57:02,COMPLETED,0:0' + '\n' - string += '49482397,arc-20211102-00107064-a0123456789,realtime,2021-11-02'\ - +'T18:31:19,2021-11-02T18:36:33,2021-11-02T18:57:05,2021-11-02T'\ - +'19:06:17,COMPLETED,0:0' + '\n' - string += '49482398,arc-20211102-00107065-a0123456789,realtime,2021-11-02'\ - +'T18:31:24,2021-11-02T18:36:33,2021-11-02T19:06:18,2021-11-02T'\ - +'19:13:59,COMPLETED,0:0' + '\n' - string += '49482399,arc-20211102-00107066-a0123456789,realtime,2021-11-02'\ - +'T18:31:27,2021-11-02T18:36:33,2021-11-02T19:14:00,2021-11-02T'\ - +'19:24:49,COMPLETED,0:0' + if dry_run_level > 4: + string = get_mock_slurm_data() cmd_as_list = ['echo', string] + elif dry_run_level > 3: + cmd_as_list = ['echo', 'JobID,JobName,Partition,Submit,Eligible,Start,End,Elapsed,State,ExitCode'] else: if user is None: if 'USER' in os.environ: @@ -257,7 +275,8 @@ def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+ 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns ------- @@ -286,7 +305,7 @@ def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+ cmd_as_list = ['sacct', '-X', '--parsable2', '--delimiter=,', f'--format={columns}', '-j', qid_str] - if dry_run_level: + if dry_run_level > 4: log.info("Dry run, would have otherwise queried Slurm with the" +f" following: {' '.join(cmd_as_list)}") ### Set a random 5% of jobs as TIMEOUT, set seed for reproducibility @@ -306,6 +325,8 @@ def queue_info_from_qids(qids, columns='jobid,jobname,partition,submit,'+ string += f'\n{jobid},{state}' # create command to run to exercise subprocess -> stdout parsing cmd_as_list = ['echo', string] + elif dry_run_level > 3: + cmd_as_list = ['echo', columns.lower()] else: log.info(f"Querying Slurm with the following: {' '.join(cmd_as_list)}") @@ -350,7 +371,8 @@ def get_queue_states_from_qids(qids, dry_run_level=0, use_cache=False): 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. use_cache : bool If True the code first looks for a cached status for the qid. If unavailable, then it queries Slurm. Default is False. @@ -457,7 +479,8 @@ def update_from_queue(ptable, qtable=None, dry_run_level=0, ignore_scriptnames=F 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns ------- @@ -583,7 +606,8 @@ def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0): 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns ------- @@ -601,10 +625,11 @@ def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0): cmd = f'squeue -u {user} -o "%i,%P,%v,%j,%u,%t,%M,%D,%R"' cmd_as_list = cmd.split() - if dry_run_level > 0: + header = 'JOBID,PARTITION,RESERVATION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)' + if dry_run_level > 4: log.info("Dry run, would have otherwise queried Slurm with the" +f" following: {' '.join(cmd_as_list)}") - string = 'JOBID,PARTITION,RESERVATION,NAME,USER,ST,TIME,NODES,NODELIST(REASON)' + string = header string += f"27650097,cron,(null),scron_ar,{user},PD,0:00,1,(BeginTime)" string += f"27650100,cron,(null),scron_nh,{user},PD,0:00,1,(BeginTime)" string += f"27650098,cron,(null),scron_up,{user},PD,0:00,1,(BeginTime)" @@ -619,6 +644,9 @@ def get_jobs_in_queue(user=None, include_scron=False, dry_run_level=0): # create command to run to exercise subprocess -> stdout parsing cmd = 'echo ' + string cmd_as_list = ['echo', string] + elif dry_run_level > 3: + cmd = 'echo ' + header + cmd_as_list = ['echo', header] else: log.info(f"Querying jobs in queue with: {' '.join(cmd_as_list)}") @@ -694,7 +722,8 @@ def check_queue_count(user=None, include_scron=False, dry_run_level=0): 1 writes all files but doesn't submit any jobs to Slurm. 2 writes tables but doesn't write scripts or submit anything. 3 Doesn't write or submit anything but queries Slurm normally for job status. - 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. + 4 Doesn't write, submit jobs, or query Slurm. + 5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns ------- From 9e29ec418e5f694dc46f1e67417829fa42e7fee3 Mon Sep 17 00:00:00 2001 From: akremin Date: Tue, 8 Oct 2024 14:31:51 -0700 Subject: [PATCH 8/9] fix doc issue in queue.py --- py/desispec/workflow/queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/py/desispec/workflow/queue.py b/py/desispec/workflow/queue.py index 0ab1333c0..8544c2d9e 100644 --- a/py/desispec/workflow/queue.py +++ b/py/desispec/workflow/queue.py @@ -154,8 +154,8 @@ def get_non_final_states(): def get_mock_slurm_data(): """ Returns a string of output that mimics what Slurm would return from - sacct -X --parsable2 --delimiter=, - --format=JobID,JobName,Partition,Submit,Eligible,Start,End,Elapsed,State,ExitCode -j + sacct -X --parsable2 --delimiter=, \ + --format=JobID,JobName,Partition,Submit,Eligible,Start,End,Elapsed,State,ExitCode -j Returns ------- From 5cb164db35bbc5f2bfa9d35d4824be8c1dad379b Mon Sep 17 00:00:00 2001 From: akremin Date: Wed, 9 Oct 2024 16:53:12 -0700 Subject: [PATCH 9/9] Fix end of file issues and add --outfile to desi_update_proctable_statuses --- bin/desi_proc_night | 3 ++- ...g_table_statuses => desi_update_proctable_statuses} | 10 ++++++++-- py/desispec/scripts/daily_processing.py | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) rename bin/{desi_update_processing_table_statuses => desi_update_proctable_statuses} (90%) diff --git a/bin/desi_proc_night b/bin/desi_proc_night index 52bdc9bf3..3a5d6f0ef 100755 --- a/bin/desi_proc_night +++ b/bin/desi_proc_night @@ -137,4 +137,5 @@ def parse_args(): return args if __name__ == '__main__': - args = parse \ No newline at end of file + args = parse_args() + proc_night(**args.__dict__) diff --git a/bin/desi_update_processing_table_statuses b/bin/desi_update_proctable_statuses similarity index 90% rename from bin/desi_update_processing_table_statuses rename to bin/desi_update_proctable_statuses index fe34b4dcb..ef0afb438 100755 --- a/bin/desi_update_processing_table_statuses +++ b/bin/desi_update_proctable_statuses @@ -13,7 +13,7 @@ from desispec.workflow.queue import update_from_queue def parse_args(): # options=None): """ - Get command line arguments for desi_update_processing_table_statuses + Get command line arguments for desi_update_proctable_statuses """ parser = argparse.ArgumentParser(description="Update the STATUS of all jobs " + "in a DESI processing table by " @@ -21,6 +21,8 @@ def parse_args(): # options=None): parser.add_argument("-n","--night", type=str, default=None, required=False, help="The night you want processed.") + parser.add_argument("-o","--outfile", type=str, default=None, + required=False, help="Output filename, if different from default.") parser.add_argument("--proc-table-pathname", type=str, required=False, default=None, help="Directory name where the output processing table should be saved.") parser.add_argument("--tab-filetype", type=str, required=False, default='csv', @@ -68,7 +70,11 @@ if __name__ == '__main__': check_complete_jobs=args.check_complete_jobs) if args.dry_run_level < 3: - write_table(ptable, tablename=ptable_pathname) + if args.outfile is not None: + outfile = args.outfile + else: + outfile = ptable_pathname + write_table(ptable, tablename=outfile) if args.show_updated_table: log.info("Updated processing table:") diff --git a/py/desispec/scripts/daily_processing.py b/py/desispec/scripts/daily_processing.py index 178aba3da..bd2408f77 100644 --- a/py/desispec/scripts/daily_processing.py +++ b/py/desispec/scripts/daily_processing.py @@ -520,4 +520,4 @@ def daily_processing_manager(specprod=None, exp_table_path=None, proc_table_path print("Exiting") ## Flush the outputs sys.stdout.flush() - sys.std \ No newline at end of file + sys.stderr.flush()