Skip to content

Commit

Permalink
I made a new table
Browse files Browse the repository at this point in the history
  • Loading branch information
eacharles committed Oct 9, 2023
1 parent 6c38dab commit 6d3e880
Show file tree
Hide file tree
Showing 31 changed files with 451 additions and 250 deletions.
12 changes: 6 additions & 6 deletions src/lsst/cmservice/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def job_errors(
fullname: str,
output: options.OutputEnum | None,
) -> None:
"""Get the ErrorInstances for a particular Job"""
"""Get the PipetaskErrors for a particular Job"""
result = client.get_job_errors(fullname)
_output_pydantic_list(result, output)

Expand Down Expand Up @@ -591,7 +591,7 @@ def error_types(
output: options.OutputEnum | None,
**kwargs: Any,
) -> None:
"""Load a ErrorTypes from a yaml file"""
"""Load PipetaskErrorTypes from a yaml file"""
result = client.load_error_types(**kwargs)
_output_pydantic_list(result, output)

Expand All @@ -601,14 +601,14 @@ def error_types(
@options.output()
@options.fullname()
@options.yaml_file()
def error_instances(
def manifest_report(
client: CMClient,
output: options.OutputEnum | None,
**kwargs: Any,
) -> None:
"""Load a ErrorInstances from a yaml file"""
result = client.load_error_instances(**kwargs)
_output_pydantic_list(result, output)
"""Load a manifest report from a yaml file"""
result = client.load_manifest_report(**kwargs)
_output_pydantic_object(result, output)


@main.group()
Expand Down
16 changes: 8 additions & 8 deletions src/lsst/cmservice/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,14 +228,14 @@ def get_job_product_sets(
def get_job_errors(
self,
fullname: str,
) -> list[models.ErrorInstance]:
) -> list[models.PipetaskError]:
params = models.FullnameQuery(
fullname=fullname,
)
query = "get/job/errors"
results = self._client.get(f"{query}", params=params.dict()).json()
try:
return parse_obj_as(list[models.ErrorInstance], results)
return parse_obj_as(list[models.PipetaskError], results)
except ValidationError as msg:
raise ValueError(f"Bad response: {results}") from msg

Expand Down Expand Up @@ -354,24 +354,24 @@ def load_campaign(
def load_error_types(
self,
**kwargs: Any,
) -> list[models.ErrorType]:
) -> list[models.PipetaskErrorType]:
query = "load/error_types"
params = models.YamlFileQuery(**kwargs)
results = self._client.post(f"{query}", data=params.dict()).json()
try:
return parse_obj_as(list[models.ErrorType], results)
return parse_obj_as(list[models.PipetaskErrorType], results)
except ValidationError as msg:
raise ValueError(f"Bad response: {results}") from msg

def load_error_instances(
def load_manifest_report(
self,
**kwargs: Any,
) -> list[models.ErrorInstance]:
query = "load/error_instances"
) -> models.Job:
query = "load/manifest_report"
params = models.YamlFileQuery(**kwargs)
results = self._client.post(f"{query}", params=params.dict()).json()
try:
return parse_obj_as(list[models.ErrorInstance], results)
return parse_obj_as(models.Job, results)
except ValidationError as msg:
raise ValueError(f"Bad response: {results}") from msg

Expand Down
13 changes: 7 additions & 6 deletions src/lsst/cmservice/common/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ class TableEnum(enum.Enum):
job = 5
step_dependency = 6
script_dependency = 7
error_type = 8
error_instance = 9
task_set = 10
product_set = 11
specification = 12
spec_block = 13
pipetask_error_type = 8
pipetask_error = 9
script_error = 10
task_set = 11
product_set = 12
specification = 13
spec_block = 14

def is_node(self) -> bool:
"""Is this a subclass of NodeMixin"""
Expand Down
10 changes: 6 additions & 4 deletions src/lsst/cmservice/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
from .campaign import Campaign
from .dbid import DbId
from .element import ElementMixin
from .error_instance import ErrorInstance
from .error_type import ErrorType
from .group import Group
from .job import Job
from .node import NodeMixin
from .pipetask_error import PipetaskError
from .pipetask_error_type import PipetaskErrorType
from .product_set import ProductSet
from .production import Production
from .row import RowMixin
from .script import Script
from .script_dependency import ScriptDependency
from .script_error import ScriptError
from .specification import SpecBlock, Specification
from .step import Step
from .step_dependency import StepDependency
Expand All @@ -22,15 +23,16 @@
"Campaign",
"DbId",
"ElementMixin",
"ErrorInstance",
"ErrorType",
"Group",
"Job",
"Production",
"ProductSet",
"NodeMixin",
"RowMixin",
"PipetaskError",
"PipetaskErrorType",
"Script",
"ScriptError",
"ScriptDependency",
"Step",
"StepDependency",
Expand Down
72 changes: 0 additions & 72 deletions src/lsst/cmservice/db/error_instance.py

This file was deleted.

89 changes: 88 additions & 1 deletion src/lsst/cmservice/db/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@

from .campaign import Campaign
from .group import Group
from .job import Job
from .pipetask_error import PipetaskError
from .pipetask_error_type import PipetaskErrorType
from .product_set import ProductSet
from .specification import SpecBlock, Specification
from .step import Step
from .step_dependency import StepDependency
from .task_set import TaskSet


async def load_specification(
Expand Down Expand Up @@ -138,7 +143,7 @@ async def add_groups(
assert specification

current_groups = await step.children(session)
n_groups = len(current_groups)
n_groups = len(list(current_groups))
i = n_groups
for _child_name_, child_config_ in child_configs.items():
spec_block_name = child_config_.pop("spec_block", None)
Expand All @@ -157,3 +162,85 @@ async def add_groups(
async with session.begin_nested():
await session.refresh(step)
return step


async def match_pipetask_error(
session: async_scoped_session,
task_name: str,
diagnostic_message: str,
) -> PipetaskErrorType | None:
for pipetask_error_type_ in await PipetaskErrorType.get_rows(session):
if TYPE_CHECKING:
assert isinstance(pipetask_error_type_, PipetaskErrorType)
if pipetask_error_type_.match(task_name, diagnostic_message):
return pipetask_error_type_
return None


async def load_manifest_report(
session: async_scoped_session,
job_name: str,
yaml_file: str,
) -> Job:
with open(yaml_file, "rt", encoding="utf-8") as fin:
manifest_data = yaml.safe_load(fin)

job = await Job.get_row_by_fullname(session, job_name)
if TYPE_CHECKING:
assert isinstance(job, Job)

for task_name_, task_data_ in manifest_data.items():
failed_quanta = task_data_.get("failed_quanta", {})
outputs = task_data_.get("outputs", {})
n_expected = task_data_.get("n_expected", 0)
n_failed = len(failed_quanta)
n_failed_upstream = task_data_.get("n_quanta_blocked", 0)
n_done = n_expected - n_failed - n_failed_upstream

new_task_set = await TaskSet.create_row(
session,
job_id=job.id,
name=task_name_,
fullname=f"{job_name}/{task_name_}",
n_expected=n_expected,
n_done=n_done,
n_failed=n_failed,
n_failed_upstream=n_failed_upstream,
)
if TYPE_CHECKING:
assert isinstance(new_task_set, TaskSet)

for data_type_, counts_ in outputs.items():
new_product_set = await ProductSet.create_row(
session,
job_id=job.id,
task_id=new_task_set.id,
name=data_type_,
fullname=f"{new_task_set.fullname}/{data_type_}",
n_expected=counts_.get("expected", 0),
n_done=counts_.get("produced", 0),
n_failed=counts_.get("missing_failed", 0),
n_failed_upstream=counts_.get("missing_upsteam_failed", 0),
n_missing=counts_.get("missing_not_produced", 0),
)
if TYPE_CHECKING:
assert isinstance(new_product_set, ProductSet)

for failed_quanta_uuid_, failed_quanta_data_ in failed_quanta.items():
diagnostic_message = failed_quanta_data_["error"][-1]
error_type_id = await match_pipetask_error(
session,
task_name_,
diagnostic_message,
)
new_pipetask_error = await PipetaskError.create_row(
session,
error_type_id=error_type_id,
task_id=new_task_set.id,
quanta=failed_quanta_uuid_,
data_id=failed_quanta_data_["data_id"],
diagnostic_message=diagnostic_message,
)
if TYPE_CHECKING:
assert isinstance(new_pipetask_error, PipetaskError)
return job
Loading

0 comments on commit 6d3e880

Please sign in to comment.