Skip to content

Commit

Permalink
feat(api): Remove estimate_sleep_time as an api
Browse files Browse the repository at this point in the history
  • Loading branch information
tcjennings committed Dec 20, 2024
1 parent 1fbe8b2 commit 96c53a0
Show file tree
Hide file tree
Showing 26 changed files with 24 additions and 183 deletions.
2 changes: 0 additions & 2 deletions src/lsst/cmservice/cli/campaign.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,6 @@ def action() -> None:

action_retry_script = wrappers.get_element_retry_script_command(action_command, sub_client)

get_sleep_time = wrappers.get_element_estimate_sleep_time_command(action_command, sub_client)

get_wms_task_reports = wrappers.get_element_wms_task_reports_command(get_command, sub_client)

get_tasks = wrappers.get_element_tasks_command(get_command, sub_client)
Expand Down
2 changes: 0 additions & 2 deletions src/lsst/cmservice/cli/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ def action() -> None:

action_retry_script = wrappers.get_element_retry_script_command(action_command, sub_client)

get_sleep_time = wrappers.get_element_estimate_sleep_time_command(action_command, sub_client)

get_wms_task_reports = wrappers.get_element_wms_task_reports_command(get_command, sub_client)

get_tasks = wrappers.get_element_tasks_command(get_command, sub_client)
Expand Down
2 changes: 0 additions & 2 deletions src/lsst/cmservice/cli/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ def action() -> None:

action_retry_script = wrappers.get_element_retry_script_command(action_command, sub_client)

get_sleep_time = wrappers.get_element_estimate_sleep_time_command(action_command, sub_client)

get_wms_task_reports = wrappers.get_element_wms_task_reports_command(get_command, sub_client)

get_tasks = wrappers.get_element_tasks_command(get_command, sub_client)
Expand Down
2 changes: 0 additions & 2 deletions src/lsst/cmservice/cli/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ def action() -> None:

action_retry_script = wrappers.get_element_retry_script_command(action_command, sub_client)

get_sleep_time = wrappers.get_element_estimate_sleep_time_command(action_command, sub_client)

get_wms_task_reports = wrappers.get_element_wms_task_reports_command(get_command, sub_client)

get_tasks = wrappers.get_element_tasks_command(get_command, sub_client)
Expand Down
37 changes: 0 additions & 37 deletions src/lsst/cmservice/cli/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1317,43 +1317,6 @@ def retry_script(
return retry_script


def get_element_estimate_sleep_time_command(
group_command: Callable,
sub_client_name: str,
) -> Callable:
"""Return a function estimates the sleep time before calling process
Parameters
----------
group_command: Callable
CLI decorator from the CLI group to attach to
sub_client_name: str
Name of python API sub-client to use
Returns
-------
the_function: Callable
Function that estimates the sleep time before calling process
"""

@group_command(name="estimate_sleep_time")
@options.cmclient()
@options.row_id()
@options.output()
def estimate_sleep_time(
client: CMClient,
row_id: int,
output: options.OutputEnum | None,
) -> None:
"""Estimates the sleep time before calling process"""
sub_client = getattr(client, sub_client_name)
result = sub_client.estimate_sleep_time(row_id=row_id)
output_dict({"sleep_time": result}, output)

return estimate_sleep_time


def get_element_wms_task_reports_command(
group_command: Callable,
sub_client_name: str,
Expand Down
7 changes: 0 additions & 7 deletions src/lsst/cmservice/client/campaigns.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,6 @@ def client(self) -> httpx.Client:
"retry_script",
)

estimate_sleep_time = wrappers.get_node_post_query_function(
int,
models.SleepTimeQuery,
f"{router_string}/get",
"sleep_time",
)

get_wms_task_reports = wrappers.get_node_property_function(
models.MergedWmsTaskReportDict,
f"{router_string}/get",
Expand Down
7 changes: 0 additions & 7 deletions src/lsst/cmservice/client/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,6 @@ def client(self) -> httpx.Client:
"retry_script",
)

estimate_sleep_time = wrappers.get_node_post_query_function(
int,
models.SleepTimeQuery,
f"{router_string}/get",
"sleep_time",
)

rescue_job = wrappers.get_node_post_query_function(
models.Job,
models.NodeQuery,
Expand Down
7 changes: 0 additions & 7 deletions src/lsst/cmservice/client/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,6 @@ def client(self) -> httpx.Client:
"retry_script",
)

estimate_sleep_time = wrappers.get_node_post_query_function(
int,
models.SleepTimeQuery,
f"{router_string}/get",
"sleep_time",
)

get_wms_task_reports = wrappers.get_node_property_function(
models.MergedWmsTaskReportDict,
f"{router_string}/get",
Expand Down
7 changes: 0 additions & 7 deletions src/lsst/cmservice/client/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,6 @@ def client(self) -> httpx.Client:
"retry_script",
)

estimate_sleep_time = wrappers.get_node_post_query_function(
int,
models.SleepTimeQuery,
f"{router_string}/get",
"sleep_time",
)

get_wms_task_reports = wrappers.get_node_property_function(
models.MergedWmsTaskReportDict,
f"{router_string}/get",
Expand Down
2 changes: 1 addition & 1 deletion src/lsst/cmservice/common/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ async def daemon_iteration(session: async_scoped_session) -> None:
sleep_time = await queue_entry.node_sleep_time(session)
else:
# Put this entry to sleep for a while
sleep_time = config.daemon.iteration_duration
sleep_time = config.daemon.processing_interval
queue_entry.time_next_check = iteration_start + timedelta(seconds=sleep_time)
await session.commit()
7 changes: 5 additions & 2 deletions src/lsst/cmservice/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,12 @@ class DaemonConfiguration(BaseModel):
Set according to DAEMON__FIELD environment variables.
"""

iteration_duration: int = Field(
processing_interval: int = Field(
default=300,
description="The number of seconds to wait between daemon interations.",
description=(
"The maximum wait time (seconds) between daemon processing intervals. "
"An element in a running state may shorten this time."
),
)


Expand Down
2 changes: 1 addition & 1 deletion src/lsst/cmservice/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async def main_loop() -> None:
engine = create_database_engine(config.db.url, config.db.password)

startup_time = datetime.now(UTC)
sleep_time = timedelta(seconds=config.daemon.iteration_duration)
sleep_time = timedelta(seconds=config.daemon.processing_interval)

async with engine.begin():
session = await create_async_session(engine, logger)
Expand Down
19 changes: 9 additions & 10 deletions src/lsst/cmservice/db/element.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from ..common.enums import LevelEnum, NodeTypeEnum, StatusEnum
from ..common.errors import CMBadStateTransitionError, CMTooManyActiveScriptsError
from ..config import config
from ..models.merged_product_set import MergedProductSetDict
from ..models.merged_task_set import MergedTaskSetDict
from ..models.merged_wms_task_report import MergedWmsTaskReportDict
Expand Down Expand Up @@ -200,28 +201,26 @@ async def retry_script(
async def estimate_sleep_time(
self,
session: async_scoped_session,
job_sleep: int = 150,
script_sleep: int = 15,
minimum_sleep_time: int = 10,
) -> int:
"""Estimate how long to sleep before calling process again
"""Estimate how long to sleep before calling process again.
Jobs may sleep up to the configured value for the Daemon iteration
duration; scripts may sleep up to 10% of this value.
Parameters
----------
session : async_scoped_session
DB session manager
job_sleep: int = 150
Time to sleep if jobs are running
script_sleep: int = 15
Time to sleep if scripts are running
Returns
-------
sleep_time : int
Time to sleep in seconds
"""
sleep_time = 10
sleep_time = minimum_sleep_time
job_sleep = config.daemon.processing_interval
script_sleep = job_sleep // 10
if self.level == LevelEnum.job:
all_jobs = []
else:
Expand Down
7 changes: 3 additions & 4 deletions src/lsst/cmservice/db/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,8 @@ async def estimate_sleep_time(
) -> int:
"""Estimate how long to sleep before calling process again.
If a job is running, use the daemon config interval value, otherwise
sleep the minimum 10 seconds.
If a node is running, use the greater of minimum sleep time or 10% of
the configured daemon processing interval.
Parameters
----------
Expand All @@ -781,8 +781,7 @@ async def estimate_sleep_time(
sleep_time : int
Time to sleep in seconds
"""
sleep_time = minimum_sleep_time
await session.refresh(self, attribute_names=["status"])
if self.status == StatusEnum.running:
sleep_time = max(config.daemon.iteration_duration, sleep_time)
sleep_time = max(config.daemon.processing_interval // 10, minimum_sleep_time)
return sleep_time
4 changes: 2 additions & 2 deletions src/lsst/cmservice/db/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ async def get_rows(
if parent_class is not None:
q = q.where(parent_class.id == cls.parent_id) # type: ignore
if parent_name is not None:
q = q.where(parent_class.fullname == parent_name)
q = q.where(parent_class.fullname == parent_name) # type: ignore
if parent_id is not None:
q = q.where(parent_class.id == parent_id)
q = q.where(parent_class.id == parent_id) # type: ignore
q = q.offset(skip).limit(limit)
results = await session.scalars(q)
return results.all()
Expand Down
2 changes: 0 additions & 2 deletions src/lsst/cmservice/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
RetryScriptQuery,
ScriptQuery,
ScriptQueryBase,
SleepTimeQuery,
UpdateNodeQuery,
UpdateStatusQuery,
YamlFileQuery,
Expand Down Expand Up @@ -86,7 +85,6 @@
"ScriptTemplate",
"ScriptTemplateCreate",
"ScriptTemplateUpdate",
"SleepTimeQuery",
"Job",
"JobCreate",
"JobUpdate",
Expand Down
7 changes: 0 additions & 7 deletions src/lsst/cmservice/models/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,6 @@ class ProcessNodeQuery(NodeQuery):
fake_status: int | None = None


class SleepTimeQuery(NodeQuery):
"""Parameters ask a Node for a sleep time"""

job_sleep: int = 150
script_sleep: int = 15


class UpdateStatusQuery(NodeQuery):
"""Parameters needed to update the status of a Node"""

Expand Down
1 change: 0 additions & 1 deletion src/lsst/cmservice/routers/campaigns.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
get_all_scripts = wrappers.get_element_get_all_scripts_function(router, DbClass)
get_jobs = wrappers.get_element_get_jobs_function(router, DbClass)
retry_script = wrappers.get_element_retry_script_function(router, DbClass)
estimate_sleep_time = wrappers.get_element_estimate_sleep_time_function(router, DbClass)

get_wms_task_reports = wrappers.get_element_wms_task_reports_function(router, DbClass)
get_tasks = wrappers.get_element_tasks_function(router, DbClass)
Expand Down
1 change: 0 additions & 1 deletion src/lsst/cmservice/routers/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
get_all_scripts = wrappers.get_element_get_all_scripts_function(router, DbClass)
get_jobs = wrappers.get_element_get_jobs_function(router, DbClass)
retry_script = wrappers.get_element_retry_script_function(router, DbClass)
estimate_sleep_time = wrappers.get_element_estimate_sleep_time_function(router, DbClass)

get_wms_task_reports = wrappers.get_element_wms_task_reports_function(router, DbClass)
get_tasks = wrappers.get_element_tasks_function(router, DbClass)
Expand Down
1 change: 0 additions & 1 deletion src/lsst/cmservice/routers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
get_scripts = wrappers.get_element_get_scripts_function(router, DbClass)
get_all_scripts = wrappers.get_element_get_all_scripts_function(router, DbClass)
retry_script = wrappers.get_element_retry_script_function(router, DbClass)
estimate_sleep_time = wrappers.get_element_estimate_sleep_time_function(router, DbClass)

get_wms_task_reports = wrappers.get_element_wms_task_reports_function(router, DbClass)
get_tasks = wrappers.get_element_tasks_function(router, DbClass)
Expand Down
1 change: 0 additions & 1 deletion src/lsst/cmservice/routers/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
get_all_scripts = wrappers.get_element_get_all_scripts_function(router, DbClass)
get_jobs = wrappers.get_element_get_jobs_function(router, DbClass)
retry_script = wrappers.get_element_retry_script_function(router, DbClass)
estimate_sleep_time = wrappers.get_element_estimate_sleep_time_function(router, DbClass)

get_wms_task_reports = wrappers.get_element_wms_task_reports_function(router, DbClass)
get_tasks = wrappers.get_element_tasks_function(router, DbClass)
Expand Down
52 changes: 0 additions & 52 deletions src/lsst/cmservice/routers/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1482,58 +1482,6 @@ async def element_retry_script(
return element_retry_script


def get_element_estimate_sleep_time_function(
router: APIRouter,
db_class: TypeAlias = db.ElementMixin,
) -> Callable:
"""Return a function that will estimate how long to sleep before
calling process again
Parameters
----------
router: APIRouter
Router to attach the function to
db_class: TypeAlias = db.ElementMixin
Underlying database class
Returns
-------
the_function: Callable
Function that will estimate how long to sleep before
calling process again
"""

@router.post(
"/get/{row_id}/sleep_time",
status_code=201,
response_model=int,
summary=f"Retry a script associated to a {db_class.class_string}",
)
async def element_estimate_sleep_time(
row_id: int,
query: models.SleepTimeQuery,
session: async_scoped_session = Depends(db_session_dependency),
) -> int:
try:
async with session.begin():
the_element = await db_class.get_row(session, row_id)
sleep_time = await the_element.estimate_sleep_time(
session,
job_sleep=query.job_sleep,
script_sleep=query.script_sleep,
)
return sleep_time
except CMMissingIDError as msg:
logger.info(msg)
raise HTTPException(status_code=404, detail=str(msg)) from msg
except Exception as msg:
logger.error(msg, exc_info=True)
raise HTTPException(status_code=500, detail=str(msg)) from msg

return element_estimate_sleep_time


def get_element_wms_task_reports_function(
router: APIRouter,
db_class: TypeAlias = db.ElementMixin,
Expand Down
4 changes: 0 additions & 4 deletions tests/cli/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ async def test_job_cli(uvicorn: UvicornProcess) -> None:
result = runner.invoke(client_top, f"job get parent --row_id {entry.id} --output yaml")
parent = check_and_parse_result(result, models.ElementMixin)

result = runner.invoke(client_top, f"job action estimate_sleep_time --row_id {entry.id} --output yaml")
sleep_time = check_and_parse_result(result, dict)["sleep_time"]
assert sleep_time == 10

result = runner.invoke(client_top, f"job get errors --row_id {entry.id} --output yaml")
job_errors = check_and_parse_result(result, list[models.PipetaskError])
assert len(job_errors) == 0
Expand Down
2 changes: 1 addition & 1 deletion tests/db/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def test_job_db(engine: AsyncEngine) -> None:

await db.Job.update_row(session, entry.id, status=StatusEnum.running)
sleep_time = await parent.estimate_sleep_time(session)
assert sleep_time == 150
assert sleep_time == 300
await db.Job.update_row(session, entry.id, status=StatusEnum.waiting)

await check_get_methods(session, entry, db.Job, db.Group)
Expand Down
4 changes: 2 additions & 2 deletions tests/db/util_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,10 @@ async def check_scripts(

await scripts[1].update_values(session, status=StatusEnum.running)
sleep_time = await entry.estimate_sleep_time(session)
assert sleep_time == 15, "Wrong sleep time for element with running script"
assert sleep_time == 30, "Wrong sleep time for element with running script"

sleep_time = await scripts[1].estimate_sleep_time(session)
assert sleep_time == 15, "Wrong sleep time for running script"
assert sleep_time == 30, "Wrong sleep time for running script"

await scripts[1].update_values(session, status=StatusEnum.waiting, superseded=True)
scripts_check = await entry.get_scripts(session)
Expand Down
Loading

0 comments on commit 96c53a0

Please sign in to comment.