Source code for spinn_front_end_common.interface.buffer_management.buffer_manager

# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations
import ctypes
import difflib
import logging
from typing import (
    Dict, Iterable, List, Optional, Set, Tuple, cast, TYPE_CHECKING)
from spinn_utilities.config_holder import get_config_bool
from spinn_utilities.log import FormatAdapter
from spinn_utilities.ordered_set import OrderedSet
from spinn_utilities.progress_bar import ProgressBar
from spinnman.messages.eieio.command_messages import EventStopRequest
from spinnman.messages.eieio import EIEIOType
from spinnman.messages.eieio.data_messages import EIEIODataMessage
from pacman.model.graphs.machine import MachineVertex
from pacman.model.placements import Placement
from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.utilities.constants import BYTES_PER_WORD
from spinn_front_end_common.utilities.exceptions import (
    BufferableRegionTooSmall, SpinnFrontEndException)
from spinn_front_end_common.utilities.helpful_functions import (
    locate_memory_region_for_placement, locate_extra_monitor_mc_receiver)
from spinn_front_end_common.interface.buffer_management.storage_objects \
    import (BuffersSentDeque, BufferDatabase)
from spinn_front_end_common.interface.buffer_management.buffer_models import (
    AbstractReceiveBuffersToHost, AbstractSendsBuffersFromHost)
from spinn_front_end_common.utility_models.streaming_context_manager import (
    StreamingContextManager)
from .recording_utilities import get_recording_header_size
if TYPE_CHECKING:
    from spinn_front_end_common.interface.java_caller import JavaCaller


logger = FormatAdapter(logging.getLogger(__name__))

# The minimum size of any message - this is the headers plus one entry
_MIN_MESSAGE_SIZE = EIEIODataMessage.min_packet_length(
    eieio_type=EIEIOType.KEY_32_BIT, is_timestamp=True)

# The number of bytes in each key to be sent
_N_BYTES_PER_KEY = EIEIOType.KEY_32_BIT.key_bytes

_SDP_MAX_PACKAGE_SIZE = 272

TRAFFIC_IDENTIFIER = "BufferTraffic"

VERIFY = False


class _RecordingRegion(ctypes.LittleEndianStructure):
    """
    Recording Region data
    """
    _fields_ = [
        # Space available for recording
        ("space", ctypes.c_uint32),
        # The size of the recording region
        ("size", ctypes.c_uint32, 31),
        # Whether any data is missing
        ("missing", ctypes.c_uint32, 1),
        # The address of the data
        ("data", ctypes.c_uint32)
    ]


class BufferManager(object):
    """
    Manager of send buffers.
    """

    __slots__ = (
        "__enable_monitors",

        # Set of vertices with buffers to be sent
        "_sender_vertices",

        # Dictionary of sender vertex -> buffers sent
        "_sent_messages",

        # Support class to help call Java
        "_java_caller",

        # The machine controller, in case it wants to make proxied connections
        # for us
        "_machine_controller")

    def __init__(self) -> None:
        self.__enable_monitors: bool = get_config_bool(
            "Machine", "enable_advanced_monitor_support") or False
        # Set of vertices with buffers to be sent
        self._sender_vertices: Set[AbstractSendsBuffersFromHost] = set()

        # Dictionary of sender vertex -> buffers sent
        self._sent_messages: Dict[
            AbstractSendsBuffersFromHost, BuffersSentDeque] = dict()

        self._java_caller: Optional[JavaCaller]
        if FecDataView.has_java_caller():
            with BufferDatabase() as db:
                db.write_session_credentials_to_db()
            self._java_caller = FecDataView.get_java_caller()
            if self.__enable_monitors:
                self._java_caller.set_advanced_monitors()
        else:
            self._java_caller = None

        for placement in FecDataView.iterate_placements_by_vertex_type(
                AbstractSendsBuffersFromHost):
            vertex = cast(AbstractSendsBuffersFromHost, placement.vertex)
            if vertex.buffering_input():
                self._sender_vertices.add(vertex)

    def _request_data(
            self, placement_x: int, placement_y: int, address: int,
            length: int) -> bytes:
        """
        Uses the extra monitor cores for data extraction.

        :param int placement_x:
            the placement X coordinate where data is to be extracted from
        :param int placement_y:
            the placement Y coordinate where data is to be extracted from
        :param int address: the memory address to start at
        :param int length: the number of bytes to extract
        :return: data as a byte array
        :rtype: bytearray
        """
        if not self.__enable_monitors:
            return FecDataView.read_memory(
                placement_x, placement_y, address, length)

        # Round to word boundaries
        initial = address % BYTES_PER_WORD
        address -= initial
        length += initial
        final = (BYTES_PER_WORD - (length % BYTES_PER_WORD)) % BYTES_PER_WORD
        length += final

        sender = FecDataView.get_monitor_by_xy(placement_x, placement_y)
        receiver = locate_extra_monitor_mc_receiver(placement_x, placement_y)
        extra_mon_data = receiver.get_data(
            sender, FecDataView.get_placement_of_vertex(sender),
            address, length)
        if VERIFY:
            txrx_data = FecDataView.read_memory(
                placement_x, placement_y, address, length)
            self._verify_data(extra_mon_data, txrx_data)

        # If we rounded to word boundaries, strip the padding junk
        if initial and final:
            return extra_mon_data[initial:-final]
        elif initial:
            return extra_mon_data[initial:]
        elif final:
            return extra_mon_data[:-final]
        else:
            return extra_mon_data

    @staticmethod
    def _verify_data(extra_mon_data: bytes, txrx_data: bytes):
        sm = difflib.SequenceMatcher(a=extra_mon_data, b=txrx_data)
        failed_index = -1
        for (tag, i1, i2, j1, j2) in sm.get_opcodes():
            if tag == 'replace':
                if failed_index < 0:
                    failed_index = i1
                logger.error(
                    "data differs at {}..{}: got {} instead of {}",
                    i1, i2, repr(txrx_data[j1:j2]),
                    repr(extra_mon_data[i1:i2]))
            elif tag == 'insert':
                if failed_index < 0:
                    failed_index = i1
                logger.error(
                    "data differs at {}: extra {}",
                    i1, repr(txrx_data[j1:j2]))
            elif tag == 'delete':
                if failed_index < 0:
                    failed_index = i1
                logger.error(
                    "data differs at {}: lost {}",
                    i1, repr(extra_mon_data[i1:i2]))
        if failed_index >= 0:
            raise ValueError(f"WRONG (at index {failed_index})")

[docs] def load_initial_buffers(self) -> None: """ Load the initial buffers for the senders using memory writes. """ total_data = 0 for vertex in self._sender_vertices: for region in vertex.get_regions(): total_data += vertex.get_region_buffer_size(region) progress = ProgressBar(total_data, "Loading buffers") for vertex in self._sender_vertices: for region in vertex.get_regions(): self._send_initial_messages(vertex, region, progress) progress.end()
[docs] def reset(self) -> None: """ Resets the buffered regions to start transmitting from the beginning of its expected regions and clears the buffered out data files. """ with BufferDatabase() as db: db.write_session_credentials_to_db() # rewind buffered in for vertex in self._sender_vertices: for region in vertex.get_regions(): vertex.rewind(region)
[docs] def resume(self) -> None: """ Resets any data structures needed before starting running again. """
[docs] def clear_recorded_data( self, x: int, y: int, p: int, recording_region_id: int): """ Removes the recorded data stored in memory. :param int x: placement X coordinate :param int y: placement Y coordinate :param int p: placement processor ID :param int recording_region_id: the recording region ID """ with BufferDatabase() as db: db.clear_region(x, y, p, recording_region_id)
def _create_message_to_send( self, size: int, vertex: AbstractSendsBuffersFromHost, region: int) -> Optional[EIEIODataMessage]: """ Creates a single message to send with the given boundaries. :param int size: The number of bytes available for the whole packet :param AbstractSendsBuffersFromHost vertex: The vertex to get the keys from :param int region: The region of the vertex to get keys from :return: A new message, or `None` if no keys can be added :rtype: None or ~spinnman.messages.eieio.data_messages.EIEIODataMessage """ # If there are no more messages to send, return None if not vertex.is_next_timestamp(region): return None # Create a new message next_timestamp = vertex.get_next_timestamp(region) message = EIEIODataMessage.create( EIEIOType.KEY_32_BIT, timestamp=next_timestamp) # If there is no room for the message, return None if message.size + _N_BYTES_PER_KEY > size: return None # Add keys up to the limit bytes_to_go = size - message.size while (bytes_to_go >= _N_BYTES_PER_KEY and vertex.is_next_key(region, next_timestamp)): key = vertex.get_next_key(region) message.add_key(key) bytes_to_go -= _N_BYTES_PER_KEY return message def _send_initial_messages( self, vertex: AbstractSendsBuffersFromHost, region: int, progress: ProgressBar): """ Send the initial set of messages. :param AbstractSendsBuffersFromHost vertex: The vertex to get the keys from :param int region: The region to get the keys from """ # Get the vertex load details # region_base_address = self._locate_region_address(region, vertex) placement = FecDataView.get_placement_of_vertex( cast(MachineVertex, vertex)) region_base_address = locate_memory_region_for_placement( placement, region) # Add packets until out of space sent_message = False bytes_to_go = vertex.get_region_buffer_size(region) if bytes_to_go % 2 != 0: raise SpinnFrontEndException( f"The buffer region of {vertex} must be divisible by 2") all_data = b"" if vertex.is_empty(region): sent_message = True else: min_size_of_packet = _MIN_MESSAGE_SIZE while (vertex.is_next_timestamp(region) and bytes_to_go > min_size_of_packet): space_available = min(bytes_to_go, _SDP_MAX_PACKAGE_SIZE) next_message = self._create_message_to_send( space_available, vertex, region) if next_message is None: break # Write the message to the memory data = next_message.bytestring all_data += data sent_message = True # Update the positions bytes_to_go -= len(data) progress.update(len(data)) if not sent_message: raise BufferableRegionTooSmall( f"The buffer size {bytes_to_go} is too small for any data to " f"be added for region {region} of vertex {vertex}") # If there are no more messages and there is space, add a stop request if (not vertex.is_next_timestamp(region) and bytes_to_go >= EventStopRequest.get_min_packet_length()): data = EventStopRequest().bytestring # pylint: disable=wrong-spelling-in-comment # logger.debug( # "Writing stop message of {} bytes to {} on {}, {}, {}" # len(data), hex(region_base_address), # placement.x, placement.y, placement.p) all_data += data bytes_to_go -= len(data) progress.update(len(data)) self._sent_messages[vertex] = BuffersSentDeque( region, sent_stop_message=True) # Do the writing all at once for efficiency FecDataView.write_memory( placement.x, placement.y, region_base_address, all_data)
[docs] def get_placement_data(self) -> None: """ Retrieve the data from placed vertices. """ recording_placements = list( FecDataView.iterate_placements_by_vertex_type( AbstractReceiveBuffersToHost)) if self._java_caller is not None: logger.info("Starting buffer extraction using Java") self._java_caller.set_placements(recording_placements) self._java_caller.get_all_data() elif self.__enable_monitors: self.__python_get_data_for_placements_with_monitors( recording_placements) else: self.__python_get_data_for_placements(recording_placements)
def __python_get_data_for_placements_with_monitors( self, recording_placements: List[Placement]): """ :param list(~pacman.model.placements.Placement) recording_placements: Where to get the data from. """ # locate receivers receivers = OrderedSet( locate_extra_monitor_mc_receiver(placement.x, placement.y) for placement in recording_placements) # update transaction id from the machine for all extra monitors for extra_mon in FecDataView.iterate_monitors(): extra_mon.update_transaction_id_from_machine() with StreamingContextManager(receivers): # get data self.__python_get_data_for_placements(recording_placements) def __python_get_data_for_placements( self, recording_placements: List[Placement]): """ :param list(~pacman.model.placements.Placement) recording_placements: Where to get the data from. """ # get data progress = ProgressBar( len(recording_placements), "Extracting buffers from the last run") for placement in progress.over(recording_placements): self._retreive_by_placement(placement)
[docs] def get_data_by_placement( self, placement: Placement, recording_region_id: int) -> Tuple[ bytes, bool]: """ Get the data container for all the data retrieved during the simulation from a specific region area of a core. :param ~pacman.model.placements.Placement placement: the placement to get the data from :param int recording_region_id: desired recording data region :return: an array contained all the data received during the simulation, and a flag indicating if any data was missing :rtype: tuple(bytearray, bool) """ # Ensure that any transfers in progress are complete first if not isinstance(placement.vertex, AbstractReceiveBuffersToHost): raise NotImplementedError( f"vertex {placement.vertex} does not implement " "AbstractReceiveBuffersToHost so no data read") # data flush has been completed - return appropriate data with BufferDatabase() as db: return db.get_region_data( placement.x, placement.y, placement.p, recording_region_id)
def _retreive_by_placement(self, placement: Placement): """ Retrieve the data for a vertex; must be locked first. :param ~pacman.model.placements.Placement placement: the placement to get the data from """ vertex = cast(AbstractReceiveBuffersToHost, placement.vertex) addr = vertex.get_recording_region_base_address(placement) sizes_and_addresses = self._get_region_information( addr, placement.x, placement.y) # Read the data if not already received for region in vertex.get_recorded_region_ids(): # Now read the data and store it size, addr, missing = sizes_and_addresses[region] data = self._request_data( placement.x, placement.y, addr, size) with BufferDatabase() as db: db.store_data_in_region_buffer( placement.x, placement.y, placement.p, region, missing, data) def _get_region_information( self, address: int, x: int, y: int) -> List[Tuple[int, int, bool]]: """ Get the recording information from all regions of a core. :param address: The recording region base address :param x: The X coordinate of the chip containing the data :param y: The Y coordinate of the chip containing the data :return: (size, address, missing flag) for each region """ transceiver = FecDataView.get_transceiver() n_regions = transceiver.read_word(x, y, address) n_bytes = get_recording_header_size(n_regions) data = transceiver.read_memory( x, y, address + BYTES_PER_WORD, n_bytes - BYTES_PER_WORD) data_type = _RecordingRegion * n_regions regions = data_type.from_buffer_copy(data) sizes_and_addresses = [ (r.size, r.data, bool(r.missing)) for r in regions] return sizes_and_addresses @property def sender_vertices(self) -> Iterable[AbstractSendsBuffersFromHost]: """ The vertices which are buffered. :rtype: iterable(AbstractSendsBuffersFromHost) """ return self._sender_vertices