Skip to content

Commit

Permalink
merged in master
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian-B committed Oct 24, 2023
2 parents e0ff03d + 5b143e5 commit f8846e6
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ def __init__(self, test_case):
self._executable_on_core = dict()

@overrides(MockableTransceiver.execute_flood)
def execute_flood(
self, core_subsets, executable, app_id,
n_bytes=None, wait=False, is_filename=False): # @UnusedVariable
def execute_flood(self, core_subsets, executable, app_id, *,
n_bytes=None, wait=False): # @UnusedVariable
for core_subset in core_subsets.core_subsets:
x = core_subset.x
y = core_subset.y
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ def _load_usable_sdram(
matrix_addresses_and_size)

# write sdram
transceiver.write_memory(
chip_x, chip_y, sdram_address, address_data, len(address_data))
transceiver.write_memory(chip_x, chip_y, sdram_address, address_data,
n_bytes=len(address_data))

# get the only processor on the chip
processor_id = list(cores.all_core_subsets.get_core_subset_for_chip(
Expand Down Expand Up @@ -547,8 +547,8 @@ def _load_address_data(
matrix_addresses_and_size, len(address_data))

# write sdram
transceiver.write_memory(
chip_x, chip_y, sdram_address, address_data, len(address_data))
transceiver.write_memory(chip_x, chip_y, sdram_address, address_data,
n_bytes=len(address_data))

# get the only processor on the chip
sorter_cores = cores.get_cores_for_binary(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from contextlib import ExitStack
from contextlib import AbstractContextManager, ExitStack
import logging
import math
from typing import Dict, Tuple
from typing import ContextManager, Dict, Tuple, Optional, Union, cast
from spinn_utilities.config_holder import (
get_config_bool, get_config_int, get_config_str, get_config_str_or_none,
get_config_str_list)
get_config_bool, get_config_str_or_none, get_config_str_list)
from spinn_utilities.log import FormatAdapter
from spinn_utilities.overrides import overrides
from spalloc_client import Job
from spalloc_client.states import JobState
from spinn_utilities.abstract_context_manager import AbstractContextManager
from spinn_utilities.typing.coords import XY
from spinn_utilities.config_holder import get_config_int, get_config_str
from spalloc_client import Job # type: ignore[import]
from spalloc_client.states import JobState # type: ignore[import]
from spinnman.constants import SCP_SCAMP_PORT
from spinnman.spalloc import (
is_server_address, SpallocClient, SpallocJob, SpallocState)
Expand All @@ -33,6 +33,9 @@
from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.interface.provenance import ProvenanceWriter
from spinn_front_end_common.utilities.utility_calls import parse_old_spalloc
from spinnman.transceiver import Transceiver
from spinnman.connections.udp_packet_connections import (
SCAMPConnection, EIEIOConnection)

logger = FormatAdapter(logging.getLogger(__name__))
_MACHINE_VERSION = 5 # Spalloc only ever works with v5 boards
Expand Down Expand Up @@ -70,28 +73,35 @@ def __init__(
super().__init__("SpallocJobController")

@overrides(AbstractMachineAllocationController.extend_allocation)
def extend_allocation(self, new_total_run_time):
def extend_allocation(self, new_total_run_time: float):
# Does Nothing in this allocator - machines are held until exit
pass

@overrides(AbstractMachineAllocationController.close)
def close(self):
super().close()
self.__closer.close()
def __stop(self) -> None:
self.__closer.__exit__(None, None, None)
self._job.destroy()
self.__client.close()

@overrides(AbstractMachineAllocationController.close)
def close(self) -> None:
super().close()
self.__stop()

@overrides(AbstractMachineAllocationController.where_is_machine)
def where_is_machine(self, chip_x, chip_y):
def where_is_machine(
self, chip_x: int, chip_y: int) -> Tuple[int, int, int]:
"""
:param int chip_x:
:param int chip_y:
:rtype: tuple(int,int,int)
"""
return self._job.where_is_machine(x=chip_x, y=chip_y)
result = self._job.where_is_machine(x=chip_x, y=chip_y)
if result is None:
raise ValueError("coordinates lie outside machine")
return result

@overrides(MachineAllocationController._wait)
def _wait(self):
def _wait(self) -> bool:
try:
if self._state != SpallocState.DESTROYED:
self._state = self._job.wait_for_state_change(self._state)
Expand All @@ -103,48 +113,67 @@ def _wait(self):
return self._state != SpallocState.DESTROYED

@overrides(MachineAllocationController._teardown)
def _teardown(self):
def _teardown(self) -> None:
if not self._exited:
self.__closer.close()
self._job.destroy()
self.__client.close()
self.__stop()
super()._teardown()

@overrides(AbstractMachineAllocationController.create_transceiver)
def create_transceiver(self):
@overrides(AbstractMachineAllocationController.create_transceiver,
extend_doc=True)
def create_transceiver(self) -> Optional[Transceiver]:
"""
.. note::
This allocation controller proxies the transceiver's connections
via Spalloc. This allows it to work even outside the UNIMAN
firewall.
"""
if not self.__use_proxy:
return super(SpallocJobController, self).create_transceiver()
return super().create_transceiver()
txrx = self._job.create_transceiver()
return txrx

@overrides(AbstractMachineAllocationController.open_sdp_connection)
def open_sdp_connection(self, chip_x, chip_y, udp_port=SCP_SCAMP_PORT):
@overrides(AbstractMachineAllocationController.open_sdp_connection,
extend_doc=True)
def open_sdp_connection(
self, chip_x: int, chip_y: int,
udp_port: int = SCP_SCAMP_PORT) -> Optional[SCAMPConnection]:
"""
.. note::
This allocation controller proxies connections via Spalloc. This
allows it to work even outside the UNIMAN firewall.
"""
if not self.__use_proxy:
return super().open_sdp_connection(chip_x, chip_y, udp_port)
return self._job.connect_to_board(chip_x, chip_y, udp_port)

@overrides(AbstractMachineAllocationController.open_eieio_connection)
def open_eieio_connection(self, chip_x, chip_y):
return self._job.open_eieio_connection(chip_x, chip_y, SCP_SCAMP_PORT)
@overrides(AbstractMachineAllocationController.open_eieio_connection,
extend_doc=True)
def open_eieio_connection(
self, chip_x: int, chip_y: int) -> Optional[EIEIOConnection]:
"""
.. note::
This allocation controller proxies connections via Spalloc. This
allows it to work even outside the UNIMAN firewall.
"""
if not self.__use_proxy:
return super().open_eieio_connection(chip_x, chip_y)
return self._job.open_eieio_connection(chip_x, chip_y)

@overrides(AbstractMachineAllocationController.open_eieio_listener)
def open_eieio_listener(self):
@overrides(AbstractMachineAllocationController.open_eieio_listener,
extend_doc=True)
def open_eieio_listener(self) -> EIEIOConnection:
"""
.. note::
This allocation controller proxies connections via Spalloc. This
allows it to work even outside the UNIMAN firewall.
"""
if not self.__use_proxy:
return super().open_eieio_listener()
return self._job.open_eieio_listener_connection()

@property
@overrides(AbstractMachineAllocationController.proxying)
def proxying(self):
def proxying(self) -> bool:
return self.__use_proxy

@overrides(MachineAllocationController.make_report)
Expand Down Expand Up @@ -172,12 +201,12 @@ def __init__(self, job: Job, host: str):
super().__init__("SpallocJobController", host)

@overrides(AbstractMachineAllocationController.extend_allocation)
def extend_allocation(self, new_total_run_time):
def extend_allocation(self, new_total_run_time: float):
# Does Nothing in this allocator - machines are held until exit
pass

@overrides(AbstractMachineAllocationController.close)
def close(self):
def close(self) -> None:
super().close()
self._job.destroy()

Expand Down Expand Up @@ -223,9 +252,10 @@ def _teardown(self):


def spalloc_allocator(
bearer_token: str = None, group: str = None, collab: str = None,
nmpi_job: int = None, nmpi_user: str = None) -> Tuple[
str, int, None, bool, bool, Dict[Tuple[int, int], str], None,
bearer_token: Optional[str] = None, group: Optional[str] = None,
collab: Optional[str] = None, nmpi_job: Union[int, str, None] = None,
nmpi_user: Optional[str] = None) -> Tuple[
str, int, None, bool, bool, Dict[XY, str],
MachineAllocationController]:
"""
Request a machine from a SPALLOC server that will fit the given
Expand All @@ -247,7 +277,6 @@ def spalloc_allocator(
:rtype: tuple(str, int, object, bool, bool, dict(tuple(int,int),str),
MachineAllocationController)
"""

spalloc_server = get_config_str("Machine", "spalloc_server")

# Work out how many boards are needed
Expand All @@ -269,7 +298,8 @@ def spalloc_allocator(

if is_server_address(spalloc_server):
host, connections, mac = _allocate_job_new(
spalloc_server, n_boards, bearer_token, group, collab, nmpi_job,
spalloc_server, n_boards, bearer_token, group, collab,
int(nmpi_job) if nmpi_job is not None else None,
nmpi_user)
else:
host, connections, mac = _allocate_job_old(spalloc_server, n_boards)
Expand All @@ -278,9 +308,10 @@ def spalloc_allocator(

def _allocate_job_new(
spalloc_server: str, n_boards: int,
bearer_token: str = None, group: str = None, collab: str = None,
nmpi_job: int = None, nmpi_user: str = None) -> Tuple[
str, Dict[Tuple[int, int], str], MachineAllocationController]:
bearer_token: Optional[str] = None, group: Optional[str] = None,
collab: Optional[str] = None, nmpi_job: Optional[int] = None,
nmpi_user: Optional[str] = None) -> Tuple[
str, Dict[XY, str], MachineAllocationController]:
"""
Request a machine from an new-style spalloc server that will fit the
given number of boards.
Expand All @@ -295,7 +326,7 @@ def _allocate_job_new(
:param collab: The collab to associate with or None for no collab
:type collab: str or None
:param nmpi_job: The NMPI Job to associate with or None for no job
:type nmpi_job: str or None
:type nmpi_job: int or None
:param nmpi_user: The NMPI username to associate with or None for no user
:type nmpi_user: str or None
Expand All @@ -308,7 +339,7 @@ def _allocate_job_new(
client = SpallocClient(
spalloc_server, bearer_token=bearer_token, group=group,
collab=collab, nmpi_job=nmpi_job, nmpi_user=nmpi_user)
stack.enter_context(client)
stack.enter_context(cast(ContextManager[SpallocClient], client))
job = client.create_job(n_boards, spalloc_machine)
stack.enter_context(job)
task = job.launch_keepalive_task()
Expand All @@ -323,15 +354,16 @@ def _allocate_job_new(
"boards: {}",
str(connections).replace("{", "[").replace("}", "]"))
allocation_controller = SpallocJobController(
client, job, task, use_proxy)
client, job, task, use_proxy or False)
# Success! We don't want to close the client, job or task now;
# the allocation controller now owns them.
stack.pop_all()
return (root, connections, allocation_controller)
assert root is not None, "no root of ready board"
return (root, connections, allocation_controller)


def _allocate_job_old(spalloc_server: str, n_boards: int) -> Tuple[
str, Dict[Tuple[int, int], str], MachineAllocationController]:
str, Dict[XY, str], MachineAllocationController]:
"""
Request a machine from an old-style spalloc server that will fit the
requested number of boards.
Expand Down Expand Up @@ -361,7 +393,7 @@ def _allocate_job_old(spalloc_server: str, n_boards: int) -> Tuple[


def _launch_checked_job_old(n_boards: int, spalloc_kwargs: dict) -> Tuple[
Job, str, Dict[Tuple[int, int], str]]:
Job, str, Dict[XY, str]]:
"""
:rtype: tuple(~.Job, str, dict(tuple(int,int),str))
"""
Expand Down
3 changes: 1 addition & 2 deletions spinn_front_end_common/utilities/helpful_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ def flood_fill_binary_to_spinnaker(binary):
executable_targets = FecDataView.get_executable_targets()
core_subset = executable_targets.get_cores_for_binary(binary)
FecDataView.get_transceiver().execute_flood(
core_subset, binary, FecDataView.get_app_id(), wait=True,
is_filename=True)
core_subset, binary, FecDataView.get_app_id(), wait=True)
return len(core_subset)


Expand Down
3 changes: 1 addition & 2 deletions spinn_front_end_common/utilities/system_control_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ def _load_application(executable_targets, app_id):
transceiver = FecDataView.get_transceiver()
for binary in executable_targets.binaries:
core_subsets = executable_targets.get_cores_for_binary(binary)
transceiver.execute_flood(
core_subsets, binary, app_id, wait=True, is_filename=True)
transceiver.execute_flood(core_subsets, binary, app_id, wait=True)

# Sleep to allow cores to get going
time.sleep(0.5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ def __init__(self):
self._regions_rewritten = list()

@overrides(MockableTransceiver.write_memory)
def write_memory(
self, x, y, base_address, data, n_bytes=None, offset=0,
cpu=0, is_filename=False, get_sum=False):
def write_memory(self, x, y, base_address, data, *, n_bytes=None,
offset=0, cpu=0, get_sum=False):
self._regions_rewritten.append((base_address, data))


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ def malloc_sdram(self, x, y, size, app_id, tag=None):
return address

@overrides(Version5Transceiver.write_memory)
def write_memory(
self, x, y, base_address, data, n_bytes=None, offset=0,
cpu=0, is_filename=False, get_sum=False):
def write_memory(self, x, y, base_address, data, *, n_bytes=None,
offset=0, cpu=0, get_sum=False):
if isinstance(data, int):
data = struct.pack("<I", data)
self._regions_written.append((base_address, data))
Expand Down

0 comments on commit f8846e6

Please sign in to comment.