Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run methods #1244

Merged
merged 13 commits into from
Dec 30, 2024
176 changes: 84 additions & 92 deletions spinn_front_end_common/interface/abstract_spinnaker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,12 @@ def _calc_run_time(self, run_time: Optional[float]) -> Union[
:rtype: tuple(int,float) or tuple(None,None)
"""
if run_time is None:
# TODO does this make sense?
# https://github.com/SpiNNakerManchester/SpiNNFrontEndCommon/issues/1243
if FecDataView.has_allocation_controller():
FecDataView.get_allocation_controller().extend_allocation(0.0)
self._data_writer.set_plan_n_timesteps(get_config_int(
"Buffers", "minimum_auto_time_steps"))
return None, None
n_machine_time_steps = self.__timesteps(run_time)
total_run_timesteps = (
Expand All @@ -503,6 +509,23 @@ def _calc_run_time(self, run_time: Optional[float]) -> Union[
total_run_time = (
total_run_timesteps *
self._data_writer.get_hardware_time_step_ms())
if FecDataView.has_allocation_controller():
FecDataView.get_allocation_controller().extend_allocation(
total_run_time)

if get_config_bool("Buffers", "use_auto_pause_and_resume"):
self._data_writer.set_plan_n_timesteps(
get_config_int("Buffers", "minimum_auto_time_steps"))
else:
self._data_writer.set_plan_n_timesteps(n_machine_time_steps)

if not get_config_bool("Buffers", "use_auto_pause_and_resume"):
if (self._data_writer.get_max_run_time_steps() <
n_machine_time_steps):
raise ConfigurationException(
"The SDRAM required by one or more vertices is based on "
"the run time, so the run time is limited to "
f"{self._data_writer.get_max_run_time_steps()} time steps")

logger.info(
f"Simulating for {n_machine_time_steps} "
Expand Down Expand Up @@ -535,17 +558,7 @@ def __is_main_thread() -> bool:
"""
return threading.get_ident() == threading.main_thread().ident

def __run(self, run_time: Optional[float], sync_time: float):
"""
The main internal run function.

:param int run_time: the run duration in milliseconds.
:param int sync_time:
the time in milliseconds between synchronisations, or 0 to disable.
"""
if not self._should_run():
return

def __run_verify(self) -> None:
# verify that we can keep doing auto pause and resume
if self._data_writer.is_ran_ever():
can_keep_running = all(
Expand All @@ -557,21 +570,6 @@ def __run(self, run_time: Optional[float], sync_time: float):
"Only binaries that use the simulation interface can be"
" run more than once")

# Install the Control-C handler
if self.__is_main_thread():
signal.signal(signal.SIGINT, self.__signal_handler)
self._raise_keyboard_interrupt = True
sys.excepthook = self.__sys_excepthook

logger.info("Starting execution process")

n_machine_time_steps, total_run_time = self._calc_run_time(run_time)
if FecDataView.has_allocation_controller():
FecDataView.get_allocation_controller().extend_allocation(
total_run_time or 0.0)

n_sync_steps = self.__timesteps(sync_time)

# If we have never run before, or the graph has changed,
# start by performing mapping
if (self._data_writer.get_requires_mapping() and
Expand All @@ -581,79 +579,62 @@ def __run(self, run_time: Optional[float], sync_time: float):
"The network cannot be changed between runs without"
" resetting")

def __run_control_c_handler_on(self) -> None:
# Install the Control-C handler
if self.__is_main_thread():
signal.signal(signal.SIGINT, self.__signal_handler)
self._raise_keyboard_interrupt = True
sys.excepthook = self.__sys_excepthook

def __run_control_c_handler_off(self) -> None:
# Indicate that the signal handler needs to act
if self.__is_main_thread():
self._raise_keyboard_interrupt = False
sys.excepthook = self.exception_handler

def __run_reset_sync_signal(self) -> None:
# If we have reset and the graph has changed, stop any running
# application
if (self._data_writer.get_requires_data_generation() and
self._data_writer.has_transceiver()):
self._data_writer.get_transceiver().stop_application(
self._data_writer.get_app_id())
self._data_writer.reset_sync_signal()
# build the graphs to modify with system requirements
if self._data_writer.get_requires_mapping():
if self._data_writer.is_soft_reset():
# wipe out stuff associated with past mapping
self._hard_reset()
FecTimer.setup(self)

self._add_dependent_verts_and_edges_for_application_graph()
def __run(self, run_time: Optional[float], sync_time: float):
"""
The main internal run function.

if ((get_config_bool("Buffers", "use_auto_pause_and_resume"))
or (run_time is None)):
self._data_writer.set_plan_n_timesteps(get_config_int(
"Buffers", "minimum_auto_time_steps"))
else:
self._data_writer.set_plan_n_timesteps(n_machine_time_steps)
:param int run_time: the run duration in milliseconds.
:param int sync_time:
the time in milliseconds between synchronisations, or 0 to disable.
"""
if not self._should_run():
return
logger.info("Starting execution process")
self.__run_verify()
self.__run_control_c_handler_on()
n_machine_time_steps, total_run_time = self._calc_run_time(run_time)
n_sync_steps = self.__timesteps(sync_time)
self.__run_reset_sync_signal()

# build the graphs to modify with system requirements
if self._data_writer.get_requires_mapping():
self._do_mapping(total_run_time)

if not self._data_writer.is_ran_last():
self._do_write_metadata()

# Check if anything has per-timestep SDRAM usage
is_per_timestep_sdram = any(
placement.vertex.sdram_required.per_timestep
for placement in self._data_writer.iterate_placemements())

# Disable auto pause and resume if the binary can't do it
if not get_config_bool("Machine", "virtual_board"):
for executable_type in self._data_writer.get_executable_types():
if not executable_type.supports_auto_pause_and_resume:
set_config("Buffers", "use_auto_pause_and_resume", "False")
break

# Work out the maximum run duration given all recordings
if not self._data_writer.has_max_run_time_steps():
self._data_writer.set_max_run_time_steps(
self._deduce_data_n_timesteps())

# Work out an array of timesteps to perform
steps: Optional[Sequence[Optional[int]]] = None
if (not get_config_bool("Buffers", "use_auto_pause_and_resume")
or not is_per_timestep_sdram):
# Runs should only be in units of max_run_time_steps at most
if is_per_timestep_sdram and (
n_machine_time_steps is None
or (self._data_writer.get_max_run_time_steps()
< n_machine_time_steps)):
raise ConfigurationException(
"The SDRAM required by one or more vertices is based on "
"the run time, so the run time is limited to "
f"{self._data_writer.get_max_run_time_steps()} time steps")

steps = [n_machine_time_steps]
elif run_time is not None:
# With auto pause and resume, any time step is possible but run
# time more than the first will guarantee that run will be called
# more than once
steps = self._generate_steps(n_machine_time_steps)

# requires data_generation includes never run and requires_mapping
if self._data_writer.get_requires_data_generation():
self._do_load()

# Run for each of the given steps
if run_time is not None:
assert steps is not None
if n_machine_time_steps is not None:
if get_config_bool("Buffers", "use_auto_pause_and_resume"):
steps = self._generate_steps(n_machine_time_steps)
else:
steps = [n_machine_time_steps]
logger.info("Running for {} steps for a total of {}ms",
len(steps), run_time)
self._data_writer.set_n_run_steps(len(steps))
Expand All @@ -662,20 +643,17 @@ def __run(self, run_time: Optional[float], sync_time: float):
logger.info(f"Run {run_step} of {len(steps)}")
self._do_run(step, n_sync_steps)
self._data_writer.clear_run_steps()
elif run_time is None and self._run_until_complete:
elif self._run_until_complete:
logger.info("Running until complete")
self._do_run(None, n_sync_steps)
else:
if is_per_timestep_sdram:
if self._data_writer.get_max_run_time_steps() < sys.maxsize:
logger.warning("Due to recording this simulation "
"should not be run longer than {}ms",
self._data_writer.get_max_run_time_steps())
logger.info("Running until stop is called by another thread")
self._do_run(None, n_sync_steps)
# Indicate that the signal handler needs to act
if self.__is_main_thread():
self._raise_keyboard_interrupt = False
sys.excepthook = self.exception_handler
self.__run_control_c_handler_off()

@final
def _add_commands_to_command_sender(self, system_placements: Placements):
Expand Down Expand Up @@ -711,14 +689,13 @@ def _add_dependent_verts_and_edges_for_application_graph(self) -> None:
ApplicationEdge(v, dpt_vtx), edge_identifier)

@final
def _deduce_data_n_timesteps(self) -> int:
def _deduce_data_n_timesteps(self) -> None:
"""
Operates the auto pause and resume functionality by figuring out
how many timer ticks a simulation can run before SDRAM runs out,
and breaks simulation into chunks of that long.

:return: max time a simulation can run.
:rtype: int
"""
# Go through the placements and find how much SDRAM is used
# on each chip
Expand Down Expand Up @@ -747,9 +724,9 @@ def _deduce_data_n_timesteps(self) -> int:
max_this_chip = int((size - sdram.fixed) // sdram.per_timestep)
max_time_steps = min(max_time_steps, max_this_chip)

return max_time_steps
self._data_writer.set_max_run_time_steps(max_time_steps)

def _generate_steps(self, n_steps: Optional[int]) -> Sequence[int]:
def _generate_steps(self, n_steps: int) -> Sequence[int]:
"""
Generates the list of "timer" runs. These are usually in terms of
time steps, but need not be.
Expand All @@ -758,7 +735,7 @@ def _generate_steps(self, n_steps: Optional[int]) -> Sequence[int]:
:return: list of time step lengths
:rtype: list(int)
"""
if n_steps is None or n_steps == 0:
if n_steps == 0:
return [0]
n_steps_per_segment = self._data_writer.get_max_run_time_steps()
n_full_iterations = int(math.floor(n_steps / n_steps_per_segment))
Expand Down Expand Up @@ -1349,8 +1326,18 @@ def _execute_locate_executable_start_type(self) -> None:
May set the executable_types data.
"""
with FecTimer("Locate executable start type", TimerWork.OTHER):
self._data_writer.set_executable_types(
locate_executable_start_type())
binary_start_types = locate_executable_start_type()
self._data_writer.set_executable_types(binary_start_types)
if get_config_bool("Buffers", "use_auto_pause_and_resume"):
# Disable auto pause and resume if the binary can't do it
for executable_type in binary_start_types:
if not executable_type.supports_auto_pause_and_resume:
logger.warning(
"Disabling auto pause resume as graph includes {}",
executable_type)
set_config("Buffers",
"use_auto_pause_and_resume", "False")
break

@final
def _execute_buffer_manager_creator(self) -> None:
Expand Down Expand Up @@ -1392,6 +1379,10 @@ def _do_mapping(self, total_run_time: Optional[float]) -> None:
:param float total_run_time:
"""
FecTimer.start_category(TimerCategory.MAPPING)
if self._data_writer.is_soft_reset():
# wipe out stuff associated with past mapping
self._hard_reset()
self._add_dependent_verts_and_edges_for_application_graph()

self._setup_java_caller()
self._do_extra_mapping_algorithms()
Expand Down Expand Up @@ -1435,6 +1426,7 @@ def _do_mapping(self, total_run_time: Optional[float]) -> None:
self._execute_locate_executable_start_type()
self._execute_buffer_manager_creator()

self._deduce_data_n_timesteps()
FecTimer.end_category(TimerCategory.MAPPING)

# Overridden by spy which adds placement_order
Expand Down