Skip to content

Commit

Permalink
commit messages are fun
Browse files Browse the repository at this point in the history
  • Loading branch information
eacharles committed Oct 10, 2023
1 parent ad3ebfc commit 749d77c
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 18 deletions.
10 changes: 10 additions & 0 deletions src/lsst/cmservice/db/step.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Iterable, List, Optional

from sqlalchemy import JSON
Expand All @@ -11,6 +13,7 @@
from .campaign import Campaign
from .dbid import DbId
from .element import ElementMixin
from .job import Job
from .node import NodeMixin
from .specification import SpecBlock

Expand Down Expand Up @@ -61,6 +64,13 @@ class Step(Base, ElementMixin):
foreign_keys="StepDependency.depend_id",
viewonly=True,
)
jobs_: Mapped[List["Job"]] = relationship(
"Job",
primaryjoin="Group.parent_id==Step.id",
secondary="join(Script, Group).join(Job)",
secondaryjoin="and_(Script.g_id==Group.id, Job.script_id==Script.id)",
viewonly=True,
)

@hybrid_property
def db_id(self) -> DbId:
Expand Down
2 changes: 1 addition & 1 deletion src/lsst/cmservice/handlers/elements.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ async def _do_prepare(
splitter = SPLIT_CLASSES[split_method]

i = 0
async for group_dict_ in await splitter.split(
async for group_dict_ in splitter.split(
session, script, parent, fake_status=fake_status, **child_config
):
new_group = await Group.create_row(
Expand Down
71 changes: 54 additions & 17 deletions src/lsst/cmservice/handlers/scripts.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from sqlalchemy.ext.asyncio import async_scoped_session

from lsst.cmservice.db.element import ElementMixin
Expand All @@ -8,6 +10,9 @@

from ..common.enums import StatusEnum

if TYPE_CHECKING:
from lsst.cmservice.db.step import Step


class ChainCreateScriptHandler(ScriptHandler):
"""Write a script to chain together collections
Expand Down Expand Up @@ -42,7 +47,7 @@ async def _write_script(


class ChainPrependScriptHandler(ScriptHandler):
"""Write a script to chain together collections
"""Write a script to prepend a collection to a chain
This will take
Expand All @@ -61,16 +66,11 @@ async def _write_script(
) -> StatusEnum:
resolved_cols = await script.resolve_collections(session)
output_coll = resolved_cols["output"]
input_colls = resolved_cols["inputs"]
prereq_colls: list["str"] = [] # FIXME
input_coll = resolved_cols["input"]
data_dict = await script.data_dict(session)
script_url = await self._set_script_files(session, script, data_dict["prod_area"])
butler_repo = data_dict["butler_repo"]
command = f"butler collection-chain {butler_repo} {output_coll} --flatten"
for prereq_coll_ in prereq_colls:
command += f" {prereq_coll_}"
for input_coll_ in input_colls:
command += f" {input_coll_}"
command = f"butler collection-chain {butler_repo} {output_coll} --mode prepend {input_coll}"
await self._write_bash_script(script_url, command, **data_dict)
await script.update_values(session, script_url=script_url, status=StatusEnum.prepared)
return StatusEnum.prepared
Expand All @@ -82,6 +82,10 @@ class ChainCollectScriptHandler(ScriptHandler):
This will create:
`script.collections['output']`
and collect all of the output collections at a given level to it
and then append
`script.collections['inputs']` to it
"""

async def _write_script(
Expand All @@ -93,8 +97,21 @@ async def _write_script(
resolved_cols = await script.resolve_collections(session)
output_coll = resolved_cols["output"]
input_colls = resolved_cols["inputs"]
collect_colls: list[str] = [] # FIXME
data_dict = await script.data_dict(session)
to_collect = data_dict["collect"]
collect_colls = []
if to_collect == "jobs":
jobs = await parent.get_jobs(session)
for job_ in jobs:
job_colls = await job_.resolve_collections(session)
collect_colls.append(job_colls["run"])
elif to_collect == "steps":
for step_ in await parent.children(session):
step_colls = await step_.resolve_collections(session)
collect_colls.append(step_colls["step_output"])
collect_colls = collect_colls[::-1]
else:
raise ValueError("Must specify what to collect in ChainCollectScriptHandler, jobs or steps")
script_url = await self._set_script_files(session, script, data_dict["prod_area"])
butler_repo = data_dict["butler_repo"]
command = f"butler collection-chain {butler_repo} {output_coll}"
Expand Down Expand Up @@ -198,11 +215,18 @@ async def _write_script(
class PrepareStepScriptHandler(ScriptHandler):
"""Make the input collection for a step
This will add datasets from
This will create a chained collection
`script.collections['input']`
to
the output collections from all prerequiste steps
`script.collections['output']`
by taking the output collections of all the
prerequisite steps, or
`script.collections['campaign_input'] if the
step has no inputs
it will then append
`script.collections['output']` to the output collection
"""

async def _write_script(
Expand All @@ -214,14 +238,27 @@ async def _write_script(
resolved_cols = await script.resolve_collections(session)
input_colls = resolved_cols["inputs"]
output_coll = resolved_cols["output"]
prereqs: list[str] = [] # FIX
prereq_colls: list[str] = []
if TYPE_CHECKING:
assert isinstance(parent, Step)

async with session.begin_nested():
await session.refresh(parent, attribute_names=["prereqs_"])
for prereq_ in parent.prereqs_:
await session.refresh(prereq_, attribute_names=["prereq_"])
prereq_step = prereq_.prereq_
prereq_step_colls = await prereq_step.resolve_collections(session)
prereq_colls.append(prereq_step_colls["step_output"])
if not prereq_colls:
prereq_colls += resolved_cols["global_inputs"]

data_dict = await script.data_dict(session)
script_url = await self._set_script_files(session, script, data_dict["prod_area"])
butler_repo = data_dict["butler_repo"]
command = f"butler collection-chain {butler_repo} {output_coll} --collections"
if prereqs:
for prereq_ in prereqs:
command += f"{prereq_}"
if prereq_colls:
for prereq_coll_ in prereq_colls:
command += f"{prereq_coll_}"
else:
for input_coll_ in input_colls:
command += f"{input_coll_}"
Expand Down

0 comments on commit 749d77c

Please sign in to comment.