From cac44c1796b5700d1d885d6c8115098f7e474602 Mon Sep 17 00:00:00 2001 From: akremin Date: Fri, 4 Oct 2024 18:18:01 -0700 Subject: [PATCH] improve dry_run_level handling in queue.py --- bin/desi_resubmit_queue_failures | 18 ++++++----- bin/desi_update_processing_table_statuses | 17 +++++++--- py/desispec/scripts/proc_night.py | 29 ++++++++--------- py/desispec/test/test_workflow_queue.py | 12 +++---- py/desispec/workflow/processing.py | 38 +++++++++++++---------- py/desispec/workflow/queue.py | 6 ++-- 6 files changed, 68 insertions(+), 52 deletions(-) diff --git a/bin/desi_resubmit_queue_failures b/bin/desi_resubmit_queue_failures index 30c8a0464..0ce188221 100755 --- a/bin/desi_resubmit_queue_failures +++ b/bin/desi_resubmit_queue_failures @@ -31,9 +31,13 @@ def parse_args(): # options=None): help="File format and extension for the exp and proc tables.") parser.add_argument("-r", "--reservation", type=str, required=False, default=None, help="The reservation to submit jobs to. If None, it is not submitted to a reservation.") - parser.add_argument("--dry-run", action="store_true", - help="Perform a dry run where no jobs are actually created or submitted. Overwritten if "+ - "dry-run-level is defined as nonzero.") + parser.add_argument("--dry-run-level", type=int, default=0, + help="What level of dry_run to perform, if any. Default is 0. " + + "0 which runs the code normally. " + + "1 writes all files but doesn't submit any jobs to Slurm. " + + "2 writes tables but doesn't write scripts or submit anything. " + + "3 Doesn't write or submit anything but queries Slurm normally for job status. " + + "4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. ") parser.add_argument("--resub-states", type=str, default=None, required=False, help="The SLURM queue states that should be resubmitted. " + "E.g. UNSUBMITTED, BOOT_FAIL, DEADLINE, NODE_FAIL, " + @@ -74,9 +78,8 @@ if __name__ == '__main__': if not os.path.exists(ptable_pathname): ValueError(f"Processing table: {ptable_pathname} doesn't exist.") - if args.dry_run > 0 and args.dry_run < 3: - log.warning(f"{args.dry_run=} will be run with limited simulation " - f"because we don't want to write out incorrect queue information.") + if args.dry_run_level > 0: + log.info(f"{args.dry_run=}, so will be simulating some features.") ## Combine the table names and types for easier passing to io functions table_type = 'proctable' @@ -87,7 +90,8 @@ if __name__ == '__main__': ptable, nsubmits = update_and_recursively_submit(ptable, submits=0, resubmission_states=args.resub_states, no_resub_failed=args.no_resub_failed, - ptab_name=ptable_pathname, dry_run=args.dry_run, + ptab_name=ptable_pathname, + dry_run_level=args.dry_run_level, reservation=args.reservation) if not args.dry_run: diff --git a/bin/desi_update_processing_table_statuses b/bin/desi_update_processing_table_statuses index 3b2d94f21..7e564c6c0 100755 --- a/bin/desi_update_processing_table_statuses +++ b/bin/desi_update_processing_table_statuses @@ -25,8 +25,13 @@ def parse_args(): # options=None): help="Directory name where the output processing table should be saved.") parser.add_argument("--tab-filetype", type=str, required=False, default='csv', help="File format and extension for the exp and proc tables.") - parser.add_argument("--dry-run", action="store_true", - help="Perform a dry run where the processing table is not written back out to disk.") + parser.add_argument("--dry-run-level", type=int, default=0, + help="What level of dry_run to perform, if any. Default is 0. " + + "0 which runs the code normally. " + + "1 writes all files but doesn't submit any jobs to Slurm. " + + "2 writes tables but doesn't write scripts or submit anything. " + + "3 Doesn't write or submit anything but queries Slurm normally for job status. " + + "4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. ") parser.add_argument("--check-complete-jobs", action="store_true", help="Query NERSC about jobs with STATUS 'COMPLETED'" + "in addition to all other jobs. Default is False, " @@ -51,13 +56,17 @@ if __name__ == '__main__': if not os.path.exists(ptable_pathname): ValueError(f"Processing table: {ptable_pathname} doesn't exist.") + if args.dry_run_level > 0: + log.info(f"{args.dry_run_level=}, so will be simulating some features." + + f" See parser for what each level limits.") + ## Load in the files defined above ptable = load_table(tablename=ptable_pathname, tabletype='proctable') log.info(f"Identified ptable with {len(ptable)} entries.") - ptable = update_from_queue(ptable, dry_run=args.dry_run, + ptable = update_from_queue(ptable, dry_run_level=args.dry_run_level, check_complete_jobs=args.check_complete_jobs) - if not args.dry_run: + if args.dry_run_level < 3: write_table(ptable, tablename=ptable_pathname) if args.show_updated_table: diff --git a/py/desispec/scripts/proc_night.py b/py/desispec/scripts/proc_night.py index 09aefcc38..0d79724e9 100644 --- a/py/desispec/scripts/proc_night.py +++ b/py/desispec/scripts/proc_night.py @@ -85,16 +85,15 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, override_pathname (str): Full path to the override file. update_exptable (bool): If true then the exposure table is updated. The default is False. - dry_run_level (int, optional): If nonzero, this is a simulated run. - If dry_run_level=1 the scripts will be written but not submitted. - If dry_run_level=2, the scripts will not be written nor submitted - but the processing_table is still created. - If dry_run_level=3, no output files are written. - Logging will remain the same for testing as though scripts are - being submitted. Default is 0 (false). + dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. dry_run (bool, optional): When to run without submitting scripts or not. If dry_run_level is defined, then it over-rides this flag. - dry_run_level not set and dry_run=True, dry_run_level is set to 2 + dry_run_level not set and dry_run=True, dry_run_level is set to 3 (no scripts generated or run). Default for dry_run is False. no_redshifts (bool, optional): Whether to submit redshifts or not. If True, redshifts are not submitted. @@ -191,7 +190,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, ## Reconcile the dry_run and dry_run_level if dry_run and dry_run_level == 0: - dry_run_level = 2 + dry_run_level = 3 elif dry_run_level > 0: dry_run = True @@ -358,7 +357,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, ## Update processing table tableng = len(ptable) if tableng > 0: - ptable = update_from_queue(ptable, dry_run=dry_run_level) + ptable = update_from_queue(ptable, dry_run_level=dry_run_level) if dry_run_level < 3: write_table(ptable, tablename=proc_table_pathname, tabletype='proctable') if any_jobs_failed(ptable['STATUS']): @@ -373,7 +372,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None, ptable, nsubmits = update_and_recursively_submit(ptable, no_resub_failed=no_resub_failed, ptab_name=proc_table_pathname, - dry_run=dry_run, + dry_run_level=dry_run_level, reservation=reservation) elif not ignore_proc_table_failures: err = "Some jobs have an incomplete job status. This script " \ @@ -588,11 +587,9 @@ def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs, ################### Wrap things up ################### unproc_table = None if len(ptable) > 0: - ## All jobs now submitted, update information from job queue and save - ## But only if actually submitting or fully simulating, don't simulate - ## outputs that will be written to disk (levels 1 and 2) - if dry_run_level < 1 or dry_run_level > 2: - ptable = update_from_queue(ptable, dry_run=dry_run_level) + ## All jobs now submitted, update information from job queue + ## If dry_run_level > 3, then Slurm isn't queried and these results are made up + ptable = update_from_queue(ptable, dry_run_level=dry_run_level) if dry_run_level < 3: write_table(ptable, tablename=proc_table_pathname, tabletype='proctable') ## Now that processing is complete, lets identify what we didn't process diff --git a/py/desispec/test/test_workflow_queue.py b/py/desispec/test/test_workflow_queue.py index 49924533d..ba6e79640 100644 --- a/py/desispec/test/test_workflow_queue.py +++ b/py/desispec/test/test_workflow_queue.py @@ -20,17 +20,17 @@ def setUp(self): def test_queue_info_from_qids(self): """Test queue_info_from_qids""" qids = [11,10,2,5] - qinfo = queue.queue_info_from_qids(qids, dry_run=3) + qinfo = queue.queue_info_from_qids(qids, dry_run_level=4) self.assertEqual(list(qinfo['JOBID']), qids) def test_queue_state_cache(self): """Test queue state cache""" # Query qids to get state into cache qids = [11,10,2,5] - qinfo = queue.queue_info_from_qids(qids, dry_run=3) + qinfo = queue.queue_info_from_qids(qids, dry_run_level=4) # check cache matches state - qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run=3) + qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4) self.assertEqual(list(qinfo['STATE']), list(qstates.values())) # should be ['COMPLETED', 'COMPLETED', 'COMPLETED', 'COMPLETED'] @@ -39,13 +39,13 @@ def test_queue_state_cache(self): qinfo['STATE'][0] = 'PENDING' queue.update_queue_state_cache_from_table(qinfo) - qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run=3) + qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4) self.assertEqual(list(qinfo['STATE']), list(qstates.values())) # should be ['PENDING', 'FAILED', 'FAILED', 'FAILED'] # update state of just one qid queue.update_queue_state_cache(10, 'COMPLETED') - qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run=3) + qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4) # should be ['PENDING', 'COMPLETED', 'FAILED', 'FAILED'] self.assertEqual(qstates[11], 'PENDING') self.assertEqual(qstates[10], 'COMPLETED') @@ -55,7 +55,7 @@ def test_queue_state_cache(self): # Asking for qids not in the cache should requery sacct for all of them. # Since this is dry run, that will also reset all back to COMPLETED. qids.append(100) - qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run=3) + qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4) # should be ['COMPLETED', 'COMPLETED', 'TIMEOUT', 'COMPLETED', 'COMPLETED'] for qid, state in qstates.items(): self.assertEqual(state, 'COMPLETED', f'{qid=} {state=} not COMPLETED') diff --git a/py/desispec/workflow/processing.py b/py/desispec/workflow/processing.py index 1df250438..d72b4dec3 100644 --- a/py/desispec/workflow/processing.py +++ b/py/desispec/workflow/processing.py @@ -636,7 +636,7 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F failed_dependency = False if len(dep_qids) > 0 and not dry_run: non_final_states = get_non_final_states() - state_dict = get_queue_states_from_qids(dep_qids, dry_run=dry_run, use_cache=True) + state_dict = get_queue_states_from_qids(dep_qids, dry_run_level=dry_run, use_cache=True) still_depids = [] for depid in dep_qids: if depid in state_dict.keys(): @@ -1156,7 +1156,7 @@ def all_calibs_submitted(accounted_for, do_cte_flats): def update_and_recursively_submit(proc_table, submits=0, resubmission_states=None, no_resub_failed=False, ptab_name=None, - dry_run=0, reservation=None): + dry_run_level=0, reservation=None): """ Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states). Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively @@ -1172,9 +1172,12 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non no_resub_failed: bool. Set to True if you do NOT want to resubmit jobs with Slurm status 'FAILED' by default. Default is False. ptab_name, str, the full pathname where the processing table should be saved. - dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If - dry_run=2, the scripts will not be writter or submitted. Logging will remain the same - for testing as though scripts are being submitted. Default is 0 (false). + dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation. Returns: @@ -1193,7 +1196,7 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non resubmission_states = get_resubmission_states(no_resub_failed=no_resub_failed) log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}") - proc_table = update_from_queue(proc_table, dry_run=dry_run) + proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level) log.info("Updated processing table queue information:") cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID', 'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS'] @@ -1207,12 +1210,12 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non proc_table, submits = recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name, resubmission_states, - reservation, dry_run) - proc_table = update_from_queue(proc_table, dry_run=dry_run) + reservation, dry_run_level) + proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level) return proc_table, submits def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None, - resubmission_states=None, reservation=None, dry_run=0): + resubmission_states=None, reservation=None, dry_run_level=0): """ Given a row of a processing table and the full processing table, this resubmits the given job. Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to @@ -1230,9 +1233,12 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name= possible Slurm scheduler state, where you wish for jobs with that outcome to be resubmitted reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation. - dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If - dry_run=2, the scripts will not be writter or submitted. Logging will remain the same - for testing as though scripts are being submitted. Default is 0 (false). + dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0. + 0 which runs the code normally. + 1 writes all files but doesn't submit any jobs to Slurm. + 2 writes tables but doesn't write scripts or submit anything. + 3 Doesn't write or submit anything but queries Slurm normally for job status. + 4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. Returns: tuple: A tuple containing: @@ -1273,7 +1279,7 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name= + f"fatal error." log.critical(msg) raise ValueError(msg) - reftab = update_from_queue(reftab, dry_run=dry_run) + reftab = update_from_queue(reftab, dry_run_level=dry_run_level) entry = reftab[reftab['INTID'] == idep][0] if entry['STATUS'] not in good_states: msg = f"Internal ID: {idep} not in id_to_row_map. " \ @@ -1311,7 +1317,7 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name= proc_table, submits, id_to_row_map, reservation=reservation, - dry_run=dry_run) + dry_run_level=dry_run_level) ## Now that we've resubmitted the dependency if necessary, ## add the most recent QID to the list qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]]) @@ -1327,10 +1333,10 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name= log.error(f"number of qdeps should be 1 or more: Rown {rown}, ideps {ideps}") proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation, - strictly_successful=True, dry_run=dry_run) + strictly_successful=True, dry_run=dry_run_level) submits += 1 - if not dry_run: + if dry_run_level < 3: if ptab_name is None: write_table(proc_table, tabletype='processing', overwrite=True) else: diff --git a/py/desispec/workflow/queue.py b/py/desispec/workflow/queue.py index 54718eb1c..cf0dadfa4 100644 --- a/py/desispec/workflow/queue.py +++ b/py/desispec/workflow/queue.py @@ -432,9 +432,9 @@ def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False, check_complete_jobs, bool. Default is False. Set to true if you want to also check QID's that currently have a STATUS "COMPLETED". in the ptable. - The following are only used if qtable is not provided: - dry_run, int. Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default - table that doesn't query the Slurm scheduler. + dry_run, int. Used if qtable is not providedWhether this is a simulated run or real run. + If nonzero, it is a simulation and it returns a default + table that doesn't query the Slurm scheduler. Returns: ptab, Table. A opy of the same processing table as the input except that the "STATUS" column in ptable for all jobs is