Source code for spinn_front_end_common.interface.buffer_management.buffer_manager

# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import logging
import threading
from concurrent.futures import ThreadPoolExecutor
from six.moves import xrange
from spinn_utilities.log import FormatAdapter
from spinn_utilities.ordered_set import OrderedSet
from spinn_utilities.progress_bar import ProgressBar
from spinnman.constants import UDP_MESSAGE_MAX_SIZE
from spinnman.connections.udp_packet_connections import EIEIOConnection
from spinnman.messages.eieio.command_messages import (
    EIEIOCommandMessage, StopRequests, SpinnakerRequestReadData,
    HostDataReadAck, HostDataRead, SpinnakerRequestBuffers,
    HostSendSequencedData, EventStopRequest)
from spinnman.utilities import utility_functions
from spinnman.messages.sdp import SDPHeader, SDPMessage, SDPFlag
from spinnman.messages.eieio import EIEIOType, create_eieio_command
from spinnman.messages.eieio.data_messages import EIEIODataMessage
from spinn_front_end_common.utilities.constants import (
    SDP_PORTS, BUFFERING_OPERATIONS)
from spinn_front_end_common.utilities.exceptions import (
    BufferableRegionTooSmall, ConfigurationException, SpinnFrontEndException)
from spinn_front_end_common.utilities.helpful_functions import (
    locate_memory_region_for_placement, locate_extra_monitor_mc_receiver)
from spinn_front_end_common.interface.buffer_management.storage_objects \
    import (
        BuffersSentDeque, BufferedReceivingData, ChannelBufferState)
from spinn_front_end_common.interface.buffer_management.buffer_models \
    import (
        AbstractReceiveBuffersToHost)
from .recording_utilities import (
    TRAFFIC_IDENTIFIER, get_last_sequence_number, get_region_pointer)

logger = FormatAdapter(logging.getLogger(__name__))

# The minimum size of any message - this is the headers plus one entry
_MIN_MESSAGE_SIZE = EIEIODataMessage.min_packet_length(
    eieio_type=EIEIOType.KEY_32_BIT, is_timestamp=True)

# The number of bytes in each key to be sent
_N_BYTES_PER_KEY = EIEIOType.KEY_32_BIT.key_bytes  # @UndefinedVariable

_SDP_MAX_PACKAGE_SIZE = 272


[docs]class BufferManager(object): """ Manager of send buffers. """ __slots__ = [ # placements object "_placements", # list of tags "_tags", # SpiNNMan instance "_transceiver", # Set of (ip_address, port) that are being listened to for the tags "_seen_tags", # Set of vertices with buffers to be sent "_sender_vertices", # Dictionary of sender vertex -> buffers sent "_sent_messages", # storage area for received data from cores "_received_data", # Lock to avoid multiple messages being processed at the same time "_thread_lock_buffer_out", # Lock to avoid multiple messages being processed at the same time "_thread_lock_buffer_in", # bool flag "_finished", # listener port "_listener_port", # Buffering out thread pool "_buffering_out_thread_pool", # the extra monitor cores which support faster data extraction "_extra_monitor_cores", # the extra_monitor to Ethernet connection map "_packet_gather_cores_to_ethernet_connection_map", # monitor cores via chip ID "_extra_monitor_cores_by_chip", # fixed routes, used by the speed up functionality for reports "_fixed_routes", # machine object "_machine", # flag for what data extraction to use "_uses_advanced_monitors", # Support class to help call Java "_java_caller" ] def __init__(self, placements, tags, transceiver, extra_monitor_cores, packet_gather_cores_to_ethernet_connection_map, extra_monitor_to_chip_mapping, machine, fixed_routes, uses_advanced_monitors, report_folder, java_caller=None): """ :param placements: The placements of the vertices :type placements: ~pacman.model.placements.Placements :param tags: The tags assigned to the vertices :type tags: ~pacman.model.tags.Tags :param transceiver: \ The transceiver to use for sending and receiving information :type transceiver: ~spinnman.transceiver.Transceiver :param packet_gather_cores_to_ethernet_connection_map: mapping of cores to the gatherer vertex placed on them :type packet_gather_cores_to_ethernet_connection_map: \ dict(tuple(int,int),\ ~spinn_front_end_common.utility_models.DataSpeedUpPacketGatherMachineVertex) :param report_folder: The directory for reports which includes the \ file to use as an SQL database. :type report_folder: str :param java_caller: Support class to call Java, or None to use python :type java_caller:\ ~spinn_front_end_common.interface.java_caller.JavaCaller """ # pylint: disable=too-many-arguments self._placements = placements self._tags = tags self._transceiver = transceiver self._extra_monitor_cores = extra_monitor_cores self._packet_gather_cores_to_ethernet_connection_map = \ packet_gather_cores_to_ethernet_connection_map self._extra_monitor_cores_by_chip = extra_monitor_to_chip_mapping self._fixed_routes = fixed_routes self._machine = machine self._uses_advanced_monitors = uses_advanced_monitors # Set of (ip_address, port) that are being listened to for the tags self._seen_tags = set() # Set of vertices with buffers to be sent self._sender_vertices = set() # Dictionary of sender vertex -> buffers sent self._sent_messages = dict() # storage area for received data from cores self._received_data = BufferedReceivingData(report_folder) # Lock to avoid multiple messages being processed at the same time self._thread_lock_buffer_out = threading.RLock() self._thread_lock_buffer_in = threading.RLock() self._buffering_out_thread_pool = ThreadPoolExecutor(max_workers=1) self._finished = False self._listener_port = None self._java_caller = java_caller if self._java_caller is not None: self._java_caller.set_machine(machine) self._java_caller.set_report_folder(report_folder) if self._uses_advanced_monitors: self._java_caller.set_advanced_monitors( self._placements, self._tags, self._extra_monitor_cores_by_chip, self._packet_gather_cores_to_ethernet_connection_map) def _request_data(self, transceiver, placement_x, placement_y, address, length): """ Uses the extra monitor cores for data extraction. :param transceiver: the spinnman interface :type transceiver: :py:class:`spinnman.transceiver.Transceiver` :param placement_x: \ the placement x coord where data is to be extracted from :type placement_x: int :param placement_y: \ the placement y coord where data is to be extracted from :type placement_y: int :param address: the memory address to start at :type address: int :param length: the number of bytes to extract :type length: int :return: data as a byte array """ # pylint: disable=too-many-arguments if not self._uses_advanced_monitors: return transceiver.read_memory( placement_x, placement_y, address, length) sender = self._extra_monitor_cores_by_chip[placement_x, placement_y] receiver = locate_extra_monitor_mc_receiver( self._machine, placement_x, placement_y, self._packet_gather_cores_to_ethernet_connection_map) return receiver.get_data( self._placements.get_placement_of_vertex(sender), address, length, self._fixed_routes) def _receive_buffer_command_message(self, packet): """ Handle an EIEIO command message for the buffers. :param packet: The EIEIO message received :type packet:\ :py:class:`spinnman.messages.eieio.command_messages.eieio_command_message.EIEIOCommandMessage` """ # pylint: disable=broad-except if isinstance(packet, SpinnakerRequestBuffers): # noinspection PyBroadException try: self.__request_buffers(packet) except Exception: logger.exception("problem when sending messages") elif isinstance(packet, SpinnakerRequestReadData): try: self.__request_read_data(packet) except Exception: logger.exception("problem when handling data") elif isinstance(packet, EIEIOCommandMessage): logger.error( "The command packet is invalid for buffer management: " "command ID {}", packet.eieio_header.command) else: logger.error( "The command packet is invalid for buffer management") # Factored out of receive_buffer_command_message to keep code readable def __request_buffers(self, packet): if not self._finished: with self._thread_lock_buffer_in: vertex = self._placements.get_vertex_on_processor( packet.x, packet.y, packet.p) if vertex in self._sender_vertices: self._send_messages( packet.space_available, vertex, packet.region_id, packet.sequence_no) # Factored out of receive_buffer_command_message to keep code readable def __request_read_data(self, packet): if not self._finished: # Send an ACK message to stop the core sending more messages ack_message_header = SDPHeader( destination_port=( SDP_PORTS.OUTPUT_BUFFERING_SDP_PORT.value), destination_cpu=packet.p, destination_chip_x=packet.x, destination_chip_y=packet.y, flags=SDPFlag.REPLY_NOT_EXPECTED) ack_message_data = HostDataReadAck(packet.sequence_no) ack_message = SDPMessage( ack_message_header, ack_message_data.bytestring) self._transceiver.send_sdp_message(ack_message) self._buffering_out_thread_pool.submit( self._process_buffered_in_packet, packet) def _create_connection(self, tag): connection = self._transceiver.register_udp_listener( self._receive_buffer_command_message, EIEIOConnection, local_port=tag.port, local_host=tag.ip_address) self._seen_tags.add((tag.ip_address, connection.local_port)) utility_functions.send_port_trigger_message( connection, tag.board_address) logger.info( "Listening for packets using tag {} on {}:{}", tag.tag, connection.local_ip_address, connection.local_port) return connection def _add_buffer_listeners(self, vertex): """ Add listeners for buffered data for the given vertex """ # Find a tag for receiving buffer data tags = self._tags.get_ip_tags_for_vertex(vertex) if tags is not None: # locate tag associated with the buffer manager traffic for tag in tags: if tag.traffic_identifier == TRAFFIC_IDENTIFIER: # If the tag port is not assigned create a connection and # assign the port. Note that this *should* update the # port number in any tags being shared. if tag.port is None: # If connection already setup, ensure subsequent # boards use same listener port in their tag if self._listener_port is None: connection = self._create_connection(tag) tag.port = connection.local_port self._listener_port = connection.local_port else: tag.port = self._listener_port # In case we have tags with different specified ports, # also allow the tag to be created here elif (tag.ip_address, tag.port) not in self._seen_tags: self._create_connection(tag)
[docs] def add_receiving_vertex(self, vertex): """ Add a vertex into the managed list for vertices which require\ buffers to be received from them during runtime. :param vertex: the vertex to be managed :type vertex:\ ~spinn_front_end_common.interface.buffer_management.buffer_models.AbstractReceiveBuffersToHost """ self._add_buffer_listeners(vertex)
[docs] def add_sender_vertex(self, vertex): """ Add a vertex into the managed list for vertices which require\ buffers to be sent to them during runtime. :param vertex: the vertex to be managed :type vertex:\ ~spinn_front_end_common.interface.buffer_management.buffer_models.AbstractSendsBuffersFromHost """ self._sender_vertices.add(vertex) self._add_buffer_listeners(vertex)
[docs] def load_initial_buffers(self): """ Load the initial buffers for the senders using memory writes. """ total_data = 0 for vertex in self._sender_vertices: for region in vertex.get_regions(): total_data += vertex.get_region_buffer_size(region) progress = ProgressBar(total_data, "Loading buffers") for vertex in self._sender_vertices: for region in vertex.get_regions(): self._send_initial_messages(vertex, region, progress) progress.end()
[docs] def reset(self): """ Resets the buffered regions to start transmitting from the\ beginning of its expected regions and clears the buffered out\ data files. """ self._received_data.reset() # rewind buffered in for vertex in self._sender_vertices: for region in vertex.get_regions(): vertex.rewind(region) self._finished = False
[docs] def resume(self): """ Resets any data structures needed before starting running again. """ # update the received data items self._received_data.resume() self._finished = False
[docs] def clear_recorded_data(self, x, y, p, recording_region_id): """ Removes the recorded data stored in memory. :param x: placement x coordinate :type x: int :param y: placement y coordinate :type y: int :param p: placement p coordinate :type p: int :param recording_region_id: the recording region ID :type recording_region_id: int """ self._received_data.clear(x, y, p, recording_region_id)
def _generate_end_buffering_state_from_machine( self, placement, state_region_base_address): # retrieve channel state memory area channel_state_data = self._request_data( transceiver=self._transceiver, placement_x=placement.x, address=state_region_base_address, placement_y=placement.y, length=ChannelBufferState.size_of_channel_state()) return ChannelBufferState.create_from_bytearray(channel_state_data) def _create_message_to_send(self, size, vertex, region): """ Creates a single message to send with the given boundaries. :param size: The number of bytes available for the whole packet :type size: int :param vertex: The vertex to get the keys from :type vertex:\ :py:class:`~spinn_front_end_common.interface.buffer_management.buffer_models.AbstractSendsBuffersFromHost` :param region: The region of the vertex to get keys from :type region: int :return: A new message, or None if no keys can be added :rtype: None or\ :py:class:`~spinnman.messages.eieio.data_messages.EIEIODataMessage` """ # If there are no more messages to send, return None if not vertex.is_next_timestamp(region): return None # Create a new message next_timestamp = vertex.get_next_timestamp(region) message = EIEIODataMessage.create( EIEIOType.KEY_32_BIT, timestamp=next_timestamp) # If there is no room for the message, return None if message.size + _N_BYTES_PER_KEY > size: return None # Add keys up to the limit bytes_to_go = size - message.size while (bytes_to_go >= _N_BYTES_PER_KEY and vertex.is_next_key(region, next_timestamp)): key = vertex.get_next_key(region) message.add_key(key) bytes_to_go -= _N_BYTES_PER_KEY return message def _send_initial_messages(self, vertex, region, progress): """ Send the initial set of messages. :param vertex: The vertex to get the keys from :type vertex:\ :py:class:`~spinn_front_end_common.interface.buffer_management.buffer_models.AbstractSendsBuffersFromHost` :param region: The region to get the keys from :type region: int :return: A list of messages :rtype: \ list(:py:class:`~spinnman.messages.eieio.data_messages.EIEIODataMessage`) """ # Get the vertex load details # region_base_address = self._locate_region_address(region, vertex) placement = self._placements.get_placement_of_vertex(vertex) region_base_address = locate_memory_region_for_placement( placement, region, self._transceiver) # Add packets until out of space sent_message = False bytes_to_go = vertex.get_region_buffer_size(region) if bytes_to_go % 2 != 0: raise SpinnFrontEndException( "The buffer region of {} must be divisible by 2".format( vertex)) all_data = b"" if vertex.is_empty(region): sent_message = True else: min_size_of_packet = _MIN_MESSAGE_SIZE while (vertex.is_next_timestamp(region) and bytes_to_go > min_size_of_packet): space_available = min(bytes_to_go, _SDP_MAX_PACKAGE_SIZE) next_message = self._create_message_to_send( space_available, vertex, region) if next_message is None: break # Write the message to the memory data = next_message.bytestring all_data += data sent_message = True # Update the positions bytes_to_go -= len(data) progress.update(len(data)) if not sent_message: raise BufferableRegionTooSmall( "The buffer size {} is too small for any data to be added for" " region {} of vertex {}".format(bytes_to_go, region, vertex)) # If there are no more messages and there is space, add a stop request if (not vertex.is_next_timestamp(region) and bytes_to_go >= EventStopRequest.get_min_packet_length()): data = EventStopRequest().bytestring # logger.debug( # "Writing stop message of {} bytes to {} on {}, {}, {}".format( # len(data), hex(region_base_address), # placement.x, placement.y, placement.p)) all_data += data bytes_to_go -= len(data) progress.update(len(data)) self._sent_messages[vertex] = BuffersSentDeque( region, sent_stop_message=True) # Do the writing all at once for efficiency self._transceiver.write_memory( placement.x, placement.y, region_base_address, all_data) def _send_messages(self, size, vertex, region, sequence_no): """ Send a set of messages. """ # Get the sent messages for the vertex if vertex not in self._sent_messages: self._sent_messages[vertex] = BuffersSentDeque(region) sent_messages = self._sent_messages[vertex] # If the sequence number is outside the window, return no messages if not sent_messages.update_last_received_sequence_number(sequence_no): return list() # Remote the existing packets from the size available bytes_to_go = size for message in sent_messages.messages: if isinstance(message.eieio_data_message, EIEIODataMessage): bytes_to_go -= message.eieio_data_message.size else: bytes_to_go -= (message.eieio_data_message .get_min_packet_length()) # Add messages up to the limits while (vertex.is_next_timestamp(region) and not sent_messages.is_full and bytes_to_go > 0): space_available = min( bytes_to_go, UDP_MESSAGE_MAX_SIZE - HostSendSequencedData.get_min_packet_length()) # logger.debug( # "Bytes to go {}, space available {}".format( # bytes_to_go, space_available)) next_message = self._create_message_to_send( space_available, vertex, region) if next_message is None: break sent_messages.add_message_to_send(next_message) bytes_to_go -= next_message.size # logger.debug("Adding additional buffer of {} bytes".format( # next_message.size)) # If the vertex is empty, send the stop messages if there is space if (not sent_messages.is_full and not vertex.is_next_timestamp(region) and bytes_to_go >= EventStopRequest.get_min_packet_length()): sent_messages.send_stop_message() # If there are no more messages, turn off requests for more messages if not vertex.is_next_timestamp(region) and sent_messages.is_empty(): # logger.debug("Sending stop") self._send_request(vertex, StopRequests()) # Send the messages for message in sent_messages.messages: # logger.debug("Sending message with sequence {}".format( # message.sequence_no)) self._send_request(vertex, message) def _send_request(self, vertex, message): """ Sends a request. :param vertex: The vertex to send to :param message: The message to send """ placement = self._placements.get_placement_of_vertex(vertex) sdp_header = SDPHeader( destination_chip_x=placement.x, destination_chip_y=placement.y, destination_cpu=placement.p, flags=SDPFlag.REPLY_NOT_EXPECTED, destination_port=SDP_PORTS.INPUT_BUFFERING_SDP_PORT.value) sdp_message = SDPMessage(sdp_header, message.bytestring) self._transceiver.send_sdp_message(sdp_message)
[docs] def stop(self): """ Indicates that the simulation has finished, so no further\ outstanding requests need to be processed. """ with self._thread_lock_buffer_in: with self._thread_lock_buffer_out: self._finished = True
[docs] def get_data_for_placements(self, placements, progress=None): """ :param placements: Where to get the data from. :type placements: ~pacman.model.placements.Placements :param progress: How to measure/display the progress. :type progress: ~spinn_utilities.progress_bar.ProgressBar or None """ if self._java_caller is not None: self._java_caller.set_placements(placements, self._transceiver) with self._thread_lock_buffer_out: if self._java_caller is not None: self._java_caller.get_all_data() if progress: progress.end() elif self._uses_advanced_monitors: self.__old_get_data_for_placements_with_monitors( placements, progress) else: self.__old_get_data_for_placements(placements, progress)
def __old_get_data_for_placements_with_monitors( self, placements, progress): # locate receivers receivers = list(OrderedSet( locate_extra_monitor_mc_receiver( self._machine, placement.x, placement.y, self._packet_gather_cores_to_ethernet_connection_map) for placement in placements)) # Ugly, to avoid an import loop... with receivers[0].streaming( receivers, self._transceiver, self._extra_monitor_cores, self._placements): # get data self.__old_get_data_for_placements(placements, progress) def __old_get_data_for_placements(self, placements, progress): # get data for placement in placements: vertex = placement.vertex for recording_region_id in vertex.get_recorded_region_ids(): self._retreive_by_placement(placement, recording_region_id) if progress is not None: progress.update()
[docs] def get_data_for_vertex(self, placement, recording_region_id): """ It is no longer possible to get access to the data pointer. Use get_data_by_vertex instead which returns\ the data that pointer.read_all() used to return\ the missing flag as before """ raise NotImplementedError("Use get_data_by_placement instead!.")
[docs] def get_data_by_placement(self, placement, recording_region_id): """ Get the data container for all the data retrieved\ during the simulation from a specific region area of a core. :param placement: the placement to get the data from :type placement: ~pacman.model.placements.Placement :param recording_region_id: desired recording data region :type recording_region_id: int :return: an array contained all the data received during the\ simulation, and a flag indicating if any data was missing :rtype: tuple(bytearray, bool) """ # Ensure that any transfers in progress are complete first if not isinstance(placement.vertex, AbstractReceiveBuffersToHost): raise NotImplementedError( "vertex {} does not implement AbstractReceiveBuffersToHost " "so no data read".format(placement.vertex)) with self._thread_lock_buffer_out: # data flush has been completed - return appropriate data (byte_array, missing) = self._received_data.get_region_data( placement.x, placement.y, placement.p, recording_region_id) return byte_array, missing
def _retreive_by_placement(self, placement, recording_region_id): """ Retrieve the data for a vertex; must be locked first. :param placement: the placement to get the data from :type placement: ~pacman.model.placements.Placement :param recording_region_id: desired recording data region :type recording_region_id: int """ recording_data_address = \ placement.vertex.get_recording_region_base_address( self._transceiver, placement) # Ensure the last sequence number sent has been retrieved if not self._received_data.is_end_buffering_sequence_number_stored( placement.x, placement.y, placement.p): self._received_data.store_end_buffering_sequence_number( placement.x, placement.y, placement.p, get_last_sequence_number( placement, self._transceiver, recording_data_address)) # Read the data if not already received if not self._received_data.is_data_from_region_flushed( placement.x, placement.y, placement.p, recording_region_id): # Read the end state of the recording for this region if not self._received_data.is_end_buffering_state_recovered( placement.x, placement.y, placement.p, recording_region_id): end_state = self._generate_end_buffering_state_from_machine( placement, get_region_pointer( placement, self._transceiver, recording_data_address, recording_region_id)) self._received_data.store_end_buffering_state( placement.x, placement.y, placement.p, recording_region_id, end_state) else: end_state = self._received_data.get_end_buffering_state( placement.x, placement.y, placement.p, recording_region_id) # current read needs to be adjusted in case the last portion of the # memory has already been read, but the HostDataRead packet has not # been processed by the chip before simulation finished. # This situation is identified by the sequence number of the last # packet sent to this core and the core internal state of the # output buffering finite state machine seq_no_last_ack_packet = \ self._received_data.last_sequence_no_for_core( placement.x, placement.y, placement.p) # get the sequence number the core was expecting to see next core_next_sequence_number = \ self._received_data.get_end_buffering_sequence_number( placement.x, placement.y, placement.p) # if the core was expecting to see our last sent sequence, # it must not have received it if core_next_sequence_number == seq_no_last_ack_packet: self._process_last_ack(placement, recording_region_id, end_state) # now state is updated, read back values for read pointer and # last operation performed last_operation = end_state.last_buffer_operation start_ptr = end_state.start_address end_ptr = end_state.end_address write_ptr = end_state.current_write read_ptr = end_state.current_read # now read_ptr is updated, check memory to read if read_ptr < write_ptr: length = write_ptr - read_ptr logger.debug( "< Reading {} bytes from {}, {}, {}: {} for region {}", length, placement.x, placement.y, placement.p, hex(read_ptr), recording_region_id) data = self._request_data( transceiver=self._transceiver, placement_x=placement.x, address=read_ptr, length=length, placement_y=placement.y) self._received_data.flushing_data_from_region( placement.x, placement.y, placement.p, recording_region_id, data) elif read_ptr > write_ptr: length = end_ptr - read_ptr if length < 0: raise ConfigurationException( "The amount of data to read is negative!") logger.debug( "> Reading {} bytes from {}, {}, {}: {} for region {}", length, placement.x, placement.y, placement.p, hex(read_ptr), recording_region_id) data = self._request_data( transceiver=self._transceiver, placement_x=placement.x, address=read_ptr, length=length, placement_y=placement.y) self._received_data.store_data_in_region_buffer( placement.x, placement.y, placement.p, recording_region_id, data) read_ptr = start_ptr length = write_ptr - read_ptr logger.debug( "Reading {} bytes from {}, {}, {}: {} for region {}", length, placement.x, placement.y, placement.p, hex(read_ptr), recording_region_id) data = self._request_data( transceiver=self._transceiver, placement_x=placement.x, address=read_ptr, length=length, placement_y=placement.y) self._received_data.flushing_data_from_region( placement.x, placement.y, placement.p, recording_region_id, data) elif (read_ptr == write_ptr and last_operation == BUFFERING_OPERATIONS.BUFFER_WRITE.value): length = end_ptr - read_ptr logger.debug( "= Reading {} bytes from {}, {}, {}: {} for region {}", length, placement.x, placement.y, placement.p, hex(read_ptr), recording_region_id) data = self._request_data( transceiver=self._transceiver, placement_x=placement.x, address=read_ptr, length=length, placement_y=placement.y) self._received_data.store_data_in_region_buffer( placement.x, placement.y, placement.p, recording_region_id, data) read_ptr = start_ptr length = write_ptr - read_ptr logger.debug( "Reading {} bytes from {}, {}, {}: {} for region {}", length, placement.x, placement.y, placement.p, hex(read_ptr), recording_region_id) data = self._request_data( transceiver=self._transceiver, placement_x=placement.x, address=read_ptr, length=length, placement_y=placement.y) self._received_data.flushing_data_from_region( placement.x, placement.y, placement.p, recording_region_id, data) elif (read_ptr == write_ptr and last_operation == BUFFERING_OPERATIONS.BUFFER_READ.value): data = bytearray() self._received_data.flushing_data_from_region( placement.x, placement.y, placement.p, recording_region_id, data) def _process_last_ack(self, placement, region_id, end_state): # if the last ACK packet has not been processed on the chip, # process it now last_sent_ack = self._received_data.last_sent_packet_to_core( placement.x, placement.y, placement.p) last_sent_ack = create_eieio_command.read_eieio_command_message( last_sent_ack.data, 0) if not isinstance(last_sent_ack, HostDataRead): raise Exception( "Something somewhere went terribly wrong; looking for a " "HostDataRead packet, while I got {0:s}".format(last_sent_ack)) start_ptr = end_state.start_address write_ptr = end_state.current_write end_ptr = end_state.end_address read_ptr = end_state.current_read for i in xrange(last_sent_ack.n_requests): in_region = region_id == last_sent_ack.region_id(i) if in_region and not end_state.is_state_updated: read_ptr += last_sent_ack.space_read(i) if (read_ptr == write_ptr or (read_ptr == end_ptr and write_ptr == start_ptr)): end_state.update_last_operation( BUFFERING_OPERATIONS.BUFFER_READ.value) if read_ptr == end_ptr: read_ptr = start_ptr elif read_ptr > end_ptr: raise Exception( "Something somewhere went terribly wrong; I was " "reading beyond the region area") end_state.update_read_pointer(read_ptr) end_state.set_update_completed() def _process_buffered_in_packet(self, packet): # pylint: disable=broad-except logger.debug( "received {} read request(s) with sequence: {}," " from chip ({},{}, core {}", packet.n_requests, packet.sequence_no, packet.x, packet.y, packet.p) try: with self._thread_lock_buffer_out: if not self._finished: self._retrieve_and_store_data(packet) except Exception: logger.warning("problem when handling data", exc_info=True) def _retrieve_and_store_data(self, packet): """ Following a SpinnakerRequestReadData packet, the data stored\ during the simulation needs to be read by the host and stored in\ a data structure, following the specifications of buffering out\ technique. :param packet: \ SpinnakerRequestReadData packet received from the SpiNNaker system :type packet:\ :py:class:`~spinnman.messages.eieio.command_messages.SpinnakerRequestReadData` :rtype: None """ x = packet.x y = packet.y p = packet.p # check packet sequence number pkt_seq = packet.sequence_no last_pkt_seq = self._received_data.last_sequence_no_for_core(x, y, p) next_pkt_seq = (last_pkt_seq + 1) % 256 if pkt_seq != next_pkt_seq: # this sequence number is incorrect # re-sent last HostDataRead packet sent last_packet_sent = self._received_data.last_sent_packet_to_core( x, y, p) if last_packet_sent is None: raise Exception( "{}, {}, {}: Something somewhere went terribly wrong - " "The packet sequence numbers have gone wrong somewhere: " "the packet sent from the board has incorrect sequence " "number, but the host never sent one acknowledge".format( x, y, p)) self._transceiver.send_sdp_message(last_packet_sent) return # read data from memory, store it and create data for return ACK packet ack_packet = self._assemble_ack_packet(x, y, p, packet, pkt_seq) # create SDP header and message return_message = SDPMessage(SDPHeader( destination_port=SDP_PORTS.OUTPUT_BUFFERING_SDP_PORT.value, destination_cpu=p, destination_chip_x=x, destination_chip_y=y, flags=SDPFlag.REPLY_NOT_EXPECTED), ack_packet.bytestring) # storage of last packet received self._received_data.store_last_received_packet_from_core( x, y, p, packet) self._received_data.update_sequence_no_for_core(x, y, p, pkt_seq) # store last sent message and send to the appropriate core self._received_data.store_last_sent_packet_to_core( x, y, p, return_message) self._transceiver.send_sdp_message(return_message) def _assemble_ack_packet(self, x, y, p, packet, pkt_seq): # pylint: disable=too-many-arguments channels = list() region_ids = list() space_read = list() for i in xrange(packet.n_requests): length = packet.space_to_be_read(i) if not length: continue start_address = packet.start_address(i) region_id = packet.region_id(i) channel = packet.channel(i) logger.debug( "Buffer receive Reading {} bytes from {}, {}, {}:" " {} for region {}, channel {}", length, x, y, p, hex(start_address), region_id, channel) # Note this *always* uses the transceiver, as fast data transfer # isn't guaranteed to work whilst a simulation is running! self._received_data.store_data_in_region_buffer( x, y, p, region_id, self._transceiver.read_memory( x, y, start_address, length)) channels.append(channel) region_ids.append(region_id) space_read.append(length) # create return acknowledge packet with data stored return HostDataRead( len(channels), pkt_seq, channels, region_ids, space_read) @property def sender_vertices(self): """ The vertices which are buffered. """ return self._sender_vertices