diff --git a/spinn_front_end_common/interface/interface_functions/spalloc_allocator.py b/spinn_front_end_common/interface/interface_functions/spalloc_allocator.py index 982e650078..c6a1b7c49b 100644 --- a/spinn_front_end_common/interface/interface_functions/spalloc_allocator.py +++ b/spinn_front_end_common/interface/interface_functions/spalloc_allocator.py @@ -11,18 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from contextlib import ExitStack +from contextlib import AbstractContextManager, ExitStack import logging import math -from typing import Dict, Tuple +from typing import ContextManager, Dict, Tuple, Optional, Union, cast from spinn_utilities.config_holder import ( - get_config_bool, get_config_int, get_config_str, get_config_str_or_none, - get_config_str_list) + get_config_bool, get_config_str_or_none, get_config_str_list) from spinn_utilities.log import FormatAdapter from spinn_utilities.overrides import overrides -from spalloc_client import Job -from spalloc_client.states import JobState -from spinn_utilities.abstract_context_manager import AbstractContextManager +from spinn_utilities.typing.coords import XY +from spinn_utilities.config_holder import get_config_int, get_config_str +from spalloc_client import Job # type: ignore[import] +from spalloc_client.states import JobState # type: ignore[import] from spinnman.constants import SCP_SCAMP_PORT from spinnman.spalloc import ( is_server_address, SpallocClient, SpallocJob, SpallocState) @@ -33,6 +33,9 @@ from spinn_front_end_common.data import FecDataView from spinn_front_end_common.interface.provenance import ProvenanceWriter from spinn_front_end_common.utilities.utility_calls import parse_old_spalloc +from spinnman.transceiver import Transceiver +from spinnman.connections.udp_packet_connections import ( + SCAMPConnection, EIEIOConnection) logger = FormatAdapter(logging.getLogger(__name__)) _MACHINE_VERSION = 5 # Spalloc only ever works with v5 boards @@ -70,28 +73,35 @@ def __init__( super().__init__("SpallocJobController") @overrides(AbstractMachineAllocationController.extend_allocation) - def extend_allocation(self, new_total_run_time): + def extend_allocation(self, new_total_run_time: float): # Does Nothing in this allocator - machines are held until exit pass - @overrides(AbstractMachineAllocationController.close) - def close(self): - super().close() - self.__closer.close() + def __stop(self) -> None: + self.__closer.__exit__(None, None, None) self._job.destroy() self.__client.close() + @overrides(AbstractMachineAllocationController.close) + def close(self) -> None: + super().close() + self.__stop() + @overrides(AbstractMachineAllocationController.where_is_machine) - def where_is_machine(self, chip_x, chip_y): + def where_is_machine( + self, chip_x: int, chip_y: int) -> Tuple[int, int, int]: """ :param int chip_x: :param int chip_y: :rtype: tuple(int,int,int) """ - return self._job.where_is_machine(x=chip_x, y=chip_y) + result = self._job.where_is_machine(x=chip_x, y=chip_y) + if result is None: + raise ValueError("coordinates lie outside machine") + return result @overrides(MachineAllocationController._wait) - def _wait(self): + def _wait(self) -> bool: try: if self._state != SpallocState.DESTROYED: self._state = self._job.wait_for_state_change(self._state) @@ -103,48 +113,67 @@ def _wait(self): return self._state != SpallocState.DESTROYED @overrides(MachineAllocationController._teardown) - def _teardown(self): + def _teardown(self) -> None: if not self._exited: - self.__closer.close() - self._job.destroy() - self.__client.close() + self.__stop() super()._teardown() - @overrides(AbstractMachineAllocationController.create_transceiver) - def create_transceiver(self): + @overrides(AbstractMachineAllocationController.create_transceiver, + extend_doc=True) + def create_transceiver(self) -> Optional[Transceiver]: """ .. note:: This allocation controller proxies the transceiver's connections via Spalloc. This allows it to work even outside the UNIMAN firewall. - """ if not self.__use_proxy: - return super(SpallocJobController, self).create_transceiver() + return super().create_transceiver() txrx = self._job.create_transceiver() return txrx - @overrides(AbstractMachineAllocationController.open_sdp_connection) - def open_sdp_connection(self, chip_x, chip_y, udp_port=SCP_SCAMP_PORT): + @overrides(AbstractMachineAllocationController.open_sdp_connection, + extend_doc=True) + def open_sdp_connection( + self, chip_x: int, chip_y: int, + udp_port: int = SCP_SCAMP_PORT) -> Optional[SCAMPConnection]: """ .. note:: This allocation controller proxies connections via Spalloc. This allows it to work even outside the UNIMAN firewall. - """ + if not self.__use_proxy: + return super().open_sdp_connection(chip_x, chip_y, udp_port) return self._job.connect_to_board(chip_x, chip_y, udp_port) - @overrides(AbstractMachineAllocationController.open_eieio_connection) - def open_eieio_connection(self, chip_x, chip_y): - return self._job.open_eieio_connection(chip_x, chip_y, SCP_SCAMP_PORT) + @overrides(AbstractMachineAllocationController.open_eieio_connection, + extend_doc=True) + def open_eieio_connection( + self, chip_x: int, chip_y: int) -> Optional[EIEIOConnection]: + """ + .. note:: + This allocation controller proxies connections via Spalloc. This + allows it to work even outside the UNIMAN firewall. + """ + if not self.__use_proxy: + return super().open_eieio_connection(chip_x, chip_y) + return self._job.open_eieio_connection(chip_x, chip_y) - @overrides(AbstractMachineAllocationController.open_eieio_listener) - def open_eieio_listener(self): + @overrides(AbstractMachineAllocationController.open_eieio_listener, + extend_doc=True) + def open_eieio_listener(self) -> EIEIOConnection: + """ + .. note:: + This allocation controller proxies connections via Spalloc. This + allows it to work even outside the UNIMAN firewall. + """ + if not self.__use_proxy: + return super().open_eieio_listener() return self._job.open_eieio_listener_connection() @property @overrides(AbstractMachineAllocationController.proxying) - def proxying(self): + def proxying(self) -> bool: return self.__use_proxy @overrides(MachineAllocationController.make_report) @@ -172,12 +201,12 @@ def __init__(self, job: Job, host: str): super().__init__("SpallocJobController", host) @overrides(AbstractMachineAllocationController.extend_allocation) - def extend_allocation(self, new_total_run_time): + def extend_allocation(self, new_total_run_time: float): # Does Nothing in this allocator - machines are held until exit pass @overrides(AbstractMachineAllocationController.close) - def close(self): + def close(self) -> None: super().close() self._job.destroy() @@ -223,9 +252,10 @@ def _teardown(self): def spalloc_allocator( - bearer_token: str = None, group: str = None, collab: str = None, - nmpi_job: int = None, nmpi_user: str = None) -> Tuple[ - str, int, None, bool, bool, Dict[Tuple[int, int], str], None, + bearer_token: Optional[str] = None, group: Optional[str] = None, + collab: Optional[str] = None, nmpi_job: Union[int, str, None] = None, + nmpi_user: Optional[str] = None) -> Tuple[ + str, int, None, bool, bool, Dict[XY, str], MachineAllocationController]: """ Request a machine from a SPALLOC server that will fit the given @@ -247,7 +277,6 @@ def spalloc_allocator( :rtype: tuple(str, int, object, bool, bool, dict(tuple(int,int),str), MachineAllocationController) """ - spalloc_server = get_config_str("Machine", "spalloc_server") # Work out how many boards are needed @@ -269,7 +298,8 @@ def spalloc_allocator( if is_server_address(spalloc_server): host, connections, mac = _allocate_job_new( - spalloc_server, n_boards, bearer_token, group, collab, nmpi_job, + spalloc_server, n_boards, bearer_token, group, collab, + int(nmpi_job) if nmpi_job is not None else None, nmpi_user) else: host, connections, mac = _allocate_job_old(spalloc_server, n_boards) @@ -278,9 +308,10 @@ def spalloc_allocator( def _allocate_job_new( spalloc_server: str, n_boards: int, - bearer_token: str = None, group: str = None, collab: str = None, - nmpi_job: int = None, nmpi_user: str = None) -> Tuple[ - str, Dict[Tuple[int, int], str], MachineAllocationController]: + bearer_token: Optional[str] = None, group: Optional[str] = None, + collab: Optional[str] = None, nmpi_job: Optional[int] = None, + nmpi_user: Optional[str] = None) -> Tuple[ + str, Dict[XY, str], MachineAllocationController]: """ Request a machine from an new-style spalloc server that will fit the given number of boards. @@ -295,7 +326,7 @@ def _allocate_job_new( :param collab: The collab to associate with or None for no collab :type collab: str or None :param nmpi_job: The NMPI Job to associate with or None for no job - :type nmpi_job: str or None + :type nmpi_job: int or None :param nmpi_user: The NMPI username to associate with or None for no user :type nmpi_user: str or None @@ -308,7 +339,7 @@ def _allocate_job_new( client = SpallocClient( spalloc_server, bearer_token=bearer_token, group=group, collab=collab, nmpi_job=nmpi_job, nmpi_user=nmpi_user) - stack.enter_context(client) + stack.enter_context(cast(ContextManager[SpallocClient], client)) job = client.create_job(n_boards, spalloc_machine) stack.enter_context(job) task = job.launch_keepalive_task() @@ -323,15 +354,16 @@ def _allocate_job_new( "boards: {}", str(connections).replace("{", "[").replace("}", "]")) allocation_controller = SpallocJobController( - client, job, task, use_proxy) + client, job, task, use_proxy or False) # Success! We don't want to close the client, job or task now; # the allocation controller now owns them. stack.pop_all() - return (root, connections, allocation_controller) + assert root is not None, "no root of ready board" + return (root, connections, allocation_controller) def _allocate_job_old(spalloc_server: str, n_boards: int) -> Tuple[ - str, Dict[Tuple[int, int], str], MachineAllocationController]: + str, Dict[XY, str], MachineAllocationController]: """ Request a machine from an old-style spalloc server that will fit the requested number of boards. @@ -361,7 +393,7 @@ def _allocate_job_old(spalloc_server: str, n_boards: int) -> Tuple[ def _launch_checked_job_old(n_boards: int, spalloc_kwargs: dict) -> Tuple[ - Job, str, Dict[Tuple[int, int], str]]: + Job, str, Dict[XY, str]]: """ :rtype: tuple(~.Job, str, dict(tuple(int,int),str)) """ @@ -402,4 +434,4 @@ def _launch_checked_job_old(n_boards: int, spalloc_kwargs: dict) -> Tuple[ del connections[key] for avoid_job in avoid_jobs: avoid_job.destroy("Asked to avoid by cfg") - return job, hostname, connections + return job, hostname, connections \ No newline at end of file