Skip to content

Commit

Permalink
add multiproc to dashboards
Browse files Browse the repository at this point in the history
  • Loading branch information
akremin committed Oct 2, 2024
1 parent b1aec4e commit c2dea61
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 124 deletions.
112 changes: 65 additions & 47 deletions py/desispec/scripts/procdashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
get_exposure_table_column_defaults
from desispec.workflow.proc_dashboard_funcs import get_skipped_ids, \
return_color_profile, find_new_exps, _hyperlink, _str_frac, \
get_output_dir, get_nights_dict, make_html_page, read_json, write_json, \
get_terminal_steps, get_tables
get_output_dir, make_html_page, read_json, write_json, \
get_terminal_steps, get_tables, populate_monthly_tables, get_nights
from desispec.workflow.proctable import get_processing_table_pathname, \
table_row_to_dict
from desispec.workflow.queue import update_from_queue, get_non_final_states
Expand All @@ -31,6 +31,7 @@
from desispec.io.util import decode_camword, camword_to_spectros, \
difference_camwords, parse_badamps, create_camword, camword_intersection, \
erow_to_goodcamword
from desiutil.log import get_logger


def parse(options):
Expand Down Expand Up @@ -72,6 +73,9 @@ def parse(options):
"Default is the earliest night available.")
parser.add_argument('--end-night', type=str, default=None, required=False,
help="This specifies the last night (inclusive) to include in the dashboard. Default is today.")
parser.add_argument('--nproc', type=int, default=1, required=False,
help="The number of processors to use with multiprocessing. " +
"Default is 1.")
parser.add_argument('--check-on-disk', action="store_true",
help="Check raw data directory for additional unaccounted for exposures on disk " +
"beyond the exposure table.")
Expand All @@ -84,17 +88,18 @@ def parse(options):

return args


######################
### Main Functions ###
######################
def main(args=None):
""" Code to generate a webpage for monitoring of desi_dailyproc production status
Usage:
-n can be 'all' or series of nights separated by comma or blank like 20200101,20200102 or 20200101 20200102
Normal Mode:
desi_proc_dashboard -n 3 --output-dir /global/cfs/cdirs/desi/www/collab/dailyproc/
desi_proc_dashboard -n 20200101,20200102 --output-dir /global/cfs/cdirs/desi/www/collab/dailyproc/
""" Code to generate a webpage for monitoring the spectra processing in a production.
Args:
args (argparse.Namespace): The arguments generated from
desispec.scripts.procdashboard.parse()
"""
log = get_logger()
if not isinstance(args, argparse.Namespace):
args = parse(args)

Expand All @@ -112,58 +117,71 @@ def main(args=None):
else:
skipd_expids = None

nights_dict, nights = get_nights_dict(args.nights, args.start_night,
args.end_night, prod_dir)
def populate_night_info_wrapper(night_to_submit):
## Load previous info if any
filename_json = os.path.join(output_dir, 'expjsons',
f'expinfo_{os.environ["SPECPROD"]}'
+ f'_{night_to_submit}.json')
night_json_info = None
if not args.ignore_json_archive:
night_json_info = read_json(filename_json=filename_json)

## get the per exposure info for a night
night_info = populate_night_info(night_to_submit,
check_on_disk=args.check_on_disk,
night_json_info=night_json_info,
skipd_expids=skipd_expids)

if len(night_info) == 0:
return {}

print(f'Searching {prod_dir} for: {nights}')
## write out the night_info to json file
write_json(output_data=night_info, filename_json=filename_json)

monthly_tables = {}
for month, nights_in_month in nights_dict.items():
print("Month: {}, nights: {}".format(month, nights_in_month))
nightly_tables = {}
for night in nights_in_month:
## Load previous info if any
filename_json = os.path.join(output_dir, 'expjsons',
f'expinfo_{os.environ["SPECPROD"]}'
+ f'_{night}.json')
night_json_info = None
if not args.ignore_json_archive:
night_json_info = read_json(filename_json=filename_json)
return night_info.copy()

## get the per exposure info for a night
night_info = populate_night_info(night, check_on_disk=args.check_on_disk,
night_json_info=night_json_info,
skipd_expids=skipd_expids)
nightly_tables[night] = night_info.copy()

## write out the night_info to json file
write_json(output_data=night_info, filename_json=filename_json)
nights = get_nights(args.nights, args.start_night, args.end_night, prod_dir)
log.info(f'Searching {prod_dir} for: {nights}')

monthly_tables[month] = nightly_tables.copy()
monthly_tables = populate_monthly_tables(pernight_info_wrapper=populate_night_info_wrapper,
nights=nights, nproc=args.nproc)

outfile = os.path.abspath(os.path.join(output_dir, args.output_name))
make_html_page(monthly_tables, outfile, titlefill='Exp. Processing',
show_null=args.show_null)



def populate_night_info(night, check_on_disk=False,
night_json_info=None, skipd_expids=None):
def populate_night_info(night, night_json_info=None, check_on_disk=False, skipd_expids=None):
"""
For a given night, return the file counts and other other information for each exposure taken on that night
input: night
output: a dictionary containing the statistics with expid as key name
FLAVOR: FLAVOR of this exposure
OBSTYPE: OBSTYPE of this exposure
EXPTIME: Exposure time
SPECTROGRAPHS: a list of spectrographs used
n_spectrographs: number of spectrographs
n_psf: number of PSF files
n_ff: number of fiberflat files
n_frame: number of frame files
n_sframe: number of sframe files
n_cframe: number of cframe files
n_sky: number of sky files
Use all available information in the SPECPROD to determine whether specific
jobs and exposures have been successfully processed or not based on the existence
of files on disk.
Args:
night (int): the night to check the status of the processing for.
night_json_info (dict of dicts): Dictionary of dictionarys. See output
definition for format.
check_on_disk (bool, optional): True if you want to submit
other jobs even the loaded processing table has incomplete jobs in
it. Use with caution. Default is False.
skipd_expids (bool, optional): Default is False. If False,
the code checks for the existence of the expected final data
products for the script being submitted. If all files exist and
this is False, then the script will not be submitted. If some
files exist and this is False, only the subset of the cameras
without the final data products will be generated and submitted.
Returns:
output (dict of dicts): keys are generally JOBDESC_EXPID. Each value
is a dict with keys of the column names and values as the elements
of the row in the table for each column. The one exception is COLOR
which is used to define the coloring of the row in the dashboard.
Current keys are: "COLOR", "EXPID", "TILEID", "OBSTYPE", "FA SURV",
"FA PRGRM", "LAST STEP", "EXP TIME" ,"PROC CAMWORD", "PSF", "FFLAT",
"FRAME", "SFRAME", "SKY", "STD", "CFRAME", "SLURM FILE", "LOG FILE",
"COMMENTS", and "STATUS".
"""
if skipd_expids is None:
skipd_expids = []
Expand Down
124 changes: 54 additions & 70 deletions py/desispec/scripts/zprocdashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
get_exposure_table_column_defaults, read_minimal_science_exptab_cols
from desispec.workflow.proc_dashboard_funcs import get_skipped_ids, \
return_color_profile, find_new_exps, _hyperlink, _str_frac, \
get_output_dir, get_nights_dict, make_html_page, read_json, write_json, \
get_terminal_steps, get_tables
get_output_dir, make_html_page, read_json, write_json, \
get_terminal_steps, get_tables, get_nights, populate_monthly_tables
from desispec.workflow.proctable import get_processing_table_pathname, \
erow_to_prow, instantiate_processing_table
from desispec.workflow.tableio import load_table
Expand Down Expand Up @@ -73,6 +73,9 @@ def parse(options):
"Default is the earliest night available.")
parser.add_argument('--end-night', type=str, default=None, required=False,
help="This specifies the last night (inclusive) to include in the dashboard. Default is today.")
parser.add_argument('--nproc', type=int, default=1, required=False,
help="The number of processors to use with multiprocessing. " +
"Default is 1.")
parser.add_argument('--check-on-disk', action="store_true",
help="Check raw data directory for additional unaccounted for exposures on disk " +
"beyond the exposure table.")
Expand All @@ -97,12 +100,11 @@ def parse(options):
### Main Functions ###
######################
def main(args=None):
""" Code to generate a webpage for monitoring of desi_dailyproc production status
Usage:
-n can be 'all' or series of nights separated by comma or blank like 20200101,20200102 or 20200101 20200102
Normal Mode:
desi_proc_dashboard -n 3 --output-dir /global/cfs/cdirs/desi/www/collab/dailyproc/
desi_proc_dashboard -n 20200101,20200102 --output-dir /global/cfs/cdirs/desi/www/collab/dailyproc/
""" Code to generate a webpage for monitoring the redshift jobs in a production.
Args:
args (argparse.Namespace): The arguments generated from
desispec.scripts.zprocdashboard.parse()
"""
log = get_logger()
if not isinstance(args, argparse.Namespace):
Expand All @@ -119,70 +121,52 @@ def main(args=None):
## Input ###
############
if args.skip_tileid_file is not None:
skipd_tileids = set(
get_skipped_ids(args.skip_tileid_file, skip_ids=True))
skipd_tileids = list(set(
get_skipped_ids(args.skip_tileid_file, skip_ids=True)))
else:
skipd_tileids = None

nights_dict, nights = get_nights_dict(args.nights, args.start_night,
args.end_night, prod_dir)

log.info(f'Searching {prod_dir} for: {nights}')

## Get all the exposure tables for cross-night dependencies
## Get all the exposure tables read in and cached for more efficient
## access later cross-night dependencies
all_exptabs = read_minimal_science_exptab_cols(nights=None)

## We don't want future days mixing in
all_exptabs = all_exptabs[all_exptabs['NIGHT'] <= np.max(nights)]
## Restrict to only the exptabs relevant to the current dashboard
night_selection = np.isin(all_exptabs['NIGHT'],nights)
tiles = all_exptabs['TILEID'][night_selection]
subset_exptabs = all_exptabs[np.isin(all_exptabs['TILEID'], tiles)]

monthly_tables = {}
for month, nights_in_month in nights_dict.items():
log.info("Month: {}, nights: {}".format(month, nights_in_month))
nightly_tables = {}
for night in nights_in_month:
## Load previous info if any
filename_json = os.path.join(output_dir, 'zjsons',
f'zinfo_{os.environ["SPECPROD"]}'
+ f'_{night}.json')
night_json_zinfo = None
if not args.ignore_json_archive:
night_json_zinfo = read_json(filename_json=filename_json)

## only send table for tiles on the given night
tiles = all_exptabs['TILEID'][all_exptabs['NIGHT']==night]
subset_exptabs = all_exptabs[np.isin(all_exptabs['TILEID'], tiles)]

## get the per exposure info for a night
night_zinfo = populate_night_zinfo(night, doem=doem, doqso=doqso,
dotileqa=dotileqa, dozmtl=dozmtl,
check_on_disk=args.check_on_disk,
night_json_zinfo=night_json_zinfo,
skipd_tileids=skipd_tileids,
all_exptabs=subset_exptabs)

if len(night_zinfo) == 0:
continue

nightly_tables[night] = night_zinfo.copy()

def populate_night_zinfo_wrapper(night_to_submit):
## Load previous info if any
filename_json = os.path.join(output_dir, 'zjsons',
f'zinfo_{os.environ["SPECPROD"]}'
+ f'_{night_to_submit}.json')
night_json_zinfo = None
if not args.ignore_json_archive:
night_json_zinfo = read_json(filename_json=filename_json)

## get the per exposure info for a night
night_zinfo = populate_night_zinfo(night_to_submit, doem=doem, doqso=doqso,
dotileqa=dotileqa, dozmtl=dozmtl,
check_on_disk=args.check_on_disk,
night_json_zinfo=night_json_zinfo,
skipd_tileids=skipd_tileids)

if len(night_zinfo) > 0:
## write out the night_info to json file
write_json(output_data=night_zinfo, filename_json=filename_json)

monthly_tables[month] = nightly_tables.copy()
return night_zinfo

nights = get_nights(args.nights, args.start_night, args.end_night, prod_dir)
log.info(f'Searching {prod_dir} for: {nights}')

monthly_tables = populate_monthly_tables(pernight_info_wrapper=populate_night_zinfo_wrapper,
nights=nights, nproc=args.nproc)

outfile = os.path.abspath(os.path.join(output_dir, args.output_name))
make_html_page(monthly_tables, outfile, titlefill='z Processing',
show_null=args.show_null)



def populate_night_zinfo(night, doem=True, doqso=True, dotileqa=True, dozmtl=True,
check_on_disk=False, night_json_zinfo=None,
skipd_tileids=None, all_exptabs=None):
def populate_night_zinfo(night, night_json_zinfo=None, doem=True, doqso=True,
dotileqa=True, dozmtl=True, check_on_disk=False,
skipd_tileids=None):
"""
For a given night, return the file counts and other information
for each zproc job (either per-exposure, per-night, or cumulative for
Expand All @@ -191,32 +175,30 @@ def populate_night_zinfo(night, doem=True, doqso=True, dotileqa=True, dozmtl=Tru
Args:
night (int): the night to check the status of the processing for.
night_json_zinfo (dict of dicts): Dictionary of dictionarys. See output
definition for format.
doem (bool): true if it should expect emline files. Default is True.
doqso (bool): true if it should expect qso_qn and qso_mgii files.
Default is True.
dotileqa (bool): true if it should expect tileqa files. Default is True.
dozmtl (bool): true if it should expect zmtl files. Default is True.
check_on_disk (bool): true if it should check on disk for missing
exposures and tiles that aren't represented in the exposure tables.
night_json_zinfo (dict): A dictionary of dicts where each key is a unique
identifier to the row. Each value is a dictionary container the
column information in addition to other metadata. Meant to be a
way of passing cached values from a previous run of this function.
skipd_tileids (list): List of tileids that should be skipped and not
listed in the output dashboard.
all_exptabs (astropy.table.Table): A stacked exposure table with minimal
columns returned from read_minimal_science_exptab_cols(). Used for
cumulative redshifts jobs to identify tile data from previous nights.
Returns dict:
A dictionary of dicts. Each item is information for a row of the output
dashboard for a redshift job on the requested night. Each key is a
unique identifier to the row. Each value is a dictionary container
the column information in addition to other metadata about the state
of the processing and file counts.
output (dict of dicts): keys are generally JOBDESC_EXPID or
JOBDESC_TILEID. Each value is a dict with keys of the column names
and values as the elements of the row in the table for each column.
The one exception is COLOR which is used to define the coloring
of the row in the dashboard. Current keys are: "COLOR",
"TILEID", "ZTYPE", "EXPIDS", "FA SURV", "FA PRGRM", "PROC CAMWORD",
"SPECTRA", "COADD", "REDROCK", "RRDETAILS", "TILEQA", "ZMTL", "QN",
"MGII", "EMLINE", "SLURM FILE", "LOG FILE", "COMMENTS", and "STATUS".
"""
log = get_logger()

if skipd_tileids is None:
skipd_tileids = []
## Note that the following list should be in order of processing. I.e. the first filetype given should be the
Expand Down Expand Up @@ -271,6 +253,8 @@ def populate_night_zinfo(night, doem=True, doqso=True, dotileqa=True, dozmtl=Tru
exptab = orig_exptab[((orig_exptab['OBSTYPE'] == 'science')
& (orig_exptab['LASTSTEP'] == 'all'))]

all_exptabs = read_minimal_science_exptab_cols(tileids=np.unique(list(exptab['TILEID'])).astype(int))

if proctab is None:
if len(exptab) == 0:
log.warning(f"No redshiftable exposures found on night {night}. Skipping")
Expand Down
Loading

0 comments on commit c2dea61

Please sign in to comment.