Skip to content

Commit

Permalink
Amend after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Dec 14, 2024
1 parent 1599391 commit c0a8bbd
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 166 deletions.
40 changes: 40 additions & 0 deletions src/plumpy/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

INTENT_KEY = 'intent'
MESSAGE_KEY = 'message'
FORCE_KILL_KEY = 'force_kill'


class Intent:
Expand Down Expand Up @@ -60,6 +61,45 @@ class Intent:

LOGGER = logging.getLogger(__name__)

MessageType = Dict[str, Any]


class MessageBuilder:
"""MessageBuilder will construct different messages that can passing over communicator."""

@classmethod
def play(cls, text: str | None = None) -> MessageType:
"""The play message send over communicator."""
return {
INTENT_KEY: Intent.PLAY,
MESSAGE_KEY: text,
}

@classmethod
def pause(cls, text: str | None = None) -> MessageType:
"""The pause message send over communicator."""
return {
INTENT_KEY: Intent.PAUSE,
MESSAGE_KEY: text,
}

@classmethod
def kill(cls, text: str | None = None, force_kill: bool = False) -> MessageType:
"""The kill message send over communicator."""
return {
INTENT_KEY: Intent.KILL,
MESSAGE_KEY: text,
FORCE_KILL_KEY: force_kill,
}

@classmethod
def status(cls, text: str | None = None) -> MessageType:
"""The status message send over communicator."""
return {
INTENT_KEY: Intent.STATUS,
MESSAGE_KEY: text,
}


def create_launch_body(
process_class: str,
Expand Down
2 changes: 1 addition & 1 deletion src/plumpy/process_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import yaml
from yaml.loader import Loader

from plumpy.process_comms import MessageBuilder, MessageType
from plumpy.message import MessageBuilder, MessageType

try:
import tblib
Expand Down
2 changes: 1 addition & 1 deletion src/plumpy/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
from .base.state_machine import StateEntryFailed, StateMachine, TransitionFailed, event
from .base.utils import call_with_super_check, super_check
from .event_helper import EventHelper
from .process_comms import MESSAGE_KEY, MessageBuilder, MessageType
from .futures import CancellableAction, capture_exceptions
from .message import MESSAGE_KEY, MessageBuilder, MessageType
from .process_listener import ProcessListener
from .process_spec import ProcessSpec
from .utils import PID_TYPE, SAVED_STATE_TYPE, protected
Expand Down
165 changes: 3 additions & 162 deletions src/plumpy/rmq/process_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,188 +4,29 @@
from __future__ import annotations

import asyncio
import logging
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union, cast
from typing import Any, Dict, Optional, Sequence, Union

import kiwipy

from plumpy import loaders
from plumpy.message import (
KILL_MSG,
MESSAGE_KEY,
PAUSE_MSG,
PLAY_MSG,
STATUS_MSG,
Intent,
MessageBuilder,
MessageType,
create_continue_body,
create_create_body,
create_launch_body,
)
from plumpy.utils import PID_TYPE

__all__ = [
'MessageBuilder',
'ProcessLauncher',
'RemoteProcessController',
'RemoteProcessThreadController',
]

ProcessResult = Any
ProcessStatus = Any

INTENT_KEY = 'intent'
MESSAGE_KEY = 'message'
FORCE_KILL_KEY = 'force_kill'


class Intent:
"""Intent constants for a process message"""

PLAY: str = 'play'
PAUSE: str = 'pause'
KILL: str = 'kill'
STATUS: str = 'status'


MessageType = Dict[str, Any]


class MessageBuilder:
"""MessageBuilder will construct different messages that can passing over communicator."""

@classmethod
def play(cls, text: str | None = None) -> MessageType:
"""The play message send over communicator."""
return {
INTENT_KEY: Intent.PLAY,
MESSAGE_KEY: text,
}

@classmethod
def pause(cls, text: str | None = None) -> MessageType:
"""The pause message send over communicator."""
return {
INTENT_KEY: Intent.PAUSE,
MESSAGE_KEY: text,
}

@classmethod
def kill(cls, text: str | None = None, force_kill: bool = False) -> MessageType:
"""The kill message send over communicator."""
return {
INTENT_KEY: Intent.KILL,
MESSAGE_KEY: text,
FORCE_KILL_KEY: force_kill,
}

@classmethod
def status(cls, text: str | None = None) -> MessageType:
"""The status message send over communicator."""
return {
INTENT_KEY: Intent.STATUS,
MESSAGE_KEY: text,
}


TASK_KEY = 'task'
TASK_ARGS = 'args'
PERSIST_KEY = 'persist'
# Launch
PROCESS_CLASS_KEY = 'process_class'
ARGS_KEY = 'init_args'
KWARGS_KEY = 'init_kwargs'
NOWAIT_KEY = 'nowait'
# Continue
PID_KEY = 'pid'
TAG_KEY = 'tag'
# Task types
LAUNCH_TASK = 'launch'
CONTINUE_TASK = 'continue'
CREATE_TASK = 'create'

LOGGER = logging.getLogger(__name__)


def create_launch_body(
process_class: str,
init_args: Optional[Sequence[Any]] = None,
init_kwargs: Optional[Dict[str, Any]] = None,
persist: bool = False,
loader: Optional[loaders.ObjectLoader] = None,
nowait: bool = True,
) -> Dict[str, Any]:
"""
Create a message body for the launch action
:param process_class: the class of the process to launch
:param init_args: any initialisation positional arguments
:param init_kwargs: any initialisation keyword arguments
:param persist: persist this process if True, otherwise don't
:param loader: the loader to use to load the persisted process
:param nowait: wait for the process to finish before completing the task, otherwise just return the PID
:return: a dictionary with the body of the message to launch the process
:rtype: dict
"""
if loader is None:
loader = loaders.get_object_loader()

msg_body = {
TASK_KEY: LAUNCH_TASK,
TASK_ARGS: {
PROCESS_CLASS_KEY: loader.identify_object(process_class),
PERSIST_KEY: persist,
NOWAIT_KEY: nowait,
ARGS_KEY: init_args,
KWARGS_KEY: init_kwargs,
},
}
return msg_body


def create_continue_body(pid: 'PID_TYPE', tag: Optional[str] = None, nowait: bool = False) -> Dict[str, Any]:
"""
Create a message body to continue an existing process
:param pid: the pid of the existing process
:param tag: the optional persistence tag
:param nowait: wait for the process to finish before completing the task, otherwise just return the PID
:return: a dictionary with the body of the message to continue the process
"""
msg_body = {TASK_KEY: CONTINUE_TASK, TASK_ARGS: {PID_KEY: pid, NOWAIT_KEY: nowait, TAG_KEY: tag}}
return msg_body


def create_create_body(
process_class: str,
init_args: Optional[Sequence[Any]] = None,
init_kwargs: Optional[Dict[str, Any]] = None,
persist: bool = False,
loader: Optional[loaders.ObjectLoader] = None,
) -> Dict[str, Any]:
"""
Create a message body to create a new process
:param process_class: the class of the process to launch
:param init_args: any initialisation positional arguments
:param init_kwargs: any initialisation keyword arguments
:param persist: persist this process if True, otherwise don't
:param loader: the loader to use to load the persisted process
:return: a dictionary with the body of the message to launch the process
"""
if loader is None:
loader = loaders.get_object_loader()

msg_body = {
TASK_KEY: CREATE_TASK,
TASK_ARGS: {
PROCESS_CLASS_KEY: loader.identify_object(process_class),
PERSIST_KEY: persist,
ARGS_KEY: init_args,
KWARGS_KEY: init_kwargs,
},
}
return msg_body


class RemoteProcessController:
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/test_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import plumpy
from plumpy import BundleKeys, Process, ProcessState
from plumpy.process_comms import MessageBuilder
from plumpy.message import MessageBuilder
from plumpy.utils import AttributesFrozendict
from tests import utils

Expand Down
2 changes: 1 addition & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import plumpy
from plumpy import persistence, process_states, processes, utils
from plumpy.process_comms import MessageBuilder
from plumpy.message import MessageBuilder

Snapshot = collections.namedtuple('Snapshot', ['state', 'bundle', 'outputs'])

Expand Down

0 comments on commit c0a8bbd

Please sign in to comment.