From 96c53a05145e846f4bd464b6db404bdbf628347b Mon Sep 17 00:00:00 2001 From: Toby Jennings Date: Fri, 20 Dec 2024 14:09:56 -0600 Subject: [PATCH] feat(api): Remove estimate_sleep_time as an api --- src/lsst/cmservice/cli/campaign.py | 2 - src/lsst/cmservice/cli/group.py | 2 - src/lsst/cmservice/cli/job.py | 2 - src/lsst/cmservice/cli/step.py | 2 - src/lsst/cmservice/cli/wrappers.py | 37 ------------------ src/lsst/cmservice/client/campaigns.py | 7 ---- src/lsst/cmservice/client/groups.py | 7 ---- src/lsst/cmservice/client/jobs.py | 7 ---- src/lsst/cmservice/client/steps.py | 7 ---- src/lsst/cmservice/common/daemon.py | 2 +- src/lsst/cmservice/config.py | 7 +++- src/lsst/cmservice/daemon.py | 2 +- src/lsst/cmservice/db/element.py | 19 +++++---- src/lsst/cmservice/db/node.py | 7 ++-- src/lsst/cmservice/db/row.py | 4 +- src/lsst/cmservice/models/__init__.py | 2 - src/lsst/cmservice/models/interface.py | 7 ---- src/lsst/cmservice/routers/campaigns.py | 1 - src/lsst/cmservice/routers/groups.py | 1 - src/lsst/cmservice/routers/jobs.py | 1 - src/lsst/cmservice/routers/steps.py | 1 - src/lsst/cmservice/routers/wrappers.py | 52 ------------------------- tests/cli/test_jobs.py | 4 -- tests/db/test_job.py | 2 +- tests/db/util_functions.py | 4 +- tests/routers/util_functions.py | 18 --------- 26 files changed, 24 insertions(+), 183 deletions(-) diff --git a/src/lsst/cmservice/cli/campaign.py b/src/lsst/cmservice/cli/campaign.py index 1dc0fabf2..dbf2e0a64 100644 --- a/src/lsst/cmservice/cli/campaign.py +++ b/src/lsst/cmservice/cli/campaign.py @@ -134,8 +134,6 @@ def action() -> None: action_retry_script = wrappers.get_element_retry_script_command(action_command, sub_client) -get_sleep_time = wrappers.get_element_estimate_sleep_time_command(action_command, sub_client) - get_wms_task_reports = wrappers.get_element_wms_task_reports_command(get_command, sub_client) get_tasks = wrappers.get_element_tasks_command(get_command, sub_client) diff --git a/src/lsst/cmservice/cli/group.py b/src/lsst/cmservice/cli/group.py index 4b67d8ff1..a4dc5c220 100644 --- a/src/lsst/cmservice/cli/group.py +++ b/src/lsst/cmservice/cli/group.py @@ -131,8 +131,6 @@ def action() -> None: action_retry_script = wrappers.get_element_retry_script_command(action_command, sub_client) -get_sleep_time = wrappers.get_element_estimate_sleep_time_command(action_command, sub_client) - get_wms_task_reports = wrappers.get_element_wms_task_reports_command(get_command, sub_client) get_tasks = wrappers.get_element_tasks_command(get_command, sub_client) diff --git a/src/lsst/cmservice/cli/job.py b/src/lsst/cmservice/cli/job.py index 67ee648e4..e8f516007 100644 --- a/src/lsst/cmservice/cli/job.py +++ b/src/lsst/cmservice/cli/job.py @@ -129,8 +129,6 @@ def action() -> None: action_retry_script = wrappers.get_element_retry_script_command(action_command, sub_client) -get_sleep_time = wrappers.get_element_estimate_sleep_time_command(action_command, sub_client) - get_wms_task_reports = wrappers.get_element_wms_task_reports_command(get_command, sub_client) get_tasks = wrappers.get_element_tasks_command(get_command, sub_client) diff --git a/src/lsst/cmservice/cli/step.py b/src/lsst/cmservice/cli/step.py index 438587d82..1d37db41d 100644 --- a/src/lsst/cmservice/cli/step.py +++ b/src/lsst/cmservice/cli/step.py @@ -130,8 +130,6 @@ def action() -> None: action_retry_script = wrappers.get_element_retry_script_command(action_command, sub_client) -get_sleep_time = wrappers.get_element_estimate_sleep_time_command(action_command, sub_client) - get_wms_task_reports = wrappers.get_element_wms_task_reports_command(get_command, sub_client) get_tasks = wrappers.get_element_tasks_command(get_command, sub_client) diff --git a/src/lsst/cmservice/cli/wrappers.py b/src/lsst/cmservice/cli/wrappers.py index 77dd39196..7b23e112b 100644 --- a/src/lsst/cmservice/cli/wrappers.py +++ b/src/lsst/cmservice/cli/wrappers.py @@ -1317,43 +1317,6 @@ def retry_script( return retry_script -def get_element_estimate_sleep_time_command( - group_command: Callable, - sub_client_name: str, -) -> Callable: - """Return a function estimates the sleep time before calling process - - Parameters - ---------- - group_command: Callable - CLI decorator from the CLI group to attach to - - sub_client_name: str - Name of python API sub-client to use - - Returns - ------- - the_function: Callable - Function that estimates the sleep time before calling process - """ - - @group_command(name="estimate_sleep_time") - @options.cmclient() - @options.row_id() - @options.output() - def estimate_sleep_time( - client: CMClient, - row_id: int, - output: options.OutputEnum | None, - ) -> None: - """Estimates the sleep time before calling process""" - sub_client = getattr(client, sub_client_name) - result = sub_client.estimate_sleep_time(row_id=row_id) - output_dict({"sleep_time": result}, output) - - return estimate_sleep_time - - def get_element_wms_task_reports_command( group_command: Callable, sub_client_name: str, diff --git a/src/lsst/cmservice/client/campaigns.py b/src/lsst/cmservice/client/campaigns.py index 9c97017da..6eda99655 100644 --- a/src/lsst/cmservice/client/campaigns.py +++ b/src/lsst/cmservice/client/campaigns.py @@ -200,13 +200,6 @@ def client(self) -> httpx.Client: "retry_script", ) - estimate_sleep_time = wrappers.get_node_post_query_function( - int, - models.SleepTimeQuery, - f"{router_string}/get", - "sleep_time", - ) - get_wms_task_reports = wrappers.get_node_property_function( models.MergedWmsTaskReportDict, f"{router_string}/get", diff --git a/src/lsst/cmservice/client/groups.py b/src/lsst/cmservice/client/groups.py index 0c8dc9506..373430eac 100644 --- a/src/lsst/cmservice/client/groups.py +++ b/src/lsst/cmservice/client/groups.py @@ -194,13 +194,6 @@ def client(self) -> httpx.Client: "retry_script", ) - estimate_sleep_time = wrappers.get_node_post_query_function( - int, - models.SleepTimeQuery, - f"{router_string}/get", - "sleep_time", - ) - rescue_job = wrappers.get_node_post_query_function( models.Job, models.NodeQuery, diff --git a/src/lsst/cmservice/client/jobs.py b/src/lsst/cmservice/client/jobs.py index b081b1449..e0fae7765 100644 --- a/src/lsst/cmservice/client/jobs.py +++ b/src/lsst/cmservice/client/jobs.py @@ -191,13 +191,6 @@ def client(self) -> httpx.Client: "retry_script", ) - estimate_sleep_time = wrappers.get_node_post_query_function( - int, - models.SleepTimeQuery, - f"{router_string}/get", - "sleep_time", - ) - get_wms_task_reports = wrappers.get_node_property_function( models.MergedWmsTaskReportDict, f"{router_string}/get", diff --git a/src/lsst/cmservice/client/steps.py b/src/lsst/cmservice/client/steps.py index 44a9b77cf..072c6878d 100644 --- a/src/lsst/cmservice/client/steps.py +++ b/src/lsst/cmservice/client/steps.py @@ -192,13 +192,6 @@ def client(self) -> httpx.Client: "retry_script", ) - estimate_sleep_time = wrappers.get_node_post_query_function( - int, - models.SleepTimeQuery, - f"{router_string}/get", - "sleep_time", - ) - get_wms_task_reports = wrappers.get_node_property_function( models.MergedWmsTaskReportDict, f"{router_string}/get", diff --git a/src/lsst/cmservice/common/daemon.py b/src/lsst/cmservice/common/daemon.py index b8346f47d..3a01984cd 100644 --- a/src/lsst/cmservice/common/daemon.py +++ b/src/lsst/cmservice/common/daemon.py @@ -30,6 +30,6 @@ async def daemon_iteration(session: async_scoped_session) -> None: sleep_time = await queue_entry.node_sleep_time(session) else: # Put this entry to sleep for a while - sleep_time = config.daemon.iteration_duration + sleep_time = config.daemon.processing_interval queue_entry.time_next_check = iteration_start + timedelta(seconds=sleep_time) await session.commit() diff --git a/src/lsst/cmservice/config.py b/src/lsst/cmservice/config.py index a4c82e25d..cdd6ab0dd 100644 --- a/src/lsst/cmservice/config.py +++ b/src/lsst/cmservice/config.py @@ -180,9 +180,12 @@ class DaemonConfiguration(BaseModel): Set according to DAEMON__FIELD environment variables. """ - iteration_duration: int = Field( + processing_interval: int = Field( default=300, - description="The number of seconds to wait between daemon interations.", + description=( + "The maximum wait time (seconds) between daemon processing intervals. " + "An element in a running state may shorten this time." + ), ) diff --git a/src/lsst/cmservice/daemon.py b/src/lsst/cmservice/daemon.py index 8505d2403..32d2c7fb3 100644 --- a/src/lsst/cmservice/daemon.py +++ b/src/lsst/cmservice/daemon.py @@ -63,7 +63,7 @@ async def main_loop() -> None: engine = create_database_engine(config.db.url, config.db.password) startup_time = datetime.now(UTC) - sleep_time = timedelta(seconds=config.daemon.iteration_duration) + sleep_time = timedelta(seconds=config.daemon.processing_interval) async with engine.begin(): session = await create_async_session(engine, logger) diff --git a/src/lsst/cmservice/db/element.py b/src/lsst/cmservice/db/element.py index 849dcb436..617ae5aea 100644 --- a/src/lsst/cmservice/db/element.py +++ b/src/lsst/cmservice/db/element.py @@ -7,6 +7,7 @@ from ..common.enums import LevelEnum, NodeTypeEnum, StatusEnum from ..common.errors import CMBadStateTransitionError, CMTooManyActiveScriptsError +from ..config import config from ..models.merged_product_set import MergedProductSetDict from ..models.merged_task_set import MergedTaskSetDict from ..models.merged_wms_task_report import MergedWmsTaskReportDict @@ -200,28 +201,26 @@ async def retry_script( async def estimate_sleep_time( self, session: async_scoped_session, - job_sleep: int = 150, - script_sleep: int = 15, + minimum_sleep_time: int = 10, ) -> int: - """Estimate how long to sleep before calling process again + """Estimate how long to sleep before calling process again. + + Jobs may sleep up to the configured value for the Daemon iteration + duration; scripts may sleep up to 10% of this value. Parameters ---------- session : async_scoped_session DB session manager - job_sleep: int = 150 - Time to sleep if jobs are running - - script_sleep: int = 15 - Time to sleep if scripts are running - Returns ------- sleep_time : int Time to sleep in seconds """ - sleep_time = 10 + sleep_time = minimum_sleep_time + job_sleep = config.daemon.processing_interval + script_sleep = job_sleep // 10 if self.level == LevelEnum.job: all_jobs = [] else: diff --git a/src/lsst/cmservice/db/node.py b/src/lsst/cmservice/db/node.py index 74ac32da4..e9f163b95 100644 --- a/src/lsst/cmservice/db/node.py +++ b/src/lsst/cmservice/db/node.py @@ -768,8 +768,8 @@ async def estimate_sleep_time( ) -> int: """Estimate how long to sleep before calling process again. - If a job is running, use the daemon config interval value, otherwise - sleep the minimum 10 seconds. + If a node is running, use the greater of minimum sleep time or 10% of + the configured daemon processing interval. Parameters ---------- @@ -781,8 +781,7 @@ async def estimate_sleep_time( sleep_time : int Time to sleep in seconds """ - sleep_time = minimum_sleep_time await session.refresh(self, attribute_names=["status"]) if self.status == StatusEnum.running: - sleep_time = max(config.daemon.iteration_duration, sleep_time) + sleep_time = max(config.daemon.processing_interval // 10, minimum_sleep_time) return sleep_time diff --git a/src/lsst/cmservice/db/row.py b/src/lsst/cmservice/db/row.py index 1c9ca6160..3e5f1908c 100644 --- a/src/lsst/cmservice/db/row.py +++ b/src/lsst/cmservice/db/row.py @@ -91,9 +91,9 @@ async def get_rows( if parent_class is not None: q = q.where(parent_class.id == cls.parent_id) # type: ignore if parent_name is not None: - q = q.where(parent_class.fullname == parent_name) + q = q.where(parent_class.fullname == parent_name) # type: ignore if parent_id is not None: - q = q.where(parent_class.id == parent_id) + q = q.where(parent_class.id == parent_id) # type: ignore q = q.offset(skip).limit(limit) results = await session.scalars(q) return results.all() diff --git a/src/lsst/cmservice/models/__init__.py b/src/lsst/cmservice/models/__init__.py index 565758c83..4faa1df5e 100644 --- a/src/lsst/cmservice/models/__init__.py +++ b/src/lsst/cmservice/models/__init__.py @@ -21,7 +21,6 @@ RetryScriptQuery, ScriptQuery, ScriptQueryBase, - SleepTimeQuery, UpdateNodeQuery, UpdateStatusQuery, YamlFileQuery, @@ -86,7 +85,6 @@ "ScriptTemplate", "ScriptTemplateCreate", "ScriptTemplateUpdate", - "SleepTimeQuery", "Job", "JobCreate", "JobUpdate", diff --git a/src/lsst/cmservice/models/interface.py b/src/lsst/cmservice/models/interface.py index 8ae339bcd..34336d622 100644 --- a/src/lsst/cmservice/models/interface.py +++ b/src/lsst/cmservice/models/interface.py @@ -61,13 +61,6 @@ class ProcessNodeQuery(NodeQuery): fake_status: int | None = None -class SleepTimeQuery(NodeQuery): - """Parameters ask a Node for a sleep time""" - - job_sleep: int = 150 - script_sleep: int = 15 - - class UpdateStatusQuery(NodeQuery): """Parameters needed to update the status of a Node""" diff --git a/src/lsst/cmservice/routers/campaigns.py b/src/lsst/cmservice/routers/campaigns.py index 578ad63a7..35fe4c54c 100644 --- a/src/lsst/cmservice/routers/campaigns.py +++ b/src/lsst/cmservice/routers/campaigns.py @@ -76,7 +76,6 @@ get_all_scripts = wrappers.get_element_get_all_scripts_function(router, DbClass) get_jobs = wrappers.get_element_get_jobs_function(router, DbClass) retry_script = wrappers.get_element_retry_script_function(router, DbClass) -estimate_sleep_time = wrappers.get_element_estimate_sleep_time_function(router, DbClass) get_wms_task_reports = wrappers.get_element_wms_task_reports_function(router, DbClass) get_tasks = wrappers.get_element_tasks_function(router, DbClass) diff --git a/src/lsst/cmservice/routers/groups.py b/src/lsst/cmservice/routers/groups.py index 7f9851a17..4d866f6eb 100644 --- a/src/lsst/cmservice/routers/groups.py +++ b/src/lsst/cmservice/routers/groups.py @@ -79,7 +79,6 @@ get_all_scripts = wrappers.get_element_get_all_scripts_function(router, DbClass) get_jobs = wrappers.get_element_get_jobs_function(router, DbClass) retry_script = wrappers.get_element_retry_script_function(router, DbClass) -estimate_sleep_time = wrappers.get_element_estimate_sleep_time_function(router, DbClass) get_wms_task_reports = wrappers.get_element_wms_task_reports_function(router, DbClass) get_tasks = wrappers.get_element_tasks_function(router, DbClass) diff --git a/src/lsst/cmservice/routers/jobs.py b/src/lsst/cmservice/routers/jobs.py index 10be95dda..62da83143 100644 --- a/src/lsst/cmservice/routers/jobs.py +++ b/src/lsst/cmservice/routers/jobs.py @@ -81,7 +81,6 @@ get_scripts = wrappers.get_element_get_scripts_function(router, DbClass) get_all_scripts = wrappers.get_element_get_all_scripts_function(router, DbClass) retry_script = wrappers.get_element_retry_script_function(router, DbClass) -estimate_sleep_time = wrappers.get_element_estimate_sleep_time_function(router, DbClass) get_wms_task_reports = wrappers.get_element_wms_task_reports_function(router, DbClass) get_tasks = wrappers.get_element_tasks_function(router, DbClass) diff --git a/src/lsst/cmservice/routers/steps.py b/src/lsst/cmservice/routers/steps.py index 49486307f..707faae3a 100644 --- a/src/lsst/cmservice/routers/steps.py +++ b/src/lsst/cmservice/routers/steps.py @@ -77,7 +77,6 @@ get_all_scripts = wrappers.get_element_get_all_scripts_function(router, DbClass) get_jobs = wrappers.get_element_get_jobs_function(router, DbClass) retry_script = wrappers.get_element_retry_script_function(router, DbClass) -estimate_sleep_time = wrappers.get_element_estimate_sleep_time_function(router, DbClass) get_wms_task_reports = wrappers.get_element_wms_task_reports_function(router, DbClass) get_tasks = wrappers.get_element_tasks_function(router, DbClass) diff --git a/src/lsst/cmservice/routers/wrappers.py b/src/lsst/cmservice/routers/wrappers.py index 99b41e664..0437c7e3c 100644 --- a/src/lsst/cmservice/routers/wrappers.py +++ b/src/lsst/cmservice/routers/wrappers.py @@ -1482,58 +1482,6 @@ async def element_retry_script( return element_retry_script -def get_element_estimate_sleep_time_function( - router: APIRouter, - db_class: TypeAlias = db.ElementMixin, -) -> Callable: - """Return a function that will estimate how long to sleep before - calling process again - - Parameters - ---------- - router: APIRouter - Router to attach the function to - - db_class: TypeAlias = db.ElementMixin - Underlying database class - - Returns - ------- - the_function: Callable - Function that will estimate how long to sleep before - calling process again - """ - - @router.post( - "/get/{row_id}/sleep_time", - status_code=201, - response_model=int, - summary=f"Retry a script associated to a {db_class.class_string}", - ) - async def element_estimate_sleep_time( - row_id: int, - query: models.SleepTimeQuery, - session: async_scoped_session = Depends(db_session_dependency), - ) -> int: - try: - async with session.begin(): - the_element = await db_class.get_row(session, row_id) - sleep_time = await the_element.estimate_sleep_time( - session, - job_sleep=query.job_sleep, - script_sleep=query.script_sleep, - ) - return sleep_time - except CMMissingIDError as msg: - logger.info(msg) - raise HTTPException(status_code=404, detail=str(msg)) from msg - except Exception as msg: - logger.error(msg, exc_info=True) - raise HTTPException(status_code=500, detail=str(msg)) from msg - - return element_estimate_sleep_time - - def get_element_wms_task_reports_function( router: APIRouter, db_class: TypeAlias = db.ElementMixin, diff --git a/tests/cli/test_jobs.py b/tests/cli/test_jobs.py index 4784f862e..69218d214 100644 --- a/tests/cli/test_jobs.py +++ b/tests/cli/test_jobs.py @@ -55,10 +55,6 @@ async def test_job_cli(uvicorn: UvicornProcess) -> None: result = runner.invoke(client_top, f"job get parent --row_id {entry.id} --output yaml") parent = check_and_parse_result(result, models.ElementMixin) - result = runner.invoke(client_top, f"job action estimate_sleep_time --row_id {entry.id} --output yaml") - sleep_time = check_and_parse_result(result, dict)["sleep_time"] - assert sleep_time == 10 - result = runner.invoke(client_top, f"job get errors --row_id {entry.id} --output yaml") job_errors = check_and_parse_result(result, list[models.PipetaskError]) assert len(job_errors) == 0 diff --git a/tests/db/test_job.py b/tests/db/test_job.py index 573152604..bd4737581 100644 --- a/tests/db/test_job.py +++ b/tests/db/test_job.py @@ -65,7 +65,7 @@ async def test_job_db(engine: AsyncEngine) -> None: await db.Job.update_row(session, entry.id, status=StatusEnum.running) sleep_time = await parent.estimate_sleep_time(session) - assert sleep_time == 150 + assert sleep_time == 300 await db.Job.update_row(session, entry.id, status=StatusEnum.waiting) await check_get_methods(session, entry, db.Job, db.Group) diff --git a/tests/db/util_functions.py b/tests/db/util_functions.py index 5c0a35254..8cfeb5f0d 100644 --- a/tests/db/util_functions.py +++ b/tests/db/util_functions.py @@ -312,10 +312,10 @@ async def check_scripts( await scripts[1].update_values(session, status=StatusEnum.running) sleep_time = await entry.estimate_sleep_time(session) - assert sleep_time == 15, "Wrong sleep time for element with running script" + assert sleep_time == 30, "Wrong sleep time for element with running script" sleep_time = await scripts[1].estimate_sleep_time(session) - assert sleep_time == 15, "Wrong sleep time for running script" + assert sleep_time == 30, "Wrong sleep time for running script" await scripts[1].update_values(session, status=StatusEnum.waiting, superseded=True) scripts_check = await entry.get_scripts(session) diff --git a/tests/routers/util_functions.py b/tests/routers/util_functions.py index 1679bc5b7..9d4c0fd6a 100644 --- a/tests/routers/util_functions.py +++ b/tests/routers/util_functions.py @@ -770,24 +770,6 @@ async def check_get_methods( response = await client.get(f"{config.asgi.prefix}/{entry_class_name}/get/-1/products") expect_failed_response(response, 404) - sleep_time_query = models.SleepTimeQuery( - fullname=entry.fullname, - job_sleep=150, - script_sleep=15, - ) - - response = await client.post( - f"{config.asgi.prefix}/{entry_class_name}/get/{entry.id}/sleep_time", - content=sleep_time_query.model_dump_json(), - ) - check_sleep_time = check_and_parse_response(response, int) - assert check_sleep_time == 10 - - response = await client.post( - f"{config.asgi.prefix}/{entry_class_name}/get/-1/sleep_time", - content=sleep_time_query.model_dump_json(), - ) - expect_failed_response(response, 404)