Source code for spinn_front_end_common.interface.abstract_spinnaker_base

# Copyright (c) 2016 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
main interface for the SpiNNaker tools
"""
from __future__ import annotations
import logging
import math
import os
import re
import signal
import sys
import threading
import types
from threading import Condition
from typing import (
    Dict, Iterable, Optional, Sequence, Tuple, Type,
    TypeVar, Union, cast, final)

import ebrains_drive  # type: ignore[import]
from numpy import __version__ as numpy_version
import requests

from spinn_utilities import __version__ as spinn_utils_version
from spinn_utilities.config_holder import (
    get_config_bool, get_config_int, get_config_str, get_config_str_or_none,
    is_config_none, set_config)
from spinn_utilities.log import FormatAdapter
from spinn_utilities.typing.coords import XY

from spinn_machine import __version__ as spinn_machine_version
from spinn_machine import CoreSubsets

from spinnman import __version__ as spinnman_version
from spinnman.exceptions import SpiNNManCoresNotInStateException
from spinnman.model.cpu_infos import CPUInfos
from spinnman.model.enums import CPUState, ExecutableType

from spalloc_client import (  # type: ignore[import]
    __version__ as spalloc_version)

from pacman import __version__ as pacman_version
from pacman.exceptions import PacmanPlaceException
from pacman.model.graphs.application import ApplicationEdge, ApplicationVertex
from pacman.model.graphs import AbstractVirtual
from pacman.model.resources import AbstractSDRAM
from pacman.model.partitioner_splitters.splitter_reset import splitter_reset
from pacman.model.placements import Placements
from pacman.model.routing_tables import MulticastRoutingTables
from pacman.operations.fixed_route_router import fixed_route_router
from pacman.operations.partition_algorithms import splitter_partitioner
from pacman.operations.placer_algorithms import place_application_graph
from pacman.operations.router_algorithms import route_application_graph
from pacman.operations.router_compressors import (
    pair_compressor, range_compressor)
from pacman.operations.router_compressors.ordered_covering_router_compressor \
    import ordered_covering_compressor
from pacman.operations.routing_info_allocator_algorithms.\
    zoned_routing_info_allocator import (flexible_allocate, global_allocate)
from pacman.operations.routing_table_generators import (
    basic_routing_table_generator, merged_routing_table_generator)
from pacman.operations.tag_allocator_algorithms import basic_tag_allocator

from spinn_front_end_common import __version__ as fec_version
from spinn_front_end_common import common_model_binaries
from spinn_front_end_common.abstract_models import (
    AbstractVertexWithEdgeToDependentVertices,
    AbstractCanReset)
from spinn_front_end_common.abstract_models.impl import (
    MachineAllocationController)
from spinn_front_end_common.data.fec_data_view import FecDataView
from spinn_front_end_common.interface.buffer_management import BufferManager
from spinn_front_end_common.interface.buffer_management.storage_objects \
    import BufferDatabase
from spinn_front_end_common.interface.config_handler import ConfigHandler
from spinn_front_end_common.interface.interface_functions import (
    application_finisher, application_runner,
    chip_io_buf_clearer, chip_io_buf_extractor,
    chip_provenance_updater, chip_runtime_updater, compute_energy_used,
    create_notification_protocol, database_interface,
    reload_dsg_regions, energy_provenance_reporter,
    load_application_data_specs, load_system_data_specs,
    graph_binary_gatherer, graph_data_specification_writer,
    graph_provenance_gatherer,
    host_based_bit_field_router_compressor, hbp_allocator,
    insert_chip_power_monitors_to_graphs,
    insert_extra_monitor_vertices_to_graphs, split_lpg_vertices,
    load_app_images, load_fixed_routes, load_sys_images,
    locate_executable_start_type, machine_generator,
    placements_provenance_gatherer, profile_data_gatherer,
    read_routing_tables_from_machine, router_provenance_gatherer,
    routing_setup, routing_table_loader,
    sdram_outgoing_partition_allocator, spalloc_allocator,
    system_multicast_routing_generator,
    tags_loader, virtual_machine_generator, add_command_senders)
from spinn_front_end_common.interface.interface_functions.\
    host_no_bitfield_router_compression import (
        ordered_covering_compression, pair_compression)
from spinn_front_end_common.interface.provenance import (
    FecTimer, GlobalProvenance, TimerCategory, TimerWork)
from spinn_front_end_common.interface.splitter_selectors import (
    splitter_selector)
from spinn_front_end_common.interface.java_caller import JavaCaller
from spinn_front_end_common.utilities.exceptions import ConfigurationException
from spinn_front_end_common.utilities.report_functions import (
    bitfield_compressor_report, board_chip_report, EnergyReport,
    fixed_route_from_machine_report, memory_map_on_host_report,
    memory_map_on_host_chip_report, network_specification,
    routing_table_from_machine_report, tags_from_machine_report,
    write_json_machine, write_json_placements,
    write_json_routing_tables, drift_report)
from spinn_front_end_common.utilities.iobuf_extractor import IOBufExtractor
from spinn_front_end_common.utility_models import (
    DataSpeedUpPacketGatherMachineVertex)
from spinn_front_end_common.utilities.report_functions.reports import (
    generate_comparison_router_report, partitioner_report,
    placer_reports_with_application_graph,
    router_compressed_summary_report, routing_info_report,
    router_report_from_compressed_router_tables,
    router_report_from_paths,
    router_report_from_router_tables, router_summary_report,
    sdram_usage_report_per_chip,
    tag_allocator_report)
from spinn_front_end_common.data.fec_data_writer import FecDataWriter

try:
    from scipy import __version__ as scipy_version
except ImportError:
    scipy_version = "scipy not installed"

logger = FormatAdapter(logging.getLogger(__name__))
_T = TypeVar("_T")

SHARED_PATH = re.compile(r".*\/shared\/([^\/]+)")
SHARED_GROUP = 1
SHARED_WITH_PATH = re.compile(r".*\/Shared with (all|groups|me)\/([^\/]+)")
SHARED_WITH_GROUP = 2


[docs] class AbstractSpinnakerBase(ConfigHandler): """ Main interface into the tools logic flow. """ # pylint: disable=broad-except __slots__ = ( # Condition object used for waiting for stop # Set during init and the used but never new object "_state_condition", # Set when run_until_complete is specified by the user "_run_until_complete", # "_raise_keyboard_interrupt", # original value which is used in exception handling and control c "__sys_excepthook", # All beyond this point new for no extractor # The data is not new but now it is held direct and not via inputs # Flag to say is compressed routing tables are on machine # TODO remove this when the data change only algorithms are done "_multicast_routes_loaded") def __init__( self, data_writer_cls: Optional[Type[FecDataWriter]] = None): """ :param FecDataWriter data_writer_cls: The Global data writer class """ # pylint: disable=too-many-arguments super().__init__(data_writer_cls) FecTimer.start_category(TimerCategory.WAITING) FecTimer.start_category(TimerCategory.SETTING_UP) # output locations of binaries to be searched for end user info logger.info( "Will search these locations for binaries: {}", self._data_writer.get_executable_finder().binary_paths) self._multicast_routes_loaded = False # holder for timing and running related values self._run_until_complete = False self._state_condition = Condition() # folders self._set_up_report_specifics() # Setup for signal handling self._raise_keyboard_interrupt = False self._create_version_provenance() self.__sys_excepthook = sys.excepthook FecTimer.setup(self) self._data_writer.register_binary_search_path( os.path.dirname(common_model_binaries.__file__)) self._data_writer.set_machine_generator(self._get_machine) FecTimer.end_category(TimerCategory.SETTING_UP) def _hard_reset(self) -> None: """ This clears all data that if no longer valid after a hard reset """ if self._data_writer.has_transceiver(): self._data_writer.get_transceiver().stop_application( self._data_writer.get_app_id()) self.__close_allocation_controller() self._data_writer.hard_reset() self._multicast_routes_loaded = False def _machine_clear(self) -> None: pass def _setup_java_caller(self) -> None: if get_config_bool("Java", "use_java"): self._data_writer.set_java_caller(JavaCaller()) def __signal_handler(self, _signal, _frame) -> None: """ Handles closing down of script via keyboard interrupt :param _signal: the signal received (ignored) :param _frame: frame executed in (ignored) """ # If we are to raise the keyboard interrupt, do so if self._raise_keyboard_interrupt: raise KeyboardInterrupt logger.error("User has cancelled simulation") self._shutdown() @property def __bearer_token(self) -> Optional[str]: """ :return: The OIDC bearer token :rtype: str or None """ # Try using Jupyter if we have the right variables jupyter_token = os.getenv("JUPYTERHUB_API_TOKEN") jupyter_ip = os.getenv("JUPYTERHUB_SERVICE_HOST") jupyter_port = os.getenv("JUPYTERHUB_SERVICE_PORT") if (jupyter_token is not None and jupyter_ip is not None and jupyter_port is not None): jupyter_url = (f"http://{jupyter_ip}:{jupyter_port}/services/" "access-token-service/access-token") headers = {"Authorization": f"Token {jupyter_token}"} response = requests.get(jupyter_url, headers=headers, timeout=10) return response.json().get('access_token') # Try a simple environment variable, or None if that doesn't exist return os.getenv("OIDC_BEARER_TOKEN") @property def __group_collab_or_job(self) -> Dict[str, str]: """ :return: The group, collab, or NMPI Job ID to associate with jobs :rtype: dict() """ # Try to get a NMPI Job nmpi_job = os.getenv("NMPI_JOB_ID") if nmpi_job is not None and nmpi_job != "": nmpi_user = os.getenv("NMPI_USER") if nmpi_user is not None and nmpi_user != "": logger.info("Requesting job for NMPI job {}, user {}", nmpi_job, nmpi_user) return {"nmpi_job": nmpi_job, "nmpi_user": nmpi_user} logger.info("Requesting spalloc job for NMPI job {}", nmpi_job) return {"nmpi_job": nmpi_job} # Try to get the collab from the path cwd = os.getcwd() match_obj = SHARED_PATH.match(cwd) if match_obj: collab = self.__get_collab_id_from_folder( match_obj.group(SHARED_GROUP)) if collab is not None: return collab match_obj = SHARED_WITH_PATH.match(cwd) if match_obj: collab = self.__get_collab_id_from_folder( match_obj.group(SHARED_WITH_GROUP)) if collab is not None: return collab # Try to use the config to get a group group = get_config_str_or_none("Machine", "spalloc_group") if group is not None: return {"group": group} # Nothing ventured, nothing gained return {} def __get_collab_id_from_folder( self, folder: str) -> Optional[Dict[str, str]]: """ Currently hacky way to get the EBRAINS collab id from the drive folder, replicated from the NMPI collab template. """ token = self.__bearer_token if token is None: return None ebrains_drive_client = ebrains_drive.connect(token=token) repo_by_title = ebrains_drive_client.repos.get_repos_by_name(folder) if len(repo_by_title) != 1: logger.warning(f"The repository for collab {folder} could not be" " found; continuing as if not in a collaboratory") return {} # Owner is formatted as collab-<collab_id>-<permission>, and we want # to extract the <collab-id> owner = repo_by_title[0].owner collab_id = owner[:owner.rindex("-")] collab_id = collab_id[collab_id.find("-") + 1:] logger.info(f"Requesting job in collaboratory {collab_id}") return {"collab": collab_id}
[docs] def exception_handler( self, exc_type: Type[BaseException], value: BaseException, traceback_obj: Optional[types.TracebackType]): """ Handler of exceptions. :param type exc_type: the type of exception received :param Exception value: the value of the exception :param traceback traceback_obj: the trace back stuff """ logger.error("Shutdown on exception") self._shutdown() return self.__sys_excepthook(exc_type, value, traceback_obj)
def _should_run(self) -> bool: """ Checks if the simulation should run. Will warn the user if there is no need to run :return: True if and only if one of the graphs has vertices in it :raises ConfigurationException: If the current state does not support a new run call """ if self._data_writer.get_n_vertices() > 0: return True logger.warning( "Your graph has no vertices in it. " "Therefore the run call will exit immediately.") return False
[docs] def run_until_complete(self, n_steps: Optional[int] = None): """ Run a simulation until it completes. :param int n_steps: If not `None`, this specifies that the simulation should be requested to run for the given number of steps. The host will still wait until the simulation itself says it has completed. """ FecTimer.start_category(TimerCategory.RUN_OTHER) self._run_until_complete = True self._run(n_steps, sync_time=0.0) FecTimer.end_category(TimerCategory.RUN_OTHER)
[docs] def run(self, run_time: Optional[float], sync_time: float = 0): """ Run a simulation for a fixed amount of time. :param int run_time: the run duration in milliseconds. :param float sync_time: If not 0, this specifies that the simulation should pause after this duration. The continue_simulation() method must then be called for the simulation to continue. """ FecTimer.start_category(TimerCategory.RUN_OTHER) if self._run_until_complete: raise NotImplementedError("run after run_until_complete") self._run(run_time, sync_time) FecTimer.end_category(TimerCategory.RUN_OTHER)
def __timesteps(self, time_in_ms: float) -> int: """ Get a number of timesteps for a given time in milliseconds. :return: The number of timesteps :rtype: int """ time_step_ms = self._data_writer.get_simulation_time_step_ms() n_time_steps = int(math.ceil(time_in_ms / time_step_ms)) calc_time = n_time_steps * time_step_ms # Allow for minor float errors if abs(time_in_ms - calc_time) > 0.00001: logger.warning( "Time of {}ms " "is not a multiple of the machine time step of {}ms " "and has therefore been rounded up to {}ms", time_in_ms, time_step_ms, calc_time) return n_time_steps def _calc_run_time(self, run_time: Optional[float]) -> Union[ Tuple[int, float], Tuple[None, None]]: """ Calculates n_machine_time_steps and total_run_time based on run_time and machine_time_step. This method rounds the run up to the next timestep as discussed in https://github.com/SpiNNakerManchester/sPyNNaker/issues/149 If run_time is `None` (run forever) both values will be `None` :param run_time: time user requested to run for in milliseconds :type run_time: float or None :return: n_machine_time_steps as a whole int and total_run_time in milliseconds :rtype: tuple(int,float) or tuple(None,None) """ if run_time is None: return None, None n_machine_time_steps = self.__timesteps(run_time) total_run_timesteps = ( (self._data_writer.get_current_run_timesteps() or 0) + n_machine_time_steps) total_run_time = ( total_run_timesteps * self._data_writer.get_hardware_time_step_ms()) logger.info( f"Simulating for {n_machine_time_steps} " f"{self._data_writer.get_simulation_time_step_ms()} ms timesteps " f"using a hardware timestep of " f"{self._data_writer.get_hardware_time_step_us()} us") return n_machine_time_steps, total_run_time def _run(self, run_time: Optional[float], sync_time: float): self._data_writer.start_run() try: self.__run(run_time, sync_time) self._data_writer.finish_run() except Exception: # if in debug mode, do not shut down machine if get_config_str("Mode", "mode") != "Debug": try: self.stop() except Exception as stop_e: logger.exception(f"Error {stop_e} when attempting to stop") self._data_writer.shut_down() raise @staticmethod def __is_main_thread() -> bool: """ :return: Whether this is the main thread. :rtype: bool """ return threading.get_ident() == threading.main_thread().ident def __run(self, run_time: Optional[float], sync_time: float): """ The main internal run function. :param int run_time: the run duration in milliseconds. :param int sync_time: the time in milliseconds between synchronisations, or 0 to disable. """ if not self._should_run(): return # verify that we can keep doing auto pause and resume if self._data_writer.is_ran_ever(): can_keep_running = all( executable_type.supports_auto_pause_and_resume for executable_type in self._data_writer.get_executable_types()) if not can_keep_running: raise NotImplementedError( "Only binaries that use the simulation interface can be" " run more than once") self._adjust_config(run_time) # Install the Control-C handler if self.__is_main_thread(): signal.signal(signal.SIGINT, self.__signal_handler) self._raise_keyboard_interrupt = True sys.excepthook = self.__sys_excepthook logger.info("Starting execution process") n_machine_time_steps, total_run_time = self._calc_run_time(run_time) if FecDataView.has_allocation_controller(): FecDataView.get_allocation_controller().extend_allocation( total_run_time or 0.0) n_sync_steps = self.__timesteps(sync_time) # If we have never run before, or the graph has changed, # start by performing mapping if (self._data_writer.get_requires_mapping() and self._data_writer.is_ran_last()): self.stop() raise NotImplementedError( "The network cannot be changed between runs without" " resetting") # If we have reset and the graph has changed, stop any running # application if (self._data_writer.get_requires_data_generation() and self._data_writer.has_transceiver()): self._data_writer.get_transceiver().stop_application( self._data_writer.get_app_id()) self._data_writer.reset_sync_signal() # build the graphs to modify with system requirements if self._data_writer.get_requires_mapping(): if self._data_writer.is_soft_reset(): # wipe out stuff associated with past mapping self._hard_reset() FecTimer.setup(self) self._add_dependent_verts_and_edges_for_application_graph() if get_config_bool("Buffers", "use_auto_pause_and_resume"): self._data_writer.set_plan_n_timesteps(get_config_int( "Buffers", "minimum_auto_time_steps")) else: self._data_writer.set_plan_n_timesteps(n_machine_time_steps) self._do_mapping(total_run_time) if not self._data_writer.is_ran_last(): self._do_write_metadata() # Check if anything has per-timestep SDRAM usage is_per_timestep_sdram = any( placement.vertex.sdram_required.per_timestep for placement in self._data_writer.iterate_placemements()) # Disable auto pause and resume if the binary can't do it if not get_config_bool("Machine", "virtual_board"): for executable_type in self._data_writer.get_executable_types(): if not executable_type.supports_auto_pause_and_resume: set_config("Buffers", "use_auto_pause_and_resume", "False") break # Work out the maximum run duration given all recordings if not self._data_writer.has_max_run_time_steps(): self._data_writer.set_max_run_time_steps( self._deduce_data_n_timesteps()) # Work out an array of timesteps to perform steps: Optional[Sequence[Optional[int]]] = None if (not get_config_bool("Buffers", "use_auto_pause_and_resume") or not is_per_timestep_sdram): # Runs should only be in units of max_run_time_steps at most if is_per_timestep_sdram and ( n_machine_time_steps is None or (self._data_writer.get_max_run_time_steps() < n_machine_time_steps)): raise ConfigurationException( "The SDRAM required by one or more vertices is based on " "the run time, so the run time is limited to " f"{self._data_writer.get_max_run_time_steps()} time steps") steps = [n_machine_time_steps] elif run_time is not None: # With auto pause and resume, any time step is possible but run # time more than the first will guarantee that run will be called # more than once steps = self._generate_steps(n_machine_time_steps) # requires data_generation includes never run and requires_mapping if self._data_writer.get_requires_data_generation(): self._do_load() # Run for each of the given steps if run_time is not None: assert steps is not None logger.info("Running for {} steps for a total of {}ms", len(steps), run_time) for step in steps: run_step = self._data_writer.next_run_step() logger.info(f"Run {run_step} of {len(steps)}") self._do_run(step, n_sync_steps) self._data_writer.clear_run_steps() elif run_time is None and self._run_until_complete: logger.info("Running until complete") self._do_run(None, n_sync_steps) elif (not get_config_bool( "Buffers", "use_auto_pause_and_resume") or not is_per_timestep_sdram): logger.info("Running forever") self._do_run(None, n_sync_steps) logger.info("Waiting for stop request") with self._state_condition: while self._data_writer.is_no_stop_requested(): self._state_condition.wait() else: logger.info("Running forever in steps of {}ms", self._data_writer.get_max_run_time_steps()) while self._data_writer.is_no_stop_requested(): logger.info(f"Run {self._data_writer.next_run_step()}") self._do_run( self._data_writer.get_max_run_time_steps(), n_sync_steps) self._data_writer.clear_run_steps() # Indicate that the signal handler needs to act if self.__is_main_thread(): self._raise_keyboard_interrupt = False sys.excepthook = self.exception_handler @final def _add_commands_to_command_sender(self, system_placements: Placements): """ Runs, times and logs the VirtualMachineGenerator if required. May set then "machine" value """ with FecTimer("Command Sender Adder", TimerWork.OTHER): all_command_senders = add_command_senders(system_placements) # add the edges from the command senders to the dependent vertices for command_sender in all_command_senders: self._data_writer.add_vertex(command_sender) edges, partition_ids = command_sender.edges_and_partitions() for edge, partition_id in zip(edges, partition_ids): self._data_writer.add_edge(edge, partition_id) @final def _add_dependent_verts_and_edges_for_application_graph(self) -> None: # cache vertices to allow insertion during iteration vertices = list(self._data_writer.get_vertices_by_type( AbstractVertexWithEdgeToDependentVertices)) for vertex in vertices: v = cast(ApplicationVertex, vertex) for dpt_vtx in vertex.dependent_vertices(): if dpt_vtx.has_been_added_to_graph(): continue self._data_writer.add_vertex(dpt_vtx) edge_partition_ids = vertex.\ edge_partition_identifiers_for_dependent_vertex(dpt_vtx) for edge_identifier in edge_partition_ids: self._data_writer.add_edge( ApplicationEdge(v, dpt_vtx), edge_identifier) @final def _deduce_data_n_timesteps(self) -> int: """ Operates the auto pause and resume functionality by figuring out how many timer ticks a simulation can run before SDRAM runs out, and breaks simulation into chunks of that long. :return: max time a simulation can run. :rtype: int """ # Go through the placements and find how much SDRAM is used # on each chip usage_by_chip: Dict[XY, AbstractSDRAM] = dict() for place in self._data_writer.iterate_placemements(): if isinstance(place.vertex, AbstractVirtual): continue sdram = place.vertex.sdram_required if (place.x, place.y) in usage_by_chip: usage_by_chip[place.x, place.y] += sdram else: usage_by_chip[place.x, place.y] = sdram # Go through the chips and divide up the remaining SDRAM, finding # the minimum number of machine timesteps to assign max_time_steps = sys.maxsize for (x, y), sdram in usage_by_chip.items(): size = self._data_writer.get_chip_at(x, y).sdram if sdram.fixed > size: raise PacmanPlaceException( f"Too much SDRAM has been allocated on chip {x}, {y}: " f"{sdram.fixed} of {size}") if sdram.per_timestep: max_this_chip = int((size - sdram.fixed) // sdram.per_timestep) max_time_steps = min(max_time_steps, max_this_chip) return max_time_steps def _generate_steps(self, n_steps: Optional[int]) -> Sequence[int]: """ Generates the list of "timer" runs. These are usually in terms of time steps, but need not be. :param int n_steps: the total runtime in machine time steps :return: list of time step lengths :rtype: list(int) """ if n_steps is None or n_steps == 0: return [0] n_steps_per_segment = self._data_writer.get_max_run_time_steps() n_full_iterations = int(math.floor(n_steps / n_steps_per_segment)) left_over_steps = n_steps - n_full_iterations * n_steps_per_segment steps = [int(n_steps_per_segment)] * n_full_iterations if left_over_steps: steps.append(int(left_over_steps)) return steps def _execute_get_virtual_machine(self) -> None: """ Runs, times and logs the VirtualMachineGenerator if required. May set then "machine" value """ with FecTimer("Virtual machine generator", TimerWork.OTHER): self._data_writer.set_machine(virtual_machine_generator()) self._data_writer.set_ipaddress("virtual") def _execute_allocator(self, total_run_time: Optional[float]) -> Optional[ Tuple[str, int, Optional[str], bool, bool, Optional[Dict[XY, str]], MachineAllocationController]]: """ Runs, times and logs the SpallocAllocator or HBPAllocator if required. :param total_run_time: The total run time to request :type total_run_time: int or None :return: machine name, machine version, BMP details (if any), reset on startup flag, auto-detect BMP, SCAMP connection details, boot port, allocation controller :rtype: tuple(str, int, object, bool, bool, object, object, MachineAllocationController) """ if self._data_writer.has_machine(): return None if not is_config_none("Machine", "spalloc_server"): with FecTimer("SpallocAllocator", TimerWork.OTHER): return spalloc_allocator( self.__bearer_token, **self.__group_collab_or_job) if not is_config_none("Machine", "remote_spinnaker_url"): with FecTimer("HBPAllocator", TimerWork.OTHER): # TODO: Would passing the bearer token to this ever make sense? return hbp_allocator(total_run_time) return None def _execute_machine_generator(self, allocator_data: Optional[Tuple[ str, int, Optional[str], bool, bool, Optional[Dict[XY, str]], MachineAllocationController]]) -> None: """ Runs, times and logs the MachineGenerator if required. May set the "machine" value if not already set :param allocator_data: `None` or (machine name, machine version, BMP details (if any), reset on startup flag, auto-detect BMP, SCAMP connection details, boot port, allocation controller) :type allocator_data: None or tuple(str, int, object, bool, bool, object, object, MachineAllocationController) """ if self._data_writer.has_machine(): return machine_name = get_config_str_or_none("Machine", "machine_name") if machine_name is not None: self._data_writer.set_ipaddress(machine_name) bmp_details = get_config_str_or_none("Machine", "bmp_names") auto_detect_bmp = get_config_bool("Machine", "auto_detect_bmp") scamp_connection_data = None reset_machine = get_config_bool( "Machine", "reset_machine_on_startup") board_version = FecDataView.get_machine_version().number elif allocator_data: (ipaddress, board_version, bmp_details, reset_machine, auto_detect_bmp, scamp_connection_data, machine_allocation_controller) = allocator_data self._data_writer.set_ipaddress(ipaddress) self._data_writer.set_allocation_controller( machine_allocation_controller) else: return with FecTimer("Machine generator", TimerWork.GET_MACHINE): machine, transceiver = machine_generator( bmp_details, board_version, auto_detect_bmp or False, scamp_connection_data, reset_machine or False) self._data_writer.set_transceiver(transceiver) self._data_writer.set_machine(machine) def _get_known_machine(self, total_run_time: float = 0.0): """ The Python machine description object. :param float total_run_time: The total run time to request :rtype: ~spinn_machine.Machine """ if not self._data_writer.has_machine(): if get_config_bool("Machine", "virtual_board"): self._execute_get_virtual_machine() else: allocator_data = self._execute_allocator(total_run_time) self._execute_machine_generator(allocator_data) def _get_machine(self) -> None: """ The factory method to get a machine. """ FecTimer.start_category(TimerCategory.GET_MACHINE, True) if self._data_writer.is_user_mode() and \ self._data_writer.is_soft_reset(): # Make the reset hard logger.warning( "Calling Get machine after a reset force a hard reset and " "therefore generate a new machine") self._hard_reset() self._get_known_machine() if not self._data_writer.has_machine(): raise ConfigurationException( "Not enough information provided to supply a machine") FecTimer.end_category(TimerCategory.GET_MACHINE) def _create_version_provenance(self) -> None: """ Add the version information to the provenance data at the start. """ with GlobalProvenance() as db: db.insert_version("spinn_utilities_version", spinn_utils_version) db.insert_version("spinn_machine_version", spinn_machine_version) db.insert_version("spalloc_version", spalloc_version) db.insert_version("spinnman_version", spinnman_version) db.insert_version("pacman_version", pacman_version) db.insert_version("front_end_common_version", fec_version) db.insert_version("numpy_version", numpy_version) db.insert_version("scipy_version", scipy_version) def _do_extra_mapping_algorithms(self) -> None: """ Allows overriding classes to add algorithms. """ def _json_machine(self) -> None: """ Runs, times and logs WriteJsonMachine if required. """ with FecTimer("Json machine", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false("Reports", "write_json_machine"): return write_json_machine() def _report_network_specification(self) -> None: """ Runs, times and logs the Network Specification report is requested. """ with FecTimer( "Network Specification report", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false( "Reports", "write_network_specification_report"): return network_specification() def _execute_split_lpg_vertices(self, system_placements: Placements): """ Runs, times and logs the SplitLPGVertices if required. """ with FecTimer("Split Live Gather Vertices", TimerWork.OTHER): split_lpg_vertices(system_placements) def _report_board_chip(self) -> None: """ Runs, times and logs the BoardChipReport is requested. """ with FecTimer("Board chip report", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false("Reports", "write_board_chip_report"): return board_chip_report() if FecDataView.has_allocation_controller(): FecDataView.get_allocation_controller().make_report( os.path.join( FecDataView.get_run_dir_path(), "machine_allocation.rpt")) def _execute_splitter_reset(self) -> None: """ Runs, times and logs the splitter_reset. """ with FecTimer("Splitter reset", TimerWork.OTHER): splitter_reset() # Overridden by sPyNNaker to choose an extended algorithm def _execute_splitter_selector(self) -> None: """ Runs, times and logs the SplitterSelector. """ with FecTimer("Splitter selector", TimerWork.OTHER): splitter_selector() def _execute_delay_support_adder(self) -> None: """ Stub to allow sPyNNaker to add delay supports. """ # Overridden by sPyNNaker to choose a different algorithm def _execute_splitter_partitioner(self) -> None: """ Runs, times and logs the SplitterPartitioner if required. """ if self._data_writer.get_n_vertices() == 0: return with FecTimer("Splitter partitioner", TimerWork.OTHER): self._data_writer.set_n_chips_in_graph(splitter_partitioner()) def _execute_insert_chip_power_monitors( self, system_placements: Placements): """ Run, time and log the InsertChipPowerMonitorsToGraphs if required. """ with FecTimer("Insert chip power monitors", TimerWork.OTHER) as timer: if timer.skip_if_cfg_false("Reports", "write_energy_report"): return insert_chip_power_monitors_to_graphs(system_placements) @final def _execute_insert_extra_monitor_vertices( self, system_placements: Placements): """ Run, time and log the InsertExtraMonitorVerticesToGraphs if required. """ with FecTimer( "Insert extra monitor vertices", TimerWork.OTHER) as timer: if timer.skip_if_cfgs_false( "Machine", "enable_advanced_monitor_support", "enable_reinjection"): return # inserter checks for None app graph not an empty one gather_map, monitor_map = insert_extra_monitor_vertices_to_graphs( system_placements) self._data_writer.set_gatherer_map(gather_map) self._data_writer.set_monitor_map(monitor_map) def _report_partitioner(self) -> None: """ Write, times and logs the partitioner_report if needed. """ with FecTimer("Partitioner report", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false("Reports", "write_partitioner_reports"): return partitioner_report() @property def get_number_of_available_cores_on_machine(self) -> int: """ The number of available cores on the machine after taking into account preallocated resources. :return: number of available cores :rtype: int """ machine = self._data_writer.get_machine() # get cores of machine cores = machine.total_available_user_cores ethernets = len(machine.ethernet_connected_chips) cores -= ((machine.n_chips - ethernets) * self._data_writer.get_all_monitor_cores()) cores -= ethernets * self._data_writer.get_ethernet_monitor_cores() return cores def _execute_application_placer(self, system_placements: Placements): """ Runs, times and logs the Application Placer. Sets the "placements" data .. note:: Calling of this method is based on the configuration placer value """ with FecTimer("Application Placer", TimerWork.OTHER): self._data_writer.set_placements(place_application_graph( system_placements)) def _do_placer(self, system_placements: Placements): """ Runs, times and logs one of the placers. Sets the "placements" data Which placer is run depends on the configuration placer value This method is the entry point for adding a new Placer :raise ConfigurationException: if the configuration place value is unexpected """ name = get_config_str("Mapping", "placer") if name == "ApplicationPlacer": return self._execute_application_placer(system_placements) if "," in name: raise ConfigurationException( "Only a single algorithm is supported for placer") raise ConfigurationException( f"Unexpected cfg setting placer: {name}") def _do_write_metadata(self) -> None: """ Do the various functions to write metadata to the SQLite files. """ with FecTimer("Record vertex labels to database", TimerWork.REPORT): with BufferDatabase() as db: db.store_vertex_labels() @final def _execute_system_multicast_routing_generator(self) -> None: """ Runs, times and logs the SystemMulticastRoutingGenerator if required. May sets the data "data_in_multicast_routing_tables", "data_in_multicast_key_to_chip_map" and "system_multicast_router_timeout_keys" """ with FecTimer("System multicast routing generator", TimerWork.OTHER) as timer: if timer.skip_if_cfgs_false( "Machine", "enable_advanced_monitor_support", "enable_reinjection"): return data = system_multicast_routing_generator() self._data_writer.set_system_multicast_routing_data(data) @final def _execute_fixed_route_router(self) -> None: """ Runs, times and logs the FixedRouteRouter if required. May set the "fixed_routes" data. """ with FecTimer("Fixed route router", TimerWork.OTHER) as timer: if timer.skip_if_cfg_false( "Machine", "enable_advanced_monitor_support"): return self._data_writer.set_fixed_routes(fixed_route_router( DataSpeedUpPacketGatherMachineVertex)) def _report_placements_with_application_graph(self) -> None: """ Writes, times and logs the application graph placer report if requested. """ if self._data_writer.get_n_vertices() == 0: return with FecTimer("Placements with application graph report", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false( "Reports", "write_application_graph_placer_report"): return placer_reports_with_application_graph() def _json_placements(self) -> None: """ Does, times and logs the writing of placements as JSON if requested. """ with FecTimer("Json placements", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false("Reports", "write_json_placements"): return write_json_placements() @final def _execute_application_router(self) -> None: """ Runs, times and logs the ApplicationRouter. Sets the "routing_table_by_partition" data if called .. note:: Calling of this method is based on the configuration router value """ with FecTimer("Application Router", TimerWork.RUNNING): self._data_writer.set_routing_table_by_partition( route_application_graph()) @final def _do_routing(self) -> None: """ Runs, times and logs one of the routers. Sets the "routing_table_by_partition" data Which router is run depends on the configuration router value This method is the entry point for adding a new Router :raise ConfigurationException: if the configuration router value is unexpected """ name = get_config_str("Mapping", "router") if name == "ApplicationRouter": return self._execute_application_router() if "," in name: raise ConfigurationException( "Only a single algorithm is supported for router") raise ConfigurationException( f"Unexpected cfg setting router: {name}") def _execute_basic_tag_allocator(self) -> None: """ Runs, times and logs the Tag Allocator. Sets the "tag" data """ with FecTimer("Basic tag allocator", TimerWork.OTHER): self._data_writer.set_tags(basic_tag_allocator()) def _report_tag_allocations(self) -> None: """ Write, times and logs the tag allocator report if requested. """ with FecTimer("Tag allocator report", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false( "Reports", "write_tag_allocation_reports"): return tag_allocator_report() @final def _execute_global_allocate( self, extra_allocations: Iterable[Tuple[ApplicationVertex, str]]): """ Runs, times and logs the Global Zoned Routing Info Allocator. Sets "routing_info" is called .. note:: Calling of this method is based on the configuration info_allocator value """ with FecTimer("Global allocate", TimerWork.OTHER): self._data_writer.set_routing_infos( global_allocate(extra_allocations)) @final def _execute_flexible_allocate( self, extra_allocations: Iterable[Tuple[ApplicationVertex, str]]): """ Runs, times and logs the Zoned Routing Info Allocator. Sets "routing_info" is called .. note:: Calling of this method is based on the configuration info_allocator value """ with FecTimer("Zoned routing info allocator", TimerWork.OTHER): self._data_writer.set_routing_infos( flexible_allocate(extra_allocations)) @final def _do_info_allocator(self) -> None: """ Runs, times and logs one of the info allocators. Sets the "routing_info" data Which allocator is run depends on the configuration info_allocator value. This method is the entry point for adding a new Info Allocator :param list(tuple(ApplicationVertex,str)) extra_allocations: Additional (vertex, partition identifier) pairs to allocate keys to. These might not appear in partitions in the graph due to being added by the system. :raise ConfigurationException: if the configuration info_allocator value is unexpected """ name = get_config_str("Mapping", "info_allocator") if name == "GlobalZonedRoutingInfoAllocator": return self._execute_global_allocate([]) if name == "ZonedRoutingInfoAllocator": return self._execute_flexible_allocate([]) if "," in name: raise ConfigurationException( "Only a single algorithm is supported for info_allocator") raise ConfigurationException( f"Unexpected cfg setting info_allocator: {name}") def _report_router_info(self) -> None: """ Writes, times and logs the router info report if requested. """ with FecTimer("Router info report", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false("Reports", "write_router_info_report"): return routing_info_report([]) @final def _execute_basic_routing_table_generator(self) -> None: """ Runs, times and logs the Routing Table Generator. .. note:: Currently no other Routing Table Generator supported. To add an additional Generator copy the pattern of do_placer """ with FecTimer("Basic routing table generator", TimerWork.OTHER): self._data_writer.set_uncompressed(basic_routing_table_generator()) @final def _execute_merged_routing_table_generator(self) -> None: """ Runs, times and logs the Routing Table Generator. .. note:: Currently no other Routing Table Generator supported. To add an additional Generator copy the pattern of do_placer """ with FecTimer("Merged routing table generator", TimerWork.OTHER): self._data_writer.set_uncompressed( merged_routing_table_generator()) # TODO Nuke ZonedRoutingTableGenerator @final def _do_routing_table_generator(self) -> None: """ Runs, times and logs one of the routing table generators. Sets the "routing_info" data Which allocator is run depends on the configuration's `routing_table_generator` value. This method is the entry point for adding a new routing table generator. :raise ConfigurationException: if the configuration's `routing_table_generator` value is unexpected """ name = get_config_str("Mapping", "routing_table_generator") if name == "BasicRoutingTableGenerator": return self._execute_basic_routing_table_generator() if name == "MergedRoutingTableGenerator": return self._execute_merged_routing_table_generator() if "," in name: raise ConfigurationException( "Only a single algorithm is supported for" " routing_table_generator") raise ConfigurationException( f"Unexpected cfg setting routing_table_generator: {name}") def _report_routers(self) -> None: """ Write, times and logs the router report if requested. """ with FecTimer("Router report", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false("Reports", "write_router_reports"): return router_report_from_paths() def _report_router_summary(self) -> None: """ Write, times and logs the router summary report if requested. """ with FecTimer("Router summary report", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false( "Reports", "write_router_summary_report"): return router_summary_report() def _json_routing_tables(self) -> None: """ Write, time and log the routing tables as JSON if requested. """ with FecTimer("Json routing tables", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false("Reports", "write_json_routing_tables"): return write_json_routing_tables(self._data_writer.get_uncompressed()) # Output ignored as never used def _report_drift(self, start: bool) -> None: """ Write, time and log the inter-board timer drift. :param bool start: Is this the start or the end """ with FecTimer("Drift report", TimerWork.REPORT) as timer: if timer.skip_if_virtual_board(): return if start and timer.skip_if_cfg_false( "Reports", "write_drift_report_start"): return if not start and timer.skip_if_cfg_false( "Reports", "write_drift_report_end"): return drift_report() @final def _execute_locate_executable_start_type(self) -> None: """ Runs, times and logs LocateExecutableStartType if required. May set the executable_types data. """ with FecTimer("Locate executable start type", TimerWork.OTHER): self._data_writer.set_executable_types( locate_executable_start_type()) @final def _execute_buffer_manager_creator(self) -> None: """ Run, times and logs the buffer manager creator if required. May set the buffer_manager data """ if self._data_writer.has_buffer_manager(): return with FecTimer("Buffer manager creator", TimerWork.OTHER) as timer: if timer.skip_if_virtual_board(): return self._data_writer.set_buffer_manager(BufferManager()) def _execute_sdram_outgoing_partition_allocator(self) -> None: """ Runs, times and logs the SDRAMOutgoingPartitionAllocator. """ with FecTimer("SDRAM outgoing partition allocator", TimerWork.OTHER): sdram_outgoing_partition_allocator() def _execute_control_sync(self, do_sync: bool) -> None: """ Control synchronisation on board. :param bool do_sync: Whether to enable synchronisation """ with FecTimer("Control Sync", TimerWork.CONTROL) as timer: if timer.skip_if_virtual_board(): return self._data_writer.get_transceiver().control_sync(do_sync) def _do_mapping(self, total_run_time: Optional[float]) -> None: """ Runs, times and logs all the algorithms in the mapping stage. :param float total_run_time: """ FecTimer.start_category(TimerCategory.MAPPING) self._setup_java_caller() self._do_extra_mapping_algorithms() self._report_network_specification() self._execute_splitter_reset() self._execute_splitter_selector() self._execute_delay_support_adder() self._execute_splitter_partitioner() allocator_data = self._execute_allocator(total_run_time) self._execute_machine_generator(allocator_data) self._json_machine() self._report_board_chip() system_placements = Placements() self._add_commands_to_command_sender(system_placements) self._execute_split_lpg_vertices(system_placements) self._execute_insert_chip_power_monitors(system_placements) self._execute_insert_extra_monitor_vertices(system_placements) self._report_partitioner() self._do_placer(system_placements) self._report_placements_with_application_graph() self._json_placements() self._execute_system_multicast_routing_generator() self._execute_fixed_route_router() self._do_routing() self._execute_basic_tag_allocator() self._report_tag_allocations() self._do_info_allocator() self._report_router_info() self._do_routing_table_generator() self._report_uncompressed_routing_table() self._report_routers() self._report_router_summary() self._json_routing_tables() self._execute_locate_executable_start_type() self._execute_buffer_manager_creator() FecTimer.end_category(TimerCategory.MAPPING) # Overridden by spy which adds placement_order def _execute_graph_data_specification_writer(self) -> None: """ Runs, times, and logs the GraphDataSpecificationWriter. Creates and fills the data spec database """ with FecTimer("Graph data specification writer", TimerWork.OTHER): self._data_writer.set_ds_database_path( graph_data_specification_writer()) def _do_data_generation(self) -> None: """ Runs, Times and logs the data generation. """ self._execute_sdram_outgoing_partition_allocator() self._execute_graph_data_specification_writer() def _execute_routing_setup(self) -> None: """ Runs, times and logs the RoutingSetup if required. """ if self._multicast_routes_loaded: return with FecTimer("Routing setup", TimerWork.LOADING) as timer: if timer.skip_if_virtual_board(): return # Only needs the x and y of chips with routing tables routing_setup() def _execute_graph_binary_gatherer(self) -> None: """ Runs, times and logs the GraphBinaryGatherer if required. """ with FecTimer("Graph binary gatherer", TimerWork.OTHER) as timer: try: self._data_writer.set_executable_targets( graph_binary_gatherer()) except KeyError: if get_config_bool("Machine", "virtual_board"): logger.warning( "Ignoring executable not found as using virtual") timer.error("executable not found and virtual board") return raise @final def _execute_host_bitfield_compressor(self) -> Optional[ MulticastRoutingTables]: """ Runs, times and logs the HostBasedBitFieldRouterCompressor .. note:: Calling of this method is based on the configuration compressor or virtual_compressor value :return: Compressed routing tables :rtype: ~pacman.model.routing_tables.MulticastRoutingTables """ with FecTimer("Host based bitfield router compressor", TimerWork.OTHER) as timer: if timer.skip_if_virtual_board(): return None self._multicast_routes_loaded = False compressed = host_based_bit_field_router_compressor() return compressed @final def _execute_ordered_covering_compressor(self) -> MulticastRoutingTables: """ Runs, times and logs the OrderedCoveringCompressor. .. note:: Calling of this method is based on the configuration compressor or virtual_compressor value :return: Compressed routing tables :rtype: ~pacman.model.routing_tables.MulticastRoutingTables """ with FecTimer("Ordered covering compressor", TimerWork.OTHER) as timer: self._multicast_routes_loaded = False precompressed = self._data_writer.get_precompressed() if self._compression_skipable(precompressed): timer.skip("Tables already small enough") return precompressed return ordered_covering_compressor() @final def _execute_ordered_covering_compression(self) -> Optional[ MulticastRoutingTables]: """ Runs, times and logs the ordered covering compressor on machine. .. note:: Calling of this method is based on the configuration compressor or virtual_compressor value """ with FecTimer("Ordered covering compressor", TimerWork.COMPRESSING) as timer: if timer.skip_if_virtual_board(): return None precompressed = self._data_writer.get_precompressed() if self._compression_skipable(precompressed): timer.skip("Tables already small enough") self._multicast_routes_loaded = False return precompressed ordered_covering_compression() self._multicast_routes_loaded = True return None @final def _execute_pair_compressor(self) -> MulticastRoutingTables: """ Runs, times and logs the PairCompressor. .. note:: Calling of this method is based on the configuration compressor or virtual_compressor value :return: Compressed routing table :rtype: ~pacman.model.routing_tables.MulticastRoutingTables """ with FecTimer("Pair compressor", TimerWork.OTHER) as timer: precompressed = self._data_writer.get_precompressed() self._multicast_routes_loaded = False if self._compression_skipable(precompressed): timer.skip("Tables already small enough") return precompressed return pair_compressor() @final def _execute_pair_compression(self) -> Optional[MulticastRoutingTables]: """ Runs, times and logs the pair compressor on machine. .. note:: Calling of this method is based on the configuration compressor or virtual_compressor value """ with FecTimer("Pair on chip router compression", TimerWork.COMPRESSING) as timer: if timer.skip_if_virtual_board(): return None precompressed = self._data_writer.get_precompressed() if self._compression_skipable(precompressed): timer.skip("Tables already small enough") self._multicast_routes_loaded = False return precompressed pair_compression() self._multicast_routes_loaded = True return None @final def _execute_pair_unordered_compressor(self) -> MulticastRoutingTables: """ Runs, times and logs the CheckedUnorderedPairCompressor. .. note:: Calling of this method is based on the configuration compressor or virtual_compressor value :return: compressed routing tables :rtype: ~pacman.model.routing_tables.MulticastRoutingTables """ with FecTimer("Pair unordered compressor", TimerWork.OTHER) as timer: self._multicast_routes_loaded = False precompressed = self._data_writer.get_precompressed() if self._compression_skipable(precompressed): timer.skip("Tables already small enough") return precompressed return pair_compressor(ordered=False) def _compressor_name(self) -> Tuple[str, bool]: if get_config_bool("Machine", "virtual_board"): name = get_config_str_or_none("Mapping", "virtual_compressor") if name is None: logger.info("As no virtual_compressor specified " "using compressor setting") name = get_config_str("Mapping", "compressor") else: name = get_config_str("Mapping", "compressor") pre_compress = "BitField" not in name return name, pre_compress def _compression_skipable(self, tables) -> bool: if get_config_bool( "Mapping", "router_table_compress_as_far_as_possible"): return False machine = self._data_writer.get_machine() return (tables.get_max_number_of_entries() <= machine.min_n_router_enteries) def _execute_pre_compression(self, pre_compress: bool): name = get_config_str_or_none("Mapping", "precompressor") if not pre_compress or name is None: # Declare the precompressed data to be the uncompressed data self._data_writer.set_precompressed( self._data_writer.get_uncompressed()) return elif name != "Ranged": raise ConfigurationException( f"Unexpected cfg setting precompressor: {name}") with FecTimer("Ranged Compressor", TimerWork.OTHER) as timer: if self._compression_skipable( self._data_writer.get_uncompressed()): timer.skip("Tables already small enough") self._data_writer.set_precompressed( self._data_writer.get_uncompressed()) return self._data_writer.set_precompressed(range_compressor()) def _do_early_compression(self, name: str) -> Optional[ MulticastRoutingTables]: """ Calls a compressor based on the name provided. .. note:: This method is the entry point for adding a new compressor that can or must run early. :param str name: Name of a compressor :return: CompressedRoutingTables (likely to be `None)`, RouterCompressorProvenanceItems (may be an empty list) :rtype: tuple(~pacman.model.routing_tables.MulticastRoutingTables or None, list(ProvenanceDataItem)) :raise ConfigurationException: if the name is not expected """ if name == "OrderedCoveringCompressor": return self._execute_ordered_covering_compressor() elif name == "OrderedCoveringOnChipRouterCompression": return self._execute_ordered_covering_compression() elif name == "PairCompressor": return self._execute_pair_compressor() elif name == "PairOnChipRouterCompression": return self._execute_pair_compression() elif name == "PairUnorderedCompressor": return self._execute_pair_unordered_compressor() # delay compression until later return None def _do_delayed_compression( self, name: str, compressed: Optional[MulticastRoutingTables]) -> Optional[ MulticastRoutingTables]: """ Run compression that must be delayed until later. .. note:: This method is the entry point for adding a new compressor that can not run at the normal place :param str name: Name of a compressor :return: CompressedRoutingTables (likely to be `None`), RouterCompressorProvenanceItems (may be an empty list) :rtype: ~pacman.model.routing_tables.MulticastRoutingTables or None :raise ConfigurationException: if the name is not expected """ if self._multicast_routes_loaded or compressed: # Already compressed return compressed # overridden in spy to handle: # SpynnakerMachineBitFieldOrderedCoveringCompressor # SpynnakerMachineBitFieldPairRouterCompressor if name == "HostBasedBitFieldRouterCompressor": return self._execute_host_bitfield_compressor() if "," in name: raise ConfigurationException( "Only a single algorithm is supported for compressor") raise ConfigurationException( f"Unexpected cfg setting compressor: {name}") @final def _execute_load_routing_tables( self, compressed: Optional[MulticastRoutingTables]) -> None: """ Runs, times and logs the RoutingTableLoader if required. :param compressed: :type compressed: ~.MulticastRoutingTables or None """ if not compressed: return with FecTimer("Routing table loader", TimerWork.LOADING) as timer: self._multicast_routes_loaded = True if timer.skip_if_virtual_board(): return routing_table_loader(compressed) def _report_uncompressed_routing_table(self) -> None: """ Runs, times and logs the router report from router tables if requested. """ with FecTimer("Uncompressed routing table report", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false( "Reports", "write_routing_table_reports"): return router_report_from_router_tables() def _report_bit_field_compressor(self) -> None: """ Runs, times and logs the BitFieldCompressorReport if requested. """ with FecTimer("Bitfield compressor report", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false( "Reports", "write_bit_field_compressor_report"): return # BitFieldSummary output ignored as never used bitfield_compressor_report() def _execute_load_fixed_routes(self) -> None: """ Runs, times and logs Load Fixed Routes if required. """ with FecTimer("Load fixed routes", TimerWork.LOADING) as timer: if timer.skip_if_cfg_false( "Machine", "enable_advanced_monitor_support"): return if timer.skip_if_virtual_board(): return load_fixed_routes() def _execute_load_system_data_specification(self) -> None: """ Runs, times and logs the load_system_data_specs if required. """ with FecTimer( "Load system data specification", TimerWork.OTHER) as timer: if timer.skip_if_virtual_board(): return load_system_data_specs() def _execute_load_system_executable_images(self) -> None: """ Runs, times and logs the loading of executable images. """ with FecTimer( "Load executable system Images", TimerWork.LOADING) as timer: if timer.skip_if_virtual_board(): return load_sys_images() def _execute_load_application_data_specification(self) -> None: """ Runs, times and logs :py:meth:`load_application_data_specs` if required. :return: map of placement and DSG data, and loaded data flag. :rtype: dict(tuple(int,int,int),DataWritten) or DsWriteInfo """ with FecTimer("Load Application data specification", TimerWork.LOADING) as timer: if timer.skip_if_virtual_board(): return return load_application_data_specs() def _execute_tags_from_machine_report(self) -> None: """ Run, times and logs the TagsFromMachineReport if requested. """ with FecTimer( "Tags from machine report", TimerWork.EXTRACTING) as timer: if timer.skip_if_virtual_board(): return if timer.skip_if_cfg_false( "Reports", "write_tag_allocation_reports"): return tags_from_machine_report() def _execute_load_tags(self) -> None: """ Runs, times and logs the Tags Loader if required. """ # TODO why: if graph_changed or data_changed: with FecTimer("Tags Loader", TimerWork.LOADING) as timer: if timer.skip_if_virtual_board(): return tags_loader() def _do_extra_load_algorithms(self) -> None: """ Runs, times and logs any extra load algorithms. """ def _report_memory_on_host(self) -> None: """ Runs, times and logs MemoryMapOnHostReport if requested. """ with FecTimer("Memory report", TimerWork.REPORT) as timer: if timer.skip_if_virtual_board(): return if timer.skip_if_cfg_false( "Reports", "write_memory_map_report"): return memory_map_on_host_report() def _report_memory_on_chip(self) -> None: """ Runs, times and logs MemoryMapOnHostChipReport if requested. """ with FecTimer("Memory report", TimerWork.REPORT) as timer: if timer.skip_if_virtual_board(): return if timer.skip_if_cfg_false( "Reports", "write_memory_map_report"): return memory_map_on_host_chip_report() # TODO consider different cfg flags def _report_compressed(self, compressed: Optional[ MulticastRoutingTables]) -> None: """ Runs, times and logs the compressor reports if requested. :param compressed: :type compressed: ~.MulticastRoutingTables or None """ with FecTimer("Compressor report", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false( "Reports", "write_routing_table_reports"): return if timer.skip_if_cfg_false( "Reports", "write_routing_tables_from_machine_reports"): return if compressed is None: if timer.skip_if_virtual_board(): return compressed = read_routing_tables_from_machine() router_report_from_compressed_router_tables(compressed) generate_comparison_router_report(compressed) router_compressed_summary_report(compressed) routing_table_from_machine_report(compressed) def _report_fixed_routes(self) -> None: """ Runs, times and logs the FixedRouteFromMachineReport if requested. """ with FecTimer("Fixed route report", TimerWork.REPORT) as timer: if timer.skip_if_virtual_board(): return if timer.skip_if_cfg_false( "Machine", "enable_advanced_monitor_support"): return # TODO at the same time as LoadFixedRoutes? fixed_route_from_machine_report() def _execute_application_load_executables(self) -> None: """ Algorithms needed for loading the binaries to the SpiNNaker machine. """ with FecTimer("Load executable app images", TimerWork.LOADING) as timer: if timer.skip_if_virtual_board(): return load_app_images() def _do_load(self) -> None: """ Runs, times and logs the load algorithms. """ FecTimer.start_category(TimerCategory.LOADING) if self._data_writer.get_requires_mapping(): self._execute_routing_setup() self._execute_graph_binary_gatherer() # loading_algorithms compressor, pre_compress = self._compressor_name() self._execute_pre_compression(pre_compress) compressed = self._do_early_compression(compressor) self._do_data_generation() self._execute_control_sync(False) if self._data_writer.get_requires_mapping(): self._execute_load_fixed_routes() self._execute_load_system_data_specification() self._execute_load_system_executable_images() self._execute_load_tags() self._execute_load_application_data_specification() self._do_extra_load_algorithms() compressed = self._do_delayed_compression(compressor, compressed) self._execute_load_routing_tables(compressed) self._report_bit_field_compressor() # TODO Was master correct to run the report first? self._execute_tags_from_machine_report() if self._data_writer.get_requires_mapping(): self._report_memory_on_host() self._report_memory_on_chip() self._report_compressed(compressed) self._report_fixed_routes() self._execute_application_load_executables() FecTimer.end_category(TimerCategory.LOADING) def _report_sdram_usage_per_chip(self) -> None: # TODO why in do run with FecTimer("Sdram usage per chip report", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false( "Reports", "write_sdram_usage_report_per_chip"): return sdram_usage_report_per_chip() def _execute_dsg_region_reloader(self) -> None: """ Runs, times and logs the DSGRegionReloader if required. Reload any parameters over the loaded data if we have already run and not using a virtual board and the data hasn't already been regenerated """ if not self._data_writer.is_ran_ever(): return if self._data_writer.is_hard_reset(): return with FecTimer("DSG region reloader", TimerWork.LOADING) as timer: if timer.skip_if_virtual_board(): return reload_dsg_regions() def _execute_graph_provenance_gatherer(self) -> None: """ Runs, times and log the GraphProvenanceGatherer if requested. """ with FecTimer("Graph provenance gatherer", TimerWork.OTHER) as timer: if timer.skip_if_cfg_false("Reports", "read_graph_provenance_data"): return graph_provenance_gatherer() def _execute_placements_provenance_gatherer(self) -> None: """ Runs, times and log the PlacementsProvenanceGatherer if requested. """ with FecTimer( "Placements provenance gatherer", TimerWork.OTHER) as timer: if timer.skip_if_cfg_false("Reports", "read_placements_provenance_data"): return if timer.skip_if_virtual_board(): return # Also used in recover from error where is is not all placements placements_provenance_gatherer( self._data_writer.get_n_placements(), self._data_writer.iterate_placemements()) def _execute_router_provenance_gatherer(self) -> None: """ Runs, times and log the RouterProvenanceGatherer if requested. """ with FecTimer( "Router provenance gatherer", TimerWork.EXTRACTING) as timer: if timer.skip_if_cfg_false("Reports", "read_router_provenance_data"): return if timer.skip_if_virtual_board(): return router_provenance_gatherer() def _execute_profile_data_gatherer(self) -> None: """ Runs, times and logs the ProfileDataGatherer if requested. """ with FecTimer("Profile data gatherer", TimerWork.EXTRACTING) as timer: if timer.skip_if_cfg_false("Reports", "read_profile_data"): return if timer.skip_if_virtual_board(): return profile_data_gatherer() def _do_read_provenance(self) -> None: """ Runs, times and log the methods that gather provenance. :rtype: list(ProvenanceDataItem) """ self._execute_graph_provenance_gatherer() self._execute_placements_provenance_gatherer() self._execute_router_provenance_gatherer() self._execute_profile_data_gatherer() def _report_energy(self) -> None: """ Runs, times and logs the energy report if requested. """ with FecTimer("Energy report", TimerWork.REPORT) as timer: if timer.skip_if_cfg_false("Reports", "write_energy_report"): return if timer.skip_if_virtual_board(): return # TODO runtime is None power_used = compute_energy_used() energy_provenance_reporter(power_used) # create energy reporter energy_reporter = EnergyReport() # run energy report energy_reporter.write_energy_report(power_used) def _do_provenance_reports(self) -> None: """ Runs any reports based on provenance. """ def _execute_clear_io_buf(self) -> None: """ Runs, times and logs the ChipIOBufClearer if required. """ if self._data_writer.get_current_run_timesteps() is None: return with FecTimer("Clear IO buffer", TimerWork.CONTROL) as timer: if timer.skip_if_virtual_board(): return # TODO Why check empty_graph is always false?? if timer.skip_if_cfg_false("Reports", "clear_iobuf_during_run"): return chip_io_buf_clearer() def _execute_runtime_update(self, n_sync_steps: int) -> None: """ Runs, times and logs the runtime updater if required. :param int n_sync_steps: The number of timesteps between synchronisations """ with FecTimer("Runtime Update", TimerWork.LOADING) as timer: if timer.skip_if_virtual_board(): return if (ExecutableType.USES_SIMULATION_INTERFACE in self._data_writer.get_executable_types()): chip_runtime_updater(n_sync_steps) else: timer.skip("No Simulation Interface used") def _execute_create_database_interface( self, run_time: Optional[float]) -> None: """ Runs, times and logs Database Interface Creator. Sets the _database_file_path data object :param int run_time: the run duration in milliseconds. """ with FecTimer("Create database interface", TimerWork.OTHER): # Used to used compressed routing tables if available on host # TODO consider not saving router tables. self._data_writer.set_database_file_path( database_interface(run_time)) def _execute_create_notifiaction_protocol(self) -> None: """ Runs, times and logs the creation of the Notification Protocol. Sets the notification_interface data object """ with FecTimer("Create notification protocol", TimerWork.OTHER): self._data_writer.set_notification_protocol( create_notification_protocol()) def _execute_runner( self, n_sync_steps: int, run_time: Optional[float]) -> None: """ Runs, times and logs the ApplicationRunner. :param int n_sync_steps: The number of timesteps between synchronisations :param int run_time: the run duration in milliseconds. """ with FecTimer(FecTimer.APPLICATION_RUNNER, TimerWork.RUNNING) as timer: if timer.skip_if_virtual_board(): return # Don't timeout if a stepped mode is in operation if n_sync_steps: time_threshold = None else: time_threshold = get_config_int( "Machine", "post_simulation_overrun_before_error") application_runner( run_time, time_threshold, self._run_until_complete) def _execute_extract_iobuff(self) -> None: """ Runs, times and logs the ChipIOBufExtractor if required. """ with FecTimer("Extract IO buff", TimerWork.EXTRACTING) as timer: if timer.skip_if_virtual_board(): return if timer.skip_if_cfg_false("Reports", "extract_iobuf"): return # ErrorMessages, WarnMessages output ignored as never used! chip_io_buf_extractor() def _execute_buffer_extractor(self) -> None: """ Runs, times and logs the BufferExtractor if required. """ with FecTimer("Buffer extractor", TimerWork.EXTRACT_DATA) as timer: if timer.skip_if_virtual_board(): return bm = self._data_writer.get_buffer_manager() bm.get_placement_data() def _do_extract_from_machine(self) -> None: """ Runs, times and logs the steps to extract data from the machine. :param run_time: the run duration in milliseconds. :type run_time: int or None """ self._execute_extract_iobuff() self._execute_buffer_extractor() self._execute_clear_io_buf() # FinaliseTimingData never needed as just pushed self._ to inputs self._do_read_provenance() self._report_energy() self._do_provenance_reports() def __do_run( self, n_machine_time_steps: Optional[int], n_sync_steps: int) -> None: """ Runs, times and logs the do run steps. :param n_machine_time_steps: Number of timesteps run :type n_machine_time_steps: int or None :param int n_sync_steps: The number of timesteps between synchronisations """ # TODO virtual board FecTimer.start_category(TimerCategory.RUN_LOOP) run_time = None if n_machine_time_steps is not None: run_time = (n_machine_time_steps * self._data_writer.get_simulation_time_step_ms()) self._data_writer.increment_current_run_timesteps( n_machine_time_steps) self._report_sdram_usage_per_chip() self._report_drift(start=True) if self._data_writer.get_requires_mapping(): self._execute_create_database_interface(run_time) self._execute_create_notifiaction_protocol() if (self._data_writer.is_ran_ever() and not self._data_writer.get_requires_mapping() and not self._data_writer.get_requires_data_generation()): self._execute_dsg_region_reloader() self._execute_runtime_update(n_sync_steps) self._execute_runner(n_sync_steps, run_time) if n_machine_time_steps is not None or self._run_until_complete: self._do_extract_from_machine() # reset at the end of each do_run cycle self._report_drift(start=False) self._execute_control_sync(True) FecTimer.end_category(TimerCategory.RUN_LOOP) def _do_run( self, n_machine_time_steps: Optional[int], n_sync_steps: int) -> None: """ Runs, times and logs the do run steps. :param n_machine_time_steps: Number of timesteps run :type n_machine_time_steps: int or None :param int n_sync_steps: The number of timesteps between synchronisations """ try: self.__do_run(n_machine_time_steps, n_sync_steps) except KeyboardInterrupt: logger.error("User has aborted the simulation") self._shutdown() sys.exit(1) except Exception as run_e: self._recover_from_error(run_e) # re-raise exception raise run_e def _recover_from_error(self, exception: Exception) -> None: """ :param Exception exception: """ try: self.__recover_from_error(exception) except Exception as rec_e: logger.exception( f"Error {rec_e} when attempting to recover from error") def __recover_from_error(self, exception: Exception) -> None: """ :param Exception exception: """ # if exception has an exception, print to system logger.error("An error has occurred during simulation") # Print the detail including the traceback logger.error(exception) logger.info("\n\nAttempting to extract data\n\n") # Extract router provenance try: router_provenance_gatherer() except Exception: logger.exception("Error reading router provenance") # Find the cores that are not in an expected state unsuccessful_cores = CPUInfos() if isinstance(exception, SpiNNManCoresNotInStateException): unsuccessful_cores = exception.failed_core_states() # If there are no cores in a bad state, find those not yet in # their finished state transceiver = self._data_writer.get_transceiver() if not unsuccessful_cores: for executable_type, core_subsets in \ self._data_writer.get_executable_types().items(): unsuccessful_cores = transceiver.get_cpu_infos( core_subsets, executable_type.end_state, False) # Print the details of error cores logger.error(unsuccessful_cores.get_status_string()) # Find the cores that are not in RTE i.e. that can still be read non_rte_cores = unsuccessful_cores.infos_not_in_states( [CPUState.RUN_TIME_EXCEPTION, CPUState.WATCHDOG]) # If there are any cores that are not in RTE, extract data from them if (non_rte_cores and ExecutableType.USES_SIMULATION_INTERFACE in self._data_writer.get_executable_types()): non_rte_core_subsets = CoreSubsets() for (x, y, p) in non_rte_cores: non_rte_core_subsets.add_processor(x, y, p) # Attempt to force the cores to write provenance and exit try: chip_provenance_updater(non_rte_core_subsets) except Exception: logger.exception("Could not update provenance on chip") # Extract any written provenance data try: transceiver = self._data_writer.get_transceiver() finished_cores = transceiver.get_cpu_infos( non_rte_core_subsets, CPUState.FINISHED, True) finished_placements = Placements() for (x, y, p) in finished_cores: try: placement = self._data_writer.\ get_placement_on_processor(x, y, p) finished_placements.add_placement(placement) except Exception: # pylint: disable=broad-except pass # already recovering from error placements_provenance_gatherer( finished_placements.n_placements, finished_placements.placements) except Exception as pro_e: logger.exception(f"Could not read provenance due to {pro_e}") # Read IOBUF where possible (that should be everywhere) iobuf = IOBufExtractor() try: errors, warnings = iobuf.extract_iobuf() except Exception: logger.exception("Could not get iobuf") errors, warnings = [], [] # Print the IOBUFs self._print_iobuf(errors, warnings) @staticmethod def _print_iobuf(errors: Iterable[str], warnings: Iterable[str]): """ :param list(str) errors: :param list(str) warnings: """ for warning in warnings: logger.warning(warning) for error in errors: logger.error(error)
[docs] def reset(self) -> None: """ Puts the simulation back at time zero. """ FecTimer.start_category(TimerCategory.RESETTING) if not self._data_writer.is_ran_last(): if not self._data_writer.is_ran_ever(): logger.error("Ignoring the reset before the run") else: logger.error("Ignoring the repeated reset call") return logger.info("Resetting") if self._data_writer.get_user_accessed_machine(): logger.warning( "A reset after a get machine call is always hard and " "therefore the previous machine is no longer valid") self._hard_reset() else: self._data_writer.soft_reset() # rewind the buffers from the buffer manager, to start at the beginning # of the simulation again and clear buffered out if self._data_writer.has_buffer_manager(): self._data_writer.get_buffer_manager().reset() # Reset the graph off the machine, to set things to time 0 self.__reset_graph_elements() FecTimer.end_category(TimerCategory.RESETTING)
def __repr__(self) -> str: if self._data_writer.has_ipaddress(): return (f"general front end instance for machine " f"{self._data_writer.get_ipaddress()}") else: return "general front end instance no machine set" def _shutdown(self) -> None: # if stopping on machine, clear IP tags and routing table self.__clear() # stop the transceiver and allocation controller if self._data_writer.has_transceiver(): transceiver = self._data_writer.get_transceiver() transceiver.stop_application(self._data_writer.get_app_id()) self.__close_allocation_controller() self._data_writer.clear_notification_protocol() FecTimer.stop_category_timing() self._data_writer.shut_down() def __clear(self) -> None: if not self._data_writer.has_transceiver(): return transceiver = self._data_writer.get_transceiver() if get_config_bool("Machine", "clear_tags"): for ip_tag in self._data_writer.get_tags().ip_tags: transceiver.clear_ip_tag( ip_tag.tag, board_address=ip_tag.board_address) for reverse_ip_tag in self._data_writer.get_tags().reverse_ip_tags: transceiver.clear_ip_tag( reverse_ip_tag.tag, board_address=reverse_ip_tag.board_address) # if clearing routing table entries, clear if get_config_bool("Machine", "clear_routing_tables"): for router_table in self._data_writer.get_uncompressed(): transceiver.clear_multicast_routes( router_table.x, router_table.y) def __close_allocation_controller(self) -> None: if FecDataView.has_allocation_controller(): FecDataView.get_allocation_controller().close() self._data_writer.set_allocation_controller(None)
[docs] def stop(self) -> None: """ End running of the simulation. """ self._data_writer.stopping() FecTimer.start_category(TimerCategory.SHUTTING_DOWN) # If we have run forever, stop the binaries try: if (self._data_writer.is_ran_ever() and self._data_writer.get_current_run_timesteps() is None and not get_config_bool("Machine", "virtual_board") and not self._run_until_complete): self._do_stop_workflow() elif get_config_bool("Reports", "read_provenance_data_on_end"): self._do_read_provenance() except Exception as e: self._recover_from_error(e) self.write_errored_file() raise finally: # shut down the machine properly self._shutdown() self.write_finished_file()
# No matching FecTimer.end_category as shutdown stops timer def _execute_application_finisher(self) -> None: with FecTimer("Application finisher", TimerWork.CONTROL): application_finisher() def _do_stop_workflow(self) -> None: self._execute_application_finisher() self._do_extract_from_machine()
[docs] def stop_run(self) -> None: """ Request that the current infinite run stop. .. note:: This will need to be called from another thread as the infinite run call is blocking. :raises SpiNNUtilsException: If the stop_run was not expected in the current state. """ # Do not do start category here # as called from a different thread while running if self._data_writer.is_stop_already_requested(): logger.warning("Second Request to stop_run ignored") return with self._state_condition: self._data_writer.request_stop() self._state_condition.notify_all()
[docs] def continue_simulation(self) -> None: """ Continue a simulation that has been started in stepped mode. """ sync_signal = self._data_writer.get_next_sync_signal() transceiver = self._data_writer.get_transceiver() transceiver.send_signal(self._data_writer.get_app_id(), sync_signal)
@staticmethod def __reset_object(obj) -> None: # Reset an object if appropriate if isinstance(obj, AbstractCanReset): obj.reset_to_first_timestep() def __reset_graph_elements(self) -> None: # Reset any object that can reset for vertex in self._data_writer.iterate_vertices(): self.__reset_object(vertex) for p in self._data_writer.iterate_partitions(): for edge in p.edges: self.__reset_object(edge)