From c0a8bbd32cb466becf1d6f6dfb9fe853122db9b7 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Sun, 15 Dec 2024 00:47:55 +0100 Subject: [PATCH] Amend after rebase --- src/plumpy/message.py | 40 ++++++++ src/plumpy/process_states.py | 2 +- src/plumpy/processes.py | 2 +- src/plumpy/rmq/process_comms.py | 165 +------------------------------- tests/test_processes.py | 2 +- tests/utils.py | 2 +- 6 files changed, 47 insertions(+), 166 deletions(-) diff --git a/src/plumpy/message.py b/src/plumpy/message.py index 47586d21..65800648 100644 --- a/src/plumpy/message.py +++ b/src/plumpy/message.py @@ -26,6 +26,7 @@ INTENT_KEY = 'intent' MESSAGE_KEY = 'message' +FORCE_KILL_KEY = 'force_kill' class Intent: @@ -60,6 +61,45 @@ class Intent: LOGGER = logging.getLogger(__name__) +MessageType = Dict[str, Any] + + +class MessageBuilder: + """MessageBuilder will construct different messages that can passing over communicator.""" + + @classmethod + def play(cls, text: str | None = None) -> MessageType: + """The play message send over communicator.""" + return { + INTENT_KEY: Intent.PLAY, + MESSAGE_KEY: text, + } + + @classmethod + def pause(cls, text: str | None = None) -> MessageType: + """The pause message send over communicator.""" + return { + INTENT_KEY: Intent.PAUSE, + MESSAGE_KEY: text, + } + + @classmethod + def kill(cls, text: str | None = None, force_kill: bool = False) -> MessageType: + """The kill message send over communicator.""" + return { + INTENT_KEY: Intent.KILL, + MESSAGE_KEY: text, + FORCE_KILL_KEY: force_kill, + } + + @classmethod + def status(cls, text: str | None = None) -> MessageType: + """The status message send over communicator.""" + return { + INTENT_KEY: Intent.STATUS, + MESSAGE_KEY: text, + } + def create_launch_body( process_class: str, diff --git a/src/plumpy/process_states.py b/src/plumpy/process_states.py index d369a1e9..48f73f77 100644 --- a/src/plumpy/process_states.py +++ b/src/plumpy/process_states.py @@ -10,7 +10,7 @@ import yaml from yaml.loader import Loader -from plumpy.process_comms import MessageBuilder, MessageType +from plumpy.message import MessageBuilder, MessageType try: import tblib diff --git a/src/plumpy/processes.py b/src/plumpy/processes.py index 9d12f885..9cf3302b 100644 --- a/src/plumpy/processes.py +++ b/src/plumpy/processes.py @@ -46,8 +46,8 @@ from .base.state_machine import StateEntryFailed, StateMachine, TransitionFailed, event from .base.utils import call_with_super_check, super_check from .event_helper import EventHelper -from .process_comms import MESSAGE_KEY, MessageBuilder, MessageType from .futures import CancellableAction, capture_exceptions +from .message import MESSAGE_KEY, MessageBuilder, MessageType from .process_listener import ProcessListener from .process_spec import ProcessSpec from .utils import PID_TYPE, SAVED_STATE_TYPE, protected diff --git a/src/plumpy/rmq/process_comms.py b/src/plumpy/rmq/process_comms.py index d18e0a6e..bf823625 100644 --- a/src/plumpy/rmq/process_comms.py +++ b/src/plumpy/rmq/process_comms.py @@ -4,19 +4,15 @@ from __future__ import annotations import asyncio -import logging -from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union, cast +from typing import Any, Dict, Optional, Sequence, Union import kiwipy from plumpy import loaders from plumpy.message import ( - KILL_MSG, - MESSAGE_KEY, - PAUSE_MSG, - PLAY_MSG, - STATUS_MSG, Intent, + MessageBuilder, + MessageType, create_continue_body, create_create_body, create_launch_body, @@ -24,8 +20,6 @@ from plumpy.utils import PID_TYPE __all__ = [ - 'MessageBuilder', - 'ProcessLauncher', 'RemoteProcessController', 'RemoteProcessThreadController', ] @@ -33,159 +27,6 @@ ProcessResult = Any ProcessStatus = Any -INTENT_KEY = 'intent' -MESSAGE_KEY = 'message' -FORCE_KILL_KEY = 'force_kill' - - -class Intent: - """Intent constants for a process message""" - - PLAY: str = 'play' - PAUSE: str = 'pause' - KILL: str = 'kill' - STATUS: str = 'status' - - -MessageType = Dict[str, Any] - - -class MessageBuilder: - """MessageBuilder will construct different messages that can passing over communicator.""" - - @classmethod - def play(cls, text: str | None = None) -> MessageType: - """The play message send over communicator.""" - return { - INTENT_KEY: Intent.PLAY, - MESSAGE_KEY: text, - } - - @classmethod - def pause(cls, text: str | None = None) -> MessageType: - """The pause message send over communicator.""" - return { - INTENT_KEY: Intent.PAUSE, - MESSAGE_KEY: text, - } - - @classmethod - def kill(cls, text: str | None = None, force_kill: bool = False) -> MessageType: - """The kill message send over communicator.""" - return { - INTENT_KEY: Intent.KILL, - MESSAGE_KEY: text, - FORCE_KILL_KEY: force_kill, - } - - @classmethod - def status(cls, text: str | None = None) -> MessageType: - """The status message send over communicator.""" - return { - INTENT_KEY: Intent.STATUS, - MESSAGE_KEY: text, - } - - -TASK_KEY = 'task' -TASK_ARGS = 'args' -PERSIST_KEY = 'persist' -# Launch -PROCESS_CLASS_KEY = 'process_class' -ARGS_KEY = 'init_args' -KWARGS_KEY = 'init_kwargs' -NOWAIT_KEY = 'nowait' -# Continue -PID_KEY = 'pid' -TAG_KEY = 'tag' -# Task types -LAUNCH_TASK = 'launch' -CONTINUE_TASK = 'continue' -CREATE_TASK = 'create' - -LOGGER = logging.getLogger(__name__) - - -def create_launch_body( - process_class: str, - init_args: Optional[Sequence[Any]] = None, - init_kwargs: Optional[Dict[str, Any]] = None, - persist: bool = False, - loader: Optional[loaders.ObjectLoader] = None, - nowait: bool = True, -) -> Dict[str, Any]: - """ - Create a message body for the launch action - - :param process_class: the class of the process to launch - :param init_args: any initialisation positional arguments - :param init_kwargs: any initialisation keyword arguments - :param persist: persist this process if True, otherwise don't - :param loader: the loader to use to load the persisted process - :param nowait: wait for the process to finish before completing the task, otherwise just return the PID - :return: a dictionary with the body of the message to launch the process - :rtype: dict - """ - if loader is None: - loader = loaders.get_object_loader() - - msg_body = { - TASK_KEY: LAUNCH_TASK, - TASK_ARGS: { - PROCESS_CLASS_KEY: loader.identify_object(process_class), - PERSIST_KEY: persist, - NOWAIT_KEY: nowait, - ARGS_KEY: init_args, - KWARGS_KEY: init_kwargs, - }, - } - return msg_body - - -def create_continue_body(pid: 'PID_TYPE', tag: Optional[str] = None, nowait: bool = False) -> Dict[str, Any]: - """ - Create a message body to continue an existing process - :param pid: the pid of the existing process - :param tag: the optional persistence tag - :param nowait: wait for the process to finish before completing the task, otherwise just return the PID - :return: a dictionary with the body of the message to continue the process - - """ - msg_body = {TASK_KEY: CONTINUE_TASK, TASK_ARGS: {PID_KEY: pid, NOWAIT_KEY: nowait, TAG_KEY: tag}} - return msg_body - - -def create_create_body( - process_class: str, - init_args: Optional[Sequence[Any]] = None, - init_kwargs: Optional[Dict[str, Any]] = None, - persist: bool = False, - loader: Optional[loaders.ObjectLoader] = None, -) -> Dict[str, Any]: - """ - Create a message body to create a new process - :param process_class: the class of the process to launch - :param init_args: any initialisation positional arguments - :param init_kwargs: any initialisation keyword arguments - :param persist: persist this process if True, otherwise don't - :param loader: the loader to use to load the persisted process - :return: a dictionary with the body of the message to launch the process - - """ - if loader is None: - loader = loaders.get_object_loader() - - msg_body = { - TASK_KEY: CREATE_TASK, - TASK_ARGS: { - PROCESS_CLASS_KEY: loader.identify_object(process_class), - PERSIST_KEY: persist, - ARGS_KEY: init_args, - KWARGS_KEY: init_kwargs, - }, - } - return msg_body - class RemoteProcessController: """ diff --git a/tests/test_processes.py b/tests/test_processes.py index 26651b57..7b232689 100644 --- a/tests/test_processes.py +++ b/tests/test_processes.py @@ -12,7 +12,7 @@ import plumpy from plumpy import BundleKeys, Process, ProcessState -from plumpy.process_comms import MessageBuilder +from plumpy.message import MessageBuilder from plumpy.utils import AttributesFrozendict from tests import utils diff --git a/tests/utils.py b/tests/utils.py index 05290990..123d6e72 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -8,7 +8,7 @@ import plumpy from plumpy import persistence, process_states, processes, utils -from plumpy.process_comms import MessageBuilder +from plumpy.message import MessageBuilder Snapshot = collections.namedtuple('Snapshot', ['state', 'bundle', 'outputs'])