# spinn_utilites imports
from spinn_utilities.progress_bar import ProgressBar
# spinnman imports
from spinnman.constants import UDP_MESSAGE_MAX_SIZE
from spinnman.connections.udp_packet_connections import EIEIOConnection
from spinnman.messages.eieio.command_messages \
import EIEIOCommandMessage, StopRequests, SpinnakerRequestReadData
from spinnman.messages.eieio.command_messages \
import HostDataRead, SpinnakerRequestBuffers, PaddingRequest
from spinnman.messages.eieio.command_messages \
import HostSendSequencedData, EventStopRequest
from spinnman.utilities import utility_functions
from spinnman.messages.sdp import SDPHeader, SDPMessage, SDPFlag
from spinnman.exceptions import SpinnmanInvalidPacketException
from spinnman.messages.eieio import EIEIOType, create_eieio_command
from spinnman.messages.eieio.data_messages import EIEIODataMessage
# front end common imports
from spinn_front_end_common.utilities import helpful_functions as funs
from spinn_front_end_common.utilities import exceptions
from spinn_front_end_common.interface.buffer_management.storage_objects \
import BuffersSentDeque, BufferedReceivingData, ChannelBufferState
from spinn_front_end_common.utilities.constants \
import SDP_PORTS, BUFFERING_OPERATIONS
from .recording_utilities import TRAFFIC_IDENTIFIER, \
get_last_sequence_number, get_region_pointer
# general imports
import threading
import logging
logger = logging.getLogger(__name__)
# The minimum size of any message - this is the headers plus one entry
_MIN_MESSAGE_SIZE = EIEIODataMessage.min_packet_length(
eieio_type=EIEIOType.KEY_32_BIT, is_timestamp=True)
# The number of bytes in each key to be sent
_N_BYTES_PER_KEY = EIEIOType.KEY_32_BIT.key_bytes # @UndefinedVariable
[docs]class BufferManager(object):
""" Manager of send buffers
"""
__slots__ = [
# placements object
"_placements",
# list of tags
"_tags",
# SpiNNMan instance
"_transceiver",
# Set of (ip_address, port) that are being listened to for the tags
"_seen_tags",
# Set of vertices with buffers to be sent
"_sender_vertices",
# Dictionary of sender vertex -> buffers sent
"_sent_messages",
# storage area for received data from cores
"_received_data",
# Lock to avoid multiple messages being processed at the same time
"_thread_lock_buffer_out",
# Lock to avoid multiple messages being processed at the same time
"_thread_lock_buffer_in",
# bool flag
"_finished",
# listener port
"_listener_port",
# Store to file flag
"_store_to_file"
]
def __init__(self, placements, tags, transceiver, store_to_file=False):
"""
:param placements: The placements of the vertices
:type placements:\
:py:class:`pacman.model.placements.Placements`
:param tags: The tags assigned to the vertices
:type tags: :py:class:`pacman.model.tags.Tags`
:param transceiver: The transceiver to use for sending and receiving\
information
:type transceiver: :py:class:`spinnman.transceiver.Transceiver`
:param store_to_file: True if the data should be temporarily stored\
in a file instead of in RAM (default uses RAM)
:type store_to_file: bool
"""
self._placements = placements
self._tags = tags
self._transceiver = transceiver
# Set of (ip_address, port) that are being listened to for the tags
self._seen_tags = set()
# Set of vertices with buffers to be sent
self._sender_vertices = set()
# Dictionary of sender vertex -> buffers sent
self._sent_messages = dict()
# storage area for received data from cores
self._received_data = BufferedReceivingData(store_to_file)
self._store_to_file = store_to_file
# Lock to avoid multiple messages being processed at the same time
self._thread_lock_buffer_out = threading.Lock()
self._thread_lock_buffer_in = threading.Lock()
self._finished = False
self._listener_port = None
[docs] def receive_buffer_command_message(self, packet):
""" Handle an EIEIO command message for the buffers
:param packet: The eieio message received
:type packet:\
:py:class:`spinnman.messages.eieio.command_messages.eieio_command_message.EIEIOCommandMessage`
"""
try:
if not self._finished:
if isinstance(packet, SpinnakerRequestBuffers):
with self._thread_lock_buffer_in:
vertex = self._placements.get_vertex_on_processor(
packet.x, packet.y, packet.p)
if vertex in self._sender_vertices:
# logger.debug(
# "received send request with sequence: {1:d},"
# " space available: {0:d}".format(
# packet.space_available,
# packet.sequence_no))
# noinspection PyBroadException
try:
self._send_messages(
packet.space_available, vertex,
packet.region_id, packet.sequence_no)
except Exception:
logger.warn("problem when sending messages",
exc_info=True)
elif isinstance(packet, SpinnakerRequestReadData):
with self._thread_lock_buffer_out:
# logger.debug(
# "received {} read request(s) with sequence: {},"
# " from chip ({},{}, core {}".format(
# packet.n_requests, packet.sequence_no,
# packet.x, packet.y, packet.p))
try:
self._retrieve_and_store_data(packet)
except Exception:
logger.warn("problem when handling data",
exc_info=True)
elif isinstance(packet, EIEIOCommandMessage):
raise SpinnmanInvalidPacketException(
str(packet.__class__),
"The command packet is invalid for buffer management: "
"command id {0:d}".format(packet.eieio_header.command))
else:
raise SpinnmanInvalidPacketException(
packet.__class__,
"The command packet is invalid for buffer management")
except Exception:
logger.warn("problem when processing received packet",
exc_info=True)
def _create_connection(self, tag):
connection = self._transceiver.register_udp_listener(
self.receive_buffer_command_message, EIEIOConnection,
local_port=tag.port, local_host=tag.ip_address)
self._seen_tags.add((tag.ip_address, connection.local_port))
utility_functions.send_port_trigger_message(
connection, tag.board_address)
logger.info(
"Listening for packets using tag {} on {}:{}".format(
tag.tag, connection.local_ip_address,
connection.local_port))
return connection
def _add_buffer_listeners(self, vertex):
""" Add listeners for buffered data for the given vertex
"""
# Find a tag for receiving buffer data
tags = self._tags.get_ip_tags_for_vertex(vertex)
if tags is not None:
# locate tag associated with the buffer manager traffic
for tag in tags:
if tag.traffic_identifier == TRAFFIC_IDENTIFIER:
# If the tag port is not assigned create a connection\
# and assign the port. Note that this *should* \
# update the port number in any tags being shared
if tag.port is None:
# If connection already setup, ensure subsequent
# boards use same listener port in their tag
if self._listener_port is None:
connection = self._create_connection(tag)
tag.port = connection.local_port
self._listener_port = connection.local_port
else:
tag.port = self._listener_port
# In case we have tags with different specified ports,\
# also allow the tag to be created here
elif (tag.ip_address, tag.port) not in self._seen_tags:
self._create_connection(tag)
[docs] def add_receiving_vertex(self, vertex):
""" Add a vertex into the managed list for vertices\
which require buffers to be received from them during runtime
"""
self._add_buffer_listeners(vertex)
[docs] def add_sender_vertex(self, vertex):
""" Add a vertex into the managed list for vertices
which require buffers to be sent to them during runtime
:param vertex: the vertex to be managed
:type vertex:\
:py:class:`spinnaker.pyNN.models.abstract_models.buffer_models.AbstractSendsBuffersFromHost`
"""
self._sender_vertices.add(vertex)
self._add_buffer_listeners(vertex)
[docs] def load_initial_buffers(self):
""" Load the initial buffers for the senders using mem writes
"""
total_data = 0
for vertex in self._sender_vertices:
for region in vertex.get_regions():
total_data += vertex.get_region_buffer_size(region)
progress = ProgressBar(
total_data, "Loading buffers ({} bytes)".format(total_data))
for vertex in self._sender_vertices:
for region in vertex.get_regions():
self._send_initial_messages(vertex, region, progress)
progress.end()
[docs] def reset(self):
""" Resets the buffered regions to start transmitting from the\
beginning of its expected regions and clears the buffered out data\
files
"""
# reset buffered out
self._received_data = BufferedReceivingData(self._store_to_file)
# rewind buffered in
for vertex in self._sender_vertices:
for region in vertex.get_regions():
vertex.rewind(region)
[docs] def resume(self):
""" Resets any data structures needed before starting running again
"""
# update the received data items
self._received_data.resume()
[docs] def clear_recorded_data(self, x, y, p, recording_region_id):
""" Removes the recorded data stored in memory.
:param x: placement x coord
:param y: placement y coord
:param p: placement p coord
:param recording_region_id: the recording region id
"""
self._received_data.clear(x, y, p, recording_region_id)
def _generate_end_buffering_state_from_machine(
self, placement, state_region_base_address):
# retrieve channel state memory area
channel_state_data = str(self._transceiver.read_memory(
placement.x, placement.y, state_region_base_address,
ChannelBufferState.size_of_channel_state()))
return ChannelBufferState.create_from_bytearray(channel_state_data)
def _create_message_to_send(self, size, vertex, region):
""" Creates a single message to send with the given boundaries.
:param size: The number of bytes available for the whole packet
:type size: int
:param vertex: The vertex to get the keys from
:type vertex:\
:py:class:`spynnaker.pyNN.models.abstract_models.buffer_models.AbstractSendsBuffersFromHost`
:param region: The region of the vertex to get keys from
:type region: int
:return: A new message, or None if no keys can be added
:rtype: None or\
:py:class:`spinnman.messages.eieio.data_messages.EIEIODataMessage`
"""
# If there are no more messages to send, return None
if not vertex.is_next_timestamp(region):
return None
# Create a new message
next_timestamp = vertex.get_next_timestamp(region)
message = EIEIODataMessage.create(
EIEIOType.KEY_32_BIT, timestamp=next_timestamp)
# If there is no room for the message, return None
if message.size + _N_BYTES_PER_KEY > size:
return None
# Add keys up to the limit
bytes_to_go = size - message.size
while (bytes_to_go >= _N_BYTES_PER_KEY and
vertex.is_next_key(region, next_timestamp)):
key = vertex.get_next_key(region)
message.add_key(key)
bytes_to_go -= _N_BYTES_PER_KEY
return message
def _send_initial_messages(self, vertex, region, progress):
""" Send the initial set of messages
:param vertex: The vertex to get the keys from
:type vertex:\
:py:class:`spynnaker.pyNN.models.abstract_models.buffer_models.AbstractSendsBuffersFromHost`
:param region: The region to get the keys from
:type region: int
:return: A list of messages
:rtype: list of\
:py:class:`spinnman.messages.eieio.data_messages.EIEIODataMessage`
"""
# Get the vertex load details
# region_base_address = self._locate_region_address(region, vertex)
region_base_address = funs.locate_memory_region_for_placement(
self._placements.get_placement_of_vertex(vertex), region,
self._transceiver)
placement = self._placements.get_placement_of_vertex(vertex)
# Add packets until out of space
sent_message = False
bytes_to_go = vertex.get_region_buffer_size(region)
if bytes_to_go % 2 != 0:
raise exceptions.SpinnFrontEndException(
"The buffer region of {} must be divisible by 2".format(
vertex))
all_data = ""
if vertex.is_empty(region):
sent_message = True
else:
min_size_of_packet = _MIN_MESSAGE_SIZE
while (vertex.is_next_timestamp(region) and
bytes_to_go > min_size_of_packet):
space_available = min(bytes_to_go, 280)
next_message = self._create_message_to_send(
space_available, vertex, region)
if next_message is None:
break
# Write the message to the memory
data = next_message.bytestring
all_data += data
sent_message = True
# Update the positions
bytes_to_go -= len(data)
progress.update(len(data))
if not sent_message:
raise exceptions.BufferableRegionTooSmall(
"The buffer size {} is too small for any data to be added for"
" region {} of vertex {}".format(bytes_to_go, region, vertex))
# If there are no more messages and there is space, add a stop request
if (not vertex.is_next_timestamp(region) and
bytes_to_go >= EventStopRequest.get_min_packet_length()):
data = EventStopRequest().bytestring
# logger.debug(
# "Writing stop message of {} bytes to {} on {}, {}, {}".format(
# len(data), hex(region_base_address),
# placement.x, placement.y, placement.p))
all_data += data
bytes_to_go -= len(data)
progress.update(len(data))
self._sent_messages[vertex] = BuffersSentDeque(
region, sent_stop_message=True)
# If there is any space left, add padding
if bytes_to_go > 0:
padding_packet = PaddingRequest()
n_packets = bytes_to_go / padding_packet.get_min_packet_length()
data = padding_packet.bytestring
data *= n_packets
all_data += data
# Do the writing all at once for efficiency
self._transceiver.write_memory(
placement.x, placement.y, region_base_address, all_data)
def _send_messages(self, size, vertex, region, sequence_no):
""" Send a set of messages
"""
# Get the sent messages for the vertex
if vertex not in self._sent_messages:
self._sent_messages[vertex] = BuffersSentDeque(region)
sent_messages = self._sent_messages[vertex]
# If the sequence number is outside the window, return no messages
if not sent_messages.update_last_received_sequence_number(sequence_no):
return list()
# Remote the existing packets from the size available
bytes_to_go = size
for message in sent_messages.messages:
if isinstance(message.eieio_data_message, EIEIODataMessage):
bytes_to_go -= message.eieio_data_message.size
else:
bytes_to_go -= (message.eieio_data_message
.get_min_packet_length())
# Add messages up to the limits
while (vertex.is_next_timestamp(region) and
not sent_messages.is_full and bytes_to_go > 0):
space_available = min(
bytes_to_go,
UDP_MESSAGE_MAX_SIZE -
HostSendSequencedData.get_min_packet_length())
# logger.debug(
# "Bytes to go {}, space available {}".format(
# bytes_to_go, space_available))
next_message = self._create_message_to_send(
space_available, vertex, region)
if next_message is None:
break
sent_messages.add_message_to_send(next_message)
bytes_to_go -= next_message.size
# logger.debug("Adding additional buffer of {} bytes".format(
# next_message.size))
# If the vertex is empty, send the stop messages if there is space
if (not sent_messages.is_full and
not vertex.is_next_timestamp(region) and
bytes_to_go >= EventStopRequest.get_min_packet_length()):
sent_messages.send_stop_message()
# If there are no more messages, turn off requests for more messages
if not vertex.is_next_timestamp(region) and sent_messages.is_empty():
# logger.debug("Sending stop")
self._send_request(vertex, StopRequests())
# Send the messages
for message in sent_messages.messages:
# logger.debug("Sending message with sequence {}".format(
# message.sequence_no))
self._send_request(vertex, message)
def _send_request(self, vertex, message):
""" Sends a request
:param vertex: The vertex to send to
:param message: The message to send
"""
placement = self._placements.get_placement_of_vertex(vertex)
sdp_header = SDPHeader(
destination_chip_x=placement.x, destination_chip_y=placement.y,
destination_cpu=placement.p, flags=SDPFlag.REPLY_NOT_EXPECTED,
destination_port=SDP_PORTS.INPUT_BUFFERING_SDP_PORT.value)
sdp_message = SDPMessage(sdp_header, message.bytestring)
self._transceiver.send_sdp_message(sdp_message)
[docs] def stop(self):
""" Indicates that the simulation has finished, so no further\
outstanding requests need to be processed
"""
with self._thread_lock_buffer_in:
with self._thread_lock_buffer_out:
self._finished = True
[docs] def get_data_for_vertex(self, placement, recording_region_id):
""" Get a pointer to the data container for all the data retrieved\
during the simulation from a specific region area of a core
:param placement: the placement to get the data from
:type placement: pacman.model.placements.Placement
:param recording_region_id: desired recording data region
:type recording_region_id: int
:return: pointer to a class which inherits from\
AbstractBufferedDataStorage
:rtype:\
:py:class:`spinn_front_end_common.interface.buffer_management.buffer_models.AbstractBufferedDataStorage`
"""
recording_data_address = \
placement.vertex.get_recording_region_base_address(
self._transceiver, placement)
# Ensure the last sequence number sent has been retrieved
if not self._received_data.is_end_buffering_sequence_number_stored(
placement.x, placement.y, placement.p):
self._received_data.store_end_buffering_sequence_number(
placement.x, placement.y, placement.p,
get_last_sequence_number(
placement, self._transceiver, recording_data_address))
# Read the data if not already received
if not self._received_data.is_data_from_region_flushed(
placement.x, placement.y, placement.p,
recording_region_id):
# Read the end state of the recording for this region
if not self._received_data.is_end_buffering_state_recovered(
placement.x, placement.y, placement.p,
recording_region_id):
end_state_address = get_region_pointer(
placement, self._transceiver, recording_data_address,
recording_region_id)
end_state = self._generate_end_buffering_state_from_machine(
placement, end_state_address)
self._received_data.store_end_buffering_state(
placement.x, placement.y, placement.p, recording_region_id,
end_state)
else:
end_state = self._received_data.\
get_end_buffering_state(
placement.x, placement.y, placement.p,
recording_region_id)
start_ptr = end_state.start_address
write_ptr = end_state.current_write
end_ptr = end_state.end_address
read_ptr = end_state.current_read
# current read needs to be adjusted in case the last portion of the
# memory has already been read, but the HostDataRead packet has not
# been processed by the chip before simulation finished
# This situation is identified by the sequence number of the last
# packet sent to this core and the core internal state of the
# output buffering finite state machine
seq_no_last_ack_packet = \
self._received_data.last_sequence_no_for_core(
placement.x, placement.y, placement.p)
# get the last sequence number
last_sequence_number = \
self._received_data.get_end_buffering_sequence_number(
placement.x, placement.y, placement.p)
if last_sequence_number == seq_no_last_ack_packet:
# if the last ACK packet has not been processed on the chip,
# process it now
last_sent_ack_sdp_packet = \
self._received_data.last_sent_packet_to_core(
placement.x, placement.y, placement.p)
last_sent_ack_packet = \
create_eieio_command.read_eieio_command_message(
last_sent_ack_sdp_packet.data, 0)
if not isinstance(last_sent_ack_packet, HostDataRead):
raise Exception(
"Something somewhere went terribly wrong - "
"I was looking for a HostDataRead packet, "
"while I got {0:s}".format(last_sent_ack_packet))
for i in xrange(last_sent_ack_packet.n_requests):
last_ack_packet_is_of_this_region = \
recording_region_id == \
last_sent_ack_packet.region_id(i)
if (last_ack_packet_is_of_this_region and
not end_state.is_state_updated):
read_ptr += last_sent_ack_packet.space_read(i)
if (read_ptr == write_ptr or
(read_ptr == end_ptr and
write_ptr == start_ptr)):
end_state.update_last_operation(
BUFFERING_OPERATIONS.BUFFER_READ.value)
if read_ptr == end_ptr:
read_ptr = start_ptr
elif read_ptr > end_ptr:
raise Exception(
"Something somewhere went terribly wrong - "
"I was reading beyond the region area some "
"unknown data".format(
last_sent_ack_packet))
end_state.update_read_pointer(read_ptr)
end_state.set_update_completed()
# now state is updated, read back values for read pointer and
# last operation performed
last_operation = end_state.last_buffer_operation
read_ptr = end_state.current_read
# now read_ptr is updated, check memory to read
if read_ptr < write_ptr:
length = write_ptr - read_ptr
data = self._transceiver.read_memory(
placement.x, placement.y, read_ptr, length)
self._received_data.flushing_data_from_region(
placement.x, placement.y, placement.p, recording_region_id,
data)
elif read_ptr > write_ptr:
length = end_ptr - read_ptr
if length < 0:
raise exceptions.ConfigurationException(
"The amount of data to read is negative!")
data = self._transceiver.read_memory(
placement.x, placement.y, read_ptr, length)
self._received_data.store_data_in_region_buffer(
placement.x, placement.y, placement.p, recording_region_id,
data)
read_ptr = start_ptr
length = write_ptr - read_ptr
data = self._transceiver.read_memory(
placement.x, placement.y, read_ptr, length)
self._received_data.flushing_data_from_region(
placement.x, placement.y, placement.p, recording_region_id,
data)
elif (read_ptr == write_ptr and
last_operation == BUFFERING_OPERATIONS.BUFFER_WRITE.value):
length = end_ptr - read_ptr
data = self._transceiver.read_memory(
placement.x, placement.y, read_ptr, length)
self._received_data.store_data_in_region_buffer(
placement.x, placement.y, placement.p, recording_region_id,
data)
read_ptr = start_ptr
length = write_ptr - read_ptr
data = self._transceiver.read_memory(
placement.x, placement.y, read_ptr, length)
self._received_data.flushing_data_from_region(
placement.x, placement.y, placement.p, recording_region_id,
data)
elif (read_ptr == write_ptr and
last_operation == BUFFERING_OPERATIONS.BUFFER_READ.value):
data = bytearray()
self._received_data.flushing_data_from_region(
placement.x, placement.y, placement.p, recording_region_id,
data)
# data flush has been completed - return appropriate data
# the two returns can be exchanged - one returns data and the other
# returns a pointer to the structure holding the data
return self._received_data.get_region_data_pointer(
placement.x, placement.y, placement.p, recording_region_id)
def _retrieve_and_store_data(self, packet):
""" Following a SpinnakerRequestReadData packet, the data stored\
during the simulation needs to be read by the host and stored in a\
data structure, following the specifications of buffering out\
technique
:param packet: SpinnakerRequestReadData packet received from the\
SpiNNaker system
:type packet:\
:py:class:`spinnman.messages.eieio.command_messages.spinnaker_request_read_data.SpinnakerRequestReadData`
:rtype: None
"""
x = packet.x
y = packet.y
p = packet.p
# check packet sequence number
pkt_seq = packet.sequence_no
last_pkt_seq = self._received_data.last_sequence_no_for_core(x, y, p)
next_pkt_seq = (last_pkt_seq + 1) % 256
if pkt_seq != next_pkt_seq:
# this sequence number is incorrect
# re-sent last HostDataRead packet sent
last_packet_sent = self._received_data.last_sent_packet_to_core(
x, y, p)
if last_packet_sent is not None:
self._transceiver.send_sdp_message(last_packet_sent)
else:
raise Exception(
"{}, {}, {}: Something somewhere went terribly wrong - "
"The packet sequence numbers have gone wrong "
"somewhere: the packet sent from the board "
"has incorrect sequence number, but the host "
"never sent one acknowledge".format(x, y, p))
return
# read data from memory, store it and create data for return ACK packet
n_requests = packet.n_requests
new_channel = list()
new_region_id = list()
new_space_read = list()
new_n_requests = 0
for i in xrange(n_requests):
length = packet.space_to_be_read(i)
if length > 0:
new_n_requests += 1
start_address = packet.start_address(i)
region_id = packet.region_id(i)
channel = packet.channel(i)
data = self._transceiver.read_memory(
x, y, start_address, length)
self._received_data.store_data_in_region_buffer(
x, y, p, region_id, data)
new_channel.append(channel)
new_region_id.append(region_id)
new_space_read.append(length)
# create return acknowledge packet with data stored
ack_packet = HostDataRead(
new_n_requests, pkt_seq, new_channel, new_region_id,
new_space_read)
ack_packet_data = ack_packet.bytestring
# create SDP header and message
return_message_header = SDPHeader(
destination_port=SDP_PORTS.OUTPUT_BUFFERING_SDP_PORT.value,
destination_cpu=p, destination_chip_x=x, destination_chip_y=y,
flags=SDPFlag.REPLY_NOT_EXPECTED)
return_message = SDPMessage(return_message_header, ack_packet_data)
# storage of last packet received
self._received_data.store_last_received_packet_from_core(
x, y, p, packet)
self._received_data.update_sequence_no_for_core(x, y, p, pkt_seq)
# store last sent message and send to the appropriate core
self._received_data.store_last_sent_packet_to_core(
x, y, p, return_message)
self._transceiver.send_sdp_message(return_message)
@property
def sender_vertices(self):
""" The vertices which are buffered
"""
return self._sender_vertices
@property
def reload_buffer_files(self):
""" The file paths for each buffered region for each sender vertex
"""
return self._reload_buffer_file_paths