diff --git a/examples/error_types.yaml b/examples/error_types.yaml new file mode 100644 index 000000000..5a4cb46e6 --- /dev/null +++ b/examples/error_types.yaml @@ -0,0 +1,8 @@ +- PipetaskErrorType: + source: manifest + flavor: configuration + action: review + task_name: skyObjectMean + diagnostic_message: 'Execution of task ''skyObjectMean'' on quantum.*failed. Exception ValueError: Failure + from formatter ''lsst.daf.butler.formatters.parquet.ParquetFormatter'' for dataset.*: + Column d_pixelFlags_edge specified in parameters not available in parquet file.' diff --git a/examples/example_config.yaml b/examples/example_config.yaml index 72e468924..16cc56c92 100644 --- a/examples/example_config.yaml +++ b/examples/example_config.yaml @@ -5,8 +5,15 @@ name: chain_prepend_script handler: lsst.cmservice.handlers.scripts.ChainPrependScriptHandler - SpecBlock: - name: chain_collect_script + name: chain_collect_jobs_script handler: lsst.cmservice.handlers.scripts.ChainCollectScriptHandler + data: + collect: jobs +- SpecBlock: + name: chain_collect_steps_script + handler: lsst.cmservice.handlers.scripts.ChainCollectScriptHandler + data: + collect: steps - SpecBlock: name: tag_inputs_script handler: lsst.cmservice.handlers.scripts.TagInputsScriptHandler @@ -19,17 +26,20 @@ - SpecBlock: name: prepare_step_script handler: lsst.cmservice.handlers.scripts.PrepareStepScriptHandler + collections: + global_inputs: "{campaign_input}" - SpecBlock: name: validate_script handler: lsst.cmservice.handlers.scripts.ValidateScriptHandler - SpecBlock: - name: run_wms_job - handler: lsst.cmservice.handlers.scripts.PandaScriptHandler - data: - rescue: false + name: panda_script + handler: lsst.cmservice.handlers.jobs.PandaScriptHandler +- SpecBlock: + name: panda_report_script + handler: lsst.cmservice.handlers.jobs.PandaReportHandler - SpecBlock: - name: run_manifest_checker - handler: lsst.cmservice.handlers.scripts.ManifestCheckerHandler + name: manifest_report_script + handler: lsst.cmservice.handlers.jobs.ManifestReportScriptHandler - SpecBlock: name: run_jobs handler: lsst.cmservice.handlers.elements.RunJobsScriptHandler @@ -43,23 +53,29 @@ name: job handler: lsst.cmservice.handlers.jobs.PandaJobHandler collections: - job_output_run: "{root}/{campaign}/{step}/{group}/{job}" - job_output: "{group_output}" - data: - rescue: false + job_run: "{root}/{campaign}/{step}/{group}/{job}" scripts: - Script: - name: wms_job - spec_block: run_wms_job + name: bps + spec_block: panda_script collections: - input: "{step_input}" - run: "{job_output_run}" + run: "{job_run}" + inputs: ["{step_input}", "{campaign_input}", "{campaign_ancillary}"] - Script: - name: manifest_checker - prerequisites: ['wms_job'] - spec_block: run_manifest_checker + name: bps_report + spec_block: panda_report_script + prerequisites: ['bps'] collections: - run: "{job_output_run}" + run: "{job_run}" + inputs: ["{step_input}", "{campaign_input}", "{campaign_ancillary}"] + - Script: + name: manifest_report + spec_block: manifest_report_script + prerequisites: ['bps_report'] + collections: + run: "{job_run}" + data: + rescue: false - SpecBlock: name: group collections: @@ -69,22 +85,8 @@ - Script: name: run spec_block: run_jobs - child_config: - spec_block: job - - Script: - name: validate - prerequisites: ['run'] - spec_block: validate_script - collections: - input: "{group_output}" - output: "{group_validation}" - - Script: - add_to_output: - prerequisites: ['validate'] - spec_block: tag_associate_script - collections: - input: "{group_output}" - output: "{campaign_tagged_output}" + child_config: + spec_block: job - SpecBlock: name: step collections: @@ -98,7 +100,7 @@ spec_block: prepare_step_script collections: output: "{step_input}" - inputs: ["{campaign_input}", "{campaign_ancilllary}"] + inputs: ["{campaign_input}", "{campaign_ancillary}"] - Script: name: run prerequisites: ['prepare'] @@ -106,7 +108,7 @@ - Script: name: collect_groups prerequisites: ['run'] - spec_block: chain_collect_script + spec_block: chain_collect_jobs_script collections: inputs: [] output: "{step_output}" @@ -115,15 +117,132 @@ prerequisites: ['collect_groups'] spec_block: chain_create_script collections: - inputs: ["{step_output}", "{campaign_input}", "{campaign_ancilllary}"] + inputs: ["{step_output}", "{campaign_input}", "{campaign_ancillary}"] output: "{step_public_output}" - - Script: - name: validate - prerequisites: ['make_step_public_output'] - spec_block: validate_script - collections: - input: "{step_public_output}" - output: "{step_validation}" +- SpecBlock: + name: dc2_step1 + includes: ['step'] + data: + pipeline_yaml: "${DRP_PIPE_DIR}/pipelines/LSSTCam-imSim/DRP-test-med-1.yaml#step1" + child_config: + spec_block: group + base_query: "instrument='LSSTCam-imSim' and skymap='DC2'" + split_method: split_by_query + split_dataset: raw + split_field: exposure + split_min_groups: 3 +- SpecBlock: + name: dc2_step2 + includes: ['step'] + data: + pipeline_yaml: "${DRP_PIPE_DIR}/pipelines/LSSTCam-imSim/DRP-test-med-1.yaml#step2" + child_config: + spec_block: group + base_query: "instrument='LSSTCam-imSim' and skymap='DC2'" + split_method: no_split +- SpecBlock: + name: dc2_step3 + includes: ['step'] + data: + pipeline_yaml: "${DRP_PIPE_DIR}/pipelines/LSSTCam-imSim/DRP-test-med-1.yaml#step3" + child_config: + spec_block: group + base_query: "instrument='LSSTCam-imSim' and skymap='DC2'" + split_method: split_by_vals + split_field: tract + split_vals: + - 3828 + - 3829 +- SpecBlock: + name: dc2_step4 + includes: ['step'] + data: + pipeline_yaml: "${DRP_PIPE_DIR}/pipelines/LSSTCam-imSim/DRP-test-med-1.yaml#step4" + child_config: + spec_block: group + base_query: "instrument='LSSTCam-imSim' and skymap='DC2'" + split_method: split_by_query + split_dataset: calexp + split_field: visit + split_min_groups: 4 +- SpecBlock: + name: dc2_step5 + includes: ['step'] + data: + pipeline_yaml: "${DRP_PIPE_DIR}/pipelines/LSSTCam-imSim/DRP-test-med-1.yaml#step5" + child_config: + spec_block: group + base_query: "instrument='LSSTCam-imSim' and skymap='DC2'" + split_method: split_by_vals + split_field: tract + split_vals: + - 3828 + - 3829 +- SpecBlock: + name: dc2_step6 + includes: ['step'] + data: + pipeline_yaml: "${DRP_PIPE_DIR}/pipelines/LSSTCam-imSim/DRP-test-med-1.yaml#step6" + child_config: + spec_block: group + base_query: "instrument='LSSTCam-imSim' and skymap='DC2'" + split_method: split_by_query + split_dataset: calexp + split_field: visit + split_min_groups: 4 +- SpecBlock: + name: dc2_step7 + includes: ['step'] + data: + pipeline_yaml: "${DRP_PIPE_DIR}/pipelines/LSSTCam-imSim/DRP-test-med-1.yaml#step7" + child_config: + spec_block: group + base_query: "instrument='LSSTCam-imSim' and skymap='DC2'" + split_method: no_split +- SpecBlock: + name: dc2_step8 + includes: ['step'] + data: + pipeline_yaml: "${DRP_PIPE_DIR}/pipelines/LSSTCam-imSim/DRP-test-med-1.yaml#step8" + child_config: + spec_block: group + base_query: "instrument='LSSTCam-imSim' and skymap='DC2'" + split_method: no_split +- SpecBlock: + name: dc2_faro_visit + includes: ['step'] + data: + pipeline_yaml: "${DRP_PIPE_DIR}/pipelines/LSSTCam-imSim/DRP-test-med-1.yaml#faro_visit" + child_config: + spec_block: group + base_query: "instrument='LSSTCam-imSim' and skymap='DC2'" + split_method: no_split +- SpecBlock: + name: dc2_faro_matched + includes: ['step'] + data: + pipeline_yaml: "${DRP_PIPE_DIR}/pipelines/LSSTCam-imSim/DRP-test-med-1.yaml#faro_matched" + child_config: + spec_block: group + base_query: "instrument='LSSTCam-imSim' and skymap='DC2'" + split_method: no_split +- SpecBlock: + name: dc2_faro_tract + includes: ['step'] + data: + pipeline_yaml: "${DRP_PIPE_DIR}/pipelines/LSSTCam-imSim/DRP-test-med-1.yaml#faro_tract" + child_config: + spec_block: group + base_query: "instrument='LSSTCam-imSim' and skymap='DC2'" +- SpecBlock: + name: dc2_plots + includes: ['step'] + data: + pipeline_yaml: "${DRP_PIPE_DIR}/pipelines/LSSTCam-imSim/DRP-test-med-1.yaml#analysis_coadd_plots" + child_config: + spec_block: group + base_query: "instrument='LSSTCam-imSim' and skymap='DC2'" + split_method: no_split - SpecBlock: name: campaign collections: @@ -131,7 +250,6 @@ campaign_source: /prod/raw/all campaign_input: "{root}/{campaign}/input" campaign_output: "{root}/{campaign}" - campaign_tagged_output: "{root}/{campaign}/_output" campaign_ancillary: "{root}/{campaign}/ancillary" campaign_validation: "{root}/{campaign}/validate" scripts: @@ -150,70 +268,82 @@ - other_calib_input output: - "{campaign_ancillary}" - - Script: - name: create_empty_output - spec_block: tag_create_script - collections: - output: "{campaign_tagged_output}" - - Script: - name: create_chained_output - spec_block: chain_create_script - prerequisites: ['tag_inputs', 'ancillary', 'create_empty_output'] - collections: - inputs: - - "{campaign_ancillary}" - - "{campaign_input}" - - "{campaign_tagged_output}" - output: - - "{campaign_output}" - Script: name: run spec_block: run_steps - prerequisites: ['create_chained_output'] + prerequisites: ['tag_inputs', 'ancillary'] - Script: - name: validate - spec_block: validate_script + name: collect_steps prerequisites: ['run'] + spec_block: chain_collect_steps_script collections: - input: "{campaign_output}" - output: "{campaign_validation}" + inputs: [] + output: "{campaign_output}" child_config: step1: - spec_block: step - data: - pipeline_yaml: "${OBS_LSST_DIR}/pipelines/imsim/DRP.yaml#step1" + spec_block: dc2_step1 child_config: - spec_block: group - split_methd: by_field_value - base_query: "a == 1" - split_field: "b" - split_vals: ["1", "2", "3"] + base_query: "instrument='LSSTCam-imSim' and skymap='DC2' and tract in (3828, 3829)" step2: - spec_block: step + spec_block: dc2_step2 prerequisites: ['step1'] - data: - pipeline_yaml: "${OBS_LSST_DIR}/pipelines/imsim/DRP.yaml#step2" child_config: - spec_block: group - split_methd: by_field_value - base_query: "a == 1" - split_field: "b" - split_vals: ["1", "2", "3"] + base_query: "instrument='LSSTCam-imSim' and skymap='DC2' and tract in (3828, 3829)" step3: - spec_block: step + spec_block: dc2_step3 prerequisites: ['step2'] - data: - pipeline_yaml: "${OBS_LSST_DIR}/pipelines/imsim/DRP.yaml#step3" child_config: - spec_block: group - split_methd: by_field_value - base_query: "a == 1" - split_field: "b" - split_vals: ["1", "2", "3"] + base_query: "instrument='LSSTCam-imSim' and skymap='DC2'" + step4: + spec_block: dc2_step4 + prerequisites: ['step3'] + child_config: + base_query: "instrument='LSSTCam-imSim' and skymap='DC2' and tract in (3828, 3829)" + step5: + spec_block: dc2_step5 + prerequisites: ['step4'] + child_config: + base_query: "instrument='LSSTCam-imSim' and skymap='DC2' and tract in (3828, 3829)" + step6: + spec_block: dc2_step6 + prerequisites: ['step4'] + child_config: + base_query: "instrument='LSSTCam-imSim' and skymap='DC2' and tract in (3828, 3829)" + step7: + spec_block: dc2_step7 + prerequisites: ['step3'] + child_config: + base_query: "instrument='LSSTCam-imSim' and skymap='DC2' and tract in (3828, 3829)" + step8: + spec_block: dc2_step8 + prerequisites: ['step3'] + child_config: + base_query: "instrument='LSSTCam-imSim' and skymap='DC2' and tract in (3828, 3829)" + faro_visit: + spec_block: dc2_faro_visit + prerequisites: ['step6'] + child_config: + base_query: "instrument='LSSTCam-imSim' and skymap='DC2' and tract in (3828, 3829)" + faro_matched: + spec_block: dc2_faro_matched + prerequisites: ['step6'] + child_config: + base_query: "instrument='LSSTCam-imSim' and skymap='DC2' and tract in (3828, 3829)" + faro_tract: + spec_block: dc2_faro_tract + prerequisites: ['step3'] + child_config: + base_query: "instrument='LSSTCam-imSim' and skymap='DC2' and tract in (3828, 3829)" + plots: + spec_block: dc2_plots + prerequisites: ['step3'] + child_config: + base_query: "instrument='LSSTCam-imSim' and skymap='DC2' and tract in (3828, 3829)" data: - butler_repo: 'dummy' + butler_repo: '/repo/dc2' prod_area: 'output/archive' - data_query: "instrument = 'HSC' and exposure < 500" + data_query: "instrument='LSSTCam-imSim' and skymap='DC2' and tract in (3828, 3829)" bps_yaml_template: "${CM_CONFIGS}/example_template.yaml" bps_script_template: "${CM_CONFIGS}/example_bps_template.sh" + manifest_script_template: "${CM_CONFIGS}/example_manifest_template.sh" lsst_version: "${WEEKLY}" diff --git a/src/lsst/cmservice/db/element_handler.py b/src/lsst/cmservice/db/element_handler.py index 2bb088b1d..bfba78892 100644 --- a/src/lsst/cmservice/db/element_handler.py +++ b/src/lsst/cmservice/db/element_handler.py @@ -90,6 +90,8 @@ async def process( status = await self.check(session, element, **kwargs) if status == StatusEnum.running: status = await self.continue_processing(session, element, **kwargs) + if status == StatusEnum.reviewable: + status = await self.review(session, element, *kwargs) if status != element.status: await element.update_values(session, status=status) return status @@ -201,6 +203,35 @@ async def continue_processing( await session.commit() return StatusEnum.running + async def review( + self, + session: async_scoped_session, + element: ElementMixin, + **kwargs: Any, + ) -> StatusEnum: + """Review a `Element` processing + + By default this does nothing, but + can be used to automate checking + that the element is ok + + Parameters + ---------- + session : async_scoped_session + DB session manager + + element: ElementMixin + Element in question + + Returns + ------- + status : StatusEnum + The status of the processing + """ + assert session + assert element + return element.status + async def _run_script_checks( self, session: async_scoped_session, @@ -264,7 +295,7 @@ async def _run_job_checks( else: await job_.run_check( session, - job_.group_, + job_.parent_, ) async def check( diff --git a/src/lsst/cmservice/db/functions.py b/src/lsst/cmservice/db/functions.py index b6ab80d86..3b8a6ed71 100644 --- a/src/lsst/cmservice/db/functions.py +++ b/src/lsst/cmservice/db/functions.py @@ -294,6 +294,8 @@ async def load_error_types( raise KeyError(f"Expecting PipetaskErrorType items not: {error_type_.keys()})") from msg new_error_type = await PipetaskErrorType.create_row(session, **val) + if TYPE_CHECKING: + assert isinstance(new_error_type, PipetaskErrorType) ret_list.append(new_error_type) return ret_list diff --git a/src/lsst/cmservice/db/group.py b/src/lsst/cmservice/db/group.py index f56179bfe..fb3c429cc 100644 --- a/src/lsst/cmservice/db/group.py +++ b/src/lsst/cmservice/db/group.py @@ -59,13 +59,7 @@ class Group(Base, ElementMixin): parent_: Mapped["Step"] = relationship("Step", viewonly=True) scripts_: Mapped[List["Script"]] = relationship("Script", viewonly=True) - jobs_: Mapped[List["Job"]] = relationship( - "Job", - primaryjoin="Job.script_id==Script.id", - secondary="join(Script, Group)", - secondaryjoin="Script.g_id==Group.id", - viewonly=True, - ) + jobs_: Mapped[List["Job"]] = relationship("Job", viewonly=True) @hybrid_property def db_id(self) -> DbId: diff --git a/src/lsst/cmservice/db/interface.py b/src/lsst/cmservice/db/interface.py index 5e5ba1b17..d9c96ff00 100644 --- a/src/lsst/cmservice/db/interface.py +++ b/src/lsst/cmservice/db/interface.py @@ -155,8 +155,6 @@ def get_node_type_by_fullname( """ if fullname.find("script:") == 0: return NodeTypeEnum.script - if fullname.find("job:") == 0: - return NodeTypeEnum.job return NodeTypeEnum.element @@ -232,8 +230,6 @@ async def get_node_by_fullname( return await get_element_by_fullname(session, fullname) if node_type == NodeTypeEnum.script: result = await db.Script.get_row_by_fullname(session, fullname[7:]) - if node_type == NodeTypeEnum.job: - result = await db.Job.get_row_by_fullname(session, fullname[4:]) if TYPE_CHECKING: assert isinstance(result, db.NodeMixin) return result @@ -811,8 +807,6 @@ async def process( return await process_element(session, fullname, fake_status=fake_status) if node_type == NodeTypeEnum.script: return await process_script(session, fullname[7:], fake_status=fake_status) - if node_type == NodeTypeEnum.job: - return await process_job(session, fullname[4:], fake_status=fake_status) raise ValueError(f"Tried to process an row from a table of type {node_type}") diff --git a/src/lsst/cmservice/db/job.py b/src/lsst/cmservice/db/job.py index d2ccbabbe..4d95e1e5e 100644 --- a/src/lsst/cmservice/db/job.py +++ b/src/lsst/cmservice/db/job.py @@ -45,7 +45,7 @@ class Job(Base, ElementMixin): secondaryjoin="Group.parent_id==Step.id", viewonly=True, ) - parent_: Mapped["Step"] = relationship("Step", viewonly=True) + parent_: Mapped["Group"] = relationship("Group", viewonly=True) scripts_: Mapped[List["Script"]] = relationship("Script", viewonly=True) tasks_: Mapped[List["TaskSet"]] = relationship("TaskSet", viewonly=True) products_: Mapped[List["ProductSet"]] = relationship("ProductSet", viewonly=True) diff --git a/src/lsst/cmservice/db/job_handler.py b/src/lsst/cmservice/db/job_handler.py index 570acd7c3..f9d4d0008 100644 --- a/src/lsst/cmservice/db/job_handler.py +++ b/src/lsst/cmservice/db/job_handler.py @@ -1,502 +1,7 @@ from __future__ import annotations -import os -from typing import TYPE_CHECKING, Any +from .element_handler import ElementHandler -from sqlalchemy.ext.asyncio import async_scoped_session -from ..common.bash import check_stamp_file, run_bash_job, write_bash_script -from ..common.enums import JobMethod, ScriptMethod, StatusEnum -from ..common.slurm import check_slurm_job, submit_slurm_job -from .handler import Handler - -if TYPE_CHECKING: - from .element import ElementMixin - from .job import Job - - -class JobHandler(Handler): - """SubClass of Handler to deal with job operations""" - - default_wms_method = JobMethod.panda - default_manifest_method = ScriptMethod.slurm - - wms_svc_class_name: str | None = None - - async def process( - self, - session: async_scoped_session, - job: Job, - parent: ElementMixin, - **kwargs: Any, - ) -> StatusEnum: - """Process a `Job` as much as possible - - Parameters - ---------- - session : async_scoped_session - DB session manager - - job: Job - The `Job` in question - - parent: ElementMixin - Parent Element of the `Job` in question - - kwargs: Any - Used to override processing configuration - - Returns - ------- - status : StatusEnum - The status of the processing - """ - status = job.status - if status == StatusEnum.waiting: - status = StatusEnum.ready - if status == StatusEnum.ready: - status = await self.prepare(session, job, parent, **kwargs) - if status == StatusEnum.running: - status = await self.check(session, job, parent, **kwargs) - if status != job.status: - await job.update_values(session, status=status) - return status - - async def prepare( - self, - session: async_scoped_session, - job: Job, - parent: ElementMixin, - **kwargs: Any, - ) -> StatusEnum: - """Prepare `Job` for processing - - This means writing (but not running) the bps script and - configuraiton file - - Parameters - ---------- - session : async_scoped_session - DB session manager - - job: Job - The `Job` in question - - parent: ElementMixin - Parent Element of the `Job` in question - - kwargs: Any - Used to override processing configuration - - Returns - ------- - status : StatusEnum - The status of the processing - """ - if job.wms_status in [StatusEnum.waiting, StatusEnum.ready]: - job_wms_method = job.wms_method - if job_wms_method == JobMethod.default: - job_wms_method = self.default_wms_method - - if job_wms_method == JobMethod.panda: # pragma: no cover - wms_status = await self._write_wms_config(session, job, parent, **kwargs) - else: - raise NotImplementedError(f"Method {job_wms_method} not implemented for {job}") - - if wms_status != job.wms_status: - await job.update_values(session, wms_status=wms_status) - - if job.manifest_status in [StatusEnum.waiting, StatusEnum.ready]: - job_manifest_method = job.manifest_method - if job_manifest_method == ScriptMethod.default: - job_manifest_method = self.default_manifest_method - if job_manifest_method == ScriptMethod.no_script: - raise ValueError("ScriptMethod.no_script can not be set for Job.manifest_method") - if job_manifest_method == ScriptMethod.bash: - manifest_status = await self._write_manifest_script(session, job, parent, **kwargs) - elif job_manifest_method == ScriptMethod.slurm: # pragma: no cover - manifest_status = await self._write_manifest_script(session, job, parent, **kwargs) - else: - raise NotImplementedError(f"Method {job_manifest_method} not implemented for {job}") - if manifest_status != job.manifest_status: - await job.update_values(session, manifest_status=manifest_status) - await session.commit() - return manifest_status - - async def check( - self, - session: async_scoped_session, - job: Job, - parent: ElementMixin, - **kwargs: Any, - ) -> StatusEnum: - """Check on a `Job` processing - - Generally this means asking the HPC system - (panda or condor) about the job - - Parameters - ---------- - session : async_scoped_session - DB session manager - - job: Job - The `Job` in question - - parent: ElementMixin - Parent Element of the `Job` in question - - Returns - ------- - status : StatusEnum - The status of the processing - """ - status = job.status - wms_status = await self._process_wms(session, job, parent, **kwargs) - if wms_status.value < 0: - status = StatusEnum.failed - elif wms_status.accepted: - manifest_status = await self._process_manifest(session, job, parent, **kwargs) - if manifest_status.value < 0: - status = StatusEnum.failed - elif manifest_status.value >= StatusEnum.reviewable.value: - status = manifest_status - if status != job.status: - await job.update_values(session, status=status) - - await session.commit() - return status - - async def _process_wms( - self, - session: async_scoped_session, - job: Job, - parent: ElementMixin, - **kwargs: Any, - ) -> StatusEnum: - """Process the wms script checking - - Parameters - ---------- - session : async_scoped_session - DB session manager - - job: Job - The `Job` in question - - parent: ElementMixin - Parent Element of the `Job` in question - - kwargs: Any - Used to override processing configuration - - Returns - ------- - status : StatusEnum - The status of the processing - """ - job_wms_method = job.wms_method - if job_wms_method == JobMethod.default: - job_wms_method = self.default_wms_method - - fake_status = kwargs.get("fake_status") - if fake_status is not None: - wms_status = fake_status - elif job_wms_method == JobMethod.panda: # pragma: no cover - wms_status = await self._check_wms_url(session, job.wms_stamp_url, job, parent, **kwargs) - else: - raise NotImplementedError(f"Method {job_wms_method} not implemented for {job}") - - if wms_status != job.wms_status: - await job.update_values(session, wms_status=wms_status) - return wms_status - - async def _process_manifest( - self, - session: async_scoped_session, - job: Job, - parent: ElementMixin, - **kwargs: Any, - ) -> StatusEnum: - """Process the manifest checking - - Parameters - ---------- - session : async_scoped_session - DB session manager - - job: Job - The `Job` in question - - parent: ElementMixin - Parent Element of the `Job` in question - - kwargs: Any - Used to override processing configuration - - Returns - ------- - status : StatusEnum - The status of the processing - """ - status = job.manifest_status - if status in [StatusEnum.waiting, StatusEnum.ready]: - status = StatusEnum.prepared - if status == StatusEnum.prepared: - status = await self._launch_manifest(session, job, parent, **kwargs) - if status == StatusEnum.running: - status = await self._check_manifest(session, job, parent, **kwargs) - if status == StatusEnum.reviewable: - status = await self._review_manifest(session, job, parent, **kwargs) - if status != job.manifest_status: - await job.update_values(session, manifest_status=status) - return status - - async def _write_wms_config( - self, - session: async_scoped_session, - job: Job, - parent: ElementMixin, - **kwargs: Any, - ) -> StatusEnum: - """Hook for subclasses to write a bps job for processing - - Parameters - ---------- - session : async_scoped_session - DB session manager - - job: Job - The `Job` in question - - parent: ElementMixin - Parent Element of the `Job` in question - - kwargs: Any - Used to override processing configuration - - Returns - ------- - status : StatusEnum - The status of the processing - """ - raise NotImplementedError() - - async def _check_wms_url( - self, - session: async_scoped_session, - url: str | None, - job: Job, - parent: ElementMixin, - **kwargs: Any, - ) -> StatusEnum: - """Hook for subclasses to check on `Script` processing - - Parameters - ---------- - session : async_scoped_session - DB session manager - - url: str | None - The url to use to get infomation about the job - - job: Job - The `Job` in question - - parent: ElementMixin - Parent Element of the `Job` in question - - kwargs: Any - Used to override processing configuration - - Returns - ------- - status : StatusEnum - The status of the processing - """ - raise NotImplementedError() - - async def _launch_manifest( - self, - session: async_scoped_session, - job: Job, - parent: ElementMixin, - **kwargs: Any, - ) -> StatusEnum: - """Run the manifest checking - - Parameters - ---------- - session : async_scoped_session - DB session manager - - job: Job - The `Job` in question - - parent: ElementMixin - Parent Element of the `Job` in question - - kwargs: Any - Used to override processing configuration - - Returns - ------- - status : StatusEnum - The status of the processing - """ - assert parent - manifest_method = job.manifest_method - if manifest_method == ScriptMethod.default: - manifest_method = self.default_manifest_method - - fake_status = kwargs.get("fake_status", None) - if manifest_method == ScriptMethod.no_script: # pragma: no cover - raise ValueError("ScriptMethod.no_script can not be set for Job.manifest_method") - if fake_status is not None: - status = fake_status - elif manifest_method == ScriptMethod.bash: - assert job.manifest_script_url - assert job.manifest_log_url - await run_bash_job(job.manifest_script_url, job.manifest_log_url) - status = StatusEnum.running - elif manifest_method == ScriptMethod.slurm: # pragma: no cover - assert job.manifest_script_url - assert job.manifest_log_url - job_id = await submit_slurm_job(job.manifest_script_url, job.manifest_log_url) - status = StatusEnum.running - await job.update_values(session, manifest_stamp_url=job_id, manifest_status=status) - if status != job.manifest_status: - await job.update_values(session, manifest_status=status) - await session.commit() - return status - - async def _check_manifest( - self, - session: async_scoped_session, - job: Job, - parent: ElementMixin, - **kwargs: Any, - ) -> StatusEnum: - """Check that the manifest checking has completed - - Parameters - ---------- - session : async_scoped_session - DB session manager - - job: Job - The `Job` in question - - parent: ElementMixin - Parent Element of the `Job` in question - - kwargs: Any - Used to override processing configuration - - Returns - ------- - status : StatusEnum - The status of the processing - """ - assert parent - job_manifest_method = job.manifest_method - if job_manifest_method == ScriptMethod.default: - job_manifest_method = self.default_manifest_method - - fake_status = kwargs.get("fake_status") - if fake_status is not None: - manifest_status = fake_status - elif job_manifest_method == ScriptMethod.bash: - manifest_status = await check_stamp_file(str(job.manifest_stamp_url)) - elif job_manifest_method == ScriptMethod.slurm: - manifest_status = await check_slurm_job(job.manifest_stamp_url) - else: - raise NotImplementedError(f"Method {job_manifest_method} not implemented for {job}") - if manifest_status is None: - manifest_status = job.manifest_status - if manifest_status != job.manifest_status: - await job.update_values(session, manifest_status=manifest_status) - return manifest_status - - async def _review_manifest( - self, - session: async_scoped_session, - job: Job, - parent: ElementMixin, - **kwargs: Any, - ) -> StatusEnum: - """Review the manifest checker outupt - - Parameters - ---------- - session : async_scoped_session - DB session manager - - job: Job - The `Job` in question - - parent: ElementMixin - Parent Element of the `Job` in question - - kwargs: Any - Used to override processing configuration - - Returns - ------- - status : StatusEnum - The status of the processing - """ - # FIXME - raise NotImplementedError() - - async def _write_manifest_script( - self, - session: async_scoped_session, - job: Job, - parent: ElementMixin, - **kwargs: Any, - ) -> StatusEnum: - """Hook for subclasses to write a script to check the manifest - - Parameters - ---------- - session : async_scoped_session - DB session manager - - job: Job - The `Job` in question - - parent: ElementMixin - Parent Element of the `Job` in question - - kwargs: Any - Used to override processing configuration - - Returns - ------- - status : StatusEnum - The status of the processing - """ - data_dict = await job.data_dict(session) - prod_area = os.path.expandvars(data_dict["prod_area"]) - manifest_script_template = os.path.expandvars(data_dict["manifest_script_template"]) - lsst_version = os.path.expandvars(data_dict["lsst_version"]) - butler_repo = os.path.expandvars(data_dict["butler_repo"]) - - manifest_script_url = f"{prod_area}/{job.fullname}_manifest.sh" - manifest_log_url = f"{prod_area}/{job.fullname}_manifest.log" - manifest_report_url = f"{prod_area}/{job.fullname}_manifest_report.yaml" - - with open(manifest_script_template, "r") as fin: - prepend = fin.read().replace("{lsst_version}", lsst_version) - - graph = os.path.abspath(os.path.expandvars(f"{prod_area}/{job.fullname}/submit/run.graph")) - command = f"cm-manifest --butler_repo {butler_repo} --graph {graph} > {manifest_report_url}" - await write_bash_script(manifest_script_url, command, prepend=prepend) - await job.update_values( - session, - manifest_script_url=manifest_script_url, - manifest_log_url=manifest_log_url, - manifest_report_url=manifest_report_url, - ) - return StatusEnum.prepared +class JobHandler(ElementHandler): + """SubClass of ElementHandler to deal with job operations""" diff --git a/src/lsst/cmservice/db/node.py b/src/lsst/cmservice/db/node.py index 5822c7b1e..a98e34410 100644 --- a/src/lsst/cmservice/db/node.py +++ b/src/lsst/cmservice/db/node.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any +from typing import TYPE_CHECKING, Any from sqlalchemy.ext.asyncio import async_scoped_session @@ -9,6 +9,9 @@ from .row import RowMixin from .specification import SpecBlock, Specification +if TYPE_CHECKING: + from .element import ElementMixin + class NodeMixin(RowMixin): """Mixin class to define common features of database rows @@ -25,6 +28,7 @@ class NodeMixin(RowMixin): spec_block_id: Any # Foriegn key into spec-block status: Any # Current status of associated processing parent_id: Any # Id of the parent row + parent_: Any # Parent of the current row collections: Any # Definition of collection names child_config: Any # Definition of child elements data: Any # Generic configuraiton parameters @@ -76,6 +80,27 @@ async def get_specification( await session.refresh(spec_block, attribute_names=["spec_"]) return spec_block.spec_ + async def get_parent( + self, + session: async_scoped_session, + ) -> ElementMixin: + """Get the parent `Element` + + Parameters + ---------- + session : async_scoped_session + DB session manager + + Returns + ------- + element : ElementMixin + The Parent Element + """ + assert self.level.value < LevelEnum.campaign.value + async with session.begin_nested(): + await session.refresh(self, attribute_names=["parent_"]) + return self.parent_ + @classmethod async def get_handler( cls, @@ -129,9 +154,7 @@ def _split_fullname(self, fullname: str) -> dict: fields = {} tokens = fullname.split("/") - if self.node_type == NodeTypeEnum.job: - fields["job"] = tokens.pop() - if self.node_type in [NodeTypeEnum.job, NodeTypeEnum.script]: + if self.node_type == NodeTypeEnum.script: fields["script"] = tokens.pop() for i, token in enumerate(tokens): if i == 0: @@ -142,6 +165,8 @@ def _split_fullname(self, fullname: str) -> dict: fields["step"] = token elif i == 3: fields["group"] = token + elif i == 4: + fields["job"] = token else: raise ValueError(f"Too many fields in {fullname}") return fields @@ -225,10 +250,10 @@ async def get_collections( parent_ = await self.get_parent(session) parent_colls = await parent_.get_collections(session) ret_dict.update(parent_colls) - elif self.level >= LevelEnum.campaign.value: + elif self.level.value > LevelEnum.campaign.value: await session.refresh(self, attribute_names=["parent_"]) parent_colls = await self.parent_.get_collections(session) - ret_dict.upate(parent_colls) + ret_dict.update(parent_colls) await session.refresh(self, attribute_names=["spec_block_"]) if self.spec_block_.collections: ret_dict.update(self.spec_block_.collections) @@ -297,10 +322,10 @@ async def data_dict( parent_ = await self.get_parent(session) parent_data = await parent_.data_dict(session) ret_dict.update(parent_data) - elif self.level >= LevelEnum.campaign.value: + elif self.level.value > LevelEnum.campaign.value: await session.refresh(self, attribute_names=["parent_"]) parent_data = await self.parent_.data_dict(session) - ret_dict.upate(parent_data) + ret_dict.update(parent_data) await session.refresh(self, attribute_names=["spec_block_"]) if self.spec_block_.data: ret_dict.update(self.spec_block_.data) diff --git a/src/lsst/cmservice/db/script.py b/src/lsst/cmservice/db/script.py index 40bddad20..ccecf046e 100644 --- a/src/lsst/cmservice/db/script.py +++ b/src/lsst/cmservice/db/script.py @@ -74,7 +74,7 @@ def db_id(self) -> DbId: @property def level(self) -> LevelEnum: - return LevelEnum.group + return LevelEnum.script @property def node_type(self) -> NodeTypeEnum: diff --git a/src/lsst/cmservice/db/script_handler.py b/src/lsst/cmservice/db/script_handler.py index 2179dcbd7..c3c2824e6 100644 --- a/src/lsst/cmservice/db/script_handler.py +++ b/src/lsst/cmservice/db/script_handler.py @@ -226,6 +226,7 @@ async def _check_stamp_file( session: async_scoped_session, stamp_file: str, script: Script, + parent: ElementMixin, ) -> StatusEnum: """Get `Script` status from a stamp file @@ -240,11 +241,15 @@ async def _check_stamp_file( script: Script The `Script` in question + parent: ElementMixin + Parent Element of the `Script` in question + Returns ------- status : StatusEnum The status of the processing """ + assert parent status = await check_stamp_file(stamp_file) if status is None: return script.status @@ -257,6 +262,7 @@ async def _check_slurm_job( session: async_scoped_session, slurm_id: str | None, script: Script, + parent: ElementMixin, ) -> StatusEnum: """Check the status of a `Script` sent to slurm @@ -271,14 +277,18 @@ async def _check_slurm_job( script: Script The `Script` in question + parent: ElementMixin + Parent Element of the `Script` in question + Returns ------- status : StatusEnum The status of the processing """ + assert parent status = await check_slurm_job(slurm_id) if status is None: - status = StatusEnum.prepared + status = StatusEnum.running if status != script.status: await script.update_values(session, status=status) return status @@ -355,10 +365,10 @@ async def check( raise ValueError("ScriptMethod.no_script can not be set for ScriptHandler") elif script_method == ScriptMethod.bash: assert script.stamp_url - status = await self._check_stamp_file(session, script.stamp_url, script) + status = await self._check_stamp_file(session, script.stamp_url, script, parent) elif script_method == ScriptMethod.slurm: # pragma: no cover assert script.stamp_url - status = await self._check_slurm_job(session, script.stamp_url, script) + status = await self._check_slurm_job(session, script.stamp_url, script, parent) if status != script.status: await script.update_values(session, status=status) await session.commit() @@ -487,7 +497,10 @@ async def _do_prepare( status : StatusEnum The status of the processing """ - raise NotImplementedError(f"{type(self)}.do_prepare()") + assert session + assert script + assert parent + return StatusEnum.prepared async def _do_run( self, @@ -514,7 +527,10 @@ async def _do_run( status : StatusEnum The status of the processing """ - raise NotImplementedError(f"{type(self)}.do_run()") + assert session + assert script + assert parent + return StatusEnum.running async def _do_check( self, @@ -541,4 +557,7 @@ async def _do_check( status : StatusEnum The status of the processing """ - raise NotImplementedError(f"{type(self)}.do_check()") + assert session + assert script + assert parent + return StatusEnum.accepted diff --git a/src/lsst/cmservice/db/step.py b/src/lsst/cmservice/db/step.py index 07297d085..9b4cb1e54 100644 --- a/src/lsst/cmservice/db/step.py +++ b/src/lsst/cmservice/db/step.py @@ -65,8 +65,8 @@ class Step(Base, ElementMixin): jobs_: Mapped[List["Job"]] = relationship( "Job", primaryjoin="Group.parent_id==Step.id", - secondary="join(Script, Group).join(Job)", - secondaryjoin="and_(Script.g_id==Group.id, Job.script_id==Script.id)", + secondary="join(Group, Job)", + secondaryjoin="Job.parent_id==Group.id", viewonly=True, ) diff --git a/src/lsst/cmservice/handlers/elements.py b/src/lsst/cmservice/handlers/elements.py index 0e3a13da3..e5b46a99f 100644 --- a/src/lsst/cmservice/handlers/elements.py +++ b/src/lsst/cmservice/handlers/elements.py @@ -1,6 +1,5 @@ from __future__ import annotations -import os from typing import Any, AsyncGenerator import numpy as np @@ -12,11 +11,11 @@ from lsst.cmservice.db.group import Group from lsst.cmservice.db.job import Job from lsst.cmservice.db.script import Script -from lsst.cmservice.db.script_handler import FunctionHandler, ScriptHandler +from lsst.cmservice.db.script_handler import FunctionHandler from lsst.daf.butler import Butler -from ..common.bash import write_bash_script from ..common.enums import StatusEnum +from ..common.slurm import check_slurm_job def parse_bps_stdout(url: str) -> dict[str, str]: @@ -34,34 +33,69 @@ def parse_bps_stdout(url: str) -> dict[str, str]: return out_dict -class RunJobsScriptHandler(ScriptHandler): - """Create a `Job` in the DB and write a batch script to launch it +class RunElementScriptHandler(FunctionHandler): + """Shared base class to handling running and + checking of Scripts that mangage the children + of elements - This will use the `SpecBlock` given by - `script.spec_block.child_config['spec_block']` - To configure the Job. + E.g., RunGroupsScriptHandler and RunStepsScriptHandler + """ - This will write the script to - `script.script_url = script.data_dict['prod_area']/script.fullname.sh` + async def _do_run( + self, + session: async_scoped_session, + script: Script, + parent: ElementMixin, + **kwargs: Any, + ) -> StatusEnum: + min_val = StatusEnum.accepted.value + for child_ in await parent.children(session): + child_status = await child_.process(session, **kwargs) + min_val = min(min_val, child_status.value) - Once the slurm job has completed this will get the - panda_id from from output of the slurm job and update - `Job` row accordingly + if min_val >= StatusEnum.accepted.value: + status = StatusEnum.accepted + else: + status = StatusEnum.running - While the panda job is running this will be in `reviewable` state. + await script.update_values(session, status=status) + return status - The `review` function will invoke the `JobHandler` to review the - panda jobs + async def _do_check( + self, + session: async_scoped_session, + script: Script, + parent: ElementMixin, + **kwargs: Any, + ) -> StatusEnum: + min_val = StatusEnum.accepted.value + for child_ in await parent.children(session): + child_status = await child_.process(session, **kwargs) + min_val = min(min_val, child_status.value) + + if min_val >= StatusEnum.accepted.value: + status = StatusEnum.accepted + else: + status = StatusEnum.running + + await script.update_values(session, status=status) + return status + + +class RunJobsScriptHandler(RunElementScriptHandler): + """Create a `Job` in the DB + + FIXME """ - async def _write_script( + async def _do_run( self, session: async_scoped_session, script: Script, parent: ElementMixin, **kwargs: Any, ) -> StatusEnum: - child_config = await script.get_child_config(session) + child_config = await parent.get_child_config(session) specification = await parent.get_specification(session) assert specification spec_block_name = child_config.pop("spec_block", None) @@ -69,84 +103,35 @@ async def _write_script( spec_block = await specification.get_block(session, spec_block_name) assert spec_block - job_attempt = 0 + attempt = 0 new_job = await Job.create_row( session, - name="job", - attempt=job_attempt, - script_name=script.fullname, + name=f"job_{attempt:03}", + parent_name=parent.fullname, spec_block_name=spec_block.fullname, **child_config, ) assert new_job - await new_job.process(session, parent, **kwargs) - data_dict = await new_job.data_dict(session) - - script_url = await self._write_job_bash_script(script, new_job, **data_dict) - await script.update_values(session, script_url=script_url, status=StatusEnum.prepared) - return StatusEnum.prepared - - async def _write_job_bash_script(self, script: Script, job: Job, **kwargs: Any) -> str: - """Hook to write the bash script to run bps - - Parameters - ---------- - session : async_scoped_session - DB session manager - - script: Script - The `Script` in question - - job: Job - The `Job` in question - - Keywords - -------- - prod_area: str - Top level directory for scripts and logs - - bps_script_template: str - Template for bps script to set up environment - - lsst_version: str - Use to call out version of lsst software stack - - Returns - ------- - script_url : str - Path to the newly written script - """ - prod_area = os.path.expandvars(kwargs["prod_area"]) - bps_script_template = os.path.expandvars(kwargs["bps_script_template"]) - lsst_version = os.path.expandvars(kwargs["lsst_version"]) - - script_url = f"{prod_area}/{script.fullname}.sh" - json_url = f"{prod_area}/{script.fullname}_log.json" - config_url = f"{prod_area}/{job.fullname}_bps_config.yaml" - log_url = f"{prod_area}/{script.fullname}.log" - - command = f"bps --log-file {json_url} --no-log-tty submit {os.path.abspath(config_url)} > {log_url}" - - with open(bps_script_template, "r") as fin: - prepend = fin.read().replace("{lsst_version}", lsst_version) - await write_bash_script(script_url, command, prepend=prepend) - return script_url + await script.update_values(session, status=StatusEnum.prepared) + return StatusEnum.running async def _check_slurm_job( self, session: async_scoped_session, slurm_id: str, script: Script, + parent: ElementMixin, + **kwargs: Any, ) -> StatusEnum: - slurm_status = ScriptHandler._check_slurm_job(self, session, slurm_id, script) + slurm_status = await check_slurm_job(slurm_id) + if slurm_status is None: + slurm_status = StatusEnum.running if slurm_status == StatusEnum.accepted: await script.update_values(session, status=StatusEnum.reviewable) bps_dict = parse_bps_stdout(script.log_url) panda_url = bps_dict["Run Id"] async with session.begin_nested(): - await session.refresh(script, attribute_names=["jobs_"]) - job = script.jobs_[-1] - await job.update_values(session, wms_stamp_url=panda_url, status=StatusEnum.running) + await parent.update_values(session, wms_stamp_url=panda_url) return StatusEnum.reviewable return slurm_status @@ -169,55 +154,6 @@ async def review( return status -class RunElementScriptHandler(FunctionHandler): - """Shared base class to handling running and - checking of Scripts that mangage the children - of elements - - E.g., RunGroupsScriptHandler and RunStepsScriptHandler - """ - - async def _do_run( - self, - session: async_scoped_session, - script: Script, - parent: ElementMixin, - **kwargs: Any, - ) -> StatusEnum: - min_val = StatusEnum.accepted.value - for child_ in await parent.children(session): - child_status = await child_.process(session, **kwargs) - min_val = min(min_val, child_status.value) - - if min_val >= StatusEnum.accepted.value: - status = StatusEnum.accepted - else: - status = StatusEnum.running - - await script.update_values(session, status=status) - return status - - async def _do_check( - self, - session: async_scoped_session, - script: Script, - parent: ElementMixin, - **kwargs: Any, - ) -> StatusEnum: - min_val = StatusEnum.accepted.value - for child_ in await parent.children(session): - child_status = await child_.process(session, **kwargs) - min_val = min(min_val, child_status.value) - - if min_val >= StatusEnum.accepted.value: - status = StatusEnum.accepted - else: - status = StatusEnum.running - - await script.update_values(session, status=status) - return status - - class Splitter: @classmethod async def split( diff --git a/src/lsst/cmservice/handlers/jobs.py b/src/lsst/cmservice/handlers/jobs.py index d725aeab1..44a84b680 100644 --- a/src/lsst/cmservice/handlers/jobs.py +++ b/src/lsst/cmservice/handlers/jobs.py @@ -4,14 +4,15 @@ import types from typing import Any -import numpy as np import yaml from sqlalchemy.ext.asyncio import async_scoped_session +from lsst.cmservice.common.bash import write_bash_script from lsst.cmservice.db.element import ElementMixin from lsst.cmservice.db.functions import load_wms_reports from lsst.cmservice.db.job import Job -from lsst.cmservice.db.job_handler import JobHandler +from lsst.cmservice.db.script import Script +from lsst.cmservice.db.script_handler import FunctionHandler, ScriptHandler from lsst.ctrl.bps import BaseWmsService, WmsRunReport, WmsStates from lsst.utils import doImport @@ -47,18 +48,135 @@ } -class BpsJobHandler(JobHandler): - """SubClass of JobHandler to deal with jobs controlled by bps""" +def parse_bps_stdout(url: str) -> dict[str, str]: + """Parse the std from a bps submit job""" + out_dict = {} + with open(url, "r", encoding="utf8") as fin: + line = fin.readline() + while line: + tokens = line.split(":") + if len(tokens) != 2: # pragma: no cover + line = fin.readline() + continue + out_dict[tokens[0]] = tokens[1] + line = fin.readline() + return out_dict - wms_svc_class_name: str | None = None - @staticmethod - def _count_tasks(summary_dict: dict) -> dict: - out_dict = {key: np.sum(list(val.values())) for key, val in summary_dict.items()} - return out_dict +class BpsScriptHandler(ScriptHandler): + """Write a script to run bps jobs + + This will create: + `parent.collections['run']` + """ + + async def _write_script( + self, + session: async_scoped_session, + script: Script, + parent: ElementMixin, + **kwargs: Any, + ) -> StatusEnum: + resolved_cols = await script.resolve_collections(session) + run_coll = resolved_cols["run"] + input_colls = resolved_cols["inputs"] + data_dict = await script.data_dict(session) + prod_area = os.path.expandvars(data_dict["prod_area"]) + script_url = await self._set_script_files(session, script, prod_area) + butler_repo = data_dict["butler_repo"] + lsst_version = data_dict["lsst_version"] + + script_url = self._set_script_files(session, script, prod_area) + json_url = os.path.abspath(os.path.expandvars(f"{prod_area}/{script.fullname}_log.json")) + config_url = os.path.abspath(os.path.expandvars(f"{prod_area}/{script.fullname}_bps_config.yaml")) + log_url = os.path.abspath(os.path.expandvars(f"{prod_area}/{script.fullname}.log")) + + bps_script_template = os.path.expandvars(data_dict["bps_script_template"]) + bps_yaml_template = os.path.expandvars(data_dict["bps_yaml_template"]) + + command = f"bps --log-file {json_url} --no-log-tty submit {os.path.abspath(config_url)} > {log_url}" + + with open(bps_script_template, "r") as fin: + prepend = fin.read().replace("{lsst_version}", lsst_version) + await write_bash_script(script_url, command, prepend=prepend) + + with open(bps_yaml_template, "rt", encoding="utf-8") as fin: + workflow_config = yaml.safe_load(fin) + + async with session.begin_nested(): + await session.refresh(parent, attribute_names=["c_", "p_"]) + workflow_config["project"] = parent.p_.name + workflow_config["campaign"] = parent.c_.name + + data_query = data_dict.get("data_query", None) + workflow_config["submitPath"] = os.path.abspath( + os.path.expandvars(f"{prod_area}/{parent.fullname}/submit") + ) + + workflow_config["LSST_VERSION"] = os.path.expandvars(data_dict["lsst_version"]) + if "custom_lsst_setup" in data_dict: + workflow_config["custom_lsst_setup"] = data_dict["lsst_custom_setup"] + workflow_config["pipelineYaml"] = os.path.expandvars(data_dict["pipeline_yaml"]) + + inCollection = ".".join(input_colls) + + payload = { + "payloadName": parent.c_.name, + "butlerConfig": butler_repo, + "outputRun": run_coll, + "inCollection": inCollection, + } + if data_query: + payload["dataQuery"] = data_query + + workflow_config["payload"] = payload + try: + os.makedirs(os.path.dirname(script_url)) + except OSError: + pass + + with open(config_url, "wt", encoding="utf-8") as fout: + yaml.dump(workflow_config, fout) + return StatusEnum.prepared + + async def _check_slurm_job( + self, + session: async_scoped_session, + slurm_id: str, + script: Script, + parent: ElementMixin, + ) -> StatusEnum: + slurm_status = ScriptHandler._check_slurm_job(self, session, slurm_id, script, parent) + if slurm_status == StatusEnum.accepted: + await script.update_values(session, status=StatusEnum.accepted) + bps_dict = parse_bps_stdout(script.log_url) + panda_url = bps_dict["Run Id"] + await parent.update_values(session, wms_id=panda_url) + return slurm_status + + async def launch( + self, + session: async_scoped_session, + script: Script, + parent: ElementMixin, + **kwargs: Any, + ) -> StatusEnum: + status = await ScriptHandler.launch(self, session, script, parent, **kwargs) + + if status == StatusEnum.running: + assert isinstance(parent, Job) + await parent.update_values(session, job_url=script.stamp_url) + await session.commit() + return status + + +class BpsReportHandler(FunctionHandler): + """Class to handle running BpsReport""" + + wms_svc_class_name: str | None = None def __init__(self, spec_block_id: int, **kwargs: dict) -> None: - JobHandler.__init__(self, spec_block_id, **kwargs) + FunctionHandler.__init__(self, spec_block_id, **kwargs) self._wms_svc_class: types.ModuleType | type | None = None self._wms_svc: BaseWmsService | None = None @@ -119,79 +237,80 @@ async def _load_wms_reports( assert job return status - async def _write_wms_config( + async def _do_prepare( self, session: async_scoped_session, - job: Job, + script: Script, parent: ElementMixin, - **kwargs: Any, # pylint: disable=unused-argument + **kwargs: Any, ) -> StatusEnum: - data_dict = await job.data_dict(session) - # collections = await job.get_collections(session, job) - prod_area = os.path.expandvars(data_dict["prod_area"]) - script_url = os.path.abspath(os.path.expandvars(f"{prod_area}/{job.fullname}_config.yaml")) + await script.update_values( + session, + stamp_url=parent.job_id, + ) + return StatusEnum.prepared - bps_yaml_template = os.path.expandvars(data_dict["bps_yaml_template"]) + async def _do_run( + self, + session: async_scoped_session, + script: Script, + parent: ElementMixin, + **kwargs: Any, + ) -> StatusEnum: + return StatusEnum.running - with open(bps_yaml_template, "rt", encoding="utf-8") as fin: - workflow_config = yaml.safe_load(fin) + async def _do_check( + self, + session: async_scoped_session, + script: Script, + parent: ElementMixin, + **kwargs: Any, + ) -> StatusEnum: + status = await self._load_wms_reports(session, parent, script.stamp_url) + if status is None: + status = script.status + if status != script.status: + await script.update_values(session, status=status) + await session.commit() + return status - async with session.begin_nested(): - await session.refresh(parent, attribute_names=["c_", "p_"]) - workflow_config["project"] = parent.p_.name - workflow_config["campaign"] = parent.c_.name - data_query = data_dict.get("data_query", None) - butler_repo = os.path.expandvars(data_dict["butler_repo"]) - workflow_config["submitPath"] = os.path.abspath( - os.path.expandvars(f"{prod_area}/{job.fullname}/submit") - ) +class PandaScriptHandler(BpsScriptHandler): + """Class to handle running Bps for panda jobs""" - workflow_config["LSST_VERSION"] = os.path.expandvars(data_dict["lsst_version"]) - if "custom_lsst_setup" in data_dict: - workflow_config["custom_lsst_setup"] = data_dict["lsst_custom_setup"] - workflow_config["pipelineYaml"] = os.path.expandvars(data_dict["pipeline_yaml"]) + default_method = JobMethod.panda - payload = { - "payloadName": parent.c_.name, - "butlerConfig": butler_repo, - # "outputRun":job.coll_out, - # "inCollection":inCollection, - } - if data_query: - payload["dataQuery"] = data_query - workflow_config["payload"] = payload - try: - os.makedirs(os.path.dirname(script_url)) - except OSError: - pass +class PandaReportHandler(BpsReportHandler): + """Class to handle running BpsReport for panda jobs""" - with open(script_url, "wt", encoding="utf-8") as fout: - yaml.dump(workflow_config, fout) - await job.update_values(session, wms_config_url=script_url, status=StatusEnum.prepared) - return StatusEnum.prepared + wms_svc_class_name = "lsst.ctrl.bps.panda.PanDAService" + + +class ManifestReportScriptHandler(ScriptHandler): + """Write a script to run manifest checker jobs""" - async def _check_wms_url( + async def _write_script( self, session: async_scoped_session, - url: str | None, - job: Job, + script: Script, parent: ElementMixin, - **kwargs: Any, # pylint: disable=unused-argument + **kwargs: Any, ) -> StatusEnum: - if url is None: - return StatusEnum.prepared + data_dict = await script.data_dict(session) + prod_area = os.path.expandvars(data_dict["prod_area"]) + script_url = await self._set_script_files(session, script, prod_area) + butler_repo = data_dict["butler_repo"] + lsst_version = data_dict["lsst_version"] + graph_url = os.path.expandvars(f"{prod_area}/{script.fullname}/submit/qg.out") + report_url = os.path.expandvars(f"{prod_area}/{script.fullname}/submit/manifest_report.yaml") - wms_workflow_id = int(url) - status = await self._load_wms_reports(session, job, wms_workflow_id) - if status is None: - return job.status - return status + manifest_script_template = os.path.expandvars(data_dict["manifest_script_template"]) + command = f"cm-manifest --butler_repo {butler_repo} --graph {graph_url} > {report_url}" -class PandaJobHandler(BpsJobHandler): - """SubClass of JobHandler to deal with panda jobs""" + with open(manifest_script_template, "r") as fin: + prepend = fin.read().replace("{lsst_version}", lsst_version) + await write_bash_script(script_url, command, prepend=prepend) - default_method = JobMethod.panda - wms_svc_class_name = "lsst.ctrl.bps.panda.PanDAService" + return StatusEnum.prepared