Source code for spinn_front_end_common.utility_models.data_speed_up_packet_gatherer_machine_vertex

# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from collections import defaultdict
import os
import datetime
import logging
import time
import struct
import sys
from enum import Enum
from six.moves import xrange
from six import reraise, PY2
from spinn_utilities.overrides import overrides
from spinn_utilities.log import FormatAdapter
from spinnman.exceptions import SpinnmanTimeoutException
from spinnman.messages.sdp import SDPMessage, SDPHeader, SDPFlag
from spinnman.messages.scp.impl.iptag_set import IPTagSet
from spinnman.connections.udp_packet_connections import SCAMPConnection
from spinnman.model.enums.cpu_state import CPUState
from pacman.executor.injection_decorator import inject_items
from pacman.model.graphs.common import EdgeTrafficType
from pacman.model.graphs.machine import MachineVertex
from pacman.model.resources import (
    ConstantSDRAM, IPtagResource, ResourceContainer)
from spinn_storage_handlers import FileDataReader
from spinn_front_end_common.utilities.globals_variables import get_simulator
from spinn_front_end_common.utilities.helpful_functions import (
    convert_vertices_to_core_subset, emergency_recover_state_from_failure)
from spinn_front_end_common.abstract_models import (
    AbstractHasAssociatedBinary, AbstractGeneratesDataSpecification)
from spinn_front_end_common.interface.provenance import (
    AbstractProvidesLocalProvenanceData)
from spinn_front_end_common.utilities.utility_objs import (
    ExecutableType, ProvenanceDataItem)
from spinn_front_end_common.utilities.constants import (
    SDP_PORTS, SYSTEM_BYTES_REQUIREMENT, SIMULATION_N_BYTES, BYTES_PER_WORD,
    BYTES_PER_KB)
from spinn_front_end_common.utilities.exceptions import SpinnFrontEndException
from spinn_front_end_common.interface.simulation import simulation_utilities

log = FormatAdapter(logging.getLogger(__name__))
TIMEOUT_RETRY_LIMIT = 20
TIMEOUT_MESSAGE = "Failed to hear from the machine during {} attempts. "\
    "Please try removing firewalls."
_MINOR_LOSS_MESSAGE = (
    "During the extraction of data of {} bytes from memory address {}, "
    "attempt {} had {} sequences that were lost.")
_MINOR_LOSS_THRESHOLD = 10
_MAJOR_LOSS_MESSAGE = (
    "During the extraction of data from chip {}, there were {} cases of "
    "serious data loss. The system recovered, but the speed of download "
    "was compromised. Reduce the number of executing applications and remove "
    "routers between yourself and the SpiNNaker machine to reduce the chance "
    "of this occurring.")
_MAJOR_LOSS_THRESHOLD = 100

#: number of items used up by the retransmit code for its header
SDP_RETRANSMISSION_HEADER_SIZE = 2

#: size of config region in bytes
CONFIG_SIZE = 4 * BYTES_PER_WORD

#: items of data a SDP packet can hold when SCP header removed
WORDS_PER_FULL_PACKET = 68  # 272 bytes as removed SCP header

#: size of items the sequence number uses
SEQUENCE_NUMBER_SIZE_IN_ITEMS = 1

#: items of data from SDP packet with a sequence number
WORDS_PER_FULL_PACKET_WITH_SEQUENCE_NUM = \
    WORDS_PER_FULL_PACKET - SEQUENCE_NUMBER_SIZE_IN_ITEMS

# points where SDP beats data speed up due to overheads
THRESHOLD_WHERE_SDP_BETTER_THAN_DATA_EXTRACTOR_IN_BYTES = 40000
THRESHOLD_WHERE_SDP_BETTER_THAN_DATA_INPUT_IN_BYTES = 300

#: offset where data in starts on first command
#: (command, base_address, x&y, max_seq_number)
WORDS_FOR_COMMAND_AND_ADDRESS_HEADER = 4
BYTES_FOR_COMMAND_AND_ADDRESS_HEADER = (
    WORDS_FOR_COMMAND_AND_ADDRESS_HEADER * BYTES_PER_WORD)

#: offset where data starts after a command id and seq number
WORDS_FOR_COMMAND_AND_SEQ_HEADER = 2
BYTES_FOR_COMMAND_AND_SEQ_HEADER = (
    WORDS_FOR_COMMAND_AND_SEQ_HEADER * BYTES_PER_WORD)

#: size for data to store when first packet with command and address
WORDS_IN_FULL_PACKET_WITH_ADDRESS = (
    WORDS_PER_FULL_PACKET - WORDS_FOR_COMMAND_AND_ADDRESS_HEADER)
BYTES_IN_FULL_PACKET_WITH_ADDRESS = (
    WORDS_IN_FULL_PACKET_WITH_ADDRESS * BYTES_PER_WORD)

#: size for data in to store when not first packet
WORDS_IN_FULL_PACKET_WITHOUT_ADDRESS = (
    WORDS_PER_FULL_PACKET - WORDS_FOR_COMMAND_AND_SEQ_HEADER)
BYTES_IN_FULL_PACKET_WITHOUT_ADDRESS = (
    WORDS_IN_FULL_PACKET_WITHOUT_ADDRESS * BYTES_PER_WORD)

#: size of data in key space;
#: x, y, key (all ints) for possible 48 chips,
SIZE_DATA_IN_CHIP_TO_KEY_SPACE = (3 * 48 + 1) * BYTES_PER_WORD


class _DATA_REGIONS(Enum):
    """DSG data regions"""
    SYSTEM = 0
    CONFIG = 1
    CHIP_TO_KEY_SPACE = 2


class DATA_OUT_COMMANDS(Enum):
    """command IDs for the SDP packets for data out"""
    START_SENDING = 100
    START_MISSING_SEQ = 1000
    MISSING_SEQ = 1001
    CLEAR = 2000


class DATA_IN_COMMANDS(Enum):
    """command IDs for the SDP packets for data in"""
    SEND_DATA_TO_LOCATION = 200
    SEND_SEQ_DATA = 2000
    SEND_DONE = 2002
    RECEIVE_FIRST_MISSING_SEQ = 2003
    RECEIVE_MISSING_SEQ_DATA = 2004
    RECEIVE_FINISHED = 2005


# precompiled structures
_ONE_WORD = struct.Struct("<I")
_TWO_WORDS = struct.Struct("<II")
_THREE_WORDS = struct.Struct("<III")
_FOUR_WORDS = struct.Struct("<IIII")

# Set to true to check that the data is correct after it has been sent in.
# This is expensive, and only works in Python 3.5 or later.
VERIFY_SENT_DATA = False


def ceildiv(dividend, divisor):
    """ How to divide two possibly-integer numbers and round up.
    """
    assert divisor > 0
    q, r = divmod(dividend, divisor)
    return int(q) + (r != 0)


# SDRAM requirement for storing missing SDP packets seq nums
SDRAM_FOR_MISSING_SDP_SEQ_NUMS = ceildiv(
    120.0 * 1024 * BYTES_PER_KB,
    WORDS_PER_FULL_PACKET_WITH_SEQUENCE_NUM * BYTES_PER_WORD)


[docs]class DataSpeedUpPacketGatherMachineVertex( MachineVertex, AbstractGeneratesDataSpecification, AbstractHasAssociatedBinary, AbstractProvidesLocalProvenanceData): __slots__ = [ "_x", "_y", "_app_id", "_connection", # store of the extra monitors to location. helpful in data in "_extra_monitors_by_chip", # boolean tracker for handling out of order packets "_have_received_missing_seq_count_packet", # path for the data in report "_in_report_path", "_ip_address", # store for the last reinjection status "_last_status", # the max seq num expected given a data retrieval "_max_seq_num", # holder for missing seq nums for data in "_missing_seq_nums_data_in", # holder of data from out "_output", # my placement for future lookup "_placement", # provenance holder "_provenance_data_items", # Count of the runs for provenance data "_run", "_remote_tag", # path to the data out report "_out_report_path", # tracker for expected missing seq nums "_total_expected_missing_seq_packets", "_write_data_speed_up_reports", # data holder for output "_view"] #: base key (really nasty hack to tie in fixed route keys) BASE_KEY = 0xFFFFFFF9 NEW_SEQ_KEY = 0xFFFFFFF8 FIRST_DATA_KEY = 0xFFFFFFF7 END_FLAG_KEY = 0xFFFFFFF6 #: to use with multicast stuff BASE_MASK = 0xFFFFFFFB NEW_SEQ_KEY_OFFSET = 1 FIRST_DATA_KEY_OFFSET = 2 END_FLAG_KEY_OFFSET = 3 # throttle on the transmission _TRANSMISSION_THROTTLE_TIME = 0.000001 # TRAFFIC_TYPE = EdgeTrafficType.MULTICAST TRAFFIC_TYPE = EdgeTrafficType.FIXED_ROUTE #: report name for tracking used routers OUT_REPORT_NAME = "routers_used_in_speed_up_process.rpt" #: report name for tracking performance gains IN_REPORT_NAME = "speeds_gained_in_speed_up_process.rpt" # the end flag is set when the high bit of the sequence number word is set _LAST_MESSAGE_FLAG_BIT_MASK = 0x80000000 # corresponding mask for the actual sequence numbers _SEQUENCE_NUMBER_MASK = 0x7fffffff # time outs used by the protocol for separate bits _TIMEOUT_PER_RECEIVE_IN_SECONDS = 1 _TIMEOUT_FOR_SENDING_IN_SECONDS = 0.01 # end flag for missing seq nums _MISSING_SEQ_NUMS_END_FLAG = 0xFFFFFFFF _ADDRESS_PACKET_BYTE_FORMAT = struct.Struct( "<{}B".format(BYTES_IN_FULL_PACKET_WITH_ADDRESS)) # Router timeouts, in mantissa,exponent form. See datasheet for details _LONG_TIMEOUT = (14, 14) _SHORT_TIMEOUT = (1, 1) _TEMP_TIMEOUT = (15, 4) _ZERO_TIMEOUT = (0, 0) # Initial port for the reverse IP tag (to be replaced later) _TAG_INITIAL_PORT = 10000 def __init__( self, x, y, extra_monitors_by_chip, ip_address, report_default_directory, write_data_speed_up_reports, constraints=None): """ :param x: Where this gatherer is. :type x: int :param y: Where this gatherer is. :type y: int :param extra_monitors_by_chip: UNUSED :type extra_monitors_by_chip: \ dict(tuple(int,int), ExtraMonitorSupportMachineVertex) :param ip_address: \ How to talk directly to the chip where the gatherer is. :type ip_address: str :param report_default_directory: Where reporting is done. :type report_default_directory: str :param write_data_speed_up_reports: \ Whether to write low-level reports on data transfer speeds. :type write_data_speed_up_reports: bool :param constraints: :type constraints: \ iterable(~pacman.model.constraints.AbstractConstraint) """ super(DataSpeedUpPacketGatherMachineVertex, self).__init__( label="SYSTEM:PacketGatherer({},{})".format(x, y), constraints=constraints) # data holders for the output, and sequence numbers self._view = None self._max_seq_num = None self._output = None # store of the extra monitors to location. helpful in data in self._extra_monitors_by_chip = extra_monitors_by_chip self._total_expected_missing_seq_packets = 0 self._have_received_missing_seq_count_packet = False self._missing_seq_nums_data_in = list() self._missing_seq_nums_data_in.append(list()) # Create a connection to be used self._x = x self._y = y self._ip_address = ip_address self._remote_tag = None self._connection = None # local provenance storage self._provenance_data_items = defaultdict(list) self._run = 0 self._placement = None self._app_id = None # create report if it doesn't already exist self._out_report_path = \ os.path.join(report_default_directory, self.OUT_REPORT_NAME) self._in_report_path = \ os.path.join(report_default_directory, self.IN_REPORT_NAME) self._write_data_speed_up_reports = write_data_speed_up_reports # Stored reinjection status for resetting timeouts self._last_status = None def __throttled_send(self, message): """ slows down transmissions to allow spinnaker to keep up. :param message: message to send :param connection: the connection to send down :rtype: None """ # send first message self._connection.send_sdp_message(message) time.sleep(self._TRANSMISSION_THROTTLE_TIME) @property @overrides(MachineVertex.resources_required) def resources_required(self): return self.static_resources_required()
[docs] @staticmethod def static_resources_required(): return ResourceContainer( sdram=ConstantSDRAM( SYSTEM_BYTES_REQUIREMENT + CONFIG_SIZE + SDRAM_FOR_MISSING_SDP_SEQ_NUMS + SIZE_DATA_IN_CHIP_TO_KEY_SPACE), iptags=[IPtagResource( port=DataSpeedUpPacketGatherMachineVertex._TAG_INITIAL_PORT, strip_sdp=True, ip_address="localhost", traffic_identifier="DATA_SPEED_UP")])
[docs] @overrides(AbstractHasAssociatedBinary.get_binary_start_type) def get_binary_start_type(self): return ExecutableType.SYSTEM
[docs] @inject_items({ "machine_graph": "MemoryMachineGraph", "routing_info": "MemoryRoutingInfos", "tags": "MemoryTags", "machine_time_step": "MachineTimeStep", "time_scale_factor": "TimeScaleFactor", "mc_data_chips_to_keys": "DataInMulticastKeyToChipMap", "machine": "MemoryExtendedMachine", "app_id": "APPID" }) @overrides( AbstractGeneratesDataSpecification.generate_data_specification, additional_arguments={ "machine_graph", "routing_info", "tags", "machine_time_step", "time_scale_factor", "mc_data_chips_to_keys", "machine", "app_id" }) def generate_data_specification( self, spec, placement, machine_graph, routing_info, tags, machine_time_step, time_scale_factor, mc_data_chips_to_keys, machine, app_id): """ :param machine_graph: (injected) :type machine_graph: ~pacman.model.graphs.machine.MachineGraph :param routing_info: (injected) :type routing_info: ~pacman.model.routing_info.RoutingInfo :param tags: (injected) :type tags: ~pacman.model.tags.Tags :param machine_time_step: (injected) :type machine_time_step: int :param time_scale_factor: (injected) :type time_scale_factor: int :param mc_data_chips_to_keys: (injected) :type mc_data_chips_to_keys: dict(tuple(int,int), int) :param machine: (injected) :type machine: ~spinn_machine.Machine :param app_id: (injected) :type app_id: int """ # pylint: disable=too-many-arguments, arguments-differ # update my placement for future knowledge self._placement = placement self._app_id = app_id # Create the data regions for hello world self._reserve_memory_regions(spec) # write data for the simulation data item spec.switch_write_focus(_DATA_REGIONS.SYSTEM.value) spec.write_array(simulation_utilities.get_simulation_header_array( self.get_binary_file_name(), machine_time_step, time_scale_factor)) # the keys for the special cases if self.TRAFFIC_TYPE == EdgeTrafficType.MULTICAST: base_key = routing_info.get_first_key_for_edge( list(machine_graph.get_edges_ending_at_vertex(self))[0]) new_seq_key = base_key + self.NEW_SEQ_KEY_OFFSET first_data_key = base_key + self.FIRST_DATA_KEY_OFFSET end_flag_key = base_key + self.END_FLAG_KEY_OFFSET else: new_seq_key = self.NEW_SEQ_KEY first_data_key = self.FIRST_DATA_KEY end_flag_key = self.END_FLAG_KEY spec.switch_write_focus(_DATA_REGIONS.CONFIG.value) spec.write_value(new_seq_key) spec.write_value(first_data_key) spec.write_value(end_flag_key) # locate the tag ID for our data and update with a port # Note: The port doesn't matter as we are going to override this later iptags = tags.get_ip_tags_for_vertex(self) iptag = iptags[0] spec.write_value(iptag.tag) self._remote_tag = iptag.tag # write mc chip key map spec.switch_write_focus(_DATA_REGIONS.CHIP_TO_KEY_SPACE.value) chips_on_board = list(machine.get_existing_xys_on_board( machine.get_chip_at(placement.x, placement.y))) # write how many chips to read spec.write_value(len(chips_on_board)) # write each chip x and y and base key for chip_xy in chips_on_board: board_chip_x, board_chip_y = machine.get_local_xy( machine.get_chip_at(*chip_xy)) spec.write_value(board_chip_x) spec.write_value(board_chip_y) spec.write_value(mc_data_chips_to_keys[chip_xy]) log.debug("for chip {}:{} base key is {}", chip_xy[0], chip_xy[1], mc_data_chips_to_keys[chip_xy]) # End-of-Spec: spec.end_specification()
@staticmethod def _reserve_memory_regions(spec): """ Writes the DSG regions memory sizes. Static so that it can be used\ by the application vertex. :param spec: spec file :param system_size: size of system region :rtype: None """ spec.reserve_memory_region( region=_DATA_REGIONS.SYSTEM.value, size=SIMULATION_N_BYTES, label='systemInfo') spec.reserve_memory_region( region=_DATA_REGIONS.CONFIG.value, size=CONFIG_SIZE, label="config") spec.reserve_memory_region( region=_DATA_REGIONS.CHIP_TO_KEY_SPACE.value, size=SIZE_DATA_IN_CHIP_TO_KEY_SPACE, label="mc_key_map")
[docs] @overrides(AbstractHasAssociatedBinary.get_binary_file_name) def get_binary_file_name(self): return "data_speed_up_packet_gatherer.aplx"
[docs] @overrides(AbstractProvidesLocalProvenanceData.get_local_provenance_data) def get_local_provenance_data(self): self._run += 1 prov_items = list() significant_losses = defaultdict(list) for (placement, memory_address, length_in_bytes) in \ self._provenance_data_items.keys(): # handle duplicates of the same calls times_extracted_the_same_thing = 0 top_level_name = "Provenance_for_{}".format(self._label) for time_taken, lost_seq_nums in self._provenance_data_items[ placement, memory_address, length_in_bytes]: # handle time chip_name = "chip{}:{}".format(placement.x, placement.y) last_name = "Memory_address:{}:Length_in_bytes:{}"\ .format(memory_address, length_in_bytes) if times_extracted_the_same_thing == 0: iteration_name = "run{}".format( self._run) else: iteration_name = "run{}iteration{}".format( self._run, times_extracted_the_same_thing) prov_items.append(ProvenanceDataItem( [top_level_name, "extraction_time", chip_name, last_name, iteration_name], time_taken, report=False, message=None)) times_extracted_the_same_thing += 1 # handle lost sequence numbers for i, n_lost_seq_nums in enumerate(lost_seq_nums): # Zeroes are not reported at all if n_lost_seq_nums: prov_items.append(ProvenanceDataItem( [top_level_name, "lost_seq_nums", chip_name, last_name, iteration_name, "iteration_{}".format(i)], n_lost_seq_nums, report=( n_lost_seq_nums > _MINOR_LOSS_THRESHOLD), message=_MINOR_LOSS_MESSAGE.format( length_in_bytes, memory_address, i, n_lost_seq_nums))) if n_lost_seq_nums > _MAJOR_LOSS_THRESHOLD: significant_losses[placement.x, placement.y] += [i] for chip in significant_losses: n_times = len(significant_losses[chip]) chip_name = "chip{}:{}".format(*chip) prov_items.append(ProvenanceDataItem( [top_level_name, "serious_lost_seq_num_count", chip_name], n_times, report=True, message=_MAJOR_LOSS_MESSAGE.format( chip, n_times))) self._provenance_data_items = defaultdict(list) return prov_items
[docs] @staticmethod def locate_correct_write_data_function_for_chip_location( uses_advanced_monitors, machine, x, y, transceiver, extra_monitor_cores_to_ethernet_connection_map): """ supports other components figuring out which gather and function \ to call for writing data onto spinnaker :param uses_advanced_monitors: \ Whether the system is using advanced monitors :type uses_advanced_monitors: bool :param machine: the SpiNNMachine instance :type machine: ~spinn_machine.Machine :param x: the chip x coordinate to write data to :type x: int :param y: the chip y coordinate to write data to :type y: int :param transceiver: the SpiNNMan instance :type transceiver: ~spinnman.transceiver.Transceiver :param extra_monitor_cores_to_ethernet_connection_map: \ mapping between cores and connections :type extra_monitor_cores_to_ethernet_connection_map: \ dict(tuple(int,int), DataSpeedUpPacketGatherMachineVertex) :return: a write function of either a LPG or the spinnMan :rtype: callable """ if not uses_advanced_monitors: return transceiver.write_memory chip = machine.get_chip_at(x, y) ethernet_connected_chip = machine.get_chip_at( chip.nearest_ethernet_x, chip.nearest_ethernet_y) gatherer = extra_monitor_cores_to_ethernet_connection_map[ ethernet_connected_chip.x, ethernet_connected_chip.y] return gatherer.send_data_into_spinnaker
def _generate_data_in_report( self, time_diff, data_size, x, y, address_written_to, missing_seq_nums): """ writes the data in report for this stage :param time_took_ms: the time taken to write the memory :param data_size: the size of data that was written in bytes :param x: the location in machine where the data was written to X axis :param y: the location in machine where the data was written to Y axis :param address_written_to: where in SDRAM it was written to :param missing_seq_nums: \ the set of missing sequence numbers per data transmission attempt :rtype: None """ if not os.path.isfile(self._in_report_path): with open(self._in_report_path, "w") as writer: writer.write( "x\t\t y\t\t SDRAM address\t\t size in bytes\t\t\t" " time took \t\t\t Mb/s \t\t\t missing sequence numbers\n") writer.write( "------------------------------------------------" "------------------------------------------------" "-------------------------------------------------\n") time_took_ms = float(time_diff.microseconds + time_diff.total_seconds() * 1000000) megabits = (data_size * 8.0) / (1024 * BYTES_PER_KB) if time_took_ms == 0: mbs = "unknown, below threshold" else: mbs = megabits / (float(time_took_ms) / 100000.0) with open(self._in_report_path, "a") as writer: writer.write( "{}\t\t {}\t\t {}\t\t {}\t\t\t\t {}\t\t\t {}\t\t {}\n".format( x, y, address_written_to, data_size, time_took_ms, mbs, missing_seq_nums))
[docs] def send_data_into_spinnaker( self, x, y, base_address, data, n_bytes=None, offset=0, cpu=0, is_filename=False): """ sends a block of data into SpiNNaker to a given chip :param x: chip x for data :type x: int :param y: chip y for data :type y: int :param base_address: the address in SDRAM to start writing memory :type base_address: int :param data: the data to write (or filename to load data from, \ if `is_filename` is True; that's the only time this is a str) :type data: bytes or bytearray or memoryview or str :param n_bytes: how many bytes to read, or None if not set :type n_bytes: int :param offset: where in the data to start from :type offset: int :param is_filename: whether data is actually a file. :type is_filename: bool :rtype: None """ # if file, read in and then process as normal if is_filename: if offset != 0: raise Exception( "when using a file, you can only have a offset of 0") with FileDataReader(data) as reader: # n_bytes=None already means 'read everything' data = reader.read(n_bytes) # pylint: disable=no-member # Number of bytes to write is now length of buffer we have n_bytes = len(data) elif n_bytes is None: n_bytes = len(data) transceiver = get_simulator().transceiver # if not worth using extra monitors, send via SCP if not self._worse_via_scp(n_bytes): # start time recording start = datetime.datetime.now() # write the data transceiver.write_memory( x=x, y=y, base_address=base_address, n_bytes=n_bytes, data=data, offset=offset, is_filename=False, cpu=cpu) # record when finished end = datetime.datetime.now() self._missing_seq_nums_data_in = [[]] else: log.debug("sending {} bytes to {},{} via Data In protocol", n_bytes, x, y) # start time recording start = datetime.datetime.now() # send data self._send_data_via_extra_monitors( transceiver, x, y, base_address, data[offset:n_bytes + offset]) # end time recording end = datetime.datetime.now() if VERIFY_SENT_DATA: original_data = bytes(data[offset:n_bytes + offset]) verified_data = bytes(transceiver.read_memory( x, y, base_address, n_bytes)) if PY2: self.__verify_sent_data_py2( original_data, verified_data, x, y, base_address, n_bytes) else: self.__verify_sent_data_py3( original_data, verified_data, x, y, base_address, n_bytes) # write report if self._write_data_speed_up_reports: self._generate_data_in_report( x=x, y=y, time_diff=end - start, data_size=n_bytes, address_written_to=base_address, missing_seq_nums=self._missing_seq_nums_data_in)
@staticmethod def __verify_sent_data_py2( original_data, verified_data, x, y, base_address, n_bytes): if original_data != verified_data: log.error("VARIANCE: chip:{},{} address:{} len:{}", x, y, base_address, n_bytes) log.error("original:{}", "".join( "%02X" % ord(x) for x in original_data)) log.error("verified:{}", "".join( "%02X" % ord(x) for x in verified_data)) i = 0 for (a, b) in zip(original_data, verified_data): if a != b: break i += 1 raise Exception("damn at " + str(i)) @staticmethod def __verify_sent_data_py3( original_data, verified_data, x, y, base_address, n_bytes): if original_data != verified_data: log.error("VARIANCE: chip:{},{} address:{} len:{}", x, y, base_address, n_bytes) log.error("original:{}", original_data.hex()) log.error("verified:{}", verified_data.hex()) i = 0 for (a, b) in zip(original_data, verified_data): if a != b: break i += 1 raise Exception("damn at " + str(i)) @staticmethod def _worse_via_scp(n_bytes): return (n_bytes is None or n_bytes >= THRESHOLD_WHERE_SDP_BETTER_THAN_DATA_INPUT_IN_BYTES) @staticmethod def __make_sdp_message(placement, port, payload): return SDPMessage( sdp_header=SDPHeader( destination_chip_x=placement.x, destination_chip_y=placement.y, destination_cpu=placement.p, destination_port=port.value, flags=SDPFlag.REPLY_NOT_EXPECTED), data=payload) def _send_data_via_extra_monitors( self, transceiver, destination_chip_x, destination_chip_y, start_address, data_to_write): """ sends data using the extra monitor cores :param transceiver: the SpiNNMan instance :param destination_chip_x: chip x :param destination_chip_y: chip y :param start_address: start address in sdram to write data to :param data_to_write: the data to write :rtype: None """ # how many packets after first one we need to send number_of_packets = ceildiv( len(data_to_write) - BYTES_IN_FULL_PACKET_WITH_ADDRESS, BYTES_IN_FULL_PACKET_WITHOUT_ADDRESS) # determine board chip IDs, as the LPG does not know machine scope IDs machine = transceiver.get_machine_details() chip = machine.get_chip_at(destination_chip_x, destination_chip_y) dest_x, dest_y = machine.get_local_xy(chip) # send first packet to lpg, stating where to send it to data = bytearray(WORDS_PER_FULL_PACKET * BYTES_PER_WORD) _FOUR_WORDS.pack_into( data, 0, DATA_IN_COMMANDS.SEND_DATA_TO_LOCATION.value, start_address, (dest_x << 16) | dest_y, number_of_packets) self._ADDRESS_PACKET_BYTE_FORMAT.pack_into( data, BYTES_FOR_COMMAND_AND_ADDRESS_HEADER, *data_to_write[0:BYTES_IN_FULL_PACKET_WITH_ADDRESS]) # debug # self._print_out_packet_data(data) # send first message self._connection = SCAMPConnection( chip_x=self._x, chip_y=self._y, remote_host=self._ip_address) self.__reprogram_tag(self._connection) self._connection.send_sdp_message(self.__make_sdp_message( self._placement, SDP_PORTS.EXTRA_MONITOR_CORE_DATA_IN_SPEED_UP, data)) log.debug("sent initial {} bytes", BYTES_IN_FULL_PACKET_WITH_ADDRESS) # send initial attempt at sending all the data self._send_all_data_based_packets(number_of_packets, data_to_write) # verify completed received_confirmation = False time_out_count = 0 while not received_confirmation: try: # try to receive a confirmation of some sort from spinnaker data = self._connection.receive( timeout=self._TIMEOUT_PER_RECEIVE_IN_SECONDS) time_out_count = 0 # check which message type we have received received_confirmation = self._outgoing_process_packet( data, data_to_write) except SpinnmanTimeoutException: # if time out, keep trying # if the timeout has not occurred x times, keep trying if time_out_count > TIMEOUT_RETRY_LIMIT: emergency_recover_state_from_failure( transceiver, self._app_id, self, self._placement) raise SpinnFrontEndException( TIMEOUT_MESSAGE.format(time_out_count)) # reopen the connection and try again time_out_count += 1 remote_port = self._connection.remote_port local_port = self._connection.local_port local_ip = self._connection.local_ip_address remote_ip = self._connection.remote_ip_address self._connection.close() self._connection = SCAMPConnection( local_port=local_port, remote_port=remote_port, local_host=local_ip, remote_host=remote_ip) # if we have not received confirmation of finish, try to # retransmit missing seq nums if not received_confirmation: self._outgoing_retransmit_missing_seq_nums(data_to_write) def _read_in_missing_seq_nums(self, data, data_to_write, position): """ handles a missing seq num packet from spinnaker :param data: the data to translate into missing seq nums :param data_to_write: the data to write :param position: the position in the data to write. :rtype: None """ # find how many elements are in this packet n_elements = (len(data) - position) // BYTES_PER_WORD # store missing self._missing_seq_nums_data_in[-1].extend(struct.unpack_from( "<{}I".format(n_elements), data, position)) # determine if last element is end flag if self._missing_seq_nums_data_in[-1][-1] == \ self._MISSING_SEQ_NUMS_END_FLAG: del self._missing_seq_nums_data_in[-1][-1] self._outgoing_retransmit_missing_seq_nums(data_to_write) if (self._total_expected_missing_seq_packets == 0 and self._have_received_missing_seq_count_packet): self._outgoing_retransmit_missing_seq_nums(data_to_write) def _outgoing_process_packet(self, data, data_to_write): """ processes a packet from SpiNNaker :param data: the packet data :param data_to_write: the data to write to spinnaker :return: if the packet contains a confirmation of complete :rtype: bool """ position = 0 command_id = _ONE_WORD.unpack_from(data, 0)[0] position += BYTES_PER_WORD log.debug("received packet with id {}", command_id) # process first missing if command_id == DATA_IN_COMMANDS.RECEIVE_FIRST_MISSING_SEQ.value: # find total missing self._total_expected_missing_seq_packets += \ _ONE_WORD.unpack_from(data, position)[0] position += BYTES_PER_WORD self._have_received_missing_seq_count_packet = True # write missing seq nums and retransmit if needed self._read_in_missing_seq_nums(data, data_to_write, position) # process missing seq packets if command_id == DATA_IN_COMMANDS.RECEIVE_MISSING_SEQ_DATA.value: # write missing seq nums and retransmit if needed self._total_expected_missing_seq_packets -= 1 self._read_in_missing_seq_nums(data, data_to_write, position) # process the confirmation of all data received return command_id == DATA_IN_COMMANDS.RECEIVE_FINISHED.value def _outgoing_retransmit_missing_seq_nums(self, data_to_write): """ Transmits back into SpiNNaker the missing data based off missing\ sequence numbers :param data_to_write: the data to write. :rtype: None """ for missing_seq_num in self._missing_seq_nums_data_in[-1]: message, _length = self._calculate_data_in_data_from_seq_number( data_to_write, missing_seq_num, DATA_IN_COMMANDS.SEND_SEQ_DATA.value, None) self.__throttled_send(message) self._missing_seq_nums_data_in.append(list()) self._total_expected_missing_seq_packets = 0 self._have_received_missing_seq_count_packet = False self._send_end_flag() def _calculate_position_from_seq_number(self, seq_num): """ Calculates where in the raw data to start reading from, given a\ sequence number :param seq_num: the sequence number to determine position from :return: the position in the byte data :rtype: int """ if seq_num == 0: return 0 return BYTES_IN_FULL_PACKET_WITH_ADDRESS + ( BYTES_IN_FULL_PACKET_WITHOUT_ADDRESS * (seq_num - 1)) def _calculate_data_in_data_from_seq_number( self, data_to_write, seq_num, command_id, position): """ Determine the data needed to be sent to the SpiNNaker machine\ given a sequence number :param data_to_write: the data to write to the SpiNNaker machine :param seq_num: the seq num to ge tthe data for :param position: the position in the data to write to spinnaker :type position: int or None :return: SDP message and how much data has been written :rtype: SDP message """ # check for last packet packet_data_length = BYTES_IN_FULL_PACKET_WITHOUT_ADDRESS # determine position in data if not given if position is None: position = self._calculate_position_from_seq_number(seq_num) # if less than a full packet worth of data, adjust length if position + packet_data_length > len(data_to_write): packet_data_length = len(data_to_write) - position if packet_data_length < 0: raise Exception() # determine the true packet length (with header) packet_length = ( packet_data_length + BYTES_FOR_COMMAND_AND_SEQ_HEADER) # create struct packet_data = bytearray(packet_length) _TWO_WORDS.pack_into(packet_data, 0, command_id, seq_num) struct.pack_into( "<{}B".format(packet_data_length), packet_data, BYTES_FOR_COMMAND_AND_SEQ_HEADER, *data_to_write[position:position+packet_data_length]) # debug # self._print_out_packet_data(packet_data) # build sdp packet message = self.__make_sdp_message( self._placement, SDP_PORTS.EXTRA_MONITOR_CORE_DATA_IN_SPEED_UP, packet_data) # return message for sending, and the length in data sent return message, packet_data_length def _send_end_flag(self): # send end flag as separate message self._connection.send_sdp_message(self.__make_sdp_message( self._placement, SDP_PORTS.EXTRA_MONITOR_CORE_DATA_IN_SPEED_UP, _ONE_WORD.pack(DATA_IN_COMMANDS.SEND_DONE.value))) def _send_all_data_based_packets( self, number_of_packets, data_to_write): """ Send all the data as one block :param number_of_packets: the number of packets expected to send :param data_to_write: the data to send :rtype: None """ # where in the data we are currently up to position_in_data = BYTES_IN_FULL_PACKET_WITH_ADDRESS # send rest of data total_data_length = len(data_to_write) for seq_num in range(1, number_of_packets + 1): # put in command flag and seq num message, length_to_send = \ self._calculate_data_in_data_from_seq_number( data_to_write, seq_num, DATA_IN_COMMANDS.SEND_SEQ_DATA.value, position_in_data) position_in_data += length_to_send # send the message self.__throttled_send(message) log.debug("sent seq {} of {} bytes", seq_num, length_to_send) # check for end flag if position_in_data == total_data_length: self._send_end_flag() log.debug("sent end flag")
[docs] @staticmethod def streaming(gatherers, transceiver, extra_monitor_cores, placements): """ Helper method for setting the router timeouts to a state usable\ for data streaming via a Python context manager (i.e., using\ the 'with' statement). :param gatherers: All the gatherers that are to be set :type gatherers: list(DataSpeedUpPacketGatherMachineVertex) :param transceiver: the SpiNNMan instance :type transceiver: ~spinnman.transceiver.Transceiver :param extra_monitor_cores: the extra monitor cores to set :type extra_monitor_cores: \ list(~spinn_front_end_common.utility_models.ExtraMonitorSupportMachineVertex) :param placements: placements object :type placements: ~pacman.model.placements.Placements :rtype: a context manager """ return _StreamingContextManager( gatherers, transceiver, extra_monitor_cores, placements)
[docs] def set_cores_for_data_streaming( self, transceiver, extra_monitor_cores, placements): """ Helper method for setting the router timeouts to a state usable\ for data streaming :param transceiver: the SpiNNMan instance :type transceiver: ~spinnman.transceiver.Transceiver :param extra_monitor_cores: the extra monitor cores to set :type extra_monitor_cores: \ list(~spinn_front_end_common.utility_models.ExtraMonitorSupportMachineVertex) :param placements: placements object :type placements: ~pacman.model.placements.Placements :rtype: None """ lead_monitor = extra_monitor_cores[0] # Store the last reinjection status for resetting # NOTE: This assumes the status is the same on all cores self._last_status = lead_monitor.get_reinjection_status( placements, transceiver) # Set to not inject dropped packets lead_monitor.set_reinjection_packets( placements, extra_monitor_cores, transceiver, point_to_point=False, multicast=False, nearest_neighbour=False, fixed_route=False) # Clear any outstanding packets from reinjection lead_monitor.clear_reinjection_queue( transceiver, placements, extra_monitor_cores) # set time outs lead_monitor.set_router_emergency_timeout( self._SHORT_TIMEOUT, transceiver, placements, extra_monitor_cores) lead_monitor.set_router_time_outs( self._LONG_TIMEOUT, transceiver, placements, extra_monitor_cores)
[docs] @staticmethod def load_application_routing_tables( transceiver, extra_monitor_cores, placements): """ Set all chips to have application table loaded in the router :param transceiver: the SpiNNMan instance :type transceiver: ~spinnman.transceiver.Transceiver :param extra_monitor_cores: the extra monitor cores to set :type extra_monitor_cores: \ list(~spinn_front_end_common.utility_models.ExtraMonitorSupportMachineVertex) :param placements: placements object :type placements: ~pacman.model.placements.Placements :rtype: None """ extra_monitor_cores[0].load_application_mc_routes( placements, extra_monitor_cores, transceiver)
[docs] @staticmethod def load_system_routing_tables( transceiver, extra_monitor_cores, placements): """ Set all chips to have the system table loaded in the router :param transceiver: the SpiNNMan instance :type transceiver: ~spinnman.transceiver.Transceiver :param extra_monitor_cores: the extra monitor cores to set :type extra_monitor_cores: \ list(~spinn_front_end_common.utility_models.ExtraMonitorSupportMachineVertex) :param placements: placements object :type placements: ~pacman.model.placements.Placements :rtype: None """ extra_monitor_cores[0].load_system_mc_routes( placements, extra_monitor_cores, transceiver)
[docs] def unset_cores_for_data_streaming( self, transceiver, extra_monitor_cores, placements): """ Helper method for setting the router timeouts to a state usable\ for data streaming :param transceiver: the SpiNNMan instance :type transceiver: ~spinnman.transceiver.Transceiver :param extra_monitor_cores: the extra monitor cores to set :type extra_monitor_cores: \ list(~spinn_front_end_common.utility_models.ExtraMonitorSupportMachineVertex) :param placements: placements object :type placements: ~pacman.model.placements.Placements :rtype: None """ lead_monitor = extra_monitor_cores[0] # Set the routers to temporary values lead_monitor.set_router_time_outs( self._TEMP_TIMEOUT, transceiver, placements, extra_monitor_cores) lead_monitor.set_router_emergency_timeout( self._ZERO_TIMEOUT, transceiver, placements, extra_monitor_cores) if self._last_status is None: log.warning( "Cores have not been set for data extraction, so can't be" " unset") try: lead_monitor.set_router_time_outs( self._last_status.router_timeout_parameters, transceiver, placements, extra_monitor_cores) lead_monitor.set_router_emergency_timeout( self._last_status.router_emergency_timeout_parameters, transceiver, placements, extra_monitor_cores) lead_monitor.set_reinjection_packets( placements, extra_monitor_cores, transceiver, point_to_point=self._last_status.is_reinjecting_point_to_point, multicast=self._last_status.is_reinjecting_multicast, nearest_neighbour=( self._last_status.is_reinjecting_nearest_neighbour), fixed_route=self._last_status.is_reinjecting_fixed_route) except Exception: # pylint: disable=broad-except log.exception("Error resetting timeouts") log.error("Checking if the cores are OK...") core_subsets = convert_vertices_to_core_subset( extra_monitor_cores, placements) try: error_cores = transceiver.get_cores_not_in_state( core_subsets, {CPUState.RUNNING}) if error_cores: log.error("Cores in an unexpected state: {}".format( error_cores)) except Exception: # pylint: disable=broad-except log.exception("Couldn't get core state")
def __reprogram_tag(self, connection): request = IPTagSet( self._x, self._y, [0, 0, 0, 0], 0, self._remote_tag, strip=True, use_sender=True) data = connection.get_scp_data(request) einfo = None for _ in range(3): try: connection.send(data) _, _, response, offset = \ connection.receive_scp_response() request.get_scp_response().read_bytestring(response, offset) return except SpinnmanTimeoutException: einfo = sys.exc_info() reraise(*einfo)
[docs] def get_data( self, placement, memory_address, length_in_bytes, fixed_routes): """ Gets data from a given core and memory address. :param placement: placement object for where to get data from :type placement: ~pacman.model.placements.Placement :param memory_address: the address in SDRAM to start reading from :type memory_address: int :param length_in_bytes: the length of data to read in bytes :type length_in_bytes: int :param fixed_routes: the fixed routes, used in the report of which\ chips were used by the speed up process :type fixed_routes: dict(tuple(int,int),~spinn_machine.FixedRouteEntry) :return: byte array of the data :rtype: bytearray """ start = float(time.time()) # if asked for no data, just return a empty byte array if length_in_bytes == 0: data = bytearray(0) end = float(time.time()) self._provenance_data_items[ placement, memory_address, length_in_bytes].append((end - start, [0])) return data transceiver = get_simulator().transceiver if (length_in_bytes < THRESHOLD_WHERE_SDP_BETTER_THAN_DATA_EXTRACTOR_IN_BYTES): data = transceiver.read_memory( placement.x, placement.y, memory_address, length_in_bytes) end = float(time.time()) self._provenance_data_items[ placement, memory_address, length_in_bytes].append((end - start, [0])) return data # Update the IP Tag to work through a NAT firewall connection = SCAMPConnection( chip_x=self._x, chip_y=self._y, remote_host=self._ip_address) self.__reprogram_tag(connection) # send connection.send_sdp_message(self.__make_sdp_message( placement, SDP_PORTS.EXTRA_MONITOR_CORE_DATA_SPEED_UP, _THREE_WORDS.pack( DATA_OUT_COMMANDS.START_SENDING.value, memory_address, length_in_bytes))) # receive self._output = bytearray(length_in_bytes) self._view = memoryview(self._output) self._max_seq_num = self.calculate_max_seq_num() lost_seq_nums = self._receive_data(transceiver, placement, connection) # Stop anything else getting through (and reduce traffic) connection.send_sdp_message(self.__make_sdp_message( placement, SDP_PORTS.EXTRA_MONITOR_CORE_DATA_SPEED_UP, _ONE_WORD.pack(DATA_OUT_COMMANDS.CLEAR.value))) connection.close() end = float(time.time()) self._provenance_data_items[ placement, memory_address, length_in_bytes].append( (end - start, lost_seq_nums)) # create report elements if self._write_data_speed_up_reports: routers_been_in_use = self._determine_which_routers_were_used( placement, fixed_routes, transceiver.get_machine_details()) self._write_routers_used_into_report( self._out_report_path, routers_been_in_use, placement) return self._output
def _receive_data(self, transceiver, placement, connection): seq_nums = set() lost_seq_nums = list() timeoutcount = 0 finished = False while not finished: try: data = connection.receive( timeout=self._TIMEOUT_PER_RECEIVE_IN_SECONDS) timeoutcount = 0 seq_nums, finished = self._process_data( data, seq_nums, finished, placement, transceiver, lost_seq_nums) except SpinnmanTimeoutException: if timeoutcount > TIMEOUT_RETRY_LIMIT: raise SpinnFrontEndException( "Failed to hear from the machine during {} attempts. " "Please try removing firewalls".format(timeoutcount)) timeoutcount += 1 # self.__reset_connection() if not finished: finished = self._determine_and_retransmit_missing_seq_nums( seq_nums, transceiver, placement, lost_seq_nums) return lost_seq_nums @staticmethod def _determine_which_routers_were_used(placement, fixed_routes, machine): """ Traverse the fixed route paths from a given location to its\ destination. Used for determining which routers were used. :param placement: the source to start from :param fixed_routes: the fixed routes for each router :param machine: the spinnMachine instance :return: list of chip IDs """ routers = list() routers.append((placement.x, placement.y)) entry = fixed_routes[(placement.x, placement.y)] chip_x = placement.x chip_y = placement.y while len(entry.processor_ids) == 0: # can assume one link, as its a minimum spanning tree going to # the root machine_link = machine.get_chip_at( chip_x, chip_y).router.get_link(next(iter(entry.link_ids))) chip_x = machine_link.destination_x chip_y = machine_link.destination_y routers.append((chip_x, chip_y)) entry = fixed_routes[(chip_x, chip_y)] return routers @staticmethod def _write_routers_used_into_report( report_path, routers_been_in_use, placement): """ Write the used routers into a report :param report_path: the path to the report file :param routers_been_in_use: the routers been in use :param placement: the first placement used :rtype: None """ writer_behaviour = "w" if os.path.isfile(report_path): writer_behaviour = "a" with open(report_path, writer_behaviour) as writer: writer.write("[{}:{}:{}] = {}\n".format( placement.x, placement.y, placement.p, routers_been_in_use)) def _calculate_missing_seq_nums(self, seq_nums): """ Determine which sequence numbers we've missed :param seq_nums: the set already acquired :return: list of missing sequence numbers """ return [sn for sn in xrange(0, self._max_seq_num) if sn not in seq_nums] def _determine_and_retransmit_missing_seq_nums( self, seq_nums, transceiver, placement, lost_seq_nums): """ Determine if there are any missing sequence numbers, and if so\ retransmits the missing sequence numbers back to the core for\ retransmission. :param seq_nums: the sequence numbers already received :param transceiver: spinnman instance :param placement: placement instance :return: whether all packets are transmitted :rtype: bool """ # pylint: disable=too-many-locals # locate missing sequence numbers from pile missing_seq_nums = self._calculate_missing_seq_nums(seq_nums) lost_seq_nums.append(len(missing_seq_nums)) # self._print_missing(missing_seq_nums) if not missing_seq_nums: return True # figure n packets given the 2 formats n_packets = 1 length_via_format2 = \ len(missing_seq_nums) - (WORDS_PER_FULL_PACKET - 2) if length_via_format2 > 0: n_packets += ceildiv( length_via_format2, WORDS_PER_FULL_PACKET - 1) # transmit missing sequence as a new SDP packet first = True seq_num_offset = 0 for _ in xrange(n_packets): length_left_in_packet = WORDS_PER_FULL_PACKET offset = 0 # if first, add n packets to list if first: # get left over space / data size size_of_data_left_to_transmit = min( length_left_in_packet - 2, len(missing_seq_nums) - seq_num_offset) # build data holder accordingly data = bytearray( (size_of_data_left_to_transmit + 2) * BYTES_PER_WORD) # pack flag and n packets _ONE_WORD.pack_into( data, offset, DATA_OUT_COMMANDS.START_MISSING_SEQ.value) _ONE_WORD.pack_into(data, BYTES_PER_WORD, n_packets) # update state offset += 2 * BYTES_PER_WORD length_left_in_packet -= 2 first = False else: # just add data # get left over space / data size size_of_data_left_to_transmit = min( WORDS_PER_FULL_PACKET_WITH_SEQUENCE_NUM, len(missing_seq_nums) - seq_num_offset) # build data holder accordingly data = bytearray( (size_of_data_left_to_transmit + 1) * BYTES_PER_WORD) # pack flag _ONE_WORD.pack_into( data, offset, DATA_OUT_COMMANDS.MISSING_SEQ.value) offset += BYTES_PER_WORD length_left_in_packet -= 1 # fill data field struct.pack_into( "<{}I".format(size_of_data_left_to_transmit), data, offset, *missing_seq_nums[ seq_num_offset: seq_num_offset + size_of_data_left_to_transmit]) seq_num_offset += length_left_in_packet # build SDP message and send it to the core transceiver.send_sdp_message(self.__make_sdp_message( placement, SDP_PORTS.EXTRA_MONITOR_CORE_DATA_SPEED_UP, data)) # sleep for ensuring core doesn't lose packets time.sleep(self._TIMEOUT_FOR_SENDING_IN_SECONDS) # self._print_packet_num_being_sent(packet_count, n_packets) return False def _process_data( self, data, seq_nums, finished, placement, transceiver, lost_seq_nums): """ Take a packet and processes it see if we're finished yet :param data: the packet data :param seq_nums: the list of sequence numbers received so far :param finished: bool which states if finished or not :param placement: placement object for location on machine :param transceiver: spinnman instance :param lost_seq_nums: the list of n sequence numbers lost per iteration :return: set of data items, if its the first packet, the list of\ sequence numbers, the sequence number received and if its finished """ # pylint: disable=too-many-arguments # self._print_out_packet_data(data) length_of_data = len(data) first_packet_element, = _ONE_WORD.unpack_from(data, 0) # get flags seq_num = first_packet_element & self._SEQUENCE_NUMBER_MASK is_end_of_stream = ( first_packet_element & self._LAST_MESSAGE_FLAG_BIT_MASK) != 0 # check seq num not insane if seq_num > self._max_seq_num: raise Exception( "got an insane sequence number. got {} when " "the max is {} with a length of {}".format( seq_num, self._max_seq_num, length_of_data)) # figure offset for where data is to be put offset = self._calculate_offset(seq_num) # write data true_data_length = offset + length_of_data - BYTES_PER_WORD if not is_end_of_stream or length_of_data != BYTES_PER_WORD: self._write_into_view( offset, true_data_length, data, BYTES_PER_WORD, length_of_data, seq_num, length_of_data, False) # add seq num to list seq_nums.add(seq_num) # if received a last flag on its own, its during retransmission. # check and try again if required if is_end_of_stream: if not self._check(seq_nums): finished = self._determine_and_retransmit_missing_seq_nums( placement=placement, transceiver=transceiver, seq_nums=seq_nums, lost_seq_nums=lost_seq_nums) else: finished = True return seq_nums, finished def _calculate_offset(self, seq_num): return (seq_num * WORDS_PER_FULL_PACKET_WITH_SEQUENCE_NUM * BYTES_PER_WORD) def _write_into_view( self, view_start_position, view_end_position, data, data_start_position, data_end_position, seq_num, packet_length, is_final): """ Puts data into the view :param view_start_position: where in view to start :param view_end_position: where in view to end :param data: the data holder to write from :param data_start_position: where in data holder to start from :param data_end_position: where in data holder to end :param seq_num: the sequence number to figure :rtype: None """ # pylint: disable=too-many-arguments if view_end_position > len(self._output): raise Exception( "I'm trying to add to my output data, but am trying to add " "outside my acceptable output positions! max is {} and I " "received a request to fill to {} for sequence num {} from max" " sequence num {} length of packet {} and final {}".format( len(self._output), view_end_position, seq_num, self._max_seq_num, packet_length, is_final)) self._view[view_start_position: view_end_position] = \ data[data_start_position:data_end_position] def _check(self, seq_nums): """ Verify if the sequence numbers are correct. :param seq_nums: the received sequence numbers :return: Whether all the sequence numbers have been received :rtype: bool """ # hand back seq_nums = sorted(seq_nums) max_needed = self.calculate_max_seq_num() if len(seq_nums) > max_needed + 1: raise Exception("I've received more data than I was expecting!!") return len(seq_nums) == max_needed + 1
[docs] def calculate_max_seq_num(self): """ Deduce the max sequence number expected to be received :return: the biggest sequence num expected :rtype: int """ return ceildiv( len(self._output), WORDS_PER_FULL_PACKET_WITH_SEQUENCE_NUM * BYTES_PER_WORD)
@staticmethod def _print_missing(seq_nums): """ Debug printer for the missing sequence numbers from the pile :param seq_nums: the sequence numbers received so far :rtype: None """ for seq_num in sorted(seq_nums): log.info("from list I'm missing sequence num {}", seq_num) def _print_out_packet_data(self, data): """ Debug prints out the data from the packet :param data: the packet data :rtype: None """ reread_data = struct.unpack("<{}I".format( ceildiv(len(data), BYTES_PER_WORD)), data) log.info("converted data back into readable form is {}", reread_data) @staticmethod def _print_length_of_received_seq_nums(seq_nums, max_needed): """ Debug helper method for figuring out if everything been received :param seq_nums: sequence numbers received :param max_needed: biggest expected to have :rtype: None """ if len(seq_nums) != max_needed: log.info("should have received {} sequence numbers, but received " "{} sequence numbers", max_needed, len(seq_nums)) @staticmethod def _print_packet_num_being_sent(packet_count, n_packets): """ Debug helper for printing missing sequence number packet\ transmission :param packet_count: which packet is being fired :param n_packets: how many packets to fire. :rtype: None """ log.info("send SDP packet with missing sequence numbers: {} of {}", packet_count + 1, n_packets)
class _StreamingContextManager(object): """ The implementation of the context manager object for streaming \ configuration control. """ __slots__ = ["_gatherers", "_monitors", "_placements", "_txrx"] def __init__(self, gatherers, txrx, monitors, placements): self._gatherers = list(gatherers) self._txrx = txrx self._monitors = monitors self._placements = placements def __enter__(self): for gatherer in self._gatherers: gatherer.set_cores_for_data_streaming( self._txrx, self._monitors, self._placements) def __exit__(self, _type, _value, _tb): for gatherer in self._gatherers: gatherer.unset_cores_for_data_streaming( self._txrx, self._monitors, self._placements) return False