Source code for spinn_front_end_common.utilities.helpful_functions

# Copyright (c) 2014 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import os
import logging
import struct
from typing import (
    Any, Collection, Dict, Iterable, List, Optional, Tuple, TYPE_CHECKING)
from spinn_utilities.log import FormatAdapter
from spinn_machine import CoreSubsets, Chip, Machine, MulticastRoutingEntry
from spinnman.model.enums import CPUState, ExecutableType
from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.utilities.constants import (
    APP_PTR_TABLE_HEADER_BYTE_SIZE, APP_PTR_TABLE_REGION_BYTE_SIZE)
from spinn_front_end_common.utilities.exceptions import ConfigurationException
if TYPE_CHECKING:
    from pacman.model.placements import Placement
    from pacman.model.graphs.machine import MachineVertex
    from spinn_front_end_common.utility_models import (
        DataSpeedUpPacketGatherMachineVertex)

logger = FormatAdapter(logging.getLogger(__name__))
_n_word_structs: List[Optional[struct.Struct]] = []


[docs] def locate_extra_monitor_mc_receiver( placement_x: int, placement_y: int ) -> DataSpeedUpPacketGatherMachineVertex: """ Get the data speed up gatherer that can be used to talk to a particular chip. This will be on the same board. :param int placement_x: The X coordinate of the reference chip :param int placement_y: The Y coordinate of the reference chip :rtype: DataSpeedUpPacketGatherMachineVertex """ chip = FecDataView.get_chip_at(placement_x, placement_y) return FecDataView.get_gatherer_by_xy( chip.nearest_ethernet_x, chip.nearest_ethernet_y)
[docs] def read_data(x: int, y: int, address: int, length: int, data_format: str) -> int: """ Reads and converts a single data item from memory. :param int x: chip x :param int y: chip y :param int address: base address of the SDRAM chip to read :param int length: length to read :param str data_format: the format to read memory (see :py:func:`struct.pack`) :return: whatever is produced by unpacking the data :rtype: tuple """ data = FecDataView.read_memory(x, y, address, length) return struct.unpack_from(data_format, data)[0]
[docs] def get_region_base_address_offset( app_data_base_address: int, region: int) -> int: """ Find the address of the of a given region for the DSG. :param int app_data_base_address: base address for the core :param int region: the region ID we're looking for """ return (app_data_base_address + APP_PTR_TABLE_HEADER_BYTE_SIZE + (region * APP_PTR_TABLE_REGION_BYTE_SIZE))
[docs] def locate_memory_region_for_placement( placement: Placement, region: int) -> int: """ Get the address of a region for a placement. :param int region: the region to locate the base address of :param ~pacman.model.placements.Placement placement: the placement object to get the region address of :return: the address :rtype: int """ transceiver = FecDataView.get_transceiver() regions_base_address = transceiver.get_region_base_address( placement.x, placement.y, placement.p) # Get the position of the region in the pointer table element_addr = get_region_base_address_offset(regions_base_address, region) # Get the actual address of the region return transceiver.read_word(placement.x, placement.y, element_addr)
[docs] def convert_string_into_chip_and_core_subset( cores: Optional[str]) -> CoreSubsets: """ Translate a string list of cores into a core subset. :param cores: string representing down cores formatted as x,y,p[:x,y,p]* :type cores: str or None :rtype: ~spinn_machine.CoreSubsets """ ignored_cores = CoreSubsets() if cores is not None and cores != "None": for downed_core in cores.split(":"): x, y, processor_id = downed_core.split(",") ignored_cores.add_processor(int(x), int(y), int(processor_id)) return ignored_cores
[docs] def flood_fill_binary_to_spinnaker(binary: str) -> int: """ Flood fills a binary to SpiNNaker. :param str binary: The name of the file containing the APLX binary to load :return: the number of cores it was loaded onto :rtype: int """ executable_targets = FecDataView.get_executable_targets() core_subset = executable_targets.get_cores_for_binary(binary) FecDataView.get_transceiver().execute_flood( core_subset, binary, FecDataView.get_app_id(), wait=True) return len(core_subset)
[docs] def generate_unique_folder_name( folder: str, filename: str, extension: str) -> str: """ Generate a unique file name with a given extension in a given folder. :param str folder: where to put this unique file :param str filename: the name of the first part of the file without extension :param str extension: extension of the file :return: file path with a unique addition :rtype: str """ new_file_path = os.path.join(folder, f"{filename}{extension}") count = 2 while os.path.exists(new_file_path): new_file_path = os.path.join(folder, f"{filename}_{count}{extension}") count += 1 return new_file_path
[docs] def get_ethernet_chip(machine: Machine, board_address: str) -> Chip: """ Locate the chip with the given board IP address. :param ~spinn_machine.Machine machine: the SpiNNaker machine :param str board_address: the board address to locate the chip of. :return: The chip that supports that board address :rtype: ~spinn_machine.Chip :raises ConfigurationException: when that board address has no chip associated with it """ for chip in machine.ethernet_connected_chips: if chip.ip_address == board_address: return chip raise ConfigurationException( "cannot find the Ethernet connected chip with the " f"board address {board_address}")
[docs] def determine_flow_states( executable_types: Dict[ExecutableType, Any], no_sync_changes: int) -> Tuple[ Dict[ExecutableType, Collection[CPUState]], Dict[ExecutableType, Collection[CPUState]]]: """ Get the start and end states for these executable types. :param dict(ExecutableType,any) executable_types: the execute types to locate start and end states from :param int no_sync_changes: the number of times sync signals been sent :return: dict of executable type to states. :rtype: tuple(dict(ExecutableType,tuple(~spinnman.model.enums.CPUState)), dict(ExecutableType,tuple(~spinnman.model.enums.CPUState))) """ expected_start_states: Dict[ExecutableType, Collection[CPUState]] = dict() expected_end_states: Dict[ExecutableType, Collection[CPUState]] = dict() for start_type in executable_types.keys(): # cores that ignore all control and are just running if start_type == ExecutableType.RUNNING: expected_start_states[ExecutableType.RUNNING] = ( CPUState.RUNNING, CPUState.FINISHED) expected_end_states[ExecutableType.RUNNING] = ( CPUState.RUNNING, CPUState.FINISHED) # cores that require a sync barrier elif start_type == ExecutableType.SYNC: expected_start_states[ExecutableType.SYNC] = (CPUState.SYNC0,) expected_end_states[ExecutableType.SYNC] = (CPUState.FINISHED,) # cores that use our simulation interface elif start_type == ExecutableType.USES_SIMULATION_INTERFACE: if no_sync_changes % 2 == 0: expected_start_states[start_type] = (CPUState.SYNC0,) else: expected_start_states[start_type] = (CPUState.SYNC1,) expected_end_states[start_type] = (CPUState.PAUSED,) # if no states, go boom. if not expected_start_states: raise ConfigurationException( f"Unknown executable start types {executable_types}") return expected_start_states, expected_end_states
[docs] def convert_vertices_to_core_subset( vertices: Iterable[MachineVertex]) -> CoreSubsets: """ Converts vertices into core subsets. :param iterable(~pacman.model.graphs.machine.MachineVertex) vertices: the vertices to convert to core subsets :return: the CoreSubSets of the vertices :rtype: ~spinn_machine.CoreSubsets """ core_subsets = CoreSubsets() for vertex in vertices: placement = FecDataView.get_placement_of_vertex(vertex) core_subsets.add_processor(placement.x, placement.y, placement.p) return core_subsets
[docs] def n_word_struct(n_words: int) -> struct.Struct: """ Manages a precompiled cache of :py:class`~struct.Struct`\\s for parsing blocks of words. Thus, this:: data = n_word_struct(n_words).unpack(data_blob) Is much like doing this:: data = struct.unpack(f"<{n_words}I", data_blob) except quite a bit more efficient because things are shared including the cost of parsing the format. :param int n_words: The number of *SpiNNaker words* to be handled. :return: A struct for working with that many words. :rtype: ~struct.Struct """ # pylint: disable=global-statement global _n_word_structs while len(_n_word_structs) < n_words + 1: _n_word_structs += [None] * (n_words + 1 - len(_n_word_structs)) s = _n_word_structs[n_words] if s is not None: return s new_struct = struct.Struct(f"<{n_words}I") _n_word_structs[n_words] = new_struct return new_struct
[docs] def get_defaultable_source_id(entry: MulticastRoutingEntry) -> int: """ Hack to support the source requirement for the router compressor on chip. :param ~spinn_machine.MulticastRoutingEntry entry: the multicast router table entry. :return: return the source value :rtype: int """ if entry.defaultable: return (list(entry.link_ids)[0] + 3) % 6 elif entry.link_ids: return list(entry.link_ids)[0] return 0