From 009bbdcbc38bf4d5dcd766dcdd070adf72c936b1 Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Thu, 19 Oct 2023 14:20:09 +0100 Subject: [PATCH 1/7] make optional param required to be named --- .../test_front_end_common_dsg_region_reloader.py | 2 +- .../interface_functions/test_load_data_specification.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/unittests/interface/interface_functions/test_front_end_common_dsg_region_reloader.py b/unittests/interface/interface_functions/test_front_end_common_dsg_region_reloader.py index e2784ef7c4..06115a4bcf 100644 --- a/unittests/interface/interface_functions/test_front_end_common_dsg_region_reloader.py +++ b/unittests/interface/interface_functions/test_front_end_common_dsg_region_reloader.py @@ -114,7 +114,7 @@ def __init__(self): @overrides(MockableTransceiver.write_memory) def write_memory( - self, x, y, base_address, data, n_bytes=None, offset=0, + self, x, y, base_address, data, *, n_bytes=None, offset=0, cpu=0, is_filename=False, get_sum=False): self._regions_rewritten.append((base_address, data)) diff --git a/unittests/interface/interface_functions/test_load_data_specification.py b/unittests/interface/interface_functions/test_load_data_specification.py index bd212dc07c..b72f9a2e74 100644 --- a/unittests/interface/interface_functions/test_load_data_specification.py +++ b/unittests/interface/interface_functions/test_load_data_specification.py @@ -57,7 +57,7 @@ def malloc_sdram(self, x, y, size, app_id, tag=None): @overrides(Version5Transceiver.write_memory) def write_memory( - self, x, y, base_address, data, n_bytes=None, offset=0, + self, x, y, base_address, data, *, n_bytes=None, offset=0, cpu=0, is_filename=False, get_sum=False): if isinstance(data, int): data = struct.pack(" Date: Thu, 19 Oct 2023 14:33:35 +0100 Subject: [PATCH 2/7] make optional param required to be named --- .../test_front_end_common_load_executable_images.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fec_integration_tests/interface/interface_functions/test_front_end_common_load_executable_images.py b/fec_integration_tests/interface/interface_functions/test_front_end_common_load_executable_images.py index a20bce567f..453321c07c 100644 --- a/fec_integration_tests/interface/interface_functions/test_front_end_common_load_executable_images.py +++ b/fec_integration_tests/interface/interface_functions/test_front_end_common_load_executable_images.py @@ -35,7 +35,7 @@ def __init__(self, test_case): @overrides(MockableTransceiver.execute_flood) def execute_flood( - self, core_subsets, executable, app_id, + self, core_subsets, executable, app_id, *, n_bytes=None, wait=False, is_filename=False): # @UnusedVariable for core_subset in core_subsets.core_subsets: x = core_subset.x From d056620053fb789b1fe6d2c3a1bc47a4711a9c8b Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Thu, 19 Oct 2023 14:58:32 +0100 Subject: [PATCH 3/7] is_filename is no longer a param --- spinn_front_end_common/utilities/helpful_functions.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spinn_front_end_common/utilities/helpful_functions.py b/spinn_front_end_common/utilities/helpful_functions.py index 5dff8da70a..6482bf304b 100644 --- a/spinn_front_end_common/utilities/helpful_functions.py +++ b/spinn_front_end_common/utilities/helpful_functions.py @@ -123,8 +123,7 @@ def flood_fill_binary_to_spinnaker(binary): executable_targets = FecDataView.get_executable_targets() core_subset = executable_targets.get_cores_for_binary(binary) FecDataView.get_transceiver().execute_flood( - core_subset, binary, FecDataView.get_app_id(), wait=True, - is_filename=True) + core_subset, binary, FecDataView.get_app_id(), wait=True) return len(core_subset) From 0276baa2521176ec7df74ee51311b4b657f396f2 Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Thu, 19 Oct 2023 15:22:58 +0100 Subject: [PATCH 4/7] is_filename is no longer a param --- .../test_front_end_common_load_executable_images.py | 5 ++--- spinn_front_end_common/utilities/system_control_logic.py | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/fec_integration_tests/interface/interface_functions/test_front_end_common_load_executable_images.py b/fec_integration_tests/interface/interface_functions/test_front_end_common_load_executable_images.py index 453321c07c..48ce0bd816 100644 --- a/fec_integration_tests/interface/interface_functions/test_front_end_common_load_executable_images.py +++ b/fec_integration_tests/interface/interface_functions/test_front_end_common_load_executable_images.py @@ -34,9 +34,8 @@ def __init__(self, test_case): self._executable_on_core = dict() @overrides(MockableTransceiver.execute_flood) - def execute_flood( - self, core_subsets, executable, app_id, *, - n_bytes=None, wait=False, is_filename=False): # @UnusedVariable + def execute_flood(self, core_subsets, executable, app_id, *, + n_bytes=None, wait=False): # @UnusedVariable for core_subset in core_subsets.core_subsets: x = core_subset.x y = core_subset.y diff --git a/spinn_front_end_common/utilities/system_control_logic.py b/spinn_front_end_common/utilities/system_control_logic.py index 9e950d40e5..0857ac230d 100644 --- a/spinn_front_end_common/utilities/system_control_logic.py +++ b/spinn_front_end_common/utilities/system_control_logic.py @@ -154,8 +154,7 @@ def _load_application(executable_targets, app_id): transceiver = FecDataView.get_transceiver() for binary in executable_targets.binaries: core_subsets = executable_targets.get_cores_for_binary(binary) - transceiver.execute_flood( - core_subsets, binary, app_id, wait=True, is_filename=True) + transceiver.execute_flood(core_subsets, binary, app_id, wait=True) # Sleep to allow cores to get going time.sleep(0.5) From 761d21d94f0cd798f625af002207177ef3e49ac4 Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Thu, 19 Oct 2023 16:35:43 +0100 Subject: [PATCH 5/7] write_memory parma fix --- .../machine_bit_field_router_compressor.py | 8 ++++---- .../test_front_end_common_dsg_region_reloader.py | 5 ++--- .../interface_functions/test_load_data_specification.py | 5 ++--- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/spinn_front_end_common/interface/interface_functions/machine_bit_field_router_compressor.py b/spinn_front_end_common/interface/interface_functions/machine_bit_field_router_compressor.py index 44f17ade08..a01ea5a29c 100644 --- a/spinn_front_end_common/interface/interface_functions/machine_bit_field_router_compressor.py +++ b/spinn_front_end_common/interface/interface_functions/machine_bit_field_router_compressor.py @@ -475,8 +475,8 @@ def _load_usable_sdram( matrix_addresses_and_size) # write sdram - transceiver.write_memory( - chip_x, chip_y, sdram_address, address_data, len(address_data)) + transceiver.write_memory(chip_x, chip_y, sdram_address, address_data, + n_bytes=len(address_data)) # get the only processor on the chip processor_id = list(cores.all_core_subsets.get_core_subset_for_chip( @@ -547,8 +547,8 @@ def _load_address_data( matrix_addresses_and_size, len(address_data)) # write sdram - transceiver.write_memory( - chip_x, chip_y, sdram_address, address_data, len(address_data)) + transceiver.write_memory(chip_x, chip_y, sdram_address, address_data, + n_bytes=len(address_data)) # get the only processor on the chip sorter_cores = cores.get_cores_for_binary( diff --git a/unittests/interface/interface_functions/test_front_end_common_dsg_region_reloader.py b/unittests/interface/interface_functions/test_front_end_common_dsg_region_reloader.py index 06115a4bcf..f6b7888eb8 100644 --- a/unittests/interface/interface_functions/test_front_end_common_dsg_region_reloader.py +++ b/unittests/interface/interface_functions/test_front_end_common_dsg_region_reloader.py @@ -113,9 +113,8 @@ def __init__(self): self._regions_rewritten = list() @overrides(MockableTransceiver.write_memory) - def write_memory( - self, x, y, base_address, data, *, n_bytes=None, offset=0, - cpu=0, is_filename=False, get_sum=False): + def write_memory(self, x, y, base_address, data, *, n_bytes=None, + offset=0, cpu=0, get_sum=False): self._regions_rewritten.append((base_address, data)) diff --git a/unittests/interface/interface_functions/test_load_data_specification.py b/unittests/interface/interface_functions/test_load_data_specification.py index b72f9a2e74..fe3dc3578f 100644 --- a/unittests/interface/interface_functions/test_load_data_specification.py +++ b/unittests/interface/interface_functions/test_load_data_specification.py @@ -56,9 +56,8 @@ def malloc_sdram(self, x, y, size, app_id, tag=None): return address @overrides(Version5Transceiver.write_memory) - def write_memory( - self, x, y, base_address, data, *, n_bytes=None, offset=0, - cpu=0, is_filename=False, get_sum=False): + def write_memory(self, x, y, base_address, data, *, n_bytes=None, + offset=0, cpu=0, get_sum=False): if isinstance(data, int): data = struct.pack(" Date: Fri, 20 Oct 2023 10:26:04 +0100 Subject: [PATCH 6/7] typing changes and now closer works --- .../interface_functions/spalloc_allocator.py | 130 +++++++++++------- 1 file changed, 81 insertions(+), 49 deletions(-) diff --git a/spinn_front_end_common/interface/interface_functions/spalloc_allocator.py b/spinn_front_end_common/interface/interface_functions/spalloc_allocator.py index 982e650078..c6a1b7c49b 100644 --- a/spinn_front_end_common/interface/interface_functions/spalloc_allocator.py +++ b/spinn_front_end_common/interface/interface_functions/spalloc_allocator.py @@ -11,18 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from contextlib import ExitStack +from contextlib import AbstractContextManager, ExitStack import logging import math -from typing import Dict, Tuple +from typing import ContextManager, Dict, Tuple, Optional, Union, cast from spinn_utilities.config_holder import ( - get_config_bool, get_config_int, get_config_str, get_config_str_or_none, - get_config_str_list) + get_config_bool, get_config_str_or_none, get_config_str_list) from spinn_utilities.log import FormatAdapter from spinn_utilities.overrides import overrides -from spalloc_client import Job -from spalloc_client.states import JobState -from spinn_utilities.abstract_context_manager import AbstractContextManager +from spinn_utilities.typing.coords import XY +from spinn_utilities.config_holder import get_config_int, get_config_str +from spalloc_client import Job # type: ignore[import] +from spalloc_client.states import JobState # type: ignore[import] from spinnman.constants import SCP_SCAMP_PORT from spinnman.spalloc import ( is_server_address, SpallocClient, SpallocJob, SpallocState) @@ -33,6 +33,9 @@ from spinn_front_end_common.data import FecDataView from spinn_front_end_common.interface.provenance import ProvenanceWriter from spinn_front_end_common.utilities.utility_calls import parse_old_spalloc +from spinnman.transceiver import Transceiver +from spinnman.connections.udp_packet_connections import ( + SCAMPConnection, EIEIOConnection) logger = FormatAdapter(logging.getLogger(__name__)) _MACHINE_VERSION = 5 # Spalloc only ever works with v5 boards @@ -70,28 +73,35 @@ def __init__( super().__init__("SpallocJobController") @overrides(AbstractMachineAllocationController.extend_allocation) - def extend_allocation(self, new_total_run_time): + def extend_allocation(self, new_total_run_time: float): # Does Nothing in this allocator - machines are held until exit pass - @overrides(AbstractMachineAllocationController.close) - def close(self): - super().close() - self.__closer.close() + def __stop(self) -> None: + self.__closer.__exit__(None, None, None) self._job.destroy() self.__client.close() + @overrides(AbstractMachineAllocationController.close) + def close(self) -> None: + super().close() + self.__stop() + @overrides(AbstractMachineAllocationController.where_is_machine) - def where_is_machine(self, chip_x, chip_y): + def where_is_machine( + self, chip_x: int, chip_y: int) -> Tuple[int, int, int]: """ :param int chip_x: :param int chip_y: :rtype: tuple(int,int,int) """ - return self._job.where_is_machine(x=chip_x, y=chip_y) + result = self._job.where_is_machine(x=chip_x, y=chip_y) + if result is None: + raise ValueError("coordinates lie outside machine") + return result @overrides(MachineAllocationController._wait) - def _wait(self): + def _wait(self) -> bool: try: if self._state != SpallocState.DESTROYED: self._state = self._job.wait_for_state_change(self._state) @@ -103,48 +113,67 @@ def _wait(self): return self._state != SpallocState.DESTROYED @overrides(MachineAllocationController._teardown) - def _teardown(self): + def _teardown(self) -> None: if not self._exited: - self.__closer.close() - self._job.destroy() - self.__client.close() + self.__stop() super()._teardown() - @overrides(AbstractMachineAllocationController.create_transceiver) - def create_transceiver(self): + @overrides(AbstractMachineAllocationController.create_transceiver, + extend_doc=True) + def create_transceiver(self) -> Optional[Transceiver]: """ .. note:: This allocation controller proxies the transceiver's connections via Spalloc. This allows it to work even outside the UNIMAN firewall. - """ if not self.__use_proxy: - return super(SpallocJobController, self).create_transceiver() + return super().create_transceiver() txrx = self._job.create_transceiver() return txrx - @overrides(AbstractMachineAllocationController.open_sdp_connection) - def open_sdp_connection(self, chip_x, chip_y, udp_port=SCP_SCAMP_PORT): + @overrides(AbstractMachineAllocationController.open_sdp_connection, + extend_doc=True) + def open_sdp_connection( + self, chip_x: int, chip_y: int, + udp_port: int = SCP_SCAMP_PORT) -> Optional[SCAMPConnection]: """ .. note:: This allocation controller proxies connections via Spalloc. This allows it to work even outside the UNIMAN firewall. - """ + if not self.__use_proxy: + return super().open_sdp_connection(chip_x, chip_y, udp_port) return self._job.connect_to_board(chip_x, chip_y, udp_port) - @overrides(AbstractMachineAllocationController.open_eieio_connection) - def open_eieio_connection(self, chip_x, chip_y): - return self._job.open_eieio_connection(chip_x, chip_y, SCP_SCAMP_PORT) + @overrides(AbstractMachineAllocationController.open_eieio_connection, + extend_doc=True) + def open_eieio_connection( + self, chip_x: int, chip_y: int) -> Optional[EIEIOConnection]: + """ + .. note:: + This allocation controller proxies connections via Spalloc. This + allows it to work even outside the UNIMAN firewall. + """ + if not self.__use_proxy: + return super().open_eieio_connection(chip_x, chip_y) + return self._job.open_eieio_connection(chip_x, chip_y) - @overrides(AbstractMachineAllocationController.open_eieio_listener) - def open_eieio_listener(self): + @overrides(AbstractMachineAllocationController.open_eieio_listener, + extend_doc=True) + def open_eieio_listener(self) -> EIEIOConnection: + """ + .. note:: + This allocation controller proxies connections via Spalloc. This + allows it to work even outside the UNIMAN firewall. + """ + if not self.__use_proxy: + return super().open_eieio_listener() return self._job.open_eieio_listener_connection() @property @overrides(AbstractMachineAllocationController.proxying) - def proxying(self): + def proxying(self) -> bool: return self.__use_proxy @overrides(MachineAllocationController.make_report) @@ -172,12 +201,12 @@ def __init__(self, job: Job, host: str): super().__init__("SpallocJobController", host) @overrides(AbstractMachineAllocationController.extend_allocation) - def extend_allocation(self, new_total_run_time): + def extend_allocation(self, new_total_run_time: float): # Does Nothing in this allocator - machines are held until exit pass @overrides(AbstractMachineAllocationController.close) - def close(self): + def close(self) -> None: super().close() self._job.destroy() @@ -223,9 +252,10 @@ def _teardown(self): def spalloc_allocator( - bearer_token: str = None, group: str = None, collab: str = None, - nmpi_job: int = None, nmpi_user: str = None) -> Tuple[ - str, int, None, bool, bool, Dict[Tuple[int, int], str], None, + bearer_token: Optional[str] = None, group: Optional[str] = None, + collab: Optional[str] = None, nmpi_job: Union[int, str, None] = None, + nmpi_user: Optional[str] = None) -> Tuple[ + str, int, None, bool, bool, Dict[XY, str], MachineAllocationController]: """ Request a machine from a SPALLOC server that will fit the given @@ -247,7 +277,6 @@ def spalloc_allocator( :rtype: tuple(str, int, object, bool, bool, dict(tuple(int,int),str), MachineAllocationController) """ - spalloc_server = get_config_str("Machine", "spalloc_server") # Work out how many boards are needed @@ -269,7 +298,8 @@ def spalloc_allocator( if is_server_address(spalloc_server): host, connections, mac = _allocate_job_new( - spalloc_server, n_boards, bearer_token, group, collab, nmpi_job, + spalloc_server, n_boards, bearer_token, group, collab, + int(nmpi_job) if nmpi_job is not None else None, nmpi_user) else: host, connections, mac = _allocate_job_old(spalloc_server, n_boards) @@ -278,9 +308,10 @@ def spalloc_allocator( def _allocate_job_new( spalloc_server: str, n_boards: int, - bearer_token: str = None, group: str = None, collab: str = None, - nmpi_job: int = None, nmpi_user: str = None) -> Tuple[ - str, Dict[Tuple[int, int], str], MachineAllocationController]: + bearer_token: Optional[str] = None, group: Optional[str] = None, + collab: Optional[str] = None, nmpi_job: Optional[int] = None, + nmpi_user: Optional[str] = None) -> Tuple[ + str, Dict[XY, str], MachineAllocationController]: """ Request a machine from an new-style spalloc server that will fit the given number of boards. @@ -295,7 +326,7 @@ def _allocate_job_new( :param collab: The collab to associate with or None for no collab :type collab: str or None :param nmpi_job: The NMPI Job to associate with or None for no job - :type nmpi_job: str or None + :type nmpi_job: int or None :param nmpi_user: The NMPI username to associate with or None for no user :type nmpi_user: str or None @@ -308,7 +339,7 @@ def _allocate_job_new( client = SpallocClient( spalloc_server, bearer_token=bearer_token, group=group, collab=collab, nmpi_job=nmpi_job, nmpi_user=nmpi_user) - stack.enter_context(client) + stack.enter_context(cast(ContextManager[SpallocClient], client)) job = client.create_job(n_boards, spalloc_machine) stack.enter_context(job) task = job.launch_keepalive_task() @@ -323,15 +354,16 @@ def _allocate_job_new( "boards: {}", str(connections).replace("{", "[").replace("}", "]")) allocation_controller = SpallocJobController( - client, job, task, use_proxy) + client, job, task, use_proxy or False) # Success! We don't want to close the client, job or task now; # the allocation controller now owns them. stack.pop_all() - return (root, connections, allocation_controller) + assert root is not None, "no root of ready board" + return (root, connections, allocation_controller) def _allocate_job_old(spalloc_server: str, n_boards: int) -> Tuple[ - str, Dict[Tuple[int, int], str], MachineAllocationController]: + str, Dict[XY, str], MachineAllocationController]: """ Request a machine from an old-style spalloc server that will fit the requested number of boards. @@ -361,7 +393,7 @@ def _allocate_job_old(spalloc_server: str, n_boards: int) -> Tuple[ def _launch_checked_job_old(n_boards: int, spalloc_kwargs: dict) -> Tuple[ - Job, str, Dict[Tuple[int, int], str]]: + Job, str, Dict[XY, str]]: """ :rtype: tuple(~.Job, str, dict(tuple(int,int),str)) """ @@ -402,4 +434,4 @@ def _launch_checked_job_old(n_boards: int, spalloc_kwargs: dict) -> Tuple[ del connections[key] for avoid_job in avoid_jobs: avoid_job.destroy("Asked to avoid by cfg") - return job, hostname, connections + return job, hostname, connections \ No newline at end of file From 0ce4b3d6eea4aa0c456f1fd0e417be1bd72aa25e Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Fri, 20 Oct 2023 10:32:54 +0100 Subject: [PATCH 7/7] flake8 --- .../interface/interface_functions/spalloc_allocator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spinn_front_end_common/interface/interface_functions/spalloc_allocator.py b/spinn_front_end_common/interface/interface_functions/spalloc_allocator.py index c6a1b7c49b..0dd1d75e81 100644 --- a/spinn_front_end_common/interface/interface_functions/spalloc_allocator.py +++ b/spinn_front_end_common/interface/interface_functions/spalloc_allocator.py @@ -434,4 +434,4 @@ def _launch_checked_job_old(n_boards: int, spalloc_kwargs: dict) -> Tuple[ del connections[key] for avoid_job in avoid_jobs: avoid_job.destroy("Asked to avoid by cfg") - return job, hostname, connections \ No newline at end of file + return job, hostname, connections