diff --git a/src/lsst/cmservice/cli/commands.py b/src/lsst/cmservice/cli/commands.py index 4036d1ef4..801a49e3d 100644 --- a/src/lsst/cmservice/cli/commands.py +++ b/src/lsst/cmservice/cli/commands.py @@ -398,7 +398,7 @@ def job_errors( fullname: str, output: options.OutputEnum | None, ) -> None: - """Get the ErrorInstances for a particular Job""" + """Get the PipetaskErrors for a particular Job""" result = client.get_job_errors(fullname) _output_pydantic_list(result, output) @@ -591,7 +591,7 @@ def error_types( output: options.OutputEnum | None, **kwargs: Any, ) -> None: - """Load a ErrorTypes from a yaml file""" + """Load PipetaskErrorTypes from a yaml file""" result = client.load_error_types(**kwargs) _output_pydantic_list(result, output) @@ -601,14 +601,14 @@ def error_types( @options.output() @options.fullname() @options.yaml_file() -def error_instances( +def manifest_report( client: CMClient, output: options.OutputEnum | None, **kwargs: Any, ) -> None: - """Load a ErrorInstances from a yaml file""" - result = client.load_error_instances(**kwargs) - _output_pydantic_list(result, output) + """Load a manifest report from a yaml file""" + result = client.load_manifest_report(**kwargs) + _output_pydantic_object(result, output) @main.group() diff --git a/src/lsst/cmservice/client.py b/src/lsst/cmservice/client.py index f183ba347..6e6f782b3 100644 --- a/src/lsst/cmservice/client.py +++ b/src/lsst/cmservice/client.py @@ -228,14 +228,14 @@ def get_job_product_sets( def get_job_errors( self, fullname: str, - ) -> list[models.ErrorInstance]: + ) -> list[models.PipetaskError]: params = models.FullnameQuery( fullname=fullname, ) query = "get/job/errors" results = self._client.get(f"{query}", params=params.dict()).json() try: - return parse_obj_as(list[models.ErrorInstance], results) + return parse_obj_as(list[models.PipetaskError], results) except ValidationError as msg: raise ValueError(f"Bad response: {results}") from msg @@ -354,24 +354,24 @@ def load_campaign( def load_error_types( self, **kwargs: Any, - ) -> list[models.ErrorType]: + ) -> list[models.PipetaskErrorType]: query = "load/error_types" params = models.YamlFileQuery(**kwargs) results = self._client.post(f"{query}", data=params.dict()).json() try: - return parse_obj_as(list[models.ErrorType], results) + return parse_obj_as(list[models.PipetaskErrorType], results) except ValidationError as msg: raise ValueError(f"Bad response: {results}") from msg - def load_error_instances( + def load_manifest_report( self, **kwargs: Any, - ) -> list[models.ErrorInstance]: - query = "load/error_instances" + ) -> models.Job: + query = "load/manifest_report" params = models.YamlFileQuery(**kwargs) results = self._client.post(f"{query}", params=params.dict()).json() try: - return parse_obj_as(list[models.ErrorInstance], results) + return parse_obj_as(models.Job, results) except ValidationError as msg: raise ValueError(f"Bad response: {results}") from msg diff --git a/src/lsst/cmservice/common/enums.py b/src/lsst/cmservice/common/enums.py index 1b7a1499e..3d6a046c7 100644 --- a/src/lsst/cmservice/common/enums.py +++ b/src/lsst/cmservice/common/enums.py @@ -14,12 +14,13 @@ class TableEnum(enum.Enum): job = 5 step_dependency = 6 script_dependency = 7 - error_type = 8 - error_instance = 9 - task_set = 10 - product_set = 11 - specification = 12 - spec_block = 13 + pipetask_error_type = 8 + pipetask_error = 9 + script_error = 10 + task_set = 11 + product_set = 12 + specification = 13 + spec_block = 14 def is_node(self) -> bool: """Is this a subclass of NodeMixin""" diff --git a/src/lsst/cmservice/db/__init__.py b/src/lsst/cmservice/db/__init__.py index 46371f062..747101092 100644 --- a/src/lsst/cmservice/db/__init__.py +++ b/src/lsst/cmservice/db/__init__.py @@ -2,16 +2,17 @@ from .campaign import Campaign from .dbid import DbId from .element import ElementMixin -from .error_instance import ErrorInstance -from .error_type import ErrorType from .group import Group from .job import Job from .node import NodeMixin +from .pipetask_error import PipetaskError +from .pipetask_error_type import PipetaskErrorType from .product_set import ProductSet from .production import Production from .row import RowMixin from .script import Script from .script_dependency import ScriptDependency +from .script_error import ScriptError from .specification import SpecBlock, Specification from .step import Step from .step_dependency import StepDependency @@ -22,15 +23,16 @@ "Campaign", "DbId", "ElementMixin", - "ErrorInstance", - "ErrorType", "Group", "Job", "Production", "ProductSet", "NodeMixin", "RowMixin", + "PipetaskError", + "PipetaskErrorType", "Script", + "ScriptError", "ScriptDependency", "Step", "StepDependency", diff --git a/src/lsst/cmservice/db/error_instance.py b/src/lsst/cmservice/db/error_instance.py deleted file mode 100644 index e687a628d..000000000 --- a/src/lsst/cmservice/db/error_instance.py +++ /dev/null @@ -1,72 +0,0 @@ -from typing import TYPE_CHECKING, Any, Optional - -from sqlalchemy import JSON -from sqlalchemy.ext.asyncio import async_scoped_session -from sqlalchemy.orm import Mapped, mapped_column, relationship -from sqlalchemy.schema import ForeignKey - -from ..common.enums import ErrorSource -from .base import Base -from .job import Job -from .row import RowMixin -from .script import Script - -if TYPE_CHECKING: - from .error_type import ErrorType - - -class ErrorInstance(Base, RowMixin): - """Database table to keep track of individual errors.""" - - __tablename__ = "error_instance" - - id: Mapped[int] = mapped_column(primary_key=True) - error_type_id: Mapped[int] = mapped_column(ForeignKey("error_type.id", ondelete="CASCADE"), index=True) - job_id: Mapped[int | None] = mapped_column(ForeignKey("job.id", ondelete="CASCADE"), index=True) - script_id: Mapped[int | None] = mapped_column(ForeignKey("script.id", ondelete="CASCADE"), index=True) - source: Mapped[ErrorSource] = mapped_column() - diagnostic_message: Mapped[str] = mapped_column() - data: Mapped[Optional[dict | list]] = mapped_column(type_=JSON) - - job_: Mapped["Job"] = relationship("Job", viewonly=True) - script_: Mapped["Script"] = relationship("Script", viewonly=True) - error_type_: Mapped["ErrorType"] = relationship("ErrorType", viewonly=True) - - def __repr__(self) -> str: - s = f"Id={self.id} {self.job_id} {self.script_id}\n" - if len(self.diagnostic_message) > 150: - diag_message = self.diagnostic_message[0:150] - else: - diag_message = self.diagnostic_message - s += f" {diag_message}" - return s - - @classmethod - async def get_create_kwargs( - cls, - session: async_scoped_session, - **kwargs: Any, - ) -> dict: - job_name = kwargs.get("job_name") - script_name = kwargs.get("script_name") - if job_name: - job = await Job.get_row_by_fullname(session, job_name) - job_id = job.id - else: - job_id = None - - if script_name: - script = await Script.get_row_by_fullname(session, script_name) - script_id = script.id - else: - script_id = None - return { - "error_type_id": kwargs["error_type_id"], - "job_id": job_id, - "script_id": script_id, - "source": kwargs["source"], - "flavor": kwargs["flavor"], - "action": kwargs["action"], - "diagnostic_message": kwargs["diagnostic_message"], - "data": kwargs.get("data"), - } diff --git a/src/lsst/cmservice/db/functions.py b/src/lsst/cmservice/db/functions.py index 2f795ed61..947b50384 100644 --- a/src/lsst/cmservice/db/functions.py +++ b/src/lsst/cmservice/db/functions.py @@ -6,9 +6,14 @@ from .campaign import Campaign from .group import Group +from .job import Job +from .pipetask_error import PipetaskError +from .pipetask_error_type import PipetaskErrorType +from .product_set import ProductSet from .specification import SpecBlock, Specification from .step import Step from .step_dependency import StepDependency +from .task_set import TaskSet async def load_specification( @@ -138,7 +143,7 @@ async def add_groups( assert specification current_groups = await step.children(session) - n_groups = len(current_groups) + n_groups = len(list(current_groups)) i = n_groups for _child_name_, child_config_ in child_configs.items(): spec_block_name = child_config_.pop("spec_block", None) @@ -157,3 +162,85 @@ async def add_groups( async with session.begin_nested(): await session.refresh(step) return step + + +async def match_pipetask_error( + session: async_scoped_session, + task_name: str, + diagnostic_message: str, +) -> PipetaskErrorType | None: + for pipetask_error_type_ in await PipetaskErrorType.get_rows(session): + if TYPE_CHECKING: + assert isinstance(pipetask_error_type_, PipetaskErrorType) + if pipetask_error_type_.match(task_name, diagnostic_message): + return pipetask_error_type_ + return None + + +async def load_manifest_report( + session: async_scoped_session, + job_name: str, + yaml_file: str, +) -> Job: + with open(yaml_file, "rt", encoding="utf-8") as fin: + manifest_data = yaml.safe_load(fin) + + job = await Job.get_row_by_fullname(session, job_name) + if TYPE_CHECKING: + assert isinstance(job, Job) + + for task_name_, task_data_ in manifest_data.items(): + failed_quanta = task_data_.get("failed_quanta", {}) + outputs = task_data_.get("outputs", {}) + n_expected = task_data_.get("n_expected", 0) + n_failed = len(failed_quanta) + n_failed_upstream = task_data_.get("n_quanta_blocked", 0) + n_done = n_expected - n_failed - n_failed_upstream + + new_task_set = await TaskSet.create_row( + session, + job_id=job.id, + name=task_name_, + fullname=f"{job_name}/{task_name_}", + n_expected=n_expected, + n_done=n_done, + n_failed=n_failed, + n_failed_upstream=n_failed_upstream, + ) + if TYPE_CHECKING: + assert isinstance(new_task_set, TaskSet) + + for data_type_, counts_ in outputs.items(): + new_product_set = await ProductSet.create_row( + session, + job_id=job.id, + task_id=new_task_set.id, + name=data_type_, + fullname=f"{new_task_set.fullname}/{data_type_}", + n_expected=counts_.get("expected", 0), + n_done=counts_.get("produced", 0), + n_failed=counts_.get("missing_failed", 0), + n_failed_upstream=counts_.get("missing_upsteam_failed", 0), + n_missing=counts_.get("missing_not_produced", 0), + ) + if TYPE_CHECKING: + assert isinstance(new_product_set, ProductSet) + + for failed_quanta_uuid_, failed_quanta_data_ in failed_quanta.items(): + diagnostic_message = failed_quanta_data_["error"][-1] + error_type_id = await match_pipetask_error( + session, + task_name_, + diagnostic_message, + ) + new_pipetask_error = await PipetaskError.create_row( + session, + error_type_id=error_type_id, + task_id=new_task_set.id, + quanta=failed_quanta_uuid_, + data_id=failed_quanta_data_["data_id"], + diagnostic_message=diagnostic_message, + ) + if TYPE_CHECKING: + assert isinstance(new_pipetask_error, PipetaskError) + return job diff --git a/src/lsst/cmservice/db/interface.py b/src/lsst/cmservice/db/interface.py index 2fa4e8c9d..0c811aaae 100644 --- a/src/lsst/cmservice/db/interface.py +++ b/src/lsst/cmservice/db/interface.py @@ -17,8 +17,9 @@ TableEnum.job: db.Job, TableEnum.step_dependency: db.StepDependency, TableEnum.script_dependency: db.ScriptDependency, - TableEnum.error_type: db.ErrorType, - TableEnum.error_instance: db.ErrorInstance, + TableEnum.pipetask_error_type: db.PipetaskErrorType, + TableEnum.pipetask_error: db.PipetaskError, + TableEnum.script_error: db.ScriptError, TableEnum.task_set: db.TaskSet, TableEnum.product_set: db.ProductSet, TableEnum.specification: db.Specification, @@ -1010,8 +1011,8 @@ async def get_product_sets_for_job( async def get_errors_for_job( session: async_scoped_session, fullname: str, -) -> List[db.ErrorInstance]: - """Get `ErrorInstance`s associated to a `Job` +) -> List[db.PipetaskError]: + """Get `PipetaskError`s associated to a `Job` Parameters ---------- @@ -1023,8 +1024,8 @@ async def get_errors_for_job( Returns ------- - error_instances : List[ErrorInstance] - Requested ErrorInstances + error_instances : List[PipetaskError] + Requested PipetaskErrors """ job = await db.Job.get_row_by_fullname(session, fullname) if TYPE_CHECKING: @@ -1229,8 +1230,8 @@ async def load_and_create_campaign( async def load_error_types( session: async_scoped_session, yaml_file: str, -) -> List[db.ErrorType]: - """Load a set of `ErrorType`s from a yaml file +) -> List[db.PipetaskErrorType]: + """Load a set of `PipetaskErrorType`s from a yaml file Parameters ---------- @@ -1242,18 +1243,18 @@ async def load_error_types( Returns ------- - error_types : List[ErrorType] - New created ErrorTypes + error_types : List[PipetaskErrorType] + New created PipetaskErrorTypes """ return [] -async def load_error_instances( +async def load_manifest_report( session: async_scoped_session, yaml_file: str, fullname: str, -) -> List[db.ErrorInstance]: - """Load a set of `ErrorInstance`s from a yaml file +) -> db.Job: + """Load a manifest checker yaml file Parameters ---------- @@ -1264,21 +1265,23 @@ async def load_error_instances( Path to the yaml file fullname: str - Fullname of the `Job` to associate with these Errors + Fullname of the `Job` to associate with this report Returns ------- - error_instances : List[ErrorInstance] - New created ErrorInstances + job: Job + Newly updated job """ - return [] + result = await functions.load_manifest_report(session, fullname, yaml_file) + await session.commit() + return result -async def match_error_instances( +async def match_pipetask_errors( session: async_scoped_session, rematch: bool = False, -) -> List[db.ErrorInstance]: - """Match ErrorInstances to ErrorTypes +) -> List[db.PipetaskError]: + """Match PipetaskErrors to PipetaskErrorTypes Parameters ---------- @@ -1286,12 +1289,12 @@ async def match_error_instances( DB session manager rematch: bool - Rematch already matched ErrorInstances + Rematch already matched PipetaskErrors Returns ------- - error_instances : List[ErrorInstance] - Newly matched (or rematched) ErrorInstances + error_instances : List[PipetaskError] + Newly matched (or rematched) PipetaskErrors """ return [] @@ -1299,8 +1302,8 @@ async def match_error_instances( async def create_error_type( session: async_scoped_session, **kwargs: Any, -) -> db.ErrorType: - """Add an ErrorType to DB +) -> db.PipetaskErrorType: + """Add an PipetaskErrorType to DB Parameters ---------- @@ -1312,11 +1315,11 @@ async def create_error_type( Returns ------- - error_type : ErrorType - Newly created ErrorType + error_type : PipetaskErrorType + Newly created PipetaskErrorType """ - result = await db.ErrorType.create_row(session, **kwargs) + result = await db.PipetaskErrorType.create_row(session, **kwargs) if TYPE_CHECKING: - assert isinstance(result, db.ErrorType) + assert isinstance(result, db.PipetaskErrorType) await session.commit() return result diff --git a/src/lsst/cmservice/db/job.py b/src/lsst/cmservice/db/job.py index d0dceab47..84918e891 100644 --- a/src/lsst/cmservice/db/job.py +++ b/src/lsst/cmservice/db/job.py @@ -16,7 +16,7 @@ from .specification import SpecBlock if TYPE_CHECKING: - from .error_instance import ErrorInstance + from .pipetask_error import PipetaskError from .product_set import ProductSet from .task_set import TaskSet @@ -50,7 +50,13 @@ class Job(Base, NodeMixin): ) tasks_: Mapped[List["TaskSet"]] = relationship("TaskSet", viewonly=True) products_: Mapped[List["ProductSet"]] = relationship("ProductSet", viewonly=True) - errors_: Mapped[List["ErrorInstance"]] = relationship("ErrorInstance", viewonly=True) + errors_: Mapped[List["PipetaskError"]] = relationship( + "PipetaskError", + primaryjoin="Job.id==TaskSet.job_id", + secondary="join(TaskSet, PipetaskError)", + secondaryjoin="PipetaskError.task_id==TaskSet.id", + viewonly=True, + ) @hybrid_property def db_id(self) -> DbId: diff --git a/src/lsst/cmservice/db/job_handler.py b/src/lsst/cmservice/db/job_handler.py index 6831268c7..b2cb2d66e 100644 --- a/src/lsst/cmservice/db/job_handler.py +++ b/src/lsst/cmservice/db/job_handler.py @@ -4,9 +4,6 @@ from sqlalchemy.ext.asyncio import async_scoped_session -from lsst.daf.butler import Butler -from lsst.pipe.base.execution_reports import QuantumGraphExecutionReport - from ..common.enums import JobMethod, StatusEnum from .handler import Handler @@ -15,15 +12,6 @@ from .job import Job -def make_gq_report( - butler_repo: str, - graph_file: str, -) -> QuantumGraphExecutionReport: - butler = Butler(butler_repo) - report = QuantumGraphExecutionReport.make_reports(butler, graph_file) - return report - - class JobHandler(Handler): """SubClass of Handler to deal with job operations""" diff --git a/src/lsst/cmservice/db/pipetask_error.py b/src/lsst/cmservice/db/pipetask_error.py new file mode 100644 index 000000000..6d4b3289c --- /dev/null +++ b/src/lsst/cmservice/db/pipetask_error.py @@ -0,0 +1,33 @@ +from typing import TYPE_CHECKING, Optional + +from sqlalchemy import JSON +from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy.schema import ForeignKey + +from .base import Base +from .job import Job +from .row import RowMixin + +if TYPE_CHECKING: + from .pipetask_error_type import PipetaskErrorType + from .task_set import TaskSet + + +class PipetaskError(Base, RowMixin): + """Database table to keep track of individual errors.""" + + __tablename__ = "pipetask_error" + + id: Mapped[int] = mapped_column(primary_key=True) + error_type_id: Mapped[int | None] = mapped_column( + ForeignKey("error_type.id", ondelete="CASCADE"), + index=True, + ) + task_id: Mapped[int] = mapped_column(ForeignKey("task_set.id", ondelete="CASCADE"), index=True) + quanta: Mapped[str] = mapped_column() + diagnostic_message: Mapped[str] = mapped_column() + data_id = Mapped[Optional[dict | list]] = mapped_column(type_=JSON) + + job_: Mapped["Job"] = relationship("Job", viewonly=True) + task_: Mapped["TaskSet"] = relationship("TaskSet", viewonly=True) + error_type_: Mapped["PipetaskErrorType"] = relationship("PipetaskErrorType", viewonly=True) diff --git a/src/lsst/cmservice/db/error_type.py b/src/lsst/cmservice/db/pipetask_error_type.py similarity index 58% rename from src/lsst/cmservice/db/error_type.py rename to src/lsst/cmservice/db/pipetask_error_type.py index f8932ba0b..390e53a83 100644 --- a/src/lsst/cmservice/db/error_type.py +++ b/src/lsst/cmservice/db/pipetask_error_type.py @@ -1,6 +1,6 @@ -from typing import TYPE_CHECKING, List, Optional +import re +from typing import TYPE_CHECKING, List -from sqlalchemy import JSON from sqlalchemy.orm import Mapped, mapped_column, relationship from ..common.enums import ErrorAction, ErrorFlavor, ErrorSource @@ -8,10 +8,10 @@ from .row import RowMixin if TYPE_CHECKING: - from .error_instance import ErrorInstance + from .pipetask_error import PipetaskError -class ErrorType(Base, RowMixin): +class PipetaskErrorType(Base, RowMixin): """Database table to keep track of types of errors.""" __tablename__ = "error_type" @@ -20,9 +20,10 @@ class ErrorType(Base, RowMixin): source: Mapped[ErrorSource] = mapped_column() flavor: Mapped[ErrorFlavor] = mapped_column() action: Mapped[ErrorAction] = mapped_column() + task_name: Mapped[str] = mapped_column() diagnostic_message: Mapped[str] = mapped_column(unique=True) - data: Mapped[Optional[dict | list]] = mapped_column(type_=JSON) - instances_: Mapped[List["ErrorInstance"]] = relationship("ErrorInstance", viewonly=True) + + errors_: Mapped[List["PipetaskError"]] = relationship("PipetaskError", viewonly=True) def __repr__(self) -> str: s = f"Id={self.id}\n" @@ -32,3 +33,14 @@ def __repr__(self) -> str: diag_message = self.diagnostic_message s += f" {diag_message}" return s + + def match( + self, + task_name: str, + diagnostic_message: str, + ) -> bool: + if not re.match(self.task_name.strip(), task_name.strip()): + return False + if not re.match(self.diagnostic_message.strip(), diagnostic_message.strip()): + return False + return True diff --git a/src/lsst/cmservice/db/product_set.py b/src/lsst/cmservice/db/product_set.py index d69fb6b72..1a71eb3da 100644 --- a/src/lsst/cmservice/db/product_set.py +++ b/src/lsst/cmservice/db/product_set.py @@ -17,6 +17,8 @@ class ProductSet(Base, RowMixin): id: Mapped[int] = mapped_column(primary_key=True) job_id: Mapped[int] = mapped_column(ForeignKey("job.id", ondelete="CASCADE"), index=True) task_id: Mapped[int] = mapped_column(ForeignKey("task_set.id", ondelete="CASCADE"), index=True) + name: Mapped[str] = mapped_column() + fullname: Mapped[str] = mapped_column(unique=True) n_expected: Mapped[int] = mapped_column() n_done: Mapped[int] = mapped_column(default=0) diff --git a/src/lsst/cmservice/db/script.py b/src/lsst/cmservice/db/script.py index b358bb536..e7f7a246a 100644 --- a/src/lsst/cmservice/db/script.py +++ b/src/lsst/cmservice/db/script.py @@ -20,8 +20,8 @@ if TYPE_CHECKING: from .dependency import ScriptDependency - from .error_instance import ErrorInstance from .job import Job + from .script_error import ScriptError from .script_handler import ScriptHandler @@ -59,7 +59,7 @@ class Script(Base, NodeMixin): s_: Mapped["Step"] = relationship("Step", viewonly=True) g_: Mapped["Group"] = relationship("Group", viewonly=True) jobs_: Mapped[List["Job"]] = relationship("Job", viewonly=True) - errors_: Mapped[List["ErrorInstance"]] = relationship("ErrorInstance", viewonly=True) + errors_: Mapped[List["ScriptError"]] = relationship("ScriptError", viewonly=True) prereqs_: Mapped[List["ScriptDependency"]] = relationship( "ScriptDependency", foreign_keys="ScriptDependency.depend_id", diff --git a/src/lsst/cmservice/db/script_error.py b/src/lsst/cmservice/db/script_error.py new file mode 100644 index 000000000..14ebb173d --- /dev/null +++ b/src/lsst/cmservice/db/script_error.py @@ -0,0 +1,29 @@ +from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy.schema import ForeignKey + +from ..common.enums import ErrorSource +from .base import Base +from .row import RowMixin +from .script import Script + + +class ScriptError(Base, RowMixin): + """Database table to keep track of individual errors.""" + + __tablename__ = "script_error" + + id: Mapped[int] = mapped_column(primary_key=True) + script_id: Mapped[int | None] = mapped_column(ForeignKey("script.id", ondelete="CASCADE"), index=True) + source: Mapped[ErrorSource] = mapped_column() + diagnostic_message: Mapped[str] = mapped_column() + + script_: Mapped["Script"] = relationship("Script", viewonly=True) + + def __repr__(self) -> str: + s = f"Id={self.id} {self.script_id}\n" + if len(self.diagnostic_message) > 150: + diag_message = self.diagnostic_message[0:150] + else: + diag_message = self.diagnostic_message + s += f" {diag_message}" + return s diff --git a/src/lsst/cmservice/db/task_set.py b/src/lsst/cmservice/db/task_set.py index 717038b12..dbb0c9ba4 100644 --- a/src/lsst/cmservice/db/task_set.py +++ b/src/lsst/cmservice/db/task_set.py @@ -16,12 +16,13 @@ class TaskSet(Base, RowMixin): id: Mapped[int] = mapped_column(primary_key=True) job_id: Mapped[int] = mapped_column(ForeignKey("job.id", ondelete="CASCADE"), index=True) + name: Mapped[str] = mapped_column() + fullname: Mapped[str] = mapped_column(unique=True) n_expected: Mapped[int] = mapped_column() n_done: Mapped[int] = mapped_column(default=0) n_failed: Mapped[int] = mapped_column(default=0) n_failed_upstream: Mapped[int] = mapped_column(default=0) - n_missing: Mapped[int] = mapped_column(default=0) job_: Mapped["Job"] = relationship("Job", viewonly=True) products_: Mapped[List["ProductSet"]] = relationship("ProductSet", viewonly=True) diff --git a/src/lsst/cmservice/handlers/jobs.py b/src/lsst/cmservice/handlers/jobs.py index 6641ab85a..32d168dc7 100644 --- a/src/lsst/cmservice/handlers/jobs.py +++ b/src/lsst/cmservice/handlers/jobs.py @@ -1,6 +1,7 @@ from __future__ import annotations import os +import types from typing import Any import numpy as np @@ -51,16 +52,16 @@ class BpsJobHandler(JobHandler): wms_svc_class_name: str | None = None @staticmethod - def _count_tasks(summary_dict: dict): + def _count_tasks(summary_dict: dict) -> dict: out_dict = {key: np.sum([count_ for count_ in val.values()]) for key, val in summary_dict.items()} return out_dict def __init__(self, spec_block_id: int, **kwargs: dict) -> None: JobHandler.__init__(self, spec_block_id, **kwargs) - self._wms_svc_class: type[BaseWmsService] | None = None + self._wms_svc_class: types.ModuleType | type | None = None self._wms_svc: BaseWmsService | None = None - def _get_wms_svc(self, **kwargs): + def _get_wms_svc(self, **kwargs: Any) -> BaseWmsService: if self._wms_svc is None: assert self.wms_svc_class_name self._wms_svc_class = doImport(self.wms_svc_class_name) @@ -69,13 +70,13 @@ def _get_wms_svc(self, **kwargs): def _get_job_status_from_job_id( self, - job_id: int, + job_id: int | None, ) -> StatusEnum | None: """Get the job processing status from job id Paramters --------- - job_id : int + job_id : int | None job_id Returns @@ -92,21 +93,21 @@ def _get_job_status_from_job_id( def _get_task_report_from_job_id( self, - job_id: int, - ) -> StatusEnum | None: + job_id: int | None, + ) -> dict[Any, dict[TaskStatusEnum, int]]: """Get the job processing status from job id Paramters --------- - job_id : int + job_id : int | None job_id """ if job_id is None: - return None + return {} wms_svc = self._get_wms_svc() wms_run_report = wms_svc.report(wms_workflow_id=job_id)[0][0] - out_dict = {} + out_dict: dict[Any, dict[TaskStatusEnum, int]] = {} for key, val in wms_run_report.job_summary.items(): task_dict = {state_: 0 for state_ in TaskStatusEnum.__members__.values()} for wms_state, count in val.items(): diff --git a/src/lsst/cmservice/main.py b/src/lsst/cmservice/main.py index fd01a7fce..6bc8a8f3d 100644 --- a/src/lsst/cmservice/main.py +++ b/src/lsst/cmservice/main.py @@ -12,11 +12,11 @@ actions, adders, campaigns, - error_types, groups, index, jobs, loaders, + pipetask_error_types, productions, queries, scripts, @@ -89,7 +89,7 @@ "description": "Operations with `jobs`. A `job` runs a single `workflow`: keeps a count" "of the results data products and keeps track of associated errors.", }, - {"name": "ErrorTypes", "description": "Operations with `error_types`."}, + {"name": "PipetaskErrorTypes", "description": "Operations with `pipetask_error_types`."}, ] @@ -118,7 +118,7 @@ app.include_router(groups.router, prefix=config.prefix) app.include_router(scripts.router, prefix=config.prefix) app.include_router(jobs.router, prefix=config.prefix) -app.include_router(error_types.router, prefix=config.prefix) +app.include_router(pipetask_error_types.router, prefix=config.prefix) app.include_router(spec_blocks.router, prefix=config.prefix) diff --git a/src/lsst/cmservice/main_debug.py b/src/lsst/cmservice/main_debug.py index 99f91be30..2278e0f6b 100644 --- a/src/lsst/cmservice/main_debug.py +++ b/src/lsst/cmservice/main_debug.py @@ -10,14 +10,15 @@ from .config import config from .routers import ( expert_campaigns, - expert_error_instances, - expert_error_types, expert_groups, expert_jobs, + expert_pipetask_error_types, + expert_pipetask_errors, expert_product_sets, expert_productions, expert_row, expert_script_dependencies, + expert_script_errors, expert_scripts, expert_spec_blocks, expert_specifications, @@ -83,8 +84,9 @@ app.include_router(expert_jobs.router, prefix=config.prefix) app.include_router(expert_step_dependencies.router, prefix=config.prefix) app.include_router(expert_script_dependencies.router, prefix=config.prefix) -app.include_router(expert_error_types.router, prefix=config.prefix) -app.include_router(expert_error_instances.router, prefix=config.prefix) +app.include_router(expert_pipetask_error_types.router, prefix=config.prefix) +app.include_router(expert_pipetask_errors.router, prefix=config.prefix) +app.include_router(expert_script_errors.router, prefix=config.prefix) app.include_router(expert_task_sets.router, prefix=config.prefix) app.include_router(expert_product_sets.router, prefix=config.prefix) app.include_router(expert_row.router, prefix=config.prefix) diff --git a/src/lsst/cmservice/models/__init__.py b/src/lsst/cmservice/models/__init__.py index faaae8db6..84f822325 100644 --- a/src/lsst/cmservice/models/__init__.py +++ b/src/lsst/cmservice/models/__init__.py @@ -1,8 +1,6 @@ from .campaign import Campaign, CampaignCreate from .dependency import Dependency, DependencyCreate from .element import Element -from .error_instance import ErrorInstance, ErrorInstanceCreate -from .error_type import ErrorType, ErrorTypeCreate from .group import Group, GroupCreate from .index import Index from .interface import ( @@ -11,7 +9,7 @@ FullnameQuery, JobQuery, LoadAndCreateCampaign, - LoadErrorInstances, + LoadManifestReport, NodeQuery, ProcessNodeQuery, ProcessQuery, @@ -22,10 +20,13 @@ YamlFileQuery, ) from .job import Job, JobCreate +from .pipetask_error import PipetaskError, PipetaskErrorCreate +from .pipetask_error_type import PipetaskErrorType, PipetaskErrorTypeCreate from .product_set import ProductSet, ProductSetCreate from .production import Production, ProductionCreate from .row import RowData, RowQuery from .script import Script, ScriptCreate +from .script_error import ScriptError, ScriptErrorCreate from .specification import SpecBlock, SpecBlockCreate, Specification, SpecificationCreate, SpecificationLoad from .step import Step, StepCreate from .task_set import TaskSet, TaskSetCreate @@ -46,10 +47,12 @@ "SpecBlock", "SpecBlockCreate", "Element", - "ErrorType", - "ErrorTypeCreate", - "ErrorInstance", - "ErrorInstanceCreate", + "PipetaskErrorType", + "PipetaskErrorTypeCreate", + "PipetaskError", + "PipetaskErrorCreate", + "ScriptError", + "ScriptErrorCreate", "Job", "JobCreate", "TaskSet", @@ -75,5 +78,5 @@ "AddSteps", "LoadAndCreateCampaign", "YamlFileQuery", - "LoadErrorInstances", + "LoadManifestReport", ] diff --git a/src/lsst/cmservice/models/error_instance.py b/src/lsst/cmservice/models/error_instance.py deleted file mode 100644 index 0dfd99fe6..000000000 --- a/src/lsst/cmservice/models/error_instance.py +++ /dev/null @@ -1,23 +0,0 @@ -from pydantic import BaseModel - -from ..common.enums import ErrorSource - - -class ErrorInstanceBase(BaseModel): - error_type_id: int | None - source: ErrorSource - diagnostic_message: str - data: dict - - -class ErrorInstanceCreate(ErrorInstanceBase): - job_name: str | None = None - script_name: str | None = None - - -class ErrorInstance(ErrorInstanceBase): - job_id: int | None = None - script_id: int | None = None - - class Config: - from_attributes = True diff --git a/src/lsst/cmservice/models/error_type.py b/src/lsst/cmservice/models/error_type.py deleted file mode 100644 index 20fc08591..000000000 --- a/src/lsst/cmservice/models/error_type.py +++ /dev/null @@ -1,22 +0,0 @@ -from pydantic import BaseModel - -from ..common.enums import ErrorAction, ErrorFlavor, ErrorSource - - -class ErrorTypeBase(BaseModel): - source: ErrorSource - flavor: ErrorFlavor - action: ErrorAction - diagnostic_message: str - data: dict - - -class ErrorTypeCreate(ErrorTypeBase): - pass - - -class ErrorType(ErrorTypeBase): - id: int - - class Config: - from_attributes = True diff --git a/src/lsst/cmservice/models/interface.py b/src/lsst/cmservice/models/interface.py index 1076e1589..aa820e201 100644 --- a/src/lsst/cmservice/models/interface.py +++ b/src/lsst/cmservice/models/interface.py @@ -64,5 +64,5 @@ class LoadAndCreateCampaign(YamlFileQuery): handler: str | None = None -class LoadErrorInstances(YamlFileQuery, FullnameQuery): +class LoadManifestReport(YamlFileQuery, FullnameQuery): pass diff --git a/src/lsst/cmservice/models/pipetask_error.py b/src/lsst/cmservice/models/pipetask_error.py new file mode 100644 index 000000000..3e87b036e --- /dev/null +++ b/src/lsst/cmservice/models/pipetask_error.py @@ -0,0 +1,20 @@ +from pydantic import BaseModel + + +class PipetaskErrorBase(BaseModel): + error_type_id: int | None = None + task_id: int + quanta: str + diagnostic_message: str + data_id: dict + + +class PipetaskErrorCreate(PipetaskErrorBase): + pass + + +class PipetaskError(PipetaskErrorBase): + id: int + + class Config: + from_attributes = True diff --git a/src/lsst/cmservice/models/pipetask_error_type.py b/src/lsst/cmservice/models/pipetask_error_type.py new file mode 100644 index 000000000..b676a0b04 --- /dev/null +++ b/src/lsst/cmservice/models/pipetask_error_type.py @@ -0,0 +1,20 @@ +from pydantic import BaseModel + + +class PipetaskErrorTypeBase(BaseModel): + source: int + flavor: int + action: int + task_name: str + diagnostic_message: str + + +class PipetaskErrorTypeCreate(PipetaskErrorTypeBase): + pass + + +class PipetaskErrorType(PipetaskErrorTypeBase): + id: int + + class Config: + from_attributes = True diff --git a/src/lsst/cmservice/models/script_error.py b/src/lsst/cmservice/models/script_error.py new file mode 100644 index 000000000..30b900446 --- /dev/null +++ b/src/lsst/cmservice/models/script_error.py @@ -0,0 +1,18 @@ +from pydantic import BaseModel + + +class ScriptErrorBase(BaseModel): + script_id: int + source: int + diagnostic_message: str + + +class ScriptErrorCreate(ScriptErrorBase): + pass + + +class ScriptError(ScriptErrorBase): + id: int + + class Config: + from_attributes = True diff --git a/src/lsst/cmservice/routers/expert_error_instances.py b/src/lsst/cmservice/routers/expert_pipetask_error_types.py similarity index 91% rename from src/lsst/cmservice/routers/expert_error_instances.py rename to src/lsst/cmservice/routers/expert_pipetask_error_types.py index e22da63dc..106bd4e6c 100644 --- a/src/lsst/cmservice/routers/expert_error_instances.py +++ b/src/lsst/cmservice/routers/expert_pipetask_error_types.py @@ -6,11 +6,12 @@ from .. import db, models -response_model_class = models.ErrorInstance -create_model_class = models.ErrorInstanceCreate -db_class = db.ErrorInstance -class_string = "error_instance" -tag_string = "ErrorInstances" +response_model_class = models.PipetaskErrorType +create_model_class = models.PipetaskErrorTypeCreate +db_class = db.PipetaskErrorType +class_string = "pipetask_error_type" +tag_string = "Pipetask Error Types" + router = APIRouter( prefix=f"/{class_string}s", diff --git a/src/lsst/cmservice/routers/error_types.py b/src/lsst/cmservice/routers/expert_pipetask_errors.py similarity index 92% rename from src/lsst/cmservice/routers/error_types.py rename to src/lsst/cmservice/routers/expert_pipetask_errors.py index 436683594..4ea8d423a 100644 --- a/src/lsst/cmservice/routers/error_types.py +++ b/src/lsst/cmservice/routers/expert_pipetask_errors.py @@ -6,12 +6,11 @@ from .. import db, models -response_model_class = models.ErrorType -create_model_class = models.ErrorTypeCreate -db_class = db.ErrorType -class_string = "error_type" -tag_string = "Error Types" - +response_model_class = models.PipetaskError +create_model_class = models.PipetaskErrorCreate +db_class = db.PipetaskError +class_string = "pipetask_error" +tag_string = "PipetaskErrors" router = APIRouter( prefix=f"/{class_string}s", diff --git a/src/lsst/cmservice/routers/expert_error_types.py b/src/lsst/cmservice/routers/expert_script_errors.py similarity index 92% rename from src/lsst/cmservice/routers/expert_error_types.py rename to src/lsst/cmservice/routers/expert_script_errors.py index 436683594..62a00761b 100644 --- a/src/lsst/cmservice/routers/expert_error_types.py +++ b/src/lsst/cmservice/routers/expert_script_errors.py @@ -6,12 +6,11 @@ from .. import db, models -response_model_class = models.ErrorType -create_model_class = models.ErrorTypeCreate -db_class = db.ErrorType -class_string = "error_type" -tag_string = "Error Types" - +response_model_class = models.ScriptError +create_model_class = models.ScriptErrorCreate +db_class = db.ScriptError +class_string = "script_error" +tag_string = "ScriptErrors" router = APIRouter( prefix=f"/{class_string}s", diff --git a/src/lsst/cmservice/routers/loaders.py b/src/lsst/cmservice/routers/loaders.py index 78a0ad93b..54b9794d2 100644 --- a/src/lsst/cmservice/routers/loaders.py +++ b/src/lsst/cmservice/routers/loaders.py @@ -45,26 +45,26 @@ async def load_and_create_campaign( @router.post( "/error_types", status_code=201, - response_model=list[models.ErrorType], - summary="Load a set of `ErrorType`s from a yaml file", + response_model=list[models.PipetaskErrorType], + summary="Load a set of `PipetaskErrorType`s from a yaml file", ) async def load_error_types( query: models.YamlFileQuery, session: async_scoped_session = Depends(db_session_dependency), -) -> List[db.ErrorType]: +) -> List[db.PipetaskErrorType]: result = await interface.load_error_types(session, **query.dict()) return result @router.post( - "/error_instances", + "/manifest_report", status_code=201, - response_model=list[models.ErrorInstance], - summary="Load a set of `ErrorInstance`s from a yaml file", + response_model=models.Job, + summary="Load a manifest report yaml file", ) -async def load_error_instances( - query: models.LoadErrorInstances, +async def load_manifest_report( + query: models.LoadManifestReport, session: async_scoped_session = Depends(db_session_dependency), -) -> List[db.ErrorInstance]: - result = await interface.load_error_instances(session, **query.dict()) +) -> List[db.Job]: + result = await interface.load_manifest_report(session, **query.dict()) return result diff --git a/src/lsst/cmservice/routers/pipetask_error_types.py b/src/lsst/cmservice/routers/pipetask_error_types.py new file mode 100644 index 000000000..106bd4e6c --- /dev/null +++ b/src/lsst/cmservice/routers/pipetask_error_types.py @@ -0,0 +1,91 @@ +from typing import Sequence + +from fastapi import APIRouter, Depends +from safir.dependencies.db_session import db_session_dependency +from sqlalchemy.ext.asyncio import async_scoped_session + +from .. import db, models + +response_model_class = models.PipetaskErrorType +create_model_class = models.PipetaskErrorTypeCreate +db_class = db.PipetaskErrorType +class_string = "pipetask_error_type" +tag_string = "Pipetask Error Types" + + +router = APIRouter( + prefix=f"/{class_string}s", + tags=[tag_string], +) + + +@router.get( + "", + response_model=list[response_model_class], + summary=f"List {class_string}s", +) +async def get_rows( + skip: int = 0, + limit: int = 100, + session: async_scoped_session = Depends(db_session_dependency), +) -> Sequence[db_class]: + result = await db_class.get_rows(session, skip=skip, limit=limit) + return result + + +@router.get( + "/{row_id}", + response_model=response_model_class, + summary=f"Retrieve a {class_string}", +) +async def get_row( + row_id: int, + session: async_scoped_session = Depends(db_session_dependency), +) -> db_class: + result = await db_class.get_row(session, row_id) + assert isinstance(result, db_class) + return result + + +@router.post( + "", + status_code=201, + response_model=response_model_class, + summary=f"Create a {class_string}", +) +async def post_row( + row_create: create_model_class, + session: async_scoped_session = Depends(db_session_dependency), +) -> db_class: + result = await db_class.create_row(session, **row_create.dict()) + assert isinstance(result, db_class) + await session.commit() + return result + + +@router.delete( + "/{row_id}", + status_code=204, + summary=f"Delete a {class_string}", +) +async def delete_row( + row_id: int, + session: async_scoped_session = Depends(db_session_dependency), +) -> None: + await db_class.delete_row(session, row_id) + + +@router.put( + "/{row_id}", + response_model=response_model_class, + summary=f"Update a {class_string}", +) +async def update_row( + row_id: int, + row_update: response_model_class, + session: async_scoped_session = Depends(db_session_dependency), +) -> db_class: + result = await db_class.update_row(session, row_id, **row_update.dict()) + assert isinstance(result, db_class) + await session.commit() + return result diff --git a/src/lsst/cmservice/routers/queries.py b/src/lsst/cmservice/routers/queries.py index 786ec8fbf..33d68ac42 100644 --- a/src/lsst/cmservice/routers/queries.py +++ b/src/lsst/cmservice/routers/queries.py @@ -254,13 +254,13 @@ async def get_job_product_sets( @router.get( "/job/errors", - response_model=list[models.ErrorInstance], - summary="Get `ErrorInstance`s associated to a `Job`", + response_model=list[models.PipetaskError], + summary="Get `PipetaskErrors`s associated to a `Job`", ) async def get_job_errors( fullname: str, session: async_scoped_session = Depends(db_session_dependency), -) -> List[db.ErrorInstance]: +) -> List[db.PipetaskError]: result = await interface.get_errors_for_job( session, fullname=fullname,