Skip to content

Commit

Permalink
Merge pull request #2385 from desihub/update_processing_table
Browse files Browse the repository at this point in the history
Add desi_update_processing_table_statuses and update dry_run_level's
  • Loading branch information
sbailey authored Oct 10, 2024
2 parents 41da70a + 5cb164d commit bd7fd13
Show file tree
Hide file tree
Showing 9 changed files with 401 additions and 184 deletions.
15 changes: 8 additions & 7 deletions bin/desi_proc_night
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ def parse_args():
parser.add_argument("--dry-run", action="store_true",
help="Perform a dry run where no jobs are actually created or submitted. Overwritten if "+
"dry-run-level is defined as nonzero.")
parser.add_argument("--dry-run-level", type=int, default=0, required=False,
help="""If nonzero, this is a simulated run.
If level=1 the scripts will be written but not submitted.
If level=2, scripts will not be written or submitted but processing_table is created.
if level=3, no output files are written at all.
Logging will remain the same for testing as though scripts are being submitted.
Default is 0 (false).""")
parser.add_argument("--dry-run-level", type=int, default=0,
help="What level of dry_run to perform, if any. Default is 0. "
+ "0 which runs the code normally. "
+ "1 writes all files but doesn't submit any jobs to Slurm. "
+ "2 writes tables but doesn't write scripts or submit anything. "
+ "3 Doesn't write or submit anything but queries Slurm normally for job status. "
+ "4 Doesn't write, submit jobs, or query Slurm."
+ "5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.")
parser.add_argument("--no-redshifts", action="store_true",
help="Whether to submit redshifts or not. If set, redshifts are not submitted.")
parser.add_argument("--ignore-proc-table-failures", action="store_true",
Expand Down
29 changes: 16 additions & 13 deletions bin/desi_resubmit_queue_failures
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,14 @@ def parse_args(): # options=None):
help="File format and extension for the exp and proc tables.")
parser.add_argument("-r", "--reservation", type=str, required=False, default=None,
help="The reservation to submit jobs to. If None, it is not submitted to a reservation.")
parser.add_argument("--dry-run", action="store_true",
help="Perform a dry run where no jobs are actually created or submitted. Overwritten if "+
"dry-run-level is defined as nonzero.")
parser.add_argument("--dry-run-level", type=int, default=0,
help="What level of dry_run to perform, if any. Default is 0. "
+ "0 which runs the code normally. "
+ "1 writes all files but doesn't submit any jobs to Slurm. "
+ "2 writes tables but doesn't write scripts or submit anything. "
+ "3 Doesn't write or submit anything but queries Slurm normally for job status. "
+ "4 Doesn't write, submit jobs, or query Slurm."
+ "5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.")
parser.add_argument("--resub-states", type=str, default=None, required=False,
help="The SLURM queue states that should be resubmitted. " +
"E.g. UNSUBMITTED, BOOT_FAIL, DEADLINE, NODE_FAIL, " +
Expand Down Expand Up @@ -74,24 +79,22 @@ if __name__ == '__main__':
if not os.path.exists(ptable_pathname):
ValueError(f"Processing table: {ptable_pathname} doesn't exist.")

if args.dry_run > 0 and args.dry_run < 3:
log.warning(f"{args.dry_run=} will be run with limited simulation "
f"because we don't want to write out incorrect queue information.")

## Combine the table names and types for easier passing to io functions
table_type = 'proctable'
if args.dry_run_level > 0:
log.info(f"{args.dry_run_level=}, so will be simulating some features."
+ f" See parser for what each level limits.")

## Load in the files defined above
ptable = load_table(tablename=ptable_pathname, tabletype=table_type)
ptable = load_table(tablename=ptable_pathname, tabletype='proctable')
log.info(f"Identified ptable with {len(ptable)} entries.")
ptable, nsubmits = update_and_recursively_submit(ptable, submits=0,
resubmission_states=args.resub_states,
no_resub_failed=args.no_resub_failed,
ptab_name=ptable_pathname, dry_run=args.dry_run,
ptab_name=ptable_pathname,
dry_run_level=args.dry_run_level,
reservation=args.reservation)

if not args.dry_run:
if args.dry_run_level < 3:
write_table(ptable, tablename=ptable_pathname)

log.info("Completed all necessary queue resubmissions from processing "
+ f"table: {ptable_pathname}")
+ f"table: {ptable_pathname}")
88 changes: 88 additions & 0 deletions bin/desi_update_proctable_statuses
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python
# coding: utf-8

import argparse
import os
import numpy as np

from desiutil.log import get_logger
from desispec.workflow.tableio import load_table, write_table
from desispec.workflow.proctable import get_processing_table_pathname
from desispec.workflow.queue import update_from_queue


def parse_args(): # options=None):
"""
Get command line arguments for desi_update_proctable_statuses
"""
parser = argparse.ArgumentParser(description="Update the STATUS of all jobs "
+ "in a DESI processing table by "
+ "querying Slurm.")

parser.add_argument("-n","--night", type=str, default=None,
required=False, help="The night you want processed.")
parser.add_argument("-o","--outfile", type=str, default=None,
required=False, help="Output filename, if different from default.")
parser.add_argument("--proc-table-pathname", type=str, required=False, default=None,
help="Directory name where the output processing table should be saved.")
parser.add_argument("--tab-filetype", type=str, required=False, default='csv',
help="File format and extension for the exp and proc tables.")
parser.add_argument("--dry-run-level", type=int, default=0,
help="What level of dry_run to perform, if any. Default is 0. "
+ "0 which runs the code normally. "
+ "1 writes all files but doesn't submit any jobs to Slurm. "
+ "2 writes tables but doesn't write scripts or submit anything. "
+ "3 Doesn't write or submit anything but queries Slurm normally for job status. "
+ "4 Doesn't write, submit jobs, or query Slurm."
+ "5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.")
parser.add_argument("--check-complete-jobs", action="store_true",
help="Query NERSC about jobs with STATUS 'COMPLETED'"
+ "in addition to all other jobs. Default is False, "
+ "which skips COMPLETED jobs.")
parser.add_argument("--show-updated-table", action="store_true",
help="Print a subset of the columns from the ptable with updated statuses.")

return parser.parse_args()


if __name__ == '__main__':
args = parse_args()
log = get_logger()
ptable_pathname = args.proc_table_pathname
if ptable_pathname is None:
if args.night is None:
ValueError("Either night or --proc-table-path must be specified")
## Determine where the processing table will be written
ptable_pathname = get_processing_table_pathname(prodmod=args.night,
extension=args.tab_filetype)

if not os.path.exists(ptable_pathname):
ValueError(f"Processing table: {ptable_pathname} doesn't exist.")

if args.dry_run_level > 0:
log.info(f"{args.dry_run_level=}, so will be simulating some features."
+ f" See parser for what each level limits.")

## Load in the files defined above
ptable = load_table(tablename=ptable_pathname, tabletype='proctable')
log.info(f"Identified ptable with {len(ptable)} entries.")
ptable = update_from_queue(ptable, dry_run_level=args.dry_run_level,
check_complete_jobs=args.check_complete_jobs)

if args.dry_run_level < 3:
if args.outfile is not None:
outfile = args.outfile
else:
outfile = ptable_pathname
write_table(ptable, tablename=outfile)

if args.show_updated_table:
log.info("Updated processing table:")
cols = ['INTID', 'INT_DEP_IDS', 'EXPID', 'TILEID',
'OBSTYPE', 'JOBDESC', 'LATEST_QID', 'STATUS']
log.info(np.array(cols))
for row in ptable:
log.info(np.array(row[cols]))
log.info("\n")

log.info(f"Done updating STATUS column for processing table: {ptable_pathname}")
6 changes: 3 additions & 3 deletions py/desispec/scripts/daily_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,9 @@ def daily_processing_manager(specprod=None, exp_table_path=None, proc_table_path
dry_run=(dry_run and ()))

if len(ptable) > 0:
ptable = update_from_queue(ptable, dry_run=dry_run_level)
ptable = update_from_queue(ptable, dry_run_level=dry_run_level)
# ptable, nsubmits = update_and_recursively_submit(ptable,
# ptab_name=proc_table_pathname, dry_run=dry_run_level)
# ptab_name=proc_table_pathname, dry_run_level=dry_run_level)

## Exposure table doesn't change in the interim, so no need to re-write it to disk
if dry_run_level < 3:
Expand Down Expand Up @@ -481,7 +481,7 @@ def daily_processing_manager(specprod=None, exp_table_path=None, proc_table_path
resubmit_partial_complete=resubmit_partial_complete,
z_submit_types=z_submit_types)
## All jobs now submitted, update information from job queue and save
ptable = update_from_queue(ptable, dry_run=dry_run_level)
ptable = update_from_queue(ptable, dry_run_level=dry_run_level)
if dry_run_level < 3:
write_table(ptable, tablename=proc_table_pathname)

Expand Down
34 changes: 18 additions & 16 deletions py/desispec/scripts/proc_night.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
override_pathname (str): Full path to the override file.
update_exptable (bool): If true then the exposure table is updated.
The default is False.
dry_run_level (int, optional): If nonzero, this is a simulated run.
If dry_run_level=1 the scripts will be written but not submitted.
If dry_run_level=2, the scripts will not be written nor submitted
but the processing_table is still created.
If dry_run_level=3, no output files are written.
Logging will remain the same for testing as though scripts are
being submitted. Default is 0 (false).
dry_run_level (int, optional): If nonzero, this is a simulated run. Default is 0.
0 which runs the code normally.
1 writes all files but doesn't submit any jobs to Slurm.
2 writes tables but doesn't write scripts or submit anything.
3 Doesn't write or submit anything but queries Slurm normally for job status.
4 Doesn't write, submit jobs, or query Slurm.
5 Doesn't write, submit jobs, or query Slurm; instead it makes up the status of the jobs.
dry_run (bool, optional): When to run without submitting scripts or
not. If dry_run_level is defined, then it over-rides this flag.
dry_run_level not set and dry_run=True, dry_run_level is set to 2
dry_run_level not set and dry_run=True, dry_run_level is set to 3
(no scripts generated or run). Default for dry_run is False.
no_redshifts (bool, optional): Whether to submit redshifts or not.
If True, redshifts are not submitted.
Expand Down Expand Up @@ -191,7 +191,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,

## Reconcile the dry_run and dry_run_level
if dry_run and dry_run_level == 0:
dry_run_level = 2
dry_run_level = 3
elif dry_run_level > 0:
dry_run = True

Expand Down Expand Up @@ -247,6 +247,8 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
resubmit_partial_complete = (not dont_resubmit_partial_jobs)
require_cals = (not dont_require_cals)
do_cte_flats = (not no_cte_flats)
## False if not submitting or simulating
update_proctable = (dry_run_level == 0 or dry_run_level > 3)

## cte flats weren't available before 20211130 so hardcode that in
if do_cte_flats and night < 20211130:
Expand Down Expand Up @@ -358,7 +360,8 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
## Update processing table
tableng = len(ptable)
if tableng > 0:
ptable = update_from_queue(ptable, dry_run=dry_run_level)
if update_proctable:
ptable = update_from_queue(ptable, dry_run_level=dry_run_level)
if dry_run_level < 3:
write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')
if any_jobs_failed(ptable['STATUS']):
Expand All @@ -373,7 +376,7 @@ def proc_night(night=None, proc_obstypes=None, z_submit_types=None,
ptable, nsubmits = update_and_recursively_submit(ptable,
no_resub_failed=no_resub_failed,
ptab_name=proc_table_pathname,
dry_run=dry_run,
dry_run_level=dry_run_level,
reservation=reservation)
elif not ignore_proc_table_failures:
err = "Some jobs have an incomplete job status. This script " \
Expand Down Expand Up @@ -588,11 +591,10 @@ def create_submit_add_and_save(prow, proctable, check_outputs=check_for_outputs,
################### Wrap things up ###################
unproc_table = None
if len(ptable) > 0:
## All jobs now submitted, update information from job queue and save
## But only if actually submitting or fully simulating, don't simulate
## outputs that will be written to disk (levels 1 and 2)
if dry_run_level < 1 or dry_run_level > 2:
ptable = update_from_queue(ptable, dry_run=dry_run_level)
## All jobs now submitted, update information from job queue
## If dry_run_level > 3, then Slurm isn't queried
if update_proctable:
ptable = update_from_queue(ptable, dry_run_level=dry_run_level)
if dry_run_level < 3:
write_table(ptable, tablename=proc_table_pathname, tabletype='proctable')
## Now that processing is complete, lets identify what we didn't process
Expand Down
19 changes: 13 additions & 6 deletions py/desispec/test/test_proc_night.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@ def test_proc_night_dryrun3(self):
self.assertEqual(len(prodfiles), 1)
self.assertTrue(prodfiles[0].endswith('exposure_tables'))

def test_proc_night_dryrun3(self):
"""Test that dry_run_level=4 doesn't produce any output"""
proctable, unproctable = proc_night(self.night, z_submit_types=['cumulative',],
dry_run_level=4, sub_wait_time=0.0)

prodfiles = glob.glob(self.proddir+'/*')
self.assertEqual(len(prodfiles), 1)
self.assertTrue(prodfiles[0].endswith('exposure_tables'))

def test_proc_night_noz(self):
"""Test that z_submit_types=None doesn't submit any redshift jobs"""

Expand Down Expand Up @@ -203,7 +212,7 @@ def test_proc_night_resubmit_queue_failures(self):
desispec.workflow.proctable.reset_tilenight_ptab_cache()

## test that the code runs
updatedtable2, nsubmits = update_and_recursively_submit(proctable2, submits=0, dry_run=3)
updatedtable2, nsubmits = update_and_recursively_submit(proctable2, submits=0, dry_run_level=4)
self.assertFalse(np.any(np.in1d(updatedtable2['STATUS'], [b'DEP_NOT_SUBD', b'TIMEOUT'])),
msg='No TIMEOUTs in nominal resubmission')

Expand All @@ -214,7 +223,7 @@ def test_proc_night_resubmit_queue_failures(self):
proctable2['STATUS'][proctable2['INTID']==cumulative2['INTID']] = 'TIMEOUT'
updatedtable2, nsubmits = update_and_recursively_submit(proctable2,
submits=0,
dry_run=1)
dry_run_level=4)
self.assertFalse(np.any(np.in1d(updatedtable2['STATUS'], [b'DEP_NOT_SUBD', b'TIMEOUT'])),
msg='Cross night resubmission should leave no TIMEOUTs')

Expand All @@ -237,11 +246,9 @@ def test_proc_night_resubmit_queue_failures(self):
## Run resubmission code
updatedtable2, nsubmits = update_and_recursively_submit(proctable2,
submits=0,
dry_run=1)
self.assertTrue(np.any(np.in1d(updatedtable2['STATUS'], [b'DEP_NOT_SUBD', b'TIMEOUT'])),
msg='Cross night resubmission should leave two TIMEOUTs')
dry_run_level=4)
self.assertTrue(np.sum(updatedtable2['STATUS'] == 'DEP_NOT_SUBD')==2,
msg='Cross night resubmission should have 2 TIMEOUTs' \
msg='Cross night resubmission should have 2 DEP_NOT_SUBDs' \
+ ' after forcing failed previous night jobs.')


Expand Down
Loading

0 comments on commit bd7fd13

Please sign in to comment.