# Copyright (c) 2016 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
main interface for the SpiNNaker tools
"""
from __future__ import annotations
import logging
import math
import os
import re
import signal
import sys
import threading
import types
from threading import Condition
from typing import (
Dict, Iterable, Optional, Sequence, Tuple, Type,
TypeVar, Union, cast, final)
from types import FrameType
from numpy import __version__ as numpy_version
from spinn_utilities import __version__ as spinn_utils_version
from spinn_utilities.config_holder import (
check_user_cfg, config_options, config_sections,
get_config_bool, get_config_int, get_config_str, get_config_str_or_none,
get_report_path, get_timestamp_path, is_config_none, set_config)
from spinn_utilities.exceptions import DataNotYetAvialable
from spinn_utilities.log import FormatAdapter
from spinn_utilities.overrides import overrides
from spinn_utilities.typing.coords import XY
from spinn_utilities.progress_bar import ProgressBar
from spinn_machine import __version__ as spinn_machine_version
from spinn_machine import CoreSubsets, Machine
from spinnman import __version__ as spinnman_version
from spinnman.exceptions import (
SpiNNManCoresNotInStateException)
from spinnman.model.cpu_infos import CPUInfos
from spinnman.model.enums import CPUState, ExecutableType
from spinnman.spalloc import is_server_address
from spinnman.transceiver import (
create_transceiver_from_hostname, Transceiver, transceiver_generator)
from spalloc_client import ( # type: ignore[import]
__version__ as spalloc_version)
from pacman import __version__ as pacman_version
from pacman.exceptions import PacmanPlaceException
from pacman.model.graphs.application import ApplicationEdge, ApplicationVertex
from pacman.model.graphs import AbstractVirtual
from pacman.model.resources import AbstractSDRAM
from pacman.model.partitioner_splitters.splitter_reset import splitter_reset
from pacman.model.placements import Placements
from pacman.model.routing_tables import MulticastRoutingTables
from pacman.operations.fixed_route_router import fixed_route_router
from pacman.operations.multi_cast_router_check_functionality.\
valid_routes_checker import validate_routes
from pacman.operations.partition_algorithms import splitter_partitioner
from pacman.operations.placer_algorithms import place_application_graph
from pacman.operations.router_algorithms import route_application_graph
from pacman.operations.router_compressors import (
pair_compressor, range_compressor)
from pacman.operations.router_compressors.ordered_covering_router_compressor \
import ordered_covering_compressor
from pacman.operations.routing_info_allocator_algorithms.\
zoned_routing_info_allocator import ZonedRoutingInfoAllocator
from pacman.operations.routing_table_generators import (
basic_routing_table_generator, merged_routing_table_generator)
from pacman.operations.tag_allocator_algorithms import basic_tag_allocator
from spinn_front_end_common import __version__ as fec_version
from spinn_front_end_common import common_model_binaries
from spinn_front_end_common.abstract_models import (
AbstractVertexWithEdgeToDependentVertices)
from spinn_front_end_common.data.fec_data_view import FecDataView
from spinn_front_end_common.interface.buffer_management import BufferManager
from spinn_front_end_common.interface.buffer_management.storage_objects \
import BufferDatabase
from spinn_front_end_common.interface.config_handler import (
ConfigHandler)
from spinn_front_end_common.interface.interface_functions import (
application_finisher, application_runner,
chip_io_buf_clearer, chip_io_buf_extractor,
chip_provenance_updater, chip_runtime_updater, compute_energy_used,
create_notification_protocol, database_interface,
reload_dsg_regions, energy_provenance_reporter,
load_application_data_specs, load_system_data_specs,
graph_binary_gatherer, graph_data_specification_writer,
hbp_allocator, insert_chip_power_monitors,
insert_extra_monitor_vertices, split_lpg_vertices,
load_app_images, load_fixed_routes, load_sys_images,
locate_executable_start_type,
placements_provenance_gatherer, profile_data_gatherer,
read_routing_tables_from_machine, router_provenance_gatherer,
routing_table_loader, sdram_outgoing_partition_allocator,
spalloc_allocate_job_old,
system_multicast_routing_generator,
tags_loader, add_command_senders)
from spinn_front_end_common.interface.interface_functions.\
host_no_bitfield_router_compression import (
ordered_covering_compression, pair_compression)
from spinn_front_end_common.interface.provenance import (
FecTimer, GlobalProvenance, ProvenanceWriter, TimerCategory, TimerWork)
from spinn_front_end_common.interface.splitter_selectors import (
splitter_selector)
from spinn_front_end_common.interface.java_caller import JavaCaller
from spinn_front_end_common.utilities.database import DatabaseUpdater
from spinn_front_end_common.utilities.exceptions import ConfigurationException
from spinn_front_end_common.utilities.report_functions import (
board_chip_report, EnergyReport,
fixed_route_from_machine_report,
generate_routing_compression_checker_report, memory_map_on_host_report,
memory_map_on_host_chip_report, network_specification,
tags_from_machine_report,
write_json_machine, write_json_placements,
write_json_routing_tables, drift_report)
from spinn_front_end_common.utilities.iobuf_extractor import IOBufExtractor
from spinn_front_end_common.utility_models import (
DataSpeedUpPacketGatherMachineVertex)
from spinn_front_end_common.utilities.report_functions.reports import (
generate_binaries_report, generate_comparison_router_report,
partitioner_report, placer_reports_with_application_graph,
router_compressed_summary_report, routing_info_report,
router_report_from_compressed_router_tables,
router_report_from_paths,
router_report_from_router_tables, router_summary_report,
sdram_usage_report_per_chip,
tag_allocator_report)
try:
from scipy import __version__ as scipy_version
except ImportError:
scipy_version = "scipy not installed"
logger = FormatAdapter(logging.getLogger(__name__))
_T = TypeVar("_T")
SHARED_PATH = re.compile(r".*\/shared\/([^\/]+)")
SHARED_GROUP = 1
SHARED_WITH_PATH = re.compile(r".*\/Shared with (all|groups|me)\/([^\/]+)")
SHARED_WITH_GROUP = 2
# pylint: disable=abstract-method
[docs]
class AbstractSpinnakerBase(ConfigHandler):
"""
Main interface into the tools logic flow.
"""
__slots__ = (
# Condition object used for waiting for stop
# Set during init and the used but never new object
"_state_condition",
# Set when run_until_complete is specified by the user
"_run_until_complete",
#
"_raise_keyboard_interrupt",
# original value which is used in exception handling and control c
"__sys_excepthook",
# All beyond this point new for no extractor
# The data is not new but now it is held direct and not via inputs
# Flag to say is compressed routing tables are on machine
# TODO remove this when the data change only algorithms are done
"_multicast_routes_loaded")
def __init__(
self, *, n_boards_required: Optional[int] = None,
n_chips_required: Optional[int] = None,
timestep: Optional[float] = None,
time_scale_factor: Optional[float] = None):
"""
:param n_boards_required:
`None` or the number of boards requested by the user
:param n_chips_required:
`None` or the number of chips requested by the user
:param timestep:
An explicitly specified time step for the simulation in ms.
If `None`, the value is read from the configuration
:param time_scale_factor:
An explicitly specified time scale factor for the simulation.
If `None`, the value is read from the configuration
"""
super().__init__(n_boards_required, n_chips_required)
FecTimer.setup()
FecTimer.start_category(TimerCategory.WAITING)
FecTimer.start_category(TimerCategory.SETTING_UP)
# output locations of binaries to be searched for end user info
logger.info(
"Will search these locations for binaries: {}",
self._data_writer.get_executable_finder().binary_paths)
self._multicast_routes_loaded = False
# holder for timing and running related values
self._run_until_complete = False
self._state_condition = Condition()
# folders
self._set_up_report_specifics()
# Setup for signal handling
self._raise_keyboard_interrupt = False
self._create_version_provenance()
self.__sys_excepthook = sys.excepthook
self._data_writer.register_binary_search_path(
os.path.dirname(common_model_binaries.__file__))
self._data_writer.set_up_timings(timestep, time_scale_factor)
external_binaries = get_config_str_or_none(
"Mapping", "external_binaries")
if external_binaries is not None:
self._data_writer.register_binary_search_path(external_binaries)
FecTimer.end_category(TimerCategory.SETTING_UP)
def _hard_reset(self) -> None:
"""
This clears all data that if no longer valid after a hard reset
"""
if self._data_writer.has_transceiver():
self._data_writer.get_transceiver().stop_application(
self._data_writer.get_app_id())
self._close_allocation_controller()
self._reset_remove_data()
self._data_writer.hard_reset()
self._multicast_routes_loaded = False
def _reset_remove_data(self) -> None:
"""
Removes all data from the report directory user did not ask to keep.
"""
with FecTimer("Cleanup reports folder based on cfg", TimerWork.REPORT):
self.__reset_remove_data()
def __reset_remove_data(self) -> None:
if not get_config_bool("Reports", "keep_json_files"):
for section in config_sections():
for option in config_options(section):
if option.startswith("pathjson"):
path = get_report_path(option, section=section)
if os.path.exists(path):
os.remove(path)
dir_name = os.path.dirname(path)
if not os.listdir(dir_name):
os.removedirs(dir_name)
if not get_config_bool("Reports", "keep_dataspec_database"):
path = get_report_path("path_dataspec_database")
if os.path.exists(path):
os.remove(path)
if not get_config_bool("Reports", "keep_input_output_database"):
path = get_report_path("path_input_output_database")
if os.path.exists(path):
os.remove(path)
if not get_config_bool("Reports", "keep_java_log"):
path = get_report_path("path_java_log")
if os.path.exists(path):
os.remove(path)
def _stop_remove_data(self) -> None:
with FecTimer("Cleanup reports folder based on cfg", TimerWork.REPORT):
self.__reset_remove_data()
if not get_config_bool("Reports", "keep_data_database"):
path = get_report_path("path_data_database")
if os.path.exists(path):
os.remove(path)
if not get_config_bool("Reports", "keep_stack_trace"):
os.remove(get_timestamp_path("tpath_stack_trace"))
def _setup_java_caller(self) -> None:
if get_config_bool("Java", "use_java"):
self._data_writer.set_java_caller(JavaCaller())
def __signal_handler(
self, _signal: int, _frame: Optional[FrameType]) -> None:
"""
Handles closing down of script via keyboard interrupt
:param _signal: the signal received (ignored)
:param _frame: frame executed in (ignored)
"""
# If we are to raise the keyboard interrupt, do so
if self._raise_keyboard_interrupt:
raise KeyboardInterrupt
logger.error("User has cancelled simulation")
self._shutdown()
[docs]
def exception_handler(
self, exc_type: Type[BaseException], value: BaseException,
traceback_obj: Optional[types.TracebackType]) -> None:
"""
Handler of exceptions.
:param exc_type: the type of exception received
:param value: the value of the exception
:param traceback_obj: the trace back stuff
"""
logger.error("Shutdown on exception")
self._shutdown()
return self.__sys_excepthook(exc_type, value, traceback_obj)
def _should_run(self) -> bool:
"""
Checks if the simulation should run.
Will warn the user if there is no need to run
:return: True if and only if one of the graphs has vertices in it
:raises ConfigurationException: If the current state does not
support a new run call
"""
if self._data_writer.get_n_vertices() > 0:
return True
logger.warning(
"Your graph has no vertices in it. "
"Therefore the run call will exit immediately.")
return False
[docs]
def run_until_complete(self, n_steps: Optional[int] = None) -> None:
"""
Run a simulation until it completes.
:param n_steps:
If not `None`, this specifies that the simulation should be
requested to run for the given number of steps. The host will
still wait until the simulation itself says it has completed.
"""
FecTimer.start_category(TimerCategory.RUN_OTHER)
self._run_until_complete = True
self._run(n_steps, sync_time=0.0)
FecTimer.end_category(TimerCategory.RUN_OTHER)
[docs]
def run(self, run_time: Optional[float], sync_time: float = 0) -> None:
"""
Run a simulation for a fixed amount of time.
:param run_time: the run duration in milliseconds.
:param sync_time:
If not 0, this specifies that the simulation should pause after
this duration. The continue_simulation() method must then be
called for the simulation to continue.
"""
FecTimer.start_category(TimerCategory.RUN_OTHER)
if self._run_until_complete:
raise NotImplementedError("run after run_until_complete")
self._run(run_time, sync_time)
FecTimer.end_category(TimerCategory.RUN_OTHER)
def __timesteps(self, time_in_ms: float) -> int:
"""
Get a number of timesteps for a given time in milliseconds.
:return: The number of timesteps
"""
time_step_ms = self._data_writer.get_simulation_time_step_ms()
n_time_steps = int(math.ceil(time_in_ms / time_step_ms))
calc_time = n_time_steps * time_step_ms
# Allow for minor float errors
if abs(time_in_ms - calc_time) > 0.00001:
logger.warning(
"Time of {}ms "
"is not a multiple of the machine time step of {}ms "
"and has therefore been rounded up to {}ms",
time_in_ms, time_step_ms, calc_time)
return n_time_steps
def _calc_run_time(self, run_time: Optional[float]) -> Union[
Tuple[int, float], Tuple[None, None]]:
"""
Calculates n_machine_time_steps and total_run_time based on run_time
and machine_time_step.
This method rounds the run up to the next timestep as discussed in
https://github.com/SpiNNakerManchester/sPyNNaker/issues/149
If run_time is `None` (run forever) both values will be `None`
:param run_time: time user requested to run for in milliseconds
:return: n_machine_time_steps as a whole int and
total_run_time in milliseconds
"""
if run_time is None:
# TODO does this make sense?
# https://github.com/SpiNNakerManchester/SpiNNFrontEndCommon/issues/1243
if FecDataView.has_allocation_controller():
FecDataView.get_allocation_controller().extend_allocation(0.0)
self._data_writer.set_plan_n_timesteps(get_config_int(
"Buffers", "minimum_auto_time_steps"))
return None, None
n_machine_time_steps = self.__timesteps(run_time)
total_run_timesteps = (
(self._data_writer.get_current_run_timesteps() or 0) +
n_machine_time_steps)
total_run_time = (
total_run_timesteps *
self._data_writer.get_hardware_time_step_ms())
if FecDataView.has_allocation_controller():
FecDataView.get_allocation_controller().extend_allocation(
total_run_time)
if get_config_bool("Buffers", "use_auto_pause_and_resume"):
self._data_writer.set_plan_n_timesteps(
get_config_int("Buffers", "minimum_auto_time_steps"))
else:
self._data_writer.set_plan_n_timesteps(n_machine_time_steps)
logger.info(
f"Simulating for {n_machine_time_steps} "
f"{self._data_writer.get_simulation_time_step_ms()} ms timesteps "
f"using a hardware timestep of "
f"{self._data_writer.get_hardware_time_step_us()} us")
return n_machine_time_steps, total_run_time
def _run(self, run_time: Optional[float], sync_time: float) -> None:
self._data_writer.start_run()
try:
self.__run(run_time, sync_time)
self._data_writer.finish_run()
except Exception:
# if in debug mode, do not shut down machine
if get_config_str("Mode", "mode") != "Debug":
try:
self.stop()
except Exception as stop_e:
logger.exception(f"Error {stop_e} when attempting to stop")
self._data_writer.shut_down()
raise
@staticmethod
def __is_main_thread() -> bool:
"""
:return: Whether this is the main thread.
"""
return threading.get_ident() == threading.main_thread().ident
def __run_verify(self) -> None:
# verify that we can keep doing auto pause and resume
if self._data_writer.is_ran_ever():
can_keep_running = all(
executable_type.supports_auto_pause_and_resume
for executable_type in
self._data_writer.get_executable_types())
if not can_keep_running:
raise NotImplementedError(
"Only binaries that use the simulation interface can be"
" run more than once")
# If we have never run before, or the graph has changed,
# start by performing mapping
if (self._data_writer.get_requires_mapping() and
self._data_writer.is_ran_last()):
self.stop()
raise NotImplementedError(
"The network cannot be changed between runs without"
" resetting")
def __run_control_c_handler_on(self) -> None:
# Install the Control-C handler
if self.__is_main_thread():
signal.signal(signal.SIGINT, self.__signal_handler)
self._raise_keyboard_interrupt = True
sys.excepthook = self.__sys_excepthook
def __run_control_c_handler_off(self) -> None:
# Indicate that the signal handler needs to act
if self.__is_main_thread():
self._raise_keyboard_interrupt = False
sys.excepthook = self.exception_handler
def __run_reset_sync_signal(self) -> None:
# If we have reset and the graph has changed, stop any running
# application
if (self._data_writer.get_requires_data_generation() and
self._data_writer.has_transceiver()):
self._data_writer.get_transceiver().stop_application(
self._data_writer.get_app_id())
self._data_writer.reset_sync_signal()
def __run(self, run_time: Optional[float], sync_time: float) -> None:
"""
The main internal run function.
:param run_time: the run duration in milliseconds.
:param sync_time:
the time in milliseconds between synchronisations, or 0 to disable.
"""
if not self._should_run():
return
logger.info("Starting execution process")
self.__run_verify()
self.__run_control_c_handler_on()
n_machine_time_steps, total_run_time = self._calc_run_time(run_time)
self.__run_reset_sync_signal()
# build the graphs to modify with system requirements
if self._data_writer.get_requires_mapping():
self._stage_mapping(total_run_time, n_machine_time_steps)
# requires data_generation includes never run and requires_mapping
if self._data_writer.get_requires_data_generation():
self._stage_data_generation()
self._stage_run(n_machine_time_steps, run_time, sync_time)
self.__run_control_c_handler_off()
@final
def _add_commands_to_command_sender(
self, system_placements: Placements) -> None:
"""
Runs, times and logs the VirtualMachineGenerator if required.
May set then "machine" value
"""
with FecTimer("Command Sender Adder", TimerWork.OTHER):
all_command_senders = add_command_senders(system_placements)
# add the edges from the command senders to the dependent vertices
for command_sender in all_command_senders:
self._data_writer.add_vertex(command_sender)
edges, partition_ids = command_sender.edges_and_partitions()
for edge, partition_id in zip(edges, partition_ids):
self._data_writer.add_edge(edge, partition_id)
@final
def _add_dependent_verts_and_edges_for_application_graph(self) -> None:
# cache vertices to allow insertion during iteration
vertices = list(self._data_writer.get_vertices_by_type(
AbstractVertexWithEdgeToDependentVertices))
for vertex in vertices:
v = cast(ApplicationVertex, vertex)
for dpt_vtx in vertex.dependent_vertices():
if dpt_vtx.has_been_added_to_graph():
continue
self._data_writer.add_vertex(dpt_vtx)
edge_partition_ids = vertex.\
edge_partition_identifiers_for_dependent_vertex(dpt_vtx)
for edge_identifier in edge_partition_ids:
self._data_writer.add_edge(
ApplicationEdge(v, dpt_vtx), edge_identifier)
@final
def _deduce_data_n_timesteps(
self, n_machine_time_steps: Optional[int]) -> None:
"""
Operates the auto pause and resume functionality by figuring out
how many timer ticks a simulation can run before SDRAM runs out,
and breaks simulation into chunks of that long.
"""
# Go through the placements and find how much SDRAM is used
# on each chip
usage_by_chip: Dict[XY, AbstractSDRAM] = dict()
for place in self._data_writer.iterate_placemements():
if isinstance(place.vertex, AbstractVirtual):
continue
sdram = place.vertex.sdram_required
if (place.x, place.y) in usage_by_chip:
usage_by_chip[place.x, place.y] += sdram
else:
usage_by_chip[place.x, place.y] = sdram
# Go through the chips and divide up the remaining SDRAM, finding
# the minimum number of machine timesteps to assign
max_time_steps = sys.maxsize
for (x, y), sdram in usage_by_chip.items():
size = self._data_writer.get_chip_at(x, y).sdram
if sdram.fixed > size:
raise PacmanPlaceException(
f"Too much SDRAM has been allocated on chip {x}, {y}: "
f"{sdram.fixed} of {size}")
if sdram.per_timestep:
max_this_chip = int((size - sdram.fixed) // sdram.per_timestep)
max_time_steps = min(max_time_steps, max_this_chip)
if not get_config_bool("Buffers", "use_auto_pause_and_resume"):
if ((n_machine_time_steps is not None) and
(max_time_steps < n_machine_time_steps)):
raise ConfigurationException(
"The SDRAM required by one or more vertices is based on "
"the run time, so the run time is limited to "
f"{max_time_steps} time steps")
self._data_writer.set_max_run_time_steps(max_time_steps)
def _generate_steps(self, n_steps: int) -> Sequence[int]:
"""
Generates the list of "timer" runs. These are usually in terms of
time steps, but need not be.
:param n_steps: the total runtime in machine time steps
:return: list of time step lengths
"""
if n_steps == 0:
return [0]
n_steps_per_segment = self._data_writer.get_max_run_time_steps()
n_full_iterations = int(math.floor(n_steps / n_steps_per_segment))
left_over_steps = n_steps - n_full_iterations * n_steps_per_segment
steps = [int(n_steps_per_segment)] * n_full_iterations
if left_over_steps:
steps.append(int(left_over_steps))
return steps
@overrides(ConfigHandler._execute_get_virtual_machine, extend_doc=False)
def _execute_get_virtual_machine(self) -> Machine:
with FecTimer("Virtual machine generator", TimerWork.OTHER):
return super()._execute_get_virtual_machine()
@overrides(ConfigHandler._do_transceiver_by_remote)
def _do_transceiver_by_remote(
self, total_run_time: Optional[float],
ensure_board_is_ready: bool) -> Transceiver:
spalloc_server = get_config_str_or_none("Machine", "spalloc_server")
if spalloc_server:
if is_server_address(spalloc_server):
transceiver, _ = self._execute_transceiver_by_spalloc(
ensure_board_is_ready)
return transceiver
else:
assert ensure_board_is_ready
return self._execute_transceiver_by_spalloc_old()
if not is_config_none("Machine", "remote_spinnaker_url"):
return self._execute_transceiver_by_hbp(total_run_time)
check_user_cfg()
raise ConfigurationException(
"None of cfg machineName, spalloc_server, virtual_board "
"or remote_spinnaker_url set")
@overrides(ConfigHandler._execute_transceiver_by_spalloc)
def _execute_transceiver_by_spalloc(
self, ensure_board_is_ready: bool
) -> Tuple[Transceiver, Dict[XY, str]]:
with (FecTimer("Transceiver by Spalloc", TimerWork.OTHER)):
transceiver, connections = (
super()._execute_transceiver_by_spalloc(
ensure_board_is_ready))
with ProvenanceWriter() as db:
db.insert_board_provenance(connections)
return (transceiver, connections)
def _execute_transceiver_by_spalloc_old(self) -> Transceiver:
with FecTimer("Transceiver by Spalloc Old", TimerWork.OTHER):
ipaddress, connections, controller = spalloc_allocate_job_old()
self._data_writer.set_ipaddress(ipaddress)
with ProvenanceWriter() as db:
db.insert_board_provenance(connections)
self._data_writer.set_allocation_controller(controller)
transceiver = create_transceiver_from_hostname(
ipaddress, ensure_board_is_ready=True)
transceiver.discover_scamp_connections()
self._data_writer.set_transceiver(transceiver)
return transceiver
def _execute_transceiver_by_hbp(
self, total_run_time: Optional[float]) -> Transceiver:
with (FecTimer("HBPAllocator", TimerWork.OTHER)):
# TODO: Would passing the bearer token to this ever make sense?
ipaddress, bmp_details, controller = hbp_allocator(total_run_time)
self._data_writer.set_ipaddress(ipaddress)
self._data_writer.set_allocation_controller(controller)
transceiver = transceiver_generator(
bmp_details, auto_detect_bmp=False,
scamp_connection_data=None, reset_machine_on_start_up=False,
ensure_board_is_ready=True)
self._data_writer.set_transceiver(transceiver)
return transceiver
@overrides(ConfigHandler._execute_tranceiver_by_name)
def _execute_tranceiver_by_name(
self, ensure_board_is_ready: bool = True) -> Transceiver:
with FecTimer("Machine generator", TimerWork.GET_MACHINE):
return super()._execute_tranceiver_by_name(
ensure_board_is_ready=True)
[docs]
def get_machine(self) -> Machine:
"""
Get the Machine. Creating it if necessary.
This method will make sure that any set called
before the next run is hard.
If called after a reset it will return a different Machine
to the one from the previous run.
:returns: The Machine now stored in the DataView
"""
self._data_writer.set_user_accessed_machine()
if self._data_writer.is_user_mode() and \
self._data_writer.is_soft_reset():
# Make the reset hard
logger.warning(
"Calling Get machine after a reset force a hard reset and "
"therefore generate a new machine")
self._hard_reset()
machine = self._get_known_machine()
return machine
@overrides(ConfigHandler._get_known_machine)
def _get_known_machine(
self, total_run_time: Optional[float] = 0.0) -> Machine:
FecTimer.start_category(TimerCategory.MACHINE_ON)
machine = super()._get_known_machine()
FecTimer.end_category(TimerCategory.MACHINE_ON)
return machine
def _create_version_provenance(self) -> None:
"""
Add the version information to the provenance data at the start.
"""
with GlobalProvenance() as db:
db.insert_version("spinn_utilities_version", spinn_utils_version)
db.insert_version("spinn_machine_version", spinn_machine_version)
db.insert_version("spalloc_version", spalloc_version)
db.insert_version("spinnman_version", spinnman_version)
db.insert_version("pacman_version", pacman_version)
db.insert_version("front_end_common_version", fec_version)
db.insert_version("numpy_version", numpy_version)
db.insert_version("scipy_version", scipy_version)
def _do_extra_mapping_algorithms(self) -> None:
"""
Runs, times and logs an any extra Mapping algorithms
"""
def _json_machine(self) -> None:
"""
Runs, times and logs WriteJsonMachine if required.
"""
with FecTimer("Json machine", TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false("Reports", "write_json_machine"):
return
write_json_machine()
def _report_network_specification(self) -> None:
"""
Runs, times and logs the Network Specification report is requested.
"""
with FecTimer(
"Network Specification report", TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false(
"Reports", "write_network_specification_report"):
return
network_specification()
def _execute_split_lpg_vertices(
self, system_placements: Placements) -> None:
"""
Runs, times and logs the SplitLPGVertices if required.
"""
with FecTimer("Split Live Gather Vertices", TimerWork.OTHER):
split_lpg_vertices(system_placements)
def _report_board_chip(self) -> None:
"""
Runs, times and logs the BoardChipReport is requested.
"""
with FecTimer("Board chip report", TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false("Reports", "write_board_chip_report"):
return
board_chip_report()
if FecDataView.has_allocation_controller():
filename = get_report_path("path_board_chip_report")
with open(filename, "a", encoding="utf-8") as report:
report.write(
f"{FecDataView.get_allocation_controller()}\n")
def _execute_splitter_reset(self) -> None:
"""
Runs, times and logs the splitter_reset.
"""
with FecTimer("Splitter reset", TimerWork.OTHER):
splitter_reset()
# Overridden by sPyNNaker to choose an extended algorithm
def _execute_splitter_selector(self) -> None:
"""
Runs, times and logs the SplitterSelector.
"""
with FecTimer("Splitter selector", TimerWork.OTHER):
splitter_selector()
def _execute_delay_support_adder(self) -> None:
"""
Stub to allow sPyNNaker to add delay supports.
"""
# Overridden by sPyNNaker to choose a different algorithm
def _execute_splitter_partitioner(self) -> None:
"""
Runs, times and logs the SplitterPartitioner if required.
"""
if self._data_writer.get_n_vertices() == 0:
return
with FecTimer("Splitter partitioner", TimerWork.OTHER):
self._data_writer.set_n_chips_in_graph(splitter_partitioner())
def _execute_insert_chip_power_monitors(
self, system_placements: Placements) -> None:
"""
Run, time and log the InsertChipPowerMonitorsToGraphs if required.
"""
with FecTimer("Insert chip power monitors", TimerWork.OTHER) as timer:
if timer.skip_if_cfg_false("Reports", "write_energy_report"):
return
insert_chip_power_monitors(system_placements)
@final
def _execute_insert_extra_monitor_vertices(
self, system_placements: Placements) -> None:
"""
Run, time and log the InsertExtraMonitorVerticesToGraphs if required.
"""
with FecTimer(
"Insert extra monitor vertices", TimerWork.OTHER) as timer:
if timer.skip_if_cfgs_false(
"Machine", "enable_advanced_monitor_support",
"enable_reinjection"):
return
# inserter checks for None app graph not an empty one
gather_map, monitor_map = insert_extra_monitor_vertices(
system_placements)
self._data_writer.set_gatherer_map(gather_map)
self._data_writer.set_monitor_map(monitor_map)
def _report_partitioner(self) -> None:
"""
Write, times and logs the partitioner_report if needed.
"""
with FecTimer("Partitioner report", TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false("Reports", "write_partitioner_reports"):
return
partitioner_report()
@property
def get_number_of_available_cores_on_machine(self) -> int:
"""
The number of available cores on the machine after taking
into account preallocated resources.
"""
machine = self._get_known_machine()
# get cores of machine
cores = machine.total_available_user_cores
ethernets = machine.n_ethernet_connected_chips
cores -= ((machine.n_chips - ethernets) *
self._data_writer.get_all_monitor_cores())
cores -= ethernets * self._data_writer.get_ethernet_monitor_cores()
return cores
def _execute_application_placer(
self, system_placements: Placements) -> None:
"""
Runs, times and logs the Application Placer.
Sets the "placements" data
.. note::
Calling of this method is based on the configuration placer value
"""
with FecTimer("Application Placer", TimerWork.OTHER):
self._data_writer.set_placements(place_application_graph(
system_placements))
def _do_placer(self, system_placements: Placements) -> None:
"""
Runs, times and logs one of the placers.
Sets the "placements" data
Which placer is run depends on the configuration placer value
This method is the entry point for adding a new Placer
:raise ConfigurationException:
if the configuration place value is unexpected
"""
name = get_config_str("Mapping", "placer")
if name == "ApplicationPlacer":
return self._execute_application_placer(system_placements)
if "," in name:
raise ConfigurationException(
"Only a single algorithm is supported for placer")
raise ConfigurationException(
f"Unexpected cfg setting placer: {name}")
def _do_write_metadata(self) -> None:
"""
Do the various functions to write metadata to the SQLite files.
"""
with FecTimer("Record vertex labels to database", TimerWork.REPORT):
with BufferDatabase() as db:
db.store_vertex_labels()
@final
def _execute_system_multicast_routing_generator(self) -> None:
"""
Runs, times and logs the SystemMulticastRoutingGenerator if required.
May sets the data "data_in_multicast_routing_tables",
"data_in_multicast_key_to_chip_map" and
"system_multicast_router_timeout_keys"
"""
with FecTimer("System multicast routing generator",
TimerWork.OTHER) as timer:
if timer.skip_if_cfgs_false(
"Machine", "enable_advanced_monitor_support",
"enable_reinjection"):
return
data = system_multicast_routing_generator()
self._data_writer.set_system_multicast_routing_data(data)
def _report_placements_with_application_graph(self) -> None:
"""
Writes, times and logs the application graph placer report if
requested.
"""
if self._data_writer.get_n_vertices() == 0:
return
with FecTimer("Placements with application graph report",
TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false(
"Reports", "write_application_graph_placer_report"):
return
placer_reports_with_application_graph()
def _json_placements(self) -> None:
"""
Does, times and logs the writing of placements as JSON if requested.
"""
with FecTimer("Json placements", TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false("Reports", "write_json_placements"):
return
write_json_placements()
@final
def _execute_application_router(self) -> None:
"""
Runs, times and logs the ApplicationRouter.
Sets the "routing_table_by_partition" data if called
.. note::
Calling of this method is based on the configuration router value
"""
with FecTimer("Application Router", TimerWork.RUNNING):
self._data_writer.set_routing_table_by_partition(
route_application_graph())
@final
def _do_routing(self) -> None:
"""
Runs, times and logs one of the routers.
Sets the "routing_table_by_partition" data
Which router is run depends on the configuration router value
This method is the entry point for adding a new Router
:raise ConfigurationException:
if the configuration router value is unexpected
"""
name = get_config_str("Mapping", "router")
if name == "ApplicationRouter":
return self._execute_application_router()
if "," in name:
raise ConfigurationException(
"Only a single algorithm is supported for router")
raise ConfigurationException(
f"Unexpected cfg setting router: {name}")
def _execute_basic_tag_allocator(self) -> None:
"""
Runs, times and logs the Tag Allocator.
Sets the "tag" data
"""
with FecTimer("Basic tag allocator", TimerWork.OTHER):
self._data_writer.set_tags(basic_tag_allocator())
def _report_tag_allocations(self) -> None:
"""
Write, times and logs the tag allocator report if requested.
"""
with FecTimer("Tag allocator report", TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false(
"Reports", "write_tag_allocation_reports"):
return
tag_allocator_report()
@final
def _execute_info_allocator(self) -> None:
"""
Runs, times and logs one of the info allocators.
Sets the "routing_info" data
Which allocator is run depends on the configuration info_allocator
value.
This method is the entry point for adding a new Info Allocator
:raise ConfigurationException:
if the configuration info_allocator value is unexpected
"""
name = get_config_str("Mapping", "info_allocator")
if name == "GlobalZonedRoutingInfoAllocator":
logger.warning("GlobalZonedRoutingInfoAllocator is deprecated. "
"Please change cfg Mapping info_allocator to "
"ZonedRoutingInfoAllocator")
name = "ZonedRoutingInfoAllocator"
if name == "ZonedRoutingInfoAllocator":
with FecTimer("Zoned routing info allocator", TimerWork.OTHER):
self._data_writer.set_routing_infos(
ZonedRoutingInfoAllocator().allocate())
elif "," in name:
raise ConfigurationException(
"Only a single algorithm is supported for info_allocator")
else:
raise ConfigurationException(
f"Unexpected cfg setting info_allocator: {name}")
def _report_router_info(self) -> None:
"""
Writes, times and logs the router info report if requested.
"""
with FecTimer("Router info report", TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false("Reports", "write_router_info_report"):
return
routing_info_report([])
@final
def _execute_basic_routing_table_generator(self) -> None:
"""
Runs, times and logs the Routing Table Generator.
.. note::
Currently no other Routing Table Generator supported.
To add an additional Generator copy the pattern of do_placer
"""
with FecTimer("Basic routing table generator", TimerWork.OTHER):
self._data_writer.set_uncompressed(basic_routing_table_generator())
@final
def _execute_merged_routing_table_generator(self) -> None:
"""
Runs, times and logs the Routing Table Generator.
.. note::
Currently no other Routing Table Generator supported.
To add an additional Generator copy the pattern of do_placer
"""
with FecTimer("Merged routing table generator", TimerWork.OTHER):
self._data_writer.set_uncompressed(
merged_routing_table_generator())
@final
def _do_routing_table_generator(self) -> None:
"""
Runs, times and logs one of the routing table generators.
Sets the "routing_info" data
Which allocator is run depends on the configuration's
`routing_table_generator` value.
This method is the entry point for adding a new routing table
generator.
:raise ConfigurationException:
if the configuration's `routing_table_generator` value is
unexpected
"""
name = get_config_str("Mapping", "routing_table_generator")
if name == "BasicRoutingTableGenerator":
return self._execute_basic_routing_table_generator()
if name == "MergedRoutingTableGenerator":
return self._execute_merged_routing_table_generator()
if "," in name:
raise ConfigurationException(
"Only a single algorithm is supported for"
" routing_table_generator")
raise ConfigurationException(
f"Unexpected cfg setting routing_table_generator: {name}")
def _report_routers(self) -> None:
"""
Write, times and logs the router report if requested.
"""
with FecTimer("Router report", TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false("Reports", "write_router_reports"):
return
router_report_from_paths()
def _report_router_summary(self) -> None:
"""
Write, times and logs the router summary report if requested.
"""
with FecTimer("Router summary report", TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false(
"Reports", "write_router_summary_report"):
return
router_summary_report()
def _json_routing_tables(self) -> None:
"""
Write, time and log the routing tables as JSON if requested.
"""
with FecTimer("Json routing tables", TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false("Reports", "write_json_routing_tables"):
return
write_json_routing_tables(self._data_writer.get_uncompressed())
# Output ignored as never used
def _report_drift(self, start: bool) -> None:
"""
Write, time and log the inter-board timer drift.
:param start: Is this the start or the end
"""
with FecTimer("Drift report", TimerWork.REPORT) as timer:
if timer.skip_if_virtual_board():
return
if start and timer.skip_if_cfg_false(
"Reports", "write_drift_report_start"):
return
if not start and timer.skip_if_cfg_false(
"Reports", "write_drift_report_end"):
return
drift_report()
@final
def _execute_locate_executable_start_type(self) -> None:
"""
Runs, times and logs LocateExecutableStartType if required.
May set the executable_types data.
"""
with FecTimer("Locate executable start type", TimerWork.OTHER):
binary_start_types = locate_executable_start_type()
self._data_writer.set_executable_types(binary_start_types)
if get_config_bool("Buffers", "use_auto_pause_and_resume"):
# Disable auto pause and resume if the binary can't do it
for executable_type in binary_start_types:
if not executable_type.supports_auto_pause_and_resume:
logger.warning(
"Disabling auto pause resume as graph includes {}",
executable_type)
set_config("Buffers",
"use_auto_pause_and_resume", "False")
break
@final
def _execute_buffer_manager_creator(self) -> None:
"""
Run, times and logs the buffer manager creator if required.
May set the buffer_manager data
"""
if self._data_writer.has_buffer_manager():
return
with FecTimer("Buffer manager creator", TimerWork.OTHER) as timer:
if timer.skip_if_virtual_board():
return
self._data_writer.set_buffer_manager(BufferManager())
def _execute_sdram_outgoing_partition_allocator(self) -> None:
"""
Runs, times and logs the SDRAMOutgoingPartitionAllocator.
"""
with FecTimer("SDRAM outgoing partition allocator", TimerWork.OTHER):
sdram_outgoing_partition_allocator()
def _execute_control_sync(self, do_sync: bool) -> None:
"""
Control synchronisation on board.
:param do_sync: Whether to enable synchronisation
"""
with FecTimer("Control Sync", TimerWork.CONTROL) as timer:
if timer.skip_if_virtual_board():
return
self._data_writer.get_transceiver().control_sync(do_sync)
def _stage_mapping(self, total_run_time: Optional[float],
n_machine_time_steps: Optional[int]) -> None:
"""
Runs, times and logs all the algorithms in the mapping stage.
"""
FecTimer.start_category(TimerCategory.MAPPING)
if self._data_writer.is_soft_reset():
# wipe out stuff associated with past mapping
self._hard_reset()
self._add_dependent_verts_and_edges_for_application_graph()
self._setup_java_caller()
self._do_extra_mapping_algorithms()
self._report_network_specification()
self._execute_splitter_reset()
self._execute_splitter_selector()
self._execute_delay_support_adder()
self._execute_splitter_partitioner()
self._get_known_machine(total_run_time)
self._json_machine()
self._report_board_chip()
system_placements = Placements()
self._add_commands_to_command_sender(system_placements)
self._execute_split_lpg_vertices(system_placements)
self._execute_insert_chip_power_monitors(system_placements)
self._execute_insert_extra_monitor_vertices(system_placements)
self._report_partitioner()
self._do_placer(system_placements)
self._report_placements_with_application_graph()
self._json_placements()
self._execute_system_multicast_routing_generator()
self._do_routing()
self._execute_basic_tag_allocator()
self._report_tag_allocations()
self._execute_info_allocator()
self._report_router_info()
self._do_routing_table_generator()
self._report_uncompressed_routing_table()
self._check_uncompressed_routing_table()
self._report_routers()
self._report_router_summary()
self._json_routing_tables()
self._execute_locate_executable_start_type()
self._execute_buffer_manager_creator()
self._deduce_data_n_timesteps(n_machine_time_steps)
self._report_sdram_usage_per_chip()
self._execute_reset_routing()
self._execute_graph_binary_gatherer()
self._execute_binary_report()
self._execute_create_database_interface()
FecTimer.end_category(TimerCategory.MAPPING)
def _execute_graph_data_specification_writer(self) -> None:
"""
Runs, times, and logs the GraphDataSpecificationWriter.
Creates and fills the data spec database
"""
with FecTimer("Graph data specification writer", TimerWork.OTHER):
self._data_writer.set_ds_database_path(
graph_data_specification_writer())
def _execute_reset_routing(self) -> None:
"""
Runs, times and logs the resetting of routing if required.
"""
if self._multicast_routes_loaded:
return
with FecTimer("Reset routing", TimerWork.LOADING) as timer:
if timer.skip_if_virtual_board():
return
progress = ProgressBar(1, "Resetting Routing")
FecDataView().get_transceiver().reset_routing()
progress.update()
progress.end()
def _execute_graph_binary_gatherer(self) -> None:
"""
Runs, times and logs the GraphBinaryGatherer if required.
"""
with FecTimer("Graph binary gatherer", TimerWork.OTHER) as timer:
try:
self._data_writer.set_executable_targets(
graph_binary_gatherer())
except KeyError:
if get_config_bool("Machine", "virtual_board"):
# Github actions have no binaries
logger.warning(
"Ignoring executable not found as using virtual")
timer.error("executable not found and virtual board")
return
raise
def _execute_binary_report(self) -> None:
with FecTimer("Binaries report", TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false(
"Reports", "write_binaries_report"):
return
generate_binaries_report()
@final
def _execute_ordered_covering_compressor(self) -> MulticastRoutingTables:
"""
Runs, times and logs the OrderedCoveringCompressor.
.. note::
Calling of this method is based on the configuration compressor or
virtual_compressor value
:return: Compressed routing tables
"""
with FecTimer("Ordered covering compressor", TimerWork.OTHER) as timer:
self._multicast_routes_loaded = False
precompressed = self._data_writer.get_precompressed()
if self._compression_skipable(precompressed):
timer.skip("Tables already small enough")
return precompressed
return ordered_covering_compressor()
@final
def _execute_ordered_covering_compression(self) -> Optional[
MulticastRoutingTables]:
"""
Runs, times and logs the ordered covering compressor on machine.
.. note::
Calling of this method is based on the configuration compressor or
virtual_compressor value
"""
with FecTimer("Ordered covering compressor",
TimerWork.COMPRESSING) as timer:
if timer.skip_if_virtual_board():
return None
precompressed = self._data_writer.get_precompressed()
if self._compression_skipable(precompressed):
timer.skip("Tables already small enough")
self._multicast_routes_loaded = False
return precompressed
ordered_covering_compression()
self._multicast_routes_loaded = True
return None
@final
def _execute_pair_compressor(self) -> MulticastRoutingTables:
"""
Runs, times and logs the PairCompressor.
.. note::
Calling of this method is based on the configuration compressor or
virtual_compressor value
:return: Compressed routing table
"""
with FecTimer("Pair compressor", TimerWork.OTHER) as timer:
precompressed = self._data_writer.get_precompressed()
self._multicast_routes_loaded = False
if self._compression_skipable(precompressed):
timer.skip("Tables already small enough")
return precompressed
return pair_compressor()
@final
def _execute_pair_compression(self) -> Optional[MulticastRoutingTables]:
"""
Runs, times and logs the pair compressor on machine.
.. note::
Calling of this method is based on the configuration compressor or
virtual_compressor value
"""
with FecTimer("Pair on chip router compression",
TimerWork.COMPRESSING) as timer:
if timer.skip_if_virtual_board():
return None
precompressed = self._data_writer.get_precompressed()
if self._compression_skipable(precompressed):
timer.skip("Tables already small enough")
self._multicast_routes_loaded = False
return precompressed
pair_compression()
self._multicast_routes_loaded = True
return None
@final
def _execute_pair_unordered_compressor(self) -> MulticastRoutingTables:
"""
Runs, times and logs the CheckedUnorderedPairCompressor.
.. note::
Calling of this method is based on the configuration compressor or
virtual_compressor value
:return: compressed routing tables
"""
with FecTimer("Pair unordered compressor", TimerWork.OTHER) as timer:
self._multicast_routes_loaded = False
precompressed = self._data_writer.get_precompressed()
if self._compression_skipable(precompressed):
timer.skip("Tables already small enough")
return precompressed
return pair_compressor(ordered=False)
def _compression_skipable(self, tables: MulticastRoutingTables) -> bool:
if get_config_bool(
"Mapping", "router_table_compress_as_far_as_possible"):
return False
machine = self._data_writer.get_machine()
return (tables.get_max_number_of_entries()
<= machine.min_n_router_enteries)
def _execute_pre_compression(self) -> None:
name = get_config_str_or_none("Mapping", "precompressor")
if name is None:
# Declare the precompressed data to be the uncompressed data
self._data_writer.set_precompressed(
self._data_writer.get_uncompressed())
return
elif name != "Ranged":
raise ConfigurationException(
f"Unexpected cfg setting precompressor: {name}")
with FecTimer("Ranged Compressor", TimerWork.OTHER) as timer:
if self._compression_skipable(
self._data_writer.get_uncompressed()):
timer.skip("Tables already small enough")
self._data_writer.set_precompressed(
self._data_writer.get_uncompressed())
return
self._data_writer.set_precompressed(range_compressor())
def _do_compression(self) -> Optional[MulticastRoutingTables]:
"""
Calls a compressor based on the name provided.
Returns the tables if compression was not needed or
if the compression is on host (by python).
Returns None if on chip compression was run.
.. note::
This method is the entry point for adding a new compressor.
:return: Routing Tables not yet loaded onto the machine
:raise ConfigurationException: if the compressor name is not expected
"""
if get_config_bool("Machine", "virtual_board"):
name = get_config_str_or_none("Mapping", "virtual_compressor")
if name is None:
logger.info("As no virtual_compressor specified "
"using compressor setting")
name = get_config_str("Mapping", "compressor")
else:
name = get_config_str("Mapping", "compressor")
if name == "OrderedCoveringCompressor":
return self._execute_ordered_covering_compressor()
elif name == "OrderedCoveringOnChipRouterCompression":
return self._execute_ordered_covering_compression()
elif name == "PairCompressor":
return self._execute_pair_compressor()
elif name == "PairOnChipRouterCompression":
return self._execute_pair_compression()
elif name == "PairUnorderedCompressor":
return self._execute_pair_unordered_compressor()
else:
raise ConfigurationException(f"Unknown compressor: {name}")
@final
def _execute_load_routing_tables(
self, compressed: Optional[MulticastRoutingTables]) -> None:
"""
Runs, times and logs the RoutingTableLoader if required.
"""
if not compressed:
return
with FecTimer("Routing table loader", TimerWork.LOADING) as timer:
self._multicast_routes_loaded = True
if timer.skip_if_virtual_board():
return
routing_table_loader(compressed)
def _report_uncompressed_routing_table(self) -> None:
"""
Runs, times and logs the router report from router tables if requested.
"""
with FecTimer("Uncompressed routing table report",
TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false(
"Reports", "write_uncompressed"):
return
router_report_from_router_tables()
def _check_uncompressed_routing_table(self) -> None:
"""
Runs, times and logs the checking of uncompressed table
"""
with FecTimer("Validating Uncompressed routing table",
TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false(
"Mapping", "validate_routes_uncompressed"):
return
validate_routes(self._data_writer.get_uncompressed())
def _execute_fixed_routes(self) -> None:
"""
Runs, times and logs Load Fixed Routes if required.
"""
with FecTimer("Fixed routes", TimerWork.LOADING) as timer:
if timer.skip_if_cfg_false(
"Machine", "enable_advanced_monitor_support"):
return
if not self._data_writer.has_fixed_routes():
self._data_writer.set_fixed_routes(fixed_route_router(
DataSpeedUpPacketGatherMachineVertex))
if not get_config_bool("Machine", "virtual_board"):
load_fixed_routes()
if get_config_bool("Reports", "write_fixed_routes_report"):
fixed_route_from_machine_report()
def _execute_load_system_data_specification(self) -> None:
"""
Runs, times and logs the load_system_data_specs if required.
"""
with FecTimer(
"Load system data specification", TimerWork.OTHER) as timer:
if timer.skip_if_virtual_board():
return
load_system_data_specs()
def _execute_load_system_executable_images(self) -> None:
"""
Runs, times and logs the loading of executable images.
"""
with FecTimer(
"Load executable system Images", TimerWork.LOADING) as timer:
if timer.skip_if_virtual_board():
return
load_sys_images()
def _execute_load_application_data_specification(self) -> None:
"""
Runs, times and logs :py:meth:`load_application_data_specs`
if required.
"""
with FecTimer("Load Application data specification",
TimerWork.LOADING_DATA) as timer:
if timer.skip_if_virtual_board():
return
FecTimer.start_category(TimerCategory.DATA_SPEC_LOAD)
load_application_data_specs()
FecTimer.end_category(TimerCategory.DATA_SPEC_LOAD)
def _execute_tags_from_machine_report(self) -> None:
"""
Run, times and logs the TagsFromMachineReport if requested.
"""
with FecTimer(
"Tags from machine report", TimerWork.EXTRACTING) as timer:
if timer.skip_if_virtual_board():
return
if timer.skip_if_cfg_false(
"Reports", "write_tag_allocation_reports"):
return
tags_from_machine_report()
def _execute_load_tags(self) -> None:
"""
Runs, times and logs the Tags Loader if required.
"""
with FecTimer("Tags Loader", TimerWork.LOADING) as timer:
if timer.skip_if_virtual_board():
return
tags_loader()
def _do_extra_load_algorithms(self) -> None:
"""
Runs, times and logs any extra load algorithms.
"""
def _report_memory_on_host(self) -> None:
"""
Runs, times and logs MemoryMapOnHostReport if requested.
"""
with FecTimer("Memory report", TimerWork.REPORT) as timer:
if timer.skip_if_virtual_board():
return
if timer.skip_if_cfg_false(
"Reports", "write_memory_map_report"):
return
memory_map_on_host_report()
def _report_memory_on_chip(self) -> None:
"""
Runs, times and logs MemoryMapOnHostChipReport if requested.
"""
with FecTimer("Memory report", TimerWork.REPORT) as timer:
if timer.skip_if_virtual_board():
return
if timer.skip_if_cfg_false(
"Reports", "write_memory_map_report"):
return
memory_map_on_host_chip_report()
# TODO consider different cfg flags
def _report_compressed(self, compressed: Optional[
MulticastRoutingTables]) -> None:
"""
Runs, times and logs the compressor reports if requested.
"""
with FecTimer("Compressor report", TimerWork.REPORT) as timer:
if timer.skip_all_cfgs_false(
[("Reports", "write_compressed"),
("Reports", "write_compression_comparison"),
("Reports", "write_compression_summary"),
("Mapping", "run_compression_checker")],
"No reports need compressed routing tables"):
return
if compressed is None:
if timer.skip_if_virtual_board():
return
compressed = read_routing_tables_from_machine()
if get_config_bool("Reports", "write_compressed"):
router_report_from_compressed_router_tables(compressed)
if get_config_bool("Reports", "write_compression_comparison"):
generate_comparison_router_report(compressed)
if get_config_bool("Reports", "write_compression_summary"):
router_compressed_summary_report(compressed)
if get_config_bool("Mapping", "run_compression_checker"):
routing_tables = self._data_writer.get_uncompressed()
generate_routing_compression_checker_report(
routing_tables, compressed)
def _execute_application_load_executables(self) -> None:
"""
Algorithms needed for loading the binaries to the SpiNNaker machine.
"""
with FecTimer("Load executable app images",
TimerWork.LOADING) as timer:
if timer.skip_if_virtual_board():
return
load_app_images()
def _stage_data_generation(self) -> None:
"""
Runs, times and logs the load algorithms.
"""
FecTimer.start_category(TimerCategory.DATA_SPEC_OTHER)
self._execute_pre_compression()
compressed = self._do_compression()
self._execute_sdram_outgoing_partition_allocator()
self._execute_graph_data_specification_writer()
self._execute_control_sync(False)
if self._data_writer.get_requires_mapping():
self._execute_fixed_routes()
self._execute_load_system_data_specification()
self._execute_load_system_executable_images()
self._execute_load_tags()
self._execute_load_application_data_specification()
self._do_extra_load_algorithms()
# Need to reload routing tables to reset weights
self._execute_load_routing_tables(compressed)
if self._data_writer.get_requires_mapping():
self._report_compressed(compressed)
self._execute_tags_from_machine_report()
if self._data_writer.get_requires_mapping():
# Can only be done after the data is loaded.
# But as the location do not change does not need rerunning
self._report_memory_on_host()
self._report_memory_on_chip()
self._execute_application_load_executables()
self._execute_router_provenance_gatherer("Load", TimerWork.LOADING)
FecTimer.end_category(TimerCategory.DATA_SPEC_OTHER)
def _report_sdram_usage_per_chip(self) -> None:
with FecTimer("Sdram usage per chip report",
TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false(
"Reports", "write_sdram_usage_report_per_chip"):
return
sdram_usage_report_per_chip()
def _execute_dsg_region_reloader(self) -> None:
"""
Runs, times and logs the DSGRegionReloader if required.
Reload any parameters over the loaded data if we have already
run and not using a virtual board and the data hasn't already
been regenerated
"""
if not self._data_writer.is_ran_ever():
return
if self._data_writer.is_hard_reset():
return
with FecTimer("DSG region reloader", TimerWork.LOADING) as timer:
if timer.skip_if_virtual_board():
return
reload_dsg_regions()
def _execute_graph_provenance(self) -> None:
"""
Runs, times and log and graph provenance if needed
"""
# Will be overridden by sPyNNaker
def _execute_placements_provenance_gatherer(self) -> None:
"""
Runs, times and log the PlacementsProvenanceGatherer if requested.
"""
with FecTimer(
"Placements provenance gatherer", TimerWork.OTHER) as timer:
if timer.skip_if_cfg_false("Reports",
"read_placements_provenance_data"):
return
if timer.skip_if_virtual_board():
return
try:
# Also used in recover from error
placements_provenance_gatherer(
self._data_writer.get_n_placements(),
self._data_writer.iterate_placemements())
except DataNotYetAvialable as ex:
timer.skip(str(ex))
return
def _execute_router_provenance_gatherer(
self, prefix: str, phase: TimerWork) -> None:
"""
Runs, times and log the RouterProvenanceGatherer if requested.
"""
with FecTimer(
"Router provenance gatherer", phase) as timer:
if timer.skip_if_cfg_false("Reports",
"read_router_provenance_data"):
return
if timer.skip_if_virtual_board():
return
try:
router_provenance_gatherer(prefix)
except DataNotYetAvialable as ex:
timer.skip(str(ex))
return
def _execute_profile_data_gatherer(self) -> None:
"""
Runs, times and logs the ProfileDataGatherer if requested.
"""
with FecTimer("Profile data gatherer", TimerWork.EXTRACTING) as timer:
if timer.skip_if_cfg_false("Reports", "read_profile_data"):
return
if timer.skip_if_virtual_board():
return
try:
profile_data_gatherer()
except DataNotYetAvialable as ex:
timer.skip(str(ex))
return
def _do_read_provenance(self) -> None:
"""
Runs, times and log the methods that gather provenance.
"""
self._execute_graph_provenance()
self._execute_placements_provenance_gatherer()
self._execute_profile_data_gatherer()
def _report_energy(self) -> None:
"""
Runs, times and logs the energy report if requested.
"""
with FecTimer("Energy report", TimerWork.REPORT) as timer:
if timer.skip_if_cfg_false("Reports", "write_energy_report"):
return
if timer.skip_if_virtual_board():
return
power_used = compute_energy_used(
n_reset=self._data_writer.get_reset_number())
energy_provenance_reporter(power_used)
# create energy reporter
energy_reporter = EnergyReport()
# run energy report
energy_reporter.write_energy_report(power_used)
def _do_provenance_reports(self) -> None:
"""
Runs, times and logs any reports based on provenance.
"""
def _execute_clear_router_diagnostic_counters(self) -> None:
"""
Runs, times and logs the clear_router_diagnostic_counters if required.
"""
with FecTimer("Clear Router Diagnostic Counters",
TimerWork.CONTROL) as timer:
if timer.skip_if_virtual_board():
return
transceiver = FecDataView().get_transceiver()
transceiver.clear_router_diagnostic_counters()
def _execute_clear_io_buf(self) -> None:
"""
Runs, times and logs the ChipIOBufClearer if required.
"""
if self._data_writer.get_current_run_timesteps() is None:
return
with FecTimer("Clear IO buffer", TimerWork.CONTROL) as timer:
if timer.skip_if_virtual_board():
return
if timer.skip_if_cfg_false("Reports", "clear_iobuf_during_run"):
return
chip_io_buf_clearer()
def _execute_runtime_update(self, n_sync_steps: int) -> None:
"""
Runs, times and logs the runtime updater if required.
:param n_sync_steps:
The number of timesteps between synchronisations
"""
with FecTimer("Runtime Update", TimerWork.LOADING) as timer:
if timer.skip_if_virtual_board():
return
if (ExecutableType.USES_SIMULATION_INTERFACE in
self._data_writer.get_executable_types()):
chip_runtime_updater(n_sync_steps)
else:
timer.skip("No Simulation Interface used")
def _execute_create_database_interface(self) -> None:
"""
Runs, times and logs Database Interface Creator.
Sets the _database_file_path data object
"""
with FecTimer("Create database interface", TimerWork.OTHER):
# Used to used compressed routing tables if available on host
# TODO consider not saving router tables.
self._data_writer.set_database_file_path(
database_interface())
def _execute_update_database_interface(
self, run_time: Optional[float]) -> None:
"""
Runs, times and logs Database Interface Updater.
Sets the _database_file_path data object
:param run_time: the run duration in milliseconds.
"""
with FecTimer("Update database interface", TimerWork.OTHER) as timer:
database_path = self._data_writer.get_database_file_path()
if database_path is None:
timer.skip("No database to update")
return
with DatabaseUpdater(database_path) as updater:
updater.update_system_params(run_time)
def _execute_create_notifiaction_protocol(self) -> None:
"""
Runs, times and logs the creation of the Notification Protocol.
Sets the notification_interface data object
"""
with FecTimer("Create notification protocol", TimerWork.OTHER):
self._data_writer.set_notification_protocol(
create_notification_protocol())
def _execute_runner(
self, n_sync_steps: int, run_time: Optional[float]) -> None:
"""
Runs, times and logs the ApplicationRunner.
:param n_sync_steps:
The number of timesteps between synchronisations
:param run_time: the run duration in milliseconds.
"""
with FecTimer(FecTimer.APPLICATION_RUNNER, TimerWork.RUNNING) as timer:
if timer.skip_if_virtual_board():
return
# Don't timeout if a stepped mode is in operation
if n_sync_steps:
time_threshold = None
else:
time_threshold = get_config_int(
"Machine", "post_simulation_overrun_before_error")
latest_runtime = application_runner(
run_time, time_threshold, self._run_until_complete,
self._state_condition)
if latest_runtime is not None:
self._data_writer.set_current_run_timesteps(latest_runtime)
def _execute_extract_iobuff(self) -> None:
"""
Runs, times and logs the ChipIOBufExtractor if required.
"""
with FecTimer("Extract IO buff", TimerWork.EXTRACTING) as timer:
if timer.skip_if_virtual_board():
return
if timer.skip_if_cfg_false("Reports", "extract_iobuf"):
return
# ErrorMessages, WarnMessages output ignored as never used!
chip_io_buf_extractor()
def _execute_buffer_extractor(self) -> None:
"""
Runs, times and logs the BufferExtractor if required.
"""
with FecTimer("Buffer extractor", TimerWork.EXTRACT_DATA) as timer:
if timer.skip_if_virtual_board():
return
bm = self._data_writer.get_buffer_manager()
FecTimer.start_category(TimerCategory.EXTRACT_DATA)
bm.extract_data()
FecTimer.end_category(TimerCategory.EXTRACT_DATA)
def _do_extract_from_machine(self) -> None:
"""
Runs, times and logs the steps to extract data from the machine.
"""
self._execute_router_provenance_gatherer("Run", TimerWork.EXTRACTING)
self._execute_clear_router_diagnostic_counters()
self._execute_extract_iobuff()
self._execute_buffer_extractor()
self._execute_clear_io_buf()
self._execute_router_provenance_gatherer(
"Extract", TimerWork.EXTRACTING)
self._do_read_provenance()
self._do_provenance_reports()
def _do_run(
self, n_machine_time_steps: Optional[int],
n_sync_steps: int) -> None:
"""
Runs, times and logs the do run steps.
:param n_machine_time_steps: Number of timesteps run
:param n_sync_steps:
The number of timesteps between synchronisations
"""
# TODO virtual board
run_time = None
if n_machine_time_steps is not None:
run_time = (n_machine_time_steps *
self._data_writer.get_simulation_time_step_ms())
self._data_writer.increment_current_run_timesteps(
n_machine_time_steps)
self._report_drift(start=True)
self._execute_update_database_interface(run_time)
self._execute_create_notifiaction_protocol()
self._execute_runtime_update(n_sync_steps)
self._execute_runner(n_sync_steps, run_time)
self._do_extract_from_machine()
# reset at the end of each do_run cycle
self._report_drift(start=False)
self._execute_control_sync(True)
def _stage_run(self, n_machine_time_steps: Optional[int],
run_time: Optional[float], sync_time: float) -> None:
"""
Runs, times and logs the do run steps.
"""
FecTimer.start_category(TimerCategory.RUN_LOOP)
if (self._data_writer.is_ran_ever() and
not self._data_writer.get_requires_data_generation()):
self._execute_dsg_region_reloader()
if not self._data_writer.is_ran_last():
self._do_write_metadata()
try:
n_sync_steps = self.__timesteps(sync_time)
# Run for each of the given steps
if n_machine_time_steps is not None:
if get_config_bool("Buffers", "use_auto_pause_and_resume"):
steps = self._generate_steps(n_machine_time_steps)
else:
steps = [n_machine_time_steps]
logger.info("Running for {} steps for a total of {}ms",
len(steps), run_time)
self._data_writer.set_n_run_steps(len(steps))
for step in steps:
run_step = self._data_writer.next_run_step()
logger.info(f"Run {run_step} of {len(steps)}")
self._do_run(step, n_sync_steps)
self._data_writer.clear_run_steps()
elif self._run_until_complete:
logger.info("Running until complete")
self._do_run(None, n_sync_steps)
else:
if self._data_writer.get_max_run_time_steps() < sys.maxsize:
logger.warning("Due to recording this simulation "
"should not be run longer than {}ms",
self._data_writer.get_max_run_time_steps())
logger.info("Running until stop is called by another thread")
self._do_run(None, n_sync_steps)
except KeyboardInterrupt:
logger.error("User has aborted the simulation")
self._shutdown()
sys.exit(1)
except Exception as run_e:
self._recover_from_error(run_e)
# re-raise exception
raise run_e
finally:
FecTimer.end_category(TimerCategory.RUN_LOOP)
def _recover_from_error(self, exception: Exception) -> None:
try:
self.__recover_from_error(exception)
except Exception as rec_e:
logger.exception(
f"Error {rec_e} when attempting to recover from error")
def __recover_from_error(self, exception: Exception) -> None:
# if exception has an exception, print to system
logger.error("An error has occurred during simulation")
# Print the detail including the traceback
logger.error(exception)
logger.info("\n\nAttempting to extract data\n\n")
# Extract router provenance
try:
router_provenance_gatherer()
except Exception:
logger.exception("Error reading router provenance")
# Find the cores that are not in an expected state
unsuccessful_cores = CPUInfos()
if isinstance(exception, SpiNNManCoresNotInStateException):
unsuccessful_cores = exception.failed_core_states()
# If there are no cores in a bad state, find those not yet in
# their finished state
transceiver = self._data_writer.get_transceiver()
if not unsuccessful_cores:
for executable_type, core_subsets in \
self._data_writer.get_executable_types().items():
unsuccessful_cores = transceiver.get_cpu_infos(
core_subsets, executable_type.end_state, False)
# Print the details of error cores
logger.error(unsuccessful_cores.get_status_string())
# Find the cores that are not in RTE i.e. that can still be read
non_rte_cores = unsuccessful_cores.infos_not_in_states(
[CPUState.RUN_TIME_EXCEPTION, CPUState.WATCHDOG])
# If there are any cores that are not in RTE, extract data from them
if (non_rte_cores and
ExecutableType.USES_SIMULATION_INTERFACE in
self._data_writer.get_executable_types()):
non_rte_core_subsets = CoreSubsets()
for (x, y, p) in non_rte_cores:
non_rte_core_subsets.add_processor(x, y, p)
# Attempt to force the cores to write provenance and exit
try:
chip_provenance_updater(non_rte_core_subsets)
except Exception:
logger.exception("Could not update provenance on chip")
# Extract any written provenance data
try:
transceiver = self._data_writer.get_transceiver()
finished_cores = transceiver.get_cpu_infos(
non_rte_core_subsets, CPUState.FINISHED, True)
finished_placements = Placements()
for (x, y, p) in finished_cores:
try:
placement = self._data_writer.\
get_placement_on_processor(x, y, p)
finished_placements.add_placement(placement)
except Exception: # pylint: disable=broad-except
pass # already recovering from error
placements_provenance_gatherer(
finished_placements.n_placements,
finished_placements.placements)
except Exception as pro_e:
logger.exception(f"Could not read provenance due to {pro_e}")
# Read IOBUF where possible (that should be everywhere)
iobuf = IOBufExtractor()
try:
errors, warnings = iobuf.extract_iobuf()
except Exception:
logger.exception("Could not get iobuf")
errors, warnings = [], []
# Print the IOBUFs
self._print_iobuf(errors, warnings)
@staticmethod
def _print_iobuf(errors: Iterable[str], warnings: Iterable[str]) -> None:
for warning in warnings:
logger.warning(warning)
for error in errors:
logger.error(error)
[docs]
def reset(self) -> None:
"""
Puts the simulation back at time zero.
"""
FecTimer.start_category(TimerCategory.RESETTING)
if not self._data_writer.is_ran_last():
if not self._data_writer.is_ran_ever():
logger.error("Ignoring the reset before the run")
else:
logger.error("Ignoring the repeated reset call")
return
self._report_energy()
logger.info("Resetting")
if self._data_writer.get_user_accessed_machine():
logger.warning(
"A reset after a get machine call is always hard and "
"therefore the previous machine is no longer valid")
self._hard_reset()
else:
self._data_writer.soft_reset()
# rewind the buffers from the buffer manager, to start at the beginning
# of the simulation again and clear buffered out
if self._data_writer.has_buffer_manager():
self._data_writer.get_buffer_manager().reset()
FecTimer.end_category(TimerCategory.RESETTING)
def __repr__(self) -> str:
if self._data_writer.has_ipaddress():
return (f"general front end instance for machine "
f"{self._data_writer.get_ipaddress()}")
else:
return "general front end instance no machine set"
def _shutdown(self) -> None:
# if stopping on machine, clear IP tags and routing table
self.__clear()
super()._shutdown()
self._data_writer.clear_notification_protocol()
FecTimer.stop_category_timing()
def __clear(self) -> None:
if not self._data_writer.has_transceiver():
return
transceiver = self._data_writer.get_transceiver()
if get_config_bool("Machine", "clear_tags"):
for ip_tag in self._data_writer.get_tags().ip_tags:
transceiver.clear_ip_tag(
ip_tag.tag, board_address=ip_tag.board_address)
for reverse_ip_tag in self._data_writer.get_tags().reverse_ip_tags:
transceiver.clear_ip_tag(
reverse_ip_tag.tag,
board_address=reverse_ip_tag.board_address)
# if clearing routing table entries, clear
if get_config_bool("Machine", "clear_routing_tables"):
transceiver.clear_multicast_routes()
@overrides(ConfigHandler._close_allocation_controller)
def _close_allocation_controller(self) -> None:
FecTimer.start_category(TimerCategory.MACHINE_OFF)
super()._close_allocation_controller()
FecTimer.end_category(TimerCategory.MACHINE_OFF)
[docs]
def stop(self) -> None:
"""
End running of the simulation.
"""
self._data_writer.stopping()
FecTimer.start_category(TimerCategory.SHUTTING_DOWN)
# If we have run forever, stop the binaries
self._report_energy()
try:
if (self._data_writer.is_ran_ever()
and self._data_writer.get_current_run_timesteps() is None
and not get_config_bool("Machine", "virtual_board")
and not self._run_until_complete):
self._do_stop_workflow()
elif get_config_bool("Reports", "read_provenance_data_on_end"):
self._do_read_provenance()
self._stop_remove_data()
except Exception as e:
self._recover_from_error(e)
self._data_writer.write_errored_file(str(e))
raise
finally:
# shut down the machine properly
self._shutdown()
self._data_writer.write_finished_file()
# No matching FecTimer.end_category as shutdown stops timer
def _execute_application_finisher(self) -> None:
with FecTimer("Application finisher", TimerWork.CONTROL):
application_finisher()
def _do_stop_workflow(self) -> None:
self._execute_application_finisher()
self._do_extract_from_machine()
[docs]
def stop_run(self) -> None:
"""
Request that the current infinite run stop.
.. note::
This will need to be called from another thread as the infinite
run call is blocking.
:raises SpiNNUtilsException:
If the stop_run was not expected in the current state.
"""
# Do not do start category here
# as called from a different thread while running
if self._data_writer.is_stop_already_requested():
logger.warning("Second Request to stop_run ignored")
return
with self._state_condition:
self._data_writer.request_stop()
self._state_condition.notify_all()
[docs]
def continue_simulation(self) -> None:
"""
Continue a simulation that has been started in stepped mode.
"""
sync_signal = self._data_writer.get_next_sync_signal()
transceiver = self._data_writer.get_transceiver()
transceiver.send_signal(self._data_writer.get_app_id(), sync_signal)