# Copyright (c) 2021 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
import math
import os
import time
from typing import Dict, Optional, Tuple
from spinn_utilities.config_holder import (
get_config_int, get_config_int_or_none, get_config_str)
from spinn_utilities.log import FormatAdapter
from spinn_utilities.overrides import overrides
from spinn_utilities.typing.coords import XY
from spinn_machine import Chip, CoreSubsets, RoutingEntry
from spinnman.data.spinnman_data_writer import SpiNNManDataWriter
from spinnman.messages.scp.enums.signal import Signal
from spinnman.model import ExecutableTargets
from spinnman.model.enums import ExecutableType
from pacman.data.pacman_data_writer import PacmanDataWriter
from pacman.model.routing_tables import MulticastRoutingTables
from pacman.model.graphs.application import ApplicationVertex
from spinn_front_end_common.utilities.notification_protocol import (
NotificationProtocol)
from spinn_front_end_common.interface.buffer_management import BufferManager
from spinn_front_end_common.interface.interface_functions.spalloc_allocator \
import SpallocJobController
from spinn_front_end_common.interface.java_caller import JavaCaller
from spinn_front_end_common.utilities.constants import (
MICRO_TO_MILLISECOND_CONVERSION, MICRO_TO_SECOND_CONVERSION)
from spinn_front_end_common.utilities.exceptions import ConfigurationException
from spinn_front_end_common.utility_models import (
DataSpeedUpPacketGatherMachineVertex, ExtraMonitorSupportMachineVertex)
from spinn_front_end_common.abstract_models.impl import (
MachineAllocationController)
from .fec_data_view import FecDataView, _FecDataModel
logger = FormatAdapter(logging.getLogger(__name__))
__temp_dir = None
REPORTS_DIRNAME = "reports"
[docs]
class FecDataWriter(PacmanDataWriter, SpiNNManDataWriter, FecDataView):
"""
See :py:class:`~spinn_utilities.data.utils_data_writer.UtilsDataWriter`.
This class is designed to only be used directly by
:py:class:`AbstractSpinnakerBase`
and within the Non-PyNN repositories unit tests as all methods are
available to subclasses.
"""
__fec_data = _FecDataModel()
__slots__ = ()
# pylint: disable=protected-access
@overrides(PacmanDataWriter._mock)
def _mock(self) -> None:
PacmanDataWriter._mock(self)
self._spinnman_mock()
self.__fec_data._clear()
# run numbers start at 1 and when not running this is the next one
self.__fec_data._run_number = 1
self.set_up_timings(1000, 1)
@overrides(PacmanDataWriter._setup)
def _setup(self) -> None:
PacmanDataWriter._setup(self)
self._spinnman_setup()
self.__fec_data._clear()
# run numbers start at 1 and when not running this is the next one
self.__fec_data._run_number = 1
self.__create_reports_directory()
self.__create_timestamp_directory()
self.__create_run_dir_path()
[docs]
@overrides(PacmanDataWriter.finish_run)
def finish_run(self) -> None:
PacmanDataWriter.finish_run(self)
assert self.__fec_data._run_number is not None
self.__fec_data._run_number += 1
@overrides(PacmanDataWriter._hard_reset)
def _hard_reset(self) -> None:
if self.is_ran_last():
self.__fec_data._reset_number += 1
PacmanDataWriter._hard_reset(self)
SpiNNManDataWriter._local_hard_reset(self)
self.__fec_data._hard_reset()
self.__create_run_dir_path()
@overrides(PacmanDataWriter._soft_reset)
def _soft_reset(self) -> None:
if self.is_ran_last():
self.__fec_data._reset_number += 1
PacmanDataWriter._soft_reset(self)
SpiNNManDataWriter._local_soft_reset(self)
self.__fec_data._soft_reset()
def __create_run_dir_path(self) -> None:
self.set_run_dir_path(self._child_folder(
self.__fec_data._timestamp_dir_path,
f"run_{self.__fec_data._run_number}"))
def __create_reports_directory(self) -> None:
default_report_file_path = get_config_str(
"Reports", "default_report_file_path")
# determine common report folder
if default_report_file_path == "DEFAULT":
directory = os.getcwd()
else:
directory = default_report_file_path
# global reports folder
self.set_report_dir_path(
self._child_folder(directory, REPORTS_DIRNAME))
def __create_timestamp_directory(self) -> None:
while True:
try:
now = datetime.datetime.now()
timestamp = (
f"{now.year:04}-{now.month:02}-{now.day:02}-{now.hour:02}"
f"-{now.minute:02}-{now.second:02}-{now.microsecond:06}")
self.__fec_data._timestamp_dir_path = self._child_folder(
self.get_report_dir_path(), timestamp, must_create=True)
return
except OSError:
time.sleep(0.5)
[docs]
def set_allocation_controller(self, allocation_controller: Optional[
MachineAllocationController]):
"""
Sets the allocation controller variable.
:param MachineAllocationController allocation_controller:
"""
if allocation_controller and not isinstance(
allocation_controller, MachineAllocationController):
raise TypeError(
"allocation_controller must be a MachineAllocationController")
self.__fec_data._spalloc_job = None
self.__fec_data._allocation_controller = allocation_controller
if allocation_controller is None:
return
if allocation_controller.proxying:
if not isinstance(allocation_controller, SpallocJobController):
raise NotImplementedError(
"Expecting only the SpallocJobController to be proxying")
self.__fec_data._spalloc_job = allocation_controller.job
[docs]
def set_buffer_manager(self, buffer_manager: BufferManager):
"""
Sets the Buffer manager variable.
:param BufferManager buffer_manager:
"""
if not isinstance(buffer_manager, BufferManager):
raise TypeError("buffer_manager must be a BufferManager")
self.__fec_data._buffer_manager = buffer_manager
[docs]
def increment_current_run_timesteps(self, increment: Optional[int]):
"""
Increment the current_run_timesteps and sets first_machine_time_step.
A `None` increment signals run_forever
:param increment: The timesteps for this do_run loop
:type increment: int or None
"""
if increment is None:
if self.__fec_data._current_run_timesteps != 0:
raise NotImplementedError("Run forever after another run")
self.__fec_data._current_run_timesteps = None
return
if not isinstance(increment, int):
raise TypeError("increment should be an int (or None")
if increment < 0:
raise ConfigurationException(
f"increment {increment} must not be negative")
if self.__fec_data._current_run_timesteps is None:
raise NotImplementedError("Run after run forever")
self.__fec_data._first_machine_time_step = \
self.__fec_data._current_run_timesteps
self.__fec_data._current_run_timesteps += increment
[docs]
def set_max_run_time_steps(self, max_run_time_steps: int):
"""
Sets the max_run_time_steps value
:param int max_run_time_steps: new value
"""
if not isinstance(max_run_time_steps, int):
raise TypeError("max_run_time_steps should be an int")
if max_run_time_steps <= 0:
raise ConfigurationException(
f"max_run_time_steps {max_run_time_steps} must be positive")
self.__fec_data._max_run_time_steps = max_run_time_steps
[docs]
def set_up_timings(
self, simulation_time_step_us: Optional[int],
time_scale_factor: Optional[float],
default_time_scale_factor: Optional[float] = None):
"""
Set up timings for the simulation.
:param simulation_time_step_us:
An explicitly specified time step for the simulation in .
If `None`, the value is read from the configuration
:type simulation_time_step_us: int or None
:param time_scale_factor:
An explicitly specified time scale factor for the simulation.
If `None`, the value is read from the configuration
:type time_scale_factor: float or None
:param default_time_scale_factor:
A back up time scale factor for the simulation.
Only used if time_scale_factor parameter and configuration are
both `None`.
If `None`, the value is based on `simulation_time_step`
:type default_time_scale_factor: float or None
"""
try:
self._set_simulation_time_step(simulation_time_step_us)
self._set_time_scale_factor(
time_scale_factor, default_time_scale_factor)
self._set_hardware_timestep()
except ConfigurationException:
self.__fec_data._simulation_time_step_us = None
self.__fec_data._simulation_time_step_ms = None
self.__fec_data._simulation_time_step_per_ms = None
self.__fec_data._simulation_time_step_per_s = None
self.__fec_data._simulation_time_step_s = None
self.__fec_data._time_scale_factor = None
self.__fec_data._hardware_time_step_us = None
self.__fec_data._hardware_time_step_ms = None
raise
def _set_simulation_time_step(
self, simulation_time_step_us: Optional[int]):
"""
:param simulation_time_step_us:
An explicitly specified time step for the simulation. If `None`,
the value is read from the configuration
:type simulation_time_step: int or None
"""
if simulation_time_step_us is None:
simulation_time_step_us = get_config_int(
"Machine", "simulation_time_step")
if not isinstance(simulation_time_step_us, int):
raise TypeError("simulation_time_step_us should be an int")
if simulation_time_step_us <= 0:
raise ConfigurationException(
f'invalid simulation_time_step {simulation_time_step_us}'
': must greater than zero')
self.__fec_data._simulation_time_step_us = simulation_time_step_us
self.__fec_data._simulation_time_step_ms = (
simulation_time_step_us / MICRO_TO_MILLISECOND_CONVERSION)
self.__fec_data._simulation_time_step_per_ms = (
MICRO_TO_MILLISECOND_CONVERSION / simulation_time_step_us)
self.__fec_data._simulation_time_step_per_s = (
MICRO_TO_SECOND_CONVERSION / simulation_time_step_us)
self.__fec_data._simulation_time_step_s = (
simulation_time_step_us / MICRO_TO_SECOND_CONVERSION)
def _set_time_scale_factor(
self, time_scale_factor: Optional[float],
default_time_scale_factor: Optional[float]):
"""
Set up time_scale_factor.
If time_scale_factor is provide that is used
Then if configuration is not `None` that is used
Then if default is provided that is used
Lastly it is set based on the simulation_time_step
:param time_scale_factor:
An explicitly specified time scale factor for the simulation.
If `None`, the value is read from the configuration
:type time_scale_factor: float or None
"""
if time_scale_factor is None:
# Note while this reads from the cfg the cfg default is None
time_scale_factor = get_config_int_or_none(
"Machine", "time_scale_factor")
if time_scale_factor is None:
if default_time_scale_factor is not None:
time_scale_factor = default_time_scale_factor
if time_scale_factor is None:
time_scale_factor = max(
1.0, math.ceil(self.get_simulation_time_step_per_ms()))
if time_scale_factor > 1.0:
logger.warning(
"A timestep was entered that has forced SpiNNaker to "
"automatically slow the simulation down from real time "
f"by a factor of {time_scale_factor}.")
if not isinstance(time_scale_factor, (int, float)):
raise TypeError("app_id should be an int (or float)")
if time_scale_factor <= 0:
raise ConfigurationException(
f'invalid time_scale_factor {time_scale_factor}'
': must greater than zero')
self.__fec_data._time_scale_factor = time_scale_factor
def _set_hardware_timestep(self) -> None:
raw = (self.get_simulation_time_step_us() *
self.get_time_scale_factor())
rounded = round(raw)
if abs(rounded - raw) > 0.0001:
raise ConfigurationException(
"The multiplication of simulation time step in microseconds: "
f"{self.get_simulation_time_step_us()} and times scale factor"
f": {self.get_time_scale_factor()} produced a non integer "
f"hardware time step of {raw}")
logger.info(
"Setting hardware timestep as {} microseconds based on "
"simulation time step of {} and timescale factor of {}",
rounded, self.get_simulation_time_step_us(),
self.get_time_scale_factor())
self.__fec_data._hardware_time_step_us = rounded
self.__fec_data._hardware_time_step_ms = (
rounded / MICRO_TO_MILLISECOND_CONVERSION)
[docs]
def set_system_multicast_routing_data(
self, data: Tuple[
MulticastRoutingTables, Dict[XY, int], Dict[XY, int]]):
"""
Sets the system_multicast_routing_data.
These are: `data_in_multicast_routing_tables`,
`data_in_multicast_key_to_chip_map`,
`system_multicast_router_timeout_keys`
:param data: new value
:type data:
tuple(~pacman.model.routing_tables.MulticastRoutingTables,
dict(tuple(int,int),int), dict(tuple(int,int),int))
"""
routing_tables, key_to_chip_map, timeout_keys = data
if not isinstance(routing_tables, MulticastRoutingTables):
raise TypeError("First element must be a MulticastRoutingTables")
if not isinstance(key_to_chip_map, dict):
raise TypeError("Second element must be dict")
if not isinstance(timeout_keys, dict):
raise TypeError("Third element must be a dict")
self.__fec_data._data_in_multicast_key_to_chip_map = key_to_chip_map
self.__fec_data._data_in_multicast_routing_tables = routing_tables
self.__fec_data._system_multicast_router_timeout_keys = timeout_keys
[docs]
def set_n_required(self, n_boards_required: Optional[int],
n_chips_required: Optional[int]):
"""
Sets (if not `None`) the number of boards/chips requested by the user.
:param n_boards_required:
`None` or the number of boards requested by the user
:type n_boards_required: int or None
:param n_chips_required:
`None` or the number of chips requested by the user
:type n_chips_required: int or None
"""
if n_boards_required is None:
if n_chips_required is None:
return
elif not isinstance(n_chips_required, int):
raise TypeError("n_chips_required must be an int (or None)")
if n_chips_required <= 0:
raise ConfigurationException(
"n_chips_required must be positive and not "
f"{n_chips_required}")
else:
if n_chips_required is not None:
raise ConfigurationException(
"Illegal call with both both param provided as "
f"{n_boards_required}, {n_chips_required}")
if not isinstance(n_boards_required, int):
raise TypeError("n_boards_required must be an int (or None)")
if n_boards_required <= 0:
raise ConfigurationException(
"n_boards_required must be positive and not "
f"{n_boards_required}")
if self.__fec_data._n_boards_required is not None or \
self.__fec_data._n_chips_required is not None:
raise ConfigurationException(
"Illegal second call to set_n_required")
self.__fec_data._n_boards_required = n_boards_required
self.__fec_data._n_chips_required = n_chips_required
[docs]
def set_n_chips_in_graph(self, n_chips_in_graph: int):
"""
Sets the number of chips needed by the graph.
:param int n_chips_in_graph:
"""
if not isinstance(n_chips_in_graph, int):
raise TypeError("n_chips_in_graph must be an int (or None)")
if n_chips_in_graph <= 0:
raise ConfigurationException(
"n_chips_in_graph must be positive and not "
f"{n_chips_in_graph}")
self.__fec_data._n_chips_in_graph = n_chips_in_graph
[docs]
def set_ipaddress(self, ip_address: str):
"""
:param str ip_address:
"""
if not isinstance(ip_address, str):
raise TypeError("ipaddress must be a str")
self.__fec_data._ipaddress = ip_address
[docs]
def set_fixed_routes(
self, fixed_routes: Dict[Tuple[int, int], RoutingEntry]):
"""
:param fixed_routes:
:type fixed_routes:
dict((int, int), ~spinn_machine.RoutingEntry)
"""
if not isinstance(fixed_routes, dict):
raise TypeError("fixed_routes must be a dict")
self.__fec_data._fixed_routes = fixed_routes
[docs]
def set_java_caller(self, java_caller: JavaCaller):
"""
:param JavaCaller java_caller:
"""
if not isinstance(java_caller, JavaCaller):
raise TypeError("java_caller must be a JavaCaller")
self.__fec_data._java_caller = java_caller
[docs]
def reset_sync_signal(self) -> None:
"""
Returns the sync signal to the default value.
"""
self.__fec_data._next_sync_signal = Signal.SYNC0
[docs]
def set_executable_types(self, executable_types: Dict[
ExecutableType, CoreSubsets]):
"""
:param executable_types:
:type executable_types: dict(
~spinnman.model.enum.ExecutableType,
~spinn_machine.CoreSubsets)
"""
if not isinstance(executable_types, dict):
raise TypeError("executable_types must be a Dict")
self.__fec_data._executable_types = executable_types
[docs]
def set_live_packet_gatherer_parameters(self, params):
"""
testing method will not work outside of mock
"""
if not self._is_mocked():
raise NotImplementedError("This call is only for testing")
self.__fec_data._live_packet_recorder_params = params
[docs]
def set_database_file_path(self, database_file_path: Optional[str]):
"""
Sets the database_file_path variable. Possibly to `None`.
:param database_file_path:
:type database_file_path: str or None
"""
if not isinstance(database_file_path, (str, type(None))):
raise TypeError("database_file_path must be a str or None")
self.__fec_data._database_file_path = database_file_path
[docs]
def set_executable_targets(self, executable_targets: ExecutableTargets):
"""
Sets the executable_targets
:param ~spinnman.model.ExecutableTargets executable_targets:
"""
if not isinstance(executable_targets, ExecutableTargets):
raise TypeError("executable_targets must be a ExecutableTargets")
self.__fec_data._executable_targets = executable_targets
[docs]
def set_ds_database_path(self, ds_database_path: str):
"""
Sets the Data Spec targets database.
:param str ds_database_path: Existing path to the database
"""
if not os.path.isfile(ds_database_path):
raise TypeError("ds_database path must be a filee")
self.__fec_data._ds_database_path = ds_database_path
def __gatherer_map_error(self) -> TypeError:
return TypeError(
"gatherer_map must be a dict(Chip, "
"DataSpeedUpPacketGatherMachineVertex)")
[docs]
def set_gatherer_map(self, gatherer_map: Dict[
Chip, DataSpeedUpPacketGatherMachineVertex]):
"""
Sets the map of Chip to Gatherer Vertices.
:param gatherer_map:
:type gatherer_map: dict(Chip, DataSpeedUpPacketGatherMachineVertex)
"""
if not isinstance(gatherer_map, dict):
raise self.__gatherer_map_error()
try:
for chip, vertex in gatherer_map.items():
if not isinstance(chip, Chip):
raise self.__gatherer_map_error()
if not isinstance(
vertex, DataSpeedUpPacketGatherMachineVertex):
raise self.__gatherer_map_error()
break # assume if first is OK all are
except Exception as ex: # pylint: disable=broad-except
raise self.__gatherer_map_error() from ex
self.__fec_data._gatherer_map = gatherer_map
def __monitor_map_error(self) -> TypeError:
return TypeError(
"monitor_map must be a dict(Chip, "
"ExtraMonitorSupportMachineVertex)")
[docs]
def set_monitor_map(self, monitor_map: Dict[
Chip, ExtraMonitorSupportMachineVertex]):
"""
Sets the map of Chip to Monitor Vertices.
:param monitor_map:
:type monitor_map:
dict(Chip, ExtraMonitorSupportMachineVertex)
"""
if not isinstance(monitor_map, dict):
raise self.__monitor_map_error()
try:
for chip, vertex in monitor_map.items():
if not isinstance(chip, Chip):
raise self.__monitor_map_error()
if not isinstance(vertex, ExtraMonitorSupportMachineVertex):
raise self.__monitor_map_error()
break # assume if first is OK all are
except TypeError:
raise
except Exception as ex: # pylint: disable=broad-except
raise self.__monitor_map_error() from ex
self.__fec_data._monitor_map = monitor_map
[docs]
def set_notification_protocol(
self, notification_protocol: NotificationProtocol):
"""
Sets the notification_protocol.
:param NotificationProtocol notification_protocol:
"""
self.__fec_data._clear_notification_protocol()
if not isinstance(notification_protocol, NotificationProtocol):
raise TypeError(
"notification_protocol must be a NotificationProtocol")
self.__fec_data._notification_protocol = notification_protocol
[docs]
def clear_notification_protocol(self) -> None:
"""
Closes an existing notification_protocol and sets the value to `None`.
If no notification_protocol exist this method silently returns.
If the close causes an Exception it is logged and ignored
"""
self.__fec_data._clear_notification_protocol()
[docs]
@classmethod
@overrides(FecDataView.add_vertex)
def add_vertex(cls, vertex: ApplicationVertex):
# Avoid the safety check in FecDataView
PacmanDataWriter.add_vertex(vertex)
[docs]
def next_run_step(self) -> int:
"""
Starts or increases the run step count.
Run steps start at 1
:return: The next step number
:rtype: int
"""
if self.__fec_data._run_step is None:
self.__fec_data._run_step = 1
return 1
self.__fec_data._run_step += 1
return self.__fec_data._run_step
[docs]
def clear_run_steps(self) -> None:
"""
Clears the run step.
get_run_step will go back to returning `None`
next_run_step will restart at 1
"""
self.__fec_data._run_step = None