diff --git a/src/lsst/cmservice/db/step.py b/src/lsst/cmservice/db/step.py index a172807c3..2e0d762a3 100644 --- a/src/lsst/cmservice/db/step.py +++ b/src/lsst/cmservice/db/step.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from typing import TYPE_CHECKING, Any, Iterable, List, Optional from sqlalchemy import JSON @@ -11,6 +13,7 @@ from .campaign import Campaign from .dbid import DbId from .element import ElementMixin +from .job import Job from .node import NodeMixin from .specification import SpecBlock @@ -61,6 +64,13 @@ class Step(Base, ElementMixin): foreign_keys="StepDependency.depend_id", viewonly=True, ) + jobs_: Mapped[List["Job"]] = relationship( + "Job", + primaryjoin="Group.parent_id==Step.id", + secondary="join(Script, Group).join(Job)", + secondaryjoin="and_(Script.g_id==Group.id, Job.script_id==Script.id)", + viewonly=True, + ) @hybrid_property def db_id(self) -> DbId: diff --git a/src/lsst/cmservice/handlers/elements.py b/src/lsst/cmservice/handlers/elements.py index 3a435c124..2ba67e74c 100644 --- a/src/lsst/cmservice/handlers/elements.py +++ b/src/lsst/cmservice/handlers/elements.py @@ -350,7 +350,7 @@ async def _do_prepare( splitter = SPLIT_CLASSES[split_method] i = 0 - async for group_dict_ in await splitter.split( + async for group_dict_ in splitter.split( session, script, parent, fake_status=fake_status, **child_config ): new_group = await Group.create_row( diff --git a/src/lsst/cmservice/handlers/scripts.py b/src/lsst/cmservice/handlers/scripts.py index 89e28d0b2..39605f194 100644 --- a/src/lsst/cmservice/handlers/scripts.py +++ b/src/lsst/cmservice/handlers/scripts.py @@ -1,5 +1,7 @@ from __future__ import annotations +from typing import TYPE_CHECKING + from sqlalchemy.ext.asyncio import async_scoped_session from lsst.cmservice.db.element import ElementMixin @@ -8,6 +10,9 @@ from ..common.enums import StatusEnum +if TYPE_CHECKING: + from lsst.cmservice.db.step import Step + class ChainCreateScriptHandler(ScriptHandler): """Write a script to chain together collections @@ -42,7 +47,7 @@ async def _write_script( class ChainPrependScriptHandler(ScriptHandler): - """Write a script to chain together collections + """Write a script to prepend a collection to a chain This will take @@ -61,16 +66,11 @@ async def _write_script( ) -> StatusEnum: resolved_cols = await script.resolve_collections(session) output_coll = resolved_cols["output"] - input_colls = resolved_cols["inputs"] - prereq_colls: list["str"] = [] # FIXME + input_coll = resolved_cols["input"] data_dict = await script.data_dict(session) script_url = await self._set_script_files(session, script, data_dict["prod_area"]) butler_repo = data_dict["butler_repo"] - command = f"butler collection-chain {butler_repo} {output_coll} --flatten" - for prereq_coll_ in prereq_colls: - command += f" {prereq_coll_}" - for input_coll_ in input_colls: - command += f" {input_coll_}" + command = f"butler collection-chain {butler_repo} {output_coll} --mode prepend {input_coll}" await self._write_bash_script(script_url, command, **data_dict) await script.update_values(session, script_url=script_url, status=StatusEnum.prepared) return StatusEnum.prepared @@ -82,6 +82,10 @@ class ChainCollectScriptHandler(ScriptHandler): This will create: `script.collections['output']` + and collect all of the output collections at a given level to it + and then append + + `script.collections['inputs']` to it """ async def _write_script( @@ -93,8 +97,21 @@ async def _write_script( resolved_cols = await script.resolve_collections(session) output_coll = resolved_cols["output"] input_colls = resolved_cols["inputs"] - collect_colls: list[str] = [] # FIXME data_dict = await script.data_dict(session) + to_collect = data_dict["collect"] + collect_colls = [] + if to_collect == "jobs": + jobs = await parent.get_jobs(session) + for job_ in jobs: + job_colls = await job_.resolve_collections(session) + collect_colls.append(job_colls["run"]) + elif to_collect == "steps": + for step_ in await parent.children(session): + step_colls = await step_.resolve_collections(session) + collect_colls.append(step_colls["step_output"]) + collect_colls = collect_colls[::-1] + else: + raise ValueError("Must specify what to collect in ChainCollectScriptHandler, jobs or steps") script_url = await self._set_script_files(session, script, data_dict["prod_area"]) butler_repo = data_dict["butler_repo"] command = f"butler collection-chain {butler_repo} {output_coll}" @@ -198,11 +215,18 @@ async def _write_script( class PrepareStepScriptHandler(ScriptHandler): """Make the input collection for a step - This will add datasets from + This will create a chained collection - `script.collections['input']` - to - the output collections from all prerequiste steps + `script.collections['output']` + + by taking the output collections of all the + prerequisite steps, or + + `script.collections['campaign_input'] if the + step has no inputs + + it will then append + `script.collections['output']` to the output collection """ async def _write_script( @@ -214,14 +238,27 @@ async def _write_script( resolved_cols = await script.resolve_collections(session) input_colls = resolved_cols["inputs"] output_coll = resolved_cols["output"] - prereqs: list[str] = [] # FIX + prereq_colls: list[str] = [] + if TYPE_CHECKING: + assert isinstance(parent, Step) + + async with session.begin_nested(): + await session.refresh(parent, attribute_names=["prereqs_"]) + for prereq_ in parent.prereqs_: + await session.refresh(prereq_, attribute_names=["prereq_"]) + prereq_step = prereq_.prereq_ + prereq_step_colls = await prereq_step.resolve_collections(session) + prereq_colls.append(prereq_step_colls["step_output"]) + if not prereq_colls: + prereq_colls += resolved_cols["global_inputs"] + data_dict = await script.data_dict(session) script_url = await self._set_script_files(session, script, data_dict["prod_area"]) butler_repo = data_dict["butler_repo"] command = f"butler collection-chain {butler_repo} {output_coll} --collections" - if prereqs: - for prereq_ in prereqs: - command += f"{prereq_}" + if prereq_colls: + for prereq_coll_ in prereq_colls: + command += f"{prereq_coll_}" else: for input_coll_ in input_colls: command += f"{input_coll_}"