Skip to content

Commit

Permalink
improve dry_run_level handling in queue.py
Browse files Browse the repository at this point in the history
  • Loading branch information
akremin committed Oct 5, 2024
1 parent 6149728 commit cac44c1
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 52 deletions.
18 changes: 11 additions & 7 deletions bin/desi_resubmit_queue_failures
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@ def parse_args(): # options=None):
help="File format and extension for the exp and proc tables.")
parser.add_argument("-r", "--reservation", type=str, required=False, default=None,
help="The reservation to submit jobs to. If None, it is not submitted to a reservation.")
parser.add_argument("--dry-run", action="store_true",
help="Perform a dry run where no jobs are actually created or submitted. Overwritten if "+
"dry-run-level is defined as nonzero.")
parser.add_argument("--dry-run-level", type=int, default=0,
help="What level of dry_run to perform, if any. Default is 0. "
+ "0 which runs the code normally. "
+ "1 writes all files but doesn't submit any jobs to Slurm. "
+ "2 writes tables but doesn't write scripts or submit anything. "
+ "3 Doesn't write or submit anything but queries Slurm normally for job status. "
+ "4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. ")
parser.add_argument("--resub-states", type=str, default=None, required=False,
help="The SLURM queue states that should be resubmitted. " +
"E.g. UNSUBMITTED, BOOT_FAIL, DEADLINE, NODE_FAIL, " +
Expand Down Expand Up @@ -74,9 +78,8 @@ if __name__ == '__main__':
if not os.path.exists(ptable_pathname):
ValueError(f"Processing table: {ptable_pathname} doesn't exist.")

if args.dry_run > 0 and args.dry_run < 3:
log.warning(f"{args.dry_run=} will be run with limited simulation "
f"because we don't want to write out incorrect queue information.")
if args.dry_run_level > 0:
log.info(f"{args.dry_run=}, so will be simulating some features.")

## Combine the table names and types for easier passing to io functions
table_type = 'proctable'
Expand All @@ -87,7 +90,8 @@ if __name__ == '__main__':
ptable, nsubmits = update_and_recursively_submit(ptable, submits=0,
resubmission_states=args.resub_states,
no_resub_failed=args.no_resub_failed,
ptab_name=ptable_pathname, dry_run=args.dry_run,
ptab_name=ptable_pathname,
dry_run_level=args.dry_run_level,
reservation=args.reservation)

if not args.dry_run:
Expand Down
17 changes: 13 additions & 4 deletions bin/desi_update_processing_table_statuses
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@ def parse_args(): # options=None):
help="Directory name where the output processing table should be saved.")
parser.add_argument("--tab-filetype", type=str, required=False, default='csv',
help="File format and extension for the exp and proc tables.")
parser.add_argument("--dry-run", action="store_true",
help="Perform a dry run where the processing table is not written back out to disk.")
parser.add_argument("--dry-run-level", type=int, default=0,
help="What level of dry_run to perform, if any. Default is 0. "
+ "0 which runs the code normally. "
+ "1 writes all files but doesn't submit any jobs to Slurm. "
+ "2 writes tables but doesn't write scripts or submit anything. "
+ "3 Doesn't write or submit anything but queries Slurm normally for job status. "
+ "4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs. ")
parser.add_argument("--check-complete-jobs", action="store_true",
help="Query NERSC about jobs with STATUS 'COMPLETED'"
+ "in addition to all other jobs. Default is False, "
Expand All @@ -51,13 +56,17 @@ if __name__ == '__main__':
if not os.path.exists(ptable_pathname):
ValueError(f"Processing table: {ptable_pathname} doesn't exist.")

if args.dry_run_level > 0:
log.info(f"{args.dry_run_level=}, so will be simulating some features."
+ f" See parser for what each level limits.")

## Load in the files defined above
ptable = load_table(tablename=ptable_pathname, tabletype='proctable')
log.info(f"Identified ptable with {len(ptable)} entries.")
ptable = update_from_queue(ptable, dry_run=args.dry_run,
ptable = update_from_queue(ptable, dry_run_level=args.dry_run_level,
check_complete_jobs=args.check_complete_jobs)

if not args.dry_run:
if args.dry_run_level < 3:
write_table(ptable, tablename=ptable_pathname)

if args.show_updated_table:
Expand Down
29 changes: 13 additions & 16 deletions py/desispec/scripts/proc_night.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,15 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
override_pathname (str): Full path to the override file.
update_exptable (bool): If true then the exposure table is updated.
The default is False.
dry_run_level (int, optional): If nonzero, this is a simulated run.
If dry_run_level=1 the scripts will be written but not submitted.
If dry_run_level=2, the scripts will not be written nor submitted
but the processing_table is still created.
If dry_run_level=3, no output files are written.
Logging will remain the same for testing as though scripts are
being submitted. Default is 0 (false).
dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0.
0 which runs the code normally.
1 writes all files but doesn't submit any jobs to Slurm.
2 writes tables but doesn't write scripts or submit anything.
3 Doesn't write or submit anything but queries Slurm normally for job status.
4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
dry_run (bool, optional): When to run without submitting scripts or
not. If dry_run_level is defined, then it over-rides this flag.
dry_run_level not set and dry_run=True, dry_run_level is set to 2
dry_run_level not set and dry_run=True, dry_run_level is set to 3
(no scripts generated or run). Default for dry_run is False.
no_redshifts (bool, optional): Whether to submit redshifts or not.
If True, redshifts are not submitted.
Expand Down Expand Up @@ -191,7 +190,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,

## Reconcile the dry_run and dry_run_level
if dry_run and dry_run_level == 0:
dry_run_level = 2
dry_run_level = 3
elif dry_run_level > 0:
dry_run = True

Expand Down Expand Up @@ -358,7 +357,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
## Update processing table
tableng = len(ptable)
if tableng > 0:
ptable = update_from_queue(ptable, dry_run=dry_run_level)
ptable = update_from_queue(ptable, dry_run_level=dry_run_level)
if dry_run_level < 3:
write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')
if any_jobs_failed(ptable['STATUS']):
Expand All @@ -373,7 +372,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
ptable, nsubmits = update_and_recursively_submit(ptable,
no_resub_failed=no_resub_failed,
ptab_name=proc_table_pathname,
dry_run=dry_run,
dry_run_level=dry_run_level,
reservation=reservation)
elif not ignore_proc_table_failures:
err = "Some jobs have an incomplete job status. This script " \
Expand Down Expand Up @@ -588,11 +587,9 @@ def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs,
################### Wrap things up ###################
unproc_table = None
if len(ptable) > 0:
## All jobs now submitted, update information from job queue and save
## But only if actually submitting or fully simulating, don't simulate
## outputs that will be written to disk (levels 1 and 2)
if dry_run_level < 1 or dry_run_level > 2:
ptable = update_from_queue(ptable, dry_run=dry_run_level)
## All jobs now submitted, update information from job queue
## If dry_run_level > 3, then Slurm isn't queried and these results are made up
ptable = update_from_queue(ptable, dry_run_level=dry_run_level)
if dry_run_level < 3:
write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')
## Now that processing is complete, lets identify what we didn't process
Expand Down
12 changes: 6 additions & 6 deletions py/desispec/test/test_workflow_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ def setUp(self):
def test_queue_info_from_qids(self):
"""Test queue_info_from_qids"""
qids = [11,10,2,5]
qinfo = queue.queue_info_from_qids(qids, dry_run=3)
qinfo = queue.queue_info_from_qids(qids, dry_run_level=4)
self.assertEqual(list(qinfo['JOBID']), qids)

def test_queue_state_cache(self):
"""Test queue state cache"""
# Query qids to get state into cache
qids = [11,10,2,5]
qinfo = queue.queue_info_from_qids(qids, dry_run=3)
qinfo = queue.queue_info_from_qids(qids, dry_run_level=4)

# check cache matches state
qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run=3)
qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4)
self.assertEqual(list(qinfo['STATE']), list(qstates.values()))
# should be ['COMPLETED', 'COMPLETED', 'COMPLETED', 'COMPLETED']

Expand All @@ -39,13 +39,13 @@ def test_queue_state_cache(self):
qinfo['STATE'][0] = 'PENDING'

queue.update_queue_state_cache_from_table(qinfo)
qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run=3)
qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4)
self.assertEqual(list(qinfo['STATE']), list(qstates.values()))
# should be ['PENDING', 'FAILED', 'FAILED', 'FAILED']

# update state of just one qid
queue.update_queue_state_cache(10, 'COMPLETED')
qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run=3)
qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4)
# should be ['PENDING', 'COMPLETED', 'FAILED', 'FAILED']
self.assertEqual(qstates[11], 'PENDING')
self.assertEqual(qstates[10], 'COMPLETED')
Expand All @@ -55,7 +55,7 @@ def test_queue_state_cache(self):
# Asking for qids not in the cache should requery sacct for all of them.
# Since this is dry run, that will also reset all back to COMPLETED.
qids.append(100)
qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run=3)
qstates = queue.get_queue_states_from_qids(qids, use_cache=True, dry_run_level=4)
# should be ['COMPLETED', 'COMPLETED', 'TIMEOUT', 'COMPLETED', 'COMPLETED']
for qid, state in qstates.items():
self.assertEqual(state, 'COMPLETED', f'{qid=} {state=} not COMPLETED')
Expand Down
38 changes: 22 additions & 16 deletions py/desispec/workflow/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ def submit_batch_script(prow, dry_run=0, reservation=None, strictly_successful=F
failed_dependency = False
if len(dep_qids) > 0 and not dry_run:
non_final_states = get_non_final_states()
state_dict = get_queue_states_from_qids(dep_qids, dry_run=dry_run, use_cache=True)
state_dict = get_queue_states_from_qids(dep_qids, dry_run_level=dry_run, use_cache=True)
still_depids = []
for depid in dep_qids:
if depid in state_dict.keys():
Expand Down Expand Up @@ -1156,7 +1156,7 @@ def all_calibs_submitted(accounted_for, do_cte_flats):

def update_and_recursively_submit(proc_table, submits=0, resubmission_states=None,
no_resub_failed=False, ptab_name=None,
dry_run=0, reservation=None):
dry_run_level=0, reservation=None):
"""
Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states).
Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively
Expand All @@ -1172,9 +1172,12 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non
no_resub_failed: bool. Set to True if you do NOT want to resubmit
jobs with Slurm status 'FAILED' by default. Default is False.
ptab_name, str, the full pathname where the processing table should be saved.
dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
for testing as though scripts are being submitted. Default is 0 (false).
dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0.
0 which runs the code normally.
1 writes all files but doesn't submit any jobs to Slurm.
2 writes tables but doesn't write scripts or submit anything.
3 Doesn't write or submit anything but queries Slurm normally for job status.
4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
Returns:
Expand All @@ -1193,7 +1196,7 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non
resubmission_states = get_resubmission_states(no_resub_failed=no_resub_failed)

log.info(f"Resubmitting jobs with current states in the following: {resubmission_states}")
proc_table = update_from_queue(proc_table, dry_run=dry_run)
proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level)
log.info("Updated processing table queue information:")
cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
Expand All @@ -1207,12 +1210,12 @@ def update_and_recursively_submit(proc_table, submits=0, resubmission_states=Non
proc_table, submits = recursive_submit_failed(rown, proc_table, submits,
id_to_row_map, ptab_name,
resubmission_states,
reservation, dry_run)
proc_table = update_from_queue(proc_table, dry_run=dry_run)
reservation, dry_run_level)
proc_table = update_from_queue(proc_table, dry_run_level=dry_run_level)
return proc_table, submits

def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None,
resubmission_states=None, reservation=None, dry_run=0):
resubmission_states=None, reservation=None, dry_run_level=0):
"""
Given a row of a processing table and the full processing table, this resubmits the given job.
Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to
Expand All @@ -1230,9 +1233,12 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=
possible Slurm scheduler state, where you wish for jobs with that
outcome to be resubmitted
reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation.
dry_run, int, If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. If
dry_run=2, the scripts will not be writter or submitted. Logging will remain the same
for testing as though scripts are being submitted. Default is 0 (false).
dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0.
0 which runs the code normally.
1 writes all files but doesn't submit any jobs to Slurm.
2 writes tables but doesn't write scripts or submit anything.
3 Doesn't write or submit anything but queries Slurm normally for job status.
4 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
Returns:
tuple: A tuple containing:
Expand Down Expand Up @@ -1273,7 +1279,7 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=
+ f"fatal error."
log.critical(msg)
raise ValueError(msg)
reftab = update_from_queue(reftab, dry_run=dry_run)
reftab = update_from_queue(reftab, dry_run_level=dry_run_level)
entry = reftab[reftab['INTID'] == idep][0]
if entry['STATUS'] not in good_states:
msg = f"Internal ID: {idep} not in id_to_row_map. " \
Expand Down Expand Up @@ -1311,7 +1317,7 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=
proc_table, submits,
id_to_row_map,
reservation=reservation,
dry_run=dry_run)
dry_run_level=dry_run_level)
## Now that we've resubmitted the dependency if necessary,
## add the most recent QID to the list
qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]])
Expand All @@ -1327,10 +1333,10 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=
log.error(f"number of qdeps should be 1 or more: Rown {rown}, ideps {ideps}")

proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation,
strictly_successful=True, dry_run=dry_run)
strictly_successful=True, dry_run=dry_run_level)
submits += 1

if not dry_run:
if dry_run_level < 3:
if ptab_name is None:
write_table(proc_table, tabletype='processing', overwrite=True)
else:
Expand Down
6 changes: 3 additions & 3 deletions py/desispec/workflow/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,9 @@ def update_from_queue(ptable, qtable=None, dry_run=0, ignore_scriptnames=False,
check_complete_jobs, bool. Default is False. Set to true if you want to
also check QID's that currently have a STATUS "COMPLETED".
in the ptable.
The following are only used if qtable is not provided:
dry_run, int. Whether this is a simulated run or real run. If nonzero, it is a simulation and it returns a default
table that doesn't query the Slurm scheduler.
dry_run, int. Used if qtable is not providedWhether this is a simulated run or real run.
If nonzero, it is a simulation and it returns a default
table that doesn't query the Slurm scheduler.
Returns:
ptab, Table. A opy of the same processing table as the input except that the "STATUS" column in ptable for all jobs is
Expand Down

0 comments on commit cac44c1

Please sign in to comment.