Source code for spinn_front_end_common.utility_models.data_speed_up_packet_gatherer_machine_vertex

# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import os
import datetime
import logging
import time
import struct
from enum import Enum, IntEnum
from typing import (
    Any, BinaryIO, Final, Iterable, List, Optional, Set, Tuple, Union,
    TYPE_CHECKING)

from spinn_utilities.config_holder import get_config_bool
from spinn_utilities.overrides import overrides
from spinn_utilities.log import FormatAdapter
from spinn_utilities.typing.coords import XY

from spinn_machine import Chip

from spinnman.exceptions import SpinnmanTimeoutException
from spinnman.messages.sdp import SDPMessage, SDPHeader, SDPFlag
from spinnman.model.enums import (
    CPUState, ExecutableType, SDP_PORTS, UserRegister)
from spinnman.connections.udp_packet_connections import SCAMPConnection

from pacman.model.graphs.machine import MachineVertex
from pacman.model.resources import ConstantSDRAM, IPtagResource
from pacman.model.placements import Placement

from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.interface.provenance import ProvenanceWriter
from spinn_front_end_common.utilities.helpful_functions import (
    convert_vertices_to_core_subset, n_word_struct)
from spinn_front_end_common.utilities.emergency_recovery import (
    emergency_recover_state_from_failure)
from spinn_front_end_common.abstract_models import (
    AbstractHasAssociatedBinary, AbstractGeneratesDataSpecification)
from spinn_front_end_common.interface.provenance import (
    AbstractProvidesProvenanceDataFromMachine)
from spinn_front_end_common.utilities.constants import (
    BYTES_PER_WORD, BYTES_PER_KB)
from spinn_front_end_common.utilities.utility_calls import (
    get_region_base_address_offset, open_scp_connection, retarget_tag)
from spinn_front_end_common.utilities.exceptions import SpinnFrontEndException
from spinn_front_end_common.utilities.scp import ReinjectorControlProcess
from spinn_front_end_common.utilities.utility_objs import ReInjectionStatus
from spinn_front_end_common.interface.ds import DataSpecificationGenerator
if TYPE_CHECKING:
    from .extra_monitor_support_machine_vertex import \
        ExtraMonitorSupportMachineVertex

log = FormatAdapter(logging.getLogger(__name__))

# shift by for the destination x coordinate in the word.
DEST_X_SHIFT = 16

TIMEOUT_RETRY_LIMIT = 100
_MINOR_LOSS_THRESHOLD = 10

# cap for stopping wrap arounds
TRANSACTION_ID_CAP = 0xFFFFFFFF

#: number of items used up by the retransmit code for its header
SDP_RETRANSMISSION_HEADER_SIZE = 2

#: size of config region in bytes
#: 1.new sequence key, 2.first data key, 3. transaction id key
# 4.end flag key, 5.base key, 6.iptag tag
CONFIG_SIZE = 6 * BYTES_PER_WORD

#: items of data a SDP packet can hold when SCP header removed
WORDS_PER_FULL_PACKET = 68  # 272 bytes as removed SCP header

#: size of items the sequence number uses
SEQUENCE_NUMBER_SIZE_IN_ITEMS = 1

#: transaction id size in words
TRANSACTION_ID_SIZE_IN_ITEMS = 1

#: the size in words of the command flag
COMMAND_SIZE_IN_ITEMS = 1

#: offset for missing sequence starts in first packet
WORDS_FOR_COMMAND_N_MISSING_TRANSACTION = 3

#: offset for missing sequence starts in more packet
WORDS_FOR_COMMAND_TRANSACTION = (
    COMMAND_SIZE_IN_ITEMS + TRANSACTION_ID_SIZE_IN_ITEMS)

BYTES_FOR_SEQ_AND_TRANSACTION_ID = (
    (SEQUENCE_NUMBER_SIZE_IN_ITEMS + TRANSACTION_ID_SIZE_IN_ITEMS) *
    BYTES_PER_WORD)

#: items of data from SDP packet with a sequence number
WORDS_PER_FULL_PACKET_WITH_SEQUENCE_NUM = (
    WORDS_PER_FULL_PACKET - SEQUENCE_NUMBER_SIZE_IN_ITEMS -
    TRANSACTION_ID_SIZE_IN_ITEMS)

#: offset where data in starts on commands
#: (command, transaction_id, sequence number)
WORDS_FOR_COMMAND_AND_KEY = 3
BYTES_FOR_COMMAND_AND_ADDRESS_HEADER = (
    WORDS_FOR_COMMAND_AND_KEY * BYTES_PER_WORD)

#: offset where data in starts in reception (command, transaction id)
WORDS_FOR_RECEPTION_COMMAND_AND_ADDRESS_HEADER = 2
BYTES_FOR_RECEPTION_COMMAND_AND_ADDRESS_HEADER = (
    WORDS_FOR_RECEPTION_COMMAND_AND_ADDRESS_HEADER * BYTES_PER_WORD)

#: size for data to store when first packet with command and address
WORDS_IN_FULL_PACKET_WITH_KEY = (
    WORDS_PER_FULL_PACKET - WORDS_FOR_COMMAND_AND_KEY)
BYTES_IN_FULL_PACKET_WITH_KEY = (
    WORDS_IN_FULL_PACKET_WITH_KEY * BYTES_PER_WORD)

#: size of data in key space
#: x, y, key (all int values) for possible 48 chips, plus n chips to read,
# the reinjector base key.
SIZE_DATA_IN_CHIP_TO_KEY_SPACE = ((3 * 48) + 2) * BYTES_PER_WORD


class _DataRegions(IntEnum):
    """
    DSG data regions.
    """
    CONFIG = 0
    CHIP_TO_KEY_SPACE = 1
    PROVENANCE_REGION = 2


class _ProvLabels(str, Enum):
    SENT = "Sent_SDP_Packets"
    RECEIVED = "Received_SDP_Packets"
    IN_STREAMS = "Speed_Up_Input_Streams"
    OUT_STREAMS = "Speed_Up_Output_Streams"


class _DataOutCommands(IntEnum):
    """
    Command IDs for the SDP packets for data out.
    """
    START_SENDING = 100
    START_MISSING_SEQ = 1000
    MISSING_SEQ = 1001
    CLEAR = 2000


class _DataInCommands(IntEnum):
    """
    Command IDs for the SDP packets for data in.
    """
    SEND_DATA_TO_LOCATION = 200
    SEND_SEQ_DATA = 2000
    SEND_TELL = 2001
    RECEIVE_MISSING_SEQ_DATA = 2002
    RECEIVE_FINISHED = 2003


# precompiled structures
_ONE_WORD = struct.Struct("<I")
_TWO_WORDS = struct.Struct("<II")
_THREE_WORDS = struct.Struct("<III")
_FOUR_WORDS = struct.Struct("<IIII")
_FIVE_WORDS = struct.Struct("<IIIII")


# Set to true to check that the data is correct after it has been sent in.
# This is expensive, and only works in Python 3.5 or later.
VERIFY_SENT_DATA = False

# provenance data size
_PROVENANCE_DATA_SIZE: Final = _FOUR_WORDS.size


def ceildiv(dividend, divisor) -> int:
    """
    How to divide two possibly-integer numbers and round up.
    """
    assert divisor > 0
    q, r = divmod(dividend, divisor)
    return int(q) + (r != 0)


# SDRAM requirement for storing missing SDP packets sequence numbers
SDRAM_FOR_MISSING_SDP_SEQ_NUMS = ceildiv(
    120.0 * 1024 * BYTES_PER_KB,
    WORDS_PER_FULL_PACKET_WITH_SEQUENCE_NUM * BYTES_PER_WORD)


class DataSpeedUpPacketGatherMachineVertex(
        MachineVertex, AbstractGeneratesDataSpecification,
        AbstractHasAssociatedBinary,
        AbstractProvidesProvenanceDataFromMachine):
    """
    Machine vertex for handling fast data transfer between host and  SpiNNaker.
    This machine vertex is only ever placed on chips with a working Ethernet
    connection; it collaborates with the
    :py:class:`ExtraMonitorSupportMachineVertex` to write data on other chips.

    .. note::
        This is an unusual machine vertex, in that it has no associated
        application vertex.
    """
    __slots__ = (
        # x coordinate
        "_x",
        # y coordinate
        "_y",
        # word with x and y
        "_coord_word",
        # transaction id
        "_transaction_id",
        # IP address
        "_ip_address",
        # store for the last reinjection status
        "_last_status",
        # the max sequence number expected given a data retrieval
        "_max_seq_num",
        # holder for missing sequence numbers for data in
        "_missing_seq_nums_data_in",
        # holder of data from out
        "_output",
        # my placement for future lookup
        "__placement",
        # Count of the runs for provenance data
        "_run",
        "_remote_tag",
        # data holder for output
        "_view")

    #: base key (really nasty hack to tie in fixed route keys)
    BASE_KEY = 0xFFFFFFF9
    NEW_SEQ_KEY = 0xFFFFFFF8
    FIRST_DATA_KEY = 0xFFFFFFF7
    END_FLAG_KEY = 0xFFFFFFF6
    TRANSACTION_ID_KEY = 0xFFFFFFF5

    #: to use with multicast stuff
    # (reinjection acknowledgements have to be fixed route)
    BASE_MASK = 0xFFFFFFFB
    NEW_SEQ_KEY_OFFSET = 1
    FIRST_DATA_KEY_OFFSET = 2
    END_FLAG_KEY_OFFSET = 3
    TRANSACTION_ID_KEY_OFFSET = 4

    # throttle on the transmission
    _TRANSMISSION_THROTTLE_TIME = 0.000001

    #: report name for tracking used routers
    OUT_REPORT_NAME = "routers_used_in_speed_up_process.rpt"
    #: report name for tracking performance gains
    IN_REPORT_NAME = "speeds_gained_in_speed_up_process.rpt"

    # the end flag is set when the high bit of the sequence number word is set
    _LAST_MESSAGE_FLAG_BIT_MASK = 0x80000000
    # corresponding mask for the actual sequence numbers
    _SEQUENCE_NUMBER_MASK = 0x7fffffff

    # time outs used by the protocol for separate bits
    _TIMEOUT_PER_RECEIVE_IN_SECONDS = 2
    _TIMEOUT_FOR_SENDING_IN_SECONDS = 0.01

    # end flag for missing sequence numbers
    _MISSING_SEQ_NUMS_END_FLAG = 0xFFFFFFFF

    # flag for saying missing all sequence numbers
    FLAG_FOR_MISSING_ALL_SEQUENCES = 0xFFFFFFFE

    _ADDRESS_PACKET_BYTE_FORMAT = struct.Struct(
        f"<{BYTES_IN_FULL_PACKET_WITH_KEY}B")

    # Router timeouts, in mantissa,exponent form. See datasheet for details
    _LONG_TIMEOUT = (14, 14)
    _SHORT_TIMEOUT = (1, 1)
    _TEMP_TIMEOUT = (15, 4)
    _ZERO_TIMEOUT = (0, 0)

    # Initial port for the reverse IP tag (to be replaced later)
    _TAG_INITIAL_PORT = 10000

    def __init__(self, x: int, y: int, ip_address: str):
        """
        :param int x: Where this gatherer is.
        :param int y: Where this gatherer is.
        :param str ip_address:
            How to talk directly to the chip where the gatherer is.
        """
        super().__init__(
            label=f"SYSTEM:PacketGatherer({x},{y})", app_vertex=None)

        # data holders for the output, and sequence numbers
        self._view: Optional[memoryview] = None
        self._max_seq_num = 0
        self._output: Optional[bytearray] = None

        self._transaction_id = 0

        self._missing_seq_nums_data_in: List[Set[int]] = list()

        # Create a connection to be used
        self._x, self._y = x, y
        self._coord_word: Optional[int] = None
        self._ip_address = ip_address
        self._remote_tag: Optional[int] = None

        # local provenance storage
        self._run = 0
        self.__placement: Optional[Placement] = None

        # Stored reinjection status for resetting timeouts
        self._last_status: Optional[ReInjectionStatus] = None

    def __throttled_send(
            self, message: SDPMessage, connection: SCAMPConnection):
        """
        Slows down transmissions to allow SpiNNaker to keep up.

        :param ~.SDPMessage message: message to send
        :type connection:
            ~spinnman.connections.udp_packet_connections.SCAMPConnection
        """
        # send first message
        connection.send_sdp_message(message)
        time.sleep(self._TRANSMISSION_THROTTLE_TIME)

    @property
    @overrides(MachineVertex.sdram_required)
    def sdram_required(self) -> ConstantSDRAM:
        return ConstantSDRAM(
                CONFIG_SIZE + SDRAM_FOR_MISSING_SDP_SEQ_NUMS +
                SIZE_DATA_IN_CHIP_TO_KEY_SPACE + _PROVENANCE_DATA_SIZE)

    @property
    @overrides(MachineVertex.iptags)
    def iptags(self) -> List[IPtagResource]:
        return [IPtagResource(
            port=self._TAG_INITIAL_PORT, strip_sdp=True,
            ip_address="localhost", traffic_identifier="DATA_SPEED_UP")]

    def _read_transaction_id_from_machine(self) -> None:
        """
        Looks up from the machine what the current transaction ID is
        and updates the data speed up gatherer.
        """
        self._transaction_id = FecDataView.get_transceiver().read_user(
            self._placement.x, self._placement.y, self._placement.p,
            UserRegister.USER_1)

[docs] @overrides(AbstractHasAssociatedBinary.get_binary_start_type) def get_binary_start_type(self) -> ExecutableType: return ExecutableType.SYSTEM
[docs] @overrides(AbstractGeneratesDataSpecification.generate_data_specification) def generate_data_specification( self, spec: DataSpecificationGenerator, placement: Placement): # pylint: disable=unsubscriptable-object # update my placement for future knowledge self.__placement = placement # Create the data regions for hello world self._reserve_memory_regions(spec) # the keys for the special cases new_seq_key = self.NEW_SEQ_KEY first_data_key = self.FIRST_DATA_KEY end_flag_key = self.END_FLAG_KEY base_key = self.BASE_KEY transaction_id_key = self.TRANSACTION_ID_KEY spec.switch_write_focus(_DataRegions.CONFIG) spec.write_value(new_seq_key) spec.write_value(first_data_key) spec.write_value(transaction_id_key) spec.write_value(end_flag_key) spec.write_value(base_key) # locate the tag ID for our data and update with a port # Note: The port doesn't matter as we are going to override this later iptags = FecDataView.get_tags().get_ip_tags_for_vertex(self) if iptags is None: raise SpinnFrontEndException("no allocated IPTag") iptag = iptags[0] spec.write_value(iptag.tag) self._remote_tag = iptag.tag # write multi cast chip key map machine = FecDataView.get_machine() spec.switch_write_focus(_DataRegions.CHIP_TO_KEY_SPACE) chip_xys_on_board = list(machine.get_existing_xys_on_board( machine[placement.xy])) # write how many chips to read spec.write_value(len(chip_xys_on_board)) # write the broad cast keys for timeouts router_timeout_key = ( FecDataView.get_system_multicast_router_timeout_keys()) # pylint: disable=unsubscriptable-object spec.write_value(router_timeout_key[placement.xy]) mc_data_chips_to_keys = ( FecDataView.get_data_in_multicast_key_to_chip_map()) # write each chip x and y and base key for chip_xy in chip_xys_on_board: local_x, local_y = machine.get_local_xy(machine[chip_xy]) spec.write_value(local_x) spec.write_value(local_y) spec.write_value(mc_data_chips_to_keys[chip_xy]) log.debug("for chip {}:{} base key is {}", chip_xy[0], chip_xy[1], mc_data_chips_to_keys[chip_xy]) # End-of-Spec: spec.end_specification()
@property def _placement(self) -> Placement: if self.__placement is None: raise SpinnFrontEndException("placement not known") return self.__placement def _reserve_memory_regions(self, spec: DataSpecificationGenerator): """ Writes the DSG regions memory sizes. Static so that it can be used by the application vertex. :param ~.DataSpecificationGenerator spec: spec file """ spec.reserve_memory_region( region=_DataRegions.CONFIG, size=CONFIG_SIZE, label="config") spec.reserve_memory_region( region=_DataRegions.CHIP_TO_KEY_SPACE, size=SIZE_DATA_IN_CHIP_TO_KEY_SPACE, label="mc_key_map") spec.reserve_memory_region( region=_DataRegions.PROVENANCE_REGION, size=_PROVENANCE_DATA_SIZE, label="Provenance")
[docs] @overrides(AbstractHasAssociatedBinary.get_binary_file_name) def get_binary_file_name(self) -> str: return "data_speed_up_packet_gatherer.aplx"
def _generate_data_in_report( self, time_diff, data_size: int, x: int, y: int, address_written_to: int, missing_seq_nums): """ Writes the data in report for this stage. :param ~datetime.timedelta time_diff: the time taken to write the memory :param int data_size: the size of data that was written in bytes :param int x: the location in machine where the data was written to X axis :param int y: the location in machine where the data was written to Y axis :param int address_written_to: where in SDRAM it was written to :param list(set(int)) missing_seq_nums: the set of missing sequence numbers per data transmission attempt """ dir_path = FecDataView.get_run_dir_path() in_report_path = os.path.join(dir_path, self.IN_REPORT_NAME) if not os.path.isfile(in_report_path): with open(in_report_path, "w", encoding="utf-8") as writer: writer.write( "x\t\t y\t\t SDRAM address\t\t size in bytes\t\t\t" " time took \t\t\t Mb/s \t\t\t missing sequence numbers\n") writer.write( "------------------------------------------------" "------------------------------------------------" "-------------------------------------------------\n") time_took_ms = float(time_diff.microseconds + time_diff.total_seconds() * 1000000) megabits = (data_size * 8.0) / (1024 * BYTES_PER_KB) if time_took_ms == 0: mbs: Any = "unknown, below threshold" else: mbs = megabits / (float(time_took_ms) / 100000.0) with open(in_report_path, "a", encoding="utf-8") as writer: writer.write( f"{x}\t\t {y}\t\t {address_written_to}\t\t {data_size}\t\t" f"\t\t {time_took_ms}\t\t\t {mbs}\t\t {missing_seq_nums}\n")
[docs] def send_data_into_spinnaker( self, x: int, y: int, base_address: int, data: Union[BinaryIO, bytes, str, int], *, n_bytes: Optional[int] = None, offset: int = 0, cpu: int = 0): # pylint: disable=unused-argument """ Sends a block of data into SpiNNaker to a given chip. :param int x: chip x for data :param int y: chip y for data :param int base_address: the address in SDRAM to start writing memory :param data: the data to write or filename to load data from (if a string) :type data: bytes or bytearray or memoryview or str :param int n_bytes: how many bytes to read, or `None` if not set :param int offset: where in the data to start from :param int cpu: Ignored; can only target SDRAM so unimportant """ # if file, read in and then process as normal if isinstance(data, str): if offset != 0: raise ValueError( "when using a file, you can only have a offset of 0") with open(data, "rb") as reader: # n_bytes=None already means 'read everything' data = reader.read(n_bytes) # Number of bytes to write is now length of buffer we have if n_bytes is None: n_bytes = len(data) else: n_bytes = min(n_bytes, len(data)) elif not isinstance(data, (bytes, bytearray)): raise ValueError("that type of data not supported") if n_bytes is None: n_bytes = len(data) if n_bytes < 0: raise ValueError("cannot write a negative amount of data") destination = FecDataView.get_chip_at(x, y) # start time recording start = datetime.datetime.now() # send data self._send_data_via_extra_monitors( destination, base_address, data[offset:n_bytes + offset]) # end time recording end = datetime.datetime.now() if VERIFY_SENT_DATA: original_data = bytes(data[offset:n_bytes + offset]) transceiver = FecDataView.get_transceiver() verified_data = bytes(transceiver.read_memory( x, y, base_address, n_bytes)) self.__verify_sent_data( original_data, verified_data, x, y, base_address, n_bytes) # write report if get_config_bool("Reports", "write_data_speed_up_reports"): self._generate_data_in_report( x=x, y=y, time_diff=end - start, data_size=n_bytes, address_written_to=base_address, missing_seq_nums=self._missing_seq_nums_data_in)
@staticmethod def __verify_sent_data( original_data: bytes, verified_data: bytes, x: int, y: int, base_address: int, n_bytes: int): if original_data != verified_data: log.error("VARIANCE: chip:{},{} address:{} len:{}", x, y, base_address, n_bytes) log.error("original:{}", original_data.hex()) log.error("verified:{}", verified_data.hex()) for i, (a, b) in enumerate(zip(original_data, verified_data)): if a != b: raise ValueError(f"Mismatch found as position {i}") def __make_data_in_message(self, payload: bytes) -> SDPMessage: return SDPMessage( sdp_header=SDPHeader( destination_chip_x=self._placement.x, destination_chip_y=self._placement.y, destination_cpu=self._placement.p, destination_port=( SDP_PORTS.EXTRA_MONITOR_CORE_DATA_IN_SPEED_UP.value), flags=SDPFlag.REPLY_NOT_EXPECTED), data=payload) @staticmethod def __make_data_out_message( placement: Placement, payload: bytes) -> SDPMessage: return SDPMessage( sdp_header=SDPHeader( destination_chip_x=placement.x, destination_chip_y=placement.y, destination_cpu=placement.p, destination_port=( SDP_PORTS.EXTRA_MONITOR_CORE_DATA_SPEED_UP.value), flags=SDPFlag.REPLY_NOT_EXPECTED), data=payload) def __open_connection(self) -> SCAMPConnection: """ Open an SCP connection and make our tag target it. :return: The opened connection, ready for use. :rtype: ~.SCAMPConnection """ assert self._remote_tag is not None connection = open_scp_connection(self._x, self._y, self._ip_address) retarget_tag(connection, self._x, self._y, self._remote_tag) return connection def _send_data_via_extra_monitors( self, destination_chip: Chip, start_address: int, data_to_write: bytes): """ Sends data using the extra monitor cores. :param Chip destination_chip: chip to send to :param int start_address: start address in SDRAM to write data to :param bytearray data_to_write: the data to write """ # Set up the connection with self.__open_connection() as connection: # how many packets after first one we need to send self._max_seq_num = ceildiv( len(data_to_write), BYTES_IN_FULL_PACKET_WITH_KEY) # determine board chip IDs, as the LPG does not know # machine scope IDs machine = FecDataView.get_machine() dest_x, dest_y = machine.get_local_xy(destination_chip) self._coord_word = (dest_x << DEST_X_SHIFT) | dest_y # for safety, check the transaction id from the machine before # updating self._read_transaction_id_from_machine() self._transaction_id = ( self._transaction_id + 1) & TRANSACTION_ID_CAP time_out_count = 0 # verify completed received_confirmation = False while not received_confirmation: # send initial attempt at sending all the data self._send_all_data_based_packets( data_to_write, start_address, connection) # Don't create a missing buffer until at least one packet has # come back. missing: Optional[Set[int]] = None while not received_confirmation: try: # try to receive a confirmation of some sort from # spinnaker data = connection.receive( timeout=self._TIMEOUT_PER_RECEIVE_IN_SECONDS) time_out_count = 0 # Read command and transaction id (cmd, transaction_id) = _TWO_WORDS.unpack_from(data, 0) # If wrong transaction id, ignore packet if self._transaction_id != transaction_id: continue # Decide what to do with the packet if cmd == _DataInCommands.RECEIVE_FINISHED: received_confirmation = True break if cmd != _DataInCommands.RECEIVE_MISSING_SEQ_DATA: raise ValueError(f"Unknown command {cmd} received") # The currently received packet has missing sequence # numbers. Accumulate and dispatch transactionId when # we've got them all. if missing is None: missing = set() self._missing_seq_nums_data_in.append(missing) seen_last, seen_all = self._read_in_missing_seq_nums( data, BYTES_FOR_RECEPTION_COMMAND_AND_ADDRESS_HEADER, missing) # Check that you've seen something that implies ready # to retransmit. if seen_all or seen_last: self._outgoing_retransmit_missing_seq_nums( data_to_write, missing, connection) missing.clear() except SpinnmanTimeoutException as e: # if the timeout has not occurred x times, keep trying time_out_count += 1 if time_out_count > TIMEOUT_RETRY_LIMIT: emergency_recover_state_from_failure( self, self._placement) raise SpinnFrontEndException( "Failed to hear from the machine during " f"{time_out_count} attempts. " "Please try removing firewalls.") from e # If we never received a packet, we will never have # created the buffer, so send everything again if missing is None: break self._outgoing_retransmit_missing_seq_nums( data_to_write, missing, connection) missing.clear() def _read_in_missing_seq_nums( self, data: bytes, position: int, seq_nums: Set[int]) -> Tuple[bool, bool]: """ Handles a missing sequence number packet from SpiNNaker. :param data: the data to translate into missing sequence numbers :type data: bytearray or bytes :param int position: the position in the data to write. :param set(int) seq_nums: a set of sequence numbers to add to :return: seen_last flag and seen_all flag :rtype: tuple(bool, bool) """ # find how many elements are in this packet n_elements = (len(data) - position) // BYTES_PER_WORD # store missing new_seq_nums = n_word_struct(n_elements).unpack_from( data, position) # add missing sequence numbers accordingly seen_last = False seen_all = False if new_seq_nums[-1] == self._MISSING_SEQ_NUMS_END_FLAG: new_seq_nums = new_seq_nums[:-1] seen_last = True if new_seq_nums[-1] == self.FLAG_FOR_MISSING_ALL_SEQUENCES: for missing_seq in range(self._max_seq_num or 0): seq_nums.add(missing_seq) seen_all = True else: seq_nums.update(new_seq_nums) return seen_last, seen_all def _outgoing_retransmit_missing_seq_nums( self, data_to_write: bytes, missing: Set[int], connection: SCAMPConnection): """ Transmits back into SpiNNaker the missing data based off missing sequence numbers. :param bytearray data_to_write: the data to write. :param set(int) missing: a set of missing sequence numbers :type connection: ~spinnman.connections.udp_packet_connections.SCAMPConnection """ missing_seqs_as_list = list(missing) missing_seqs_as_list.sort() # send sequence data for missing_seq_num in missing_seqs_as_list: message, _length = self.__make_data_in_stream_message( data_to_write, missing_seq_num, None) self.__throttled_send(message, connection) # request an update on what is missing self.__send_tell_flag(connection) @staticmethod def __position_from_seq_number(seq_num: int) -> int: """ Calculates where in the raw data to start reading from, given a sequence number. :param int seq_num: the sequence number to determine position from :return: the position in the byte data :rtype: int """ return BYTES_IN_FULL_PACKET_WITH_KEY * seq_num def __make_data_in_stream_message( self, data_to_write: bytes, seq_num: int, position: Optional[int]) -> Tuple[SDPMessage, int]: """ Determine the data needed to be sent to the SpiNNaker machine given a sequence number. :param bytearray data_to_write: the data to write to the SpiNNaker machine :param int seq_num: the sequence number to get the data for :param position: the position in the data to write to SpiNNaker, or None to infer from the sequence number :type position: int or None :return: SDP message and how much data has been written :rtype: tuple(~.SDPMessage, int) """ # check for last packet packet_data_length = BYTES_IN_FULL_PACKET_WITH_KEY # determine position in data if not given if position is None: position = self.__position_from_seq_number(seq_num) # if less than a full packet worth of data, adjust length if position + packet_data_length > len(data_to_write): packet_data_length = len(data_to_write) - position if packet_data_length < 0: raise ValueError("weird packet data length") # create message body packet_data = _THREE_WORDS.pack( _DataInCommands.SEND_SEQ_DATA, self._transaction_id, seq_num) + data_to_write[position:position+packet_data_length] # return message for sending, and the length in data sent return self.__make_data_in_message(packet_data), packet_data_length def __send_location(self, start_address: int, connection: SCAMPConnection): """ Send location as separate message. :param int start_address: SDRAM location """ connection.send_sdp_message(self.__make_data_in_message( _FIVE_WORDS.pack( _DataInCommands.SEND_DATA_TO_LOCATION, self._transaction_id, start_address, self._coord_word, self._max_seq_num - 1))) log.debug( "start address for transaction {} is {}", self._transaction_id, start_address) def __send_tell_flag(self, connection: SCAMPConnection) -> None: """ Send tell flag as separate message. """ connection.send_sdp_message(self.__make_data_in_message( _TWO_WORDS.pack( _DataInCommands.SEND_TELL, self._transaction_id))) def _send_all_data_based_packets( self, data_to_write: bytes, start_address: int, connection: SCAMPConnection): """ Send all the data as one block. :param bytearray data_to_write: the data to send :param int start_address: """ # Send the location self.__send_location(start_address, connection) # where in the data we are currently up to position_in_data = 0 # send rest of data for seq_num in range(self._max_seq_num or 0): # put in command flag and sequence number message, length_to_send = self.__make_data_in_stream_message( data_to_write, seq_num, position_in_data) position_in_data += length_to_send # send the message self.__throttled_send(message, connection) log.debug("sent sequence {} of {} bytes", seq_num, length_to_send) # check for end flag self.__send_tell_flag(connection) log.debug("sent end flag")
[docs] def set_cores_for_data_streaming(self) -> None: """ Helper method for setting the router timeouts to a state usable for data streaming. """ lead_monitor = FecDataView.get_monitor_by_xy(0, 0) # Store the last reinjection status for resetting # NOTE: This assumes the status is the same on all cores self._last_status = lead_monitor.get_reinjection_status() # Set to not inject dropped packets lead_monitor.set_reinjection_packets( point_to_point=False, multicast=False, nearest_neighbour=False, fixed_route=False) # Clear any outstanding packets from reinjection self.clear_reinjection_queue() # set time outs self.set_router_wait2_timeout(self._SHORT_TIMEOUT) self.set_router_wait1_timeout(self._LONG_TIMEOUT)
[docs] @staticmethod def load_application_routing_tables() -> None: """ Set all chips to have application table loaded in the router. """ FecDataView.get_monitor_by_xy(0, 0).load_application_mc_routes()
[docs] @staticmethod def load_system_routing_tables() -> None: """ Set all chips to have the system table loaded in the router. """ FecDataView.get_monitor_by_xy(0, 0).load_system_mc_routes()
[docs] def set_router_wait1_timeout(self, timeout: Tuple[int, int]): """ Set the wait1 field for a set of routers. :param tuple(int,int) timeout: The mantissa and exponent of the timeout value, each between 0 and 15 """ mantissa, exponent = timeout core_subsets = convert_vertices_to_core_subset([self]) process = ReinjectorControlProcess( FecDataView.get_scamp_connection_selector()) try: process.set_wait1_timeout(mantissa, exponent, core_subsets) except: # noqa: E722 emergency_recover_state_from_failure( self, FecDataView.get_placement_of_vertex(self)) raise
[docs] def set_router_wait2_timeout(self, timeout: Tuple[int, int]): """ Set the wait2 field for a set of routers. :param tuple(int,int) timeout: The mantissa and exponent of the timeout value, each between 0 and 15 """ mantissa, exponent = timeout core_subsets = convert_vertices_to_core_subset([self]) process = ReinjectorControlProcess( FecDataView.get_scamp_connection_selector()) try: process.set_wait2_timeout(mantissa, exponent, core_subsets) except: # noqa: E722 emergency_recover_state_from_failure( self, FecDataView.get_placement_of_vertex(self)) raise
[docs] def clear_reinjection_queue(self) -> None: """ Clears the queues for reinjection. """ core_subsets = convert_vertices_to_core_subset([self]) process = ReinjectorControlProcess( FecDataView.get_scamp_connection_selector()) try: process.clear_queue(core_subsets) except: # noqa: E722 emergency_recover_state_from_failure( self, FecDataView.get_placement_of_vertex(self)) raise
[docs] def unset_cores_for_data_streaming(self) -> None: """ Helper method for restoring the router timeouts to normal after being in a state usable for data streaming. """ # Set the routers to temporary values self.set_router_wait1_timeout(self._TEMP_TIMEOUT) self.set_router_wait2_timeout(self._ZERO_TIMEOUT) if self._last_status is None: log.warning( "Cores have not been set for data extraction, so can't be" " unset") return try: self.set_router_wait1_timeout( self._last_status.router_wait1_timeout_parameters) self.set_router_wait2_timeout( self._last_status.router_wait2_timeout_parameters) lead_monitor = FecDataView.get_monitor_by_xy(0, 0) lead_monitor.set_reinjection_packets( point_to_point=self._last_status.is_reinjecting_point_to_point, multicast=self._last_status.is_reinjecting_multicast, nearest_neighbour=( self._last_status.is_reinjecting_nearest_neighbour), fixed_route=self._last_status.is_reinjecting_fixed_route) except Exception: # pylint: disable=broad-except log.exception("Error resetting timeouts") log.error("Checking if the cores are OK...") core_subsets = convert_vertices_to_core_subset( FecDataView.iterate_monitors()) try: transceiver = FecDataView.get_transceiver() error_cores = transceiver.get_cpu_infos( core_subsets, CPUState.RUNNING, include=False) if error_cores: log.error("Cores in an unexpected state: {}", error_cores) except Exception: # pylint: disable=broad-except log.exception("Couldn't get core state")
[docs] def get_data( self, extra_monitor: ExtraMonitorSupportMachineVertex, placement: Placement, memory_address: int, length_in_bytes: int) -> bytes: """ Gets data from a given core and memory address. :param ExtraMonitorSupportMachineVertex extra_monitor: the extra monitor used for this data :param ~pacman.model.placements.Placement placement: placement object for where to get data from :param int memory_address: the address in SDRAM to start reading from :param int length_in_bytes: the length of data to read in bytes :return: byte array of the data :rtype: bytearray """ # create report elements if get_config_bool("Reports", "write_data_speed_up_reports"): self._report_routers_used_for_out(placement) start = float(time.time()) # if asked for no data, just return a empty byte array if length_in_bytes == 0: data = bytearray(0) end = float(time.time()) with ProvenanceWriter() as db: # TODO Why log the time to not read??? db.insert_gatherer( placement.x, placement.y, memory_address, length_in_bytes, self._run, "No Extraction time", end - start) return data # Update the IP Tag to work through a NAT firewall with self.__open_connection() as connection: # update transaction id for extra monitor extra_monitor.update_transaction_id() transaction_id = extra_monitor.transaction_id # send connection.send_sdp_message(self.__make_data_out_message( placement, _FOUR_WORDS.pack( _DataOutCommands.START_SENDING, transaction_id, memory_address, length_in_bytes))) # receive self._output = bytearray(length_in_bytes) self._view = memoryview(self._output) self._max_seq_num = self.__calculate_max_seq_num() lost_seq_nums = self._receive_data( placement, connection, transaction_id) # Stop anything else getting through (and reduce traffic) connection.send_sdp_message(self.__make_data_out_message( placement, _TWO_WORDS.pack( _DataOutCommands.CLEAR, transaction_id))) end = float(time.time()) with ProvenanceWriter() as db: db.insert_gatherer( placement.x, placement.y, memory_address, length_in_bytes, self._run, "Extraction time", end - start) for lost_seq_num in lost_seq_nums: if lost_seq_num > _MINOR_LOSS_THRESHOLD: db.insert_report( f"During the extraction of data of {length_in_bytes} " f"bytes from memory address {memory_address} on " f"chip ({placement.x}, {placement.y}), " f"{lost_seq_num} sequences were lost.") if lost_seq_num > 0: db.insert_gatherer( placement.x, placement.y, memory_address, length_in_bytes, self._run, "Lost_seq_nums", lost_seq_num) return self._output
def _receive_data( self, placement: Placement, connection: SCAMPConnection, transaction_id: int) -> List[int]: """ :param ~.Placement placement: :param ~.UDPConnection connection: :param int transaction_id: :rtype: list(int) """ seq_nums: Set[int] = set() lost_seq_nums: List[int] = list() timeoutcount = 0 finished = False while not finished: try: data = connection.receive(self._TIMEOUT_PER_RECEIVE_IN_SECONDS) response_transaction_id, = _ONE_WORD.unpack_from(data, 4) if transaction_id == response_transaction_id: timeoutcount = 0 seq_nums, finished = self._process_data( data, seq_nums, finished, placement, lost_seq_nums, transaction_id, connection) else: log.info( "ignoring packet as transaction id should be {}" " but is {}", transaction_id, response_transaction_id) except SpinnmanTimeoutException as e: if timeoutcount > TIMEOUT_RETRY_LIMIT: raise SpinnFrontEndException( "Failed to hear from the machine during " f"{timeoutcount} attempts. " "Please try removing firewalls") from e timeoutcount += 1 # self.__reset_connection() if not finished: finished = self._determine_and_retransmit_missing_seq_nums( seq_nums, placement, lost_seq_nums, transaction_id, connection) return lost_seq_nums @staticmethod def __describe_fixed_route_from(placement: Placement) -> List[XY]: """ Traverse the fixed route paths from a given location to its destination. Used for determining which routers were used. :param ~.Placement placement: the source to start from :return: list of chip locations :rtype: list(tuple(int,int)) """ routers = [placement.xy] fixed_routes = FecDataView.get_fixed_routes() # pylint: disable=unsubscriptable-object chip = placement.chip entry = fixed_routes[(placement.xy)] while not entry.processor_ids: # can assume one link, as its a minimum spanning tree going to # the root link = chip.router.get_link(next(iter(entry.link_ids))) assert link is not None chip = FecDataView.get_chip_at( link.destination_x, link.destination_y) routers.append((link.destination_x, link.destination_y)) entry = fixed_routes[(link.destination_x, link.destination_y)] return routers def _report_routers_used_for_out(self, placement: Placement): """ Write the used routers into a report. :param ~.Placement placement: The placement that we have been routing data out from """ routers_used = self.__describe_fixed_route_from(placement) dir_path = FecDataView.get_run_dir_path() out_report_path = os.path.join(dir_path, self.OUT_REPORT_NAME) with open(out_report_path, "a", encoding="utf-8") as writer: writer.write( f"[{placement.x}:{placement.y}:{placement.p}] " f"= {routers_used}\n") def __missing_seq_nums(self, seq_nums: Set[int]) -> List[int]: """ Determine which sequence numbers we've missed. :param set(int) seq_nums: the set already acquired :return: list of missing sequence numbers :rtype: list(int) """ return [sn for sn in range(self._max_seq_num) if sn not in seq_nums] def _determine_and_retransmit_missing_seq_nums( self, seq_nums: Set[int], placement: Placement, lost_seq_nums: List[int], transaction_id: int, connection: SCAMPConnection) -> bool: """ Determine if there are any missing sequence numbers, and if so retransmits the missing sequence numbers back to the core for retransmission. :param set(int) seq_nums: the sequence numbers already received :param ~.Placement placement: placement instance :param list(int) lost_seq_nums: :param int transaction_id: transaction_id :param SCAMPConnection connection: how to talk to the board :return: whether all packets are transmitted :rtype: bool """ # locate missing sequence numbers from pile missing_seq_nums = self.__missing_seq_nums(seq_nums) lost_seq_nums.append(len(missing_seq_nums)) # for seq_num in sorted(seq_nums): # log.debug("from list I'm missing sequence number {}", seq_num) if not missing_seq_nums: return True # figure n packets given the 2 formats n_packets = 1 length_via_format2 = len(missing_seq_nums) - ( WORDS_PER_FULL_PACKET - WORDS_FOR_COMMAND_N_MISSING_TRANSACTION) if length_via_format2 > 0: n_packets += ceildiv( length_via_format2, WORDS_PER_FULL_PACKET - WORDS_FOR_COMMAND_TRANSACTION) # log.debug("missing packets = {}", n_packets) # transmit missing sequence as a new SDP packet first = True seq_num_offset = 0 for _packet_count in range(n_packets): length_left_in_packet = WORDS_PER_FULL_PACKET offset = 0 # if first, add n packets to list if first: # get left over space / data size size_of_data_left_to_transmit = min( length_left_in_packet - WORDS_FOR_COMMAND_N_MISSING_TRANSACTION, len(missing_seq_nums) - seq_num_offset) # build data holder accordingly data = bytearray( (size_of_data_left_to_transmit + WORDS_FOR_COMMAND_N_MISSING_TRANSACTION) * BYTES_PER_WORD) # pack flag and n packets _THREE_WORDS.pack_into( data, 0, _DataOutCommands.START_MISSING_SEQ, transaction_id, n_packets) # update state offset += ( WORDS_FOR_COMMAND_N_MISSING_TRANSACTION * BYTES_PER_WORD) length_left_in_packet -= ( WORDS_FOR_COMMAND_N_MISSING_TRANSACTION) first = False else: # just add data # get left over space / data size size_of_data_left_to_transmit = min( WORDS_PER_FULL_PACKET_WITH_SEQUENCE_NUM, len(missing_seq_nums) - seq_num_offset) # build data holder accordingly data = bytearray( (size_of_data_left_to_transmit + WORDS_FOR_COMMAND_TRANSACTION) * BYTES_PER_WORD) # pack flag _TWO_WORDS.pack_into( data, offset, _DataOutCommands.MISSING_SEQ, transaction_id) offset += BYTES_PER_WORD * WORDS_FOR_COMMAND_TRANSACTION length_left_in_packet -= WORDS_FOR_COMMAND_TRANSACTION # fill data field n_word_struct(size_of_data_left_to_transmit).pack_into( data, offset, *missing_seq_nums[ seq_num_offset: seq_num_offset + size_of_data_left_to_transmit]) seq_num_offset += length_left_in_packet # build SDP message and send it to the core connection.send_sdp_message(self.__make_data_out_message( placement, data)) # sleep for ensuring core doesn't lose packets time.sleep(self._TIMEOUT_FOR_SENDING_IN_SECONDS) # log.debug( # "send SDP packet with missing sequence numbers: {} of {}", # _packet_count + 1, n_packets) return False def _process_data( self, data: bytes, seq_nums: Set[int], finished: bool, placement: Placement, lost_seq_nums: List[int], transaction_id: int, connection: SCAMPConnection) -> Tuple[Set[int], bool]: """ Take a packet and process it see if we're finished yet. :param bytearray data: the packet data :param set(int) seq_nums: the list of sequence numbers received so far :param bool finished: bool which states if finished or not :param ~.Placement placement: placement object for location on machine :param int transaction_id: the transaction ID for this stream :param list(int) lost_seq_nums: the list of n sequence numbers lost per iteration :return: set of data items, if its the first packet, the list of sequence numbers, the sequence number received and if its finished :rtype: tuple(set(int), bool) """ # pylint: disable=too-many-arguments length_of_data = len(data) first_packet_element, = _ONE_WORD.unpack_from(data, 0) # get flags seq_num = first_packet_element & self._SEQUENCE_NUMBER_MASK is_end_of_stream = ( first_packet_element & self._LAST_MESSAGE_FLAG_BIT_MASK) != 0 # check sequence number not insane if seq_num > self._max_seq_num: raise ValueError( f"got an insane sequence number. got {seq_num} when " f"the max is {self._max_seq_num} " f"with a length of {length_of_data}") # figure offset for where data is to be put offset = self.__offset(seq_num) # write data # read offset from data is at byte 8. as first 4 is sequence number, # second 4 is transaction id true_data_length = ( offset + length_of_data - BYTES_FOR_SEQ_AND_TRANSACTION_ID) if (not is_end_of_stream or length_of_data != BYTES_FOR_SEQ_AND_TRANSACTION_ID): self.__write_into_view( offset, true_data_length, data, BYTES_FOR_SEQ_AND_TRANSACTION_ID, length_of_data) # add sequence number to list seq_nums.add(seq_num) # if received a last flag on its own, its during retransmission. # check and try again if required if is_end_of_stream: if not self.__check(seq_nums): finished = self._determine_and_retransmit_missing_seq_nums( seq_nums, placement, lost_seq_nums, transaction_id, connection) else: finished = True return seq_nums, finished @staticmethod def __offset(seq_num: int) -> int: """ :param int seq_num: :rtype: int """ return (seq_num * WORDS_PER_FULL_PACKET_WITH_SEQUENCE_NUM * BYTES_PER_WORD) def __write_into_view( self, view_start_position: int, view_end_position: int, data: bytes, data_start_position: int, data_end_position: int): """ Puts data into the view. :param int view_start_position: where in view to start :param int view_end_position: where in view to end :param bytearray data: the data holder to write from :param int data_start_position: where in data holder to start from :param int data_end_position: where in data holder to end :param int seq_num: the sequence number to figure :raises Exception: If the position to write to is crazy """ # pylint: disable=too-many-arguments if self._view is None or self._output is None: raise SpinnFrontEndException("no current target buffer") if view_end_position > len(self._output): raise ValueError( f"End position {view_end_position} > " f"output length {len(self._output)}") self._view[view_start_position: view_end_position] = \ data[data_start_position:data_end_position] def __check(self, seq_nums: Iterable[int]) -> bool: """ Verify if the sequence numbers are correct. :param list(int) seq_nums: the received sequence numbers :return: Whether all the sequence numbers have been received :rtype: bool """ # hand back seq_nums = sorted(seq_nums) max_needed = self.__calculate_max_seq_num() if len(seq_nums) > max_needed + 1: raise ValueError(f"too many seq_nums: {len(seq_nums)}") return len(seq_nums) == max_needed + 1 def __calculate_max_seq_num(self) -> int: """ Deduce the max sequence number expected to be received. :return: the biggest sequence number expected :rtype: int """ if self._output is None: raise SpinnFrontEndException("no receiving buffer") return ceildiv( len(self._output), WORDS_PER_FULL_PACKET_WITH_SEQUENCE_NUM * BYTES_PER_WORD) @staticmethod def __provenance_address(x: int, y: int, p: int) -> int: txrx = FecDataView.get_transceiver() region_table = txrx.get_region_base_address(x, y, p) # Get the provenance region base address prov_region_entry_address = get_region_base_address_offset( region_table, _DataRegions.PROVENANCE_REGION) return txrx.read_word(x, y, prov_region_entry_address)
[docs] @overrides(AbstractProvidesProvenanceDataFromMachine .get_provenance_data_from_machine) def get_provenance_data_from_machine(self, placement: Placement): x, y, p = placement.x, placement.y, placement.p # Get the App Data for the core data = FecDataView.read_memory( x, y, self.__provenance_address(x, y, p), _PROVENANCE_DATA_SIZE) n_sdp_sent, n_sdp_recvd, n_in_streams, n_out_streams = ( _FOUR_WORDS.unpack_from(data)) with ProvenanceWriter() as db: db.insert_core(x, y, p, _ProvLabels.SENT, n_sdp_sent) db.insert_core(x, y, p, _ProvLabels.RECEIVED, n_sdp_recvd) db.insert_core(x, y, p, _ProvLabels.IN_STREAMS, n_in_streams) db.insert_core(x, y, p, _ProvLabels.OUT_STREAMS, n_out_streams)