# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
try:
from collections.abc import defaultdict
except ImportError:
from collections import defaultdict
import os
import datetime
import logging
import time
import struct
import sys
from enum import Enum
from six.moves import xrange
from six import reraise
from spinn_utilities.overrides import overrides
from spinn_utilities.log import FormatAdapter
from spinnman.exceptions import SpinnmanTimeoutException
from spinnman.messages.sdp import SDPMessage, SDPHeader, SDPFlag
from spinnman.messages.scp.impl.iptag_set import IPTagSet
from spinnman.connections.udp_packet_connections import SCAMPConnection
from spinnman.model.enums.cpu_state import CPUState
from pacman.executor.injection_decorator import inject_items
from pacman.model.graphs.common import EdgeTrafficType
from pacman.model.graphs.machine import MachineVertex
from pacman.model.resources import (
ConstantSDRAM, IPtagResource, ResourceContainer)
from spinn_storage_handlers import FileDataReader
from spinn_front_end_common.utilities.globals_variables import get_simulator
from spinn_front_end_common.utilities.helpful_functions import (
convert_vertices_to_core_subset, emergency_recover_state_from_failure)
from spinn_front_end_common.abstract_models import (
AbstractHasAssociatedBinary, AbstractGeneratesDataSpecification)
from spinn_front_end_common.interface.provenance import (
AbstractProvidesLocalProvenanceData)
from spinn_front_end_common.utilities.utility_objs import (
ExecutableType, ProvenanceDataItem)
from spinn_front_end_common.utilities.constants import (
SDP_PORTS, SYSTEM_BYTES_REQUIREMENT, SIMULATION_N_BYTES)
from spinn_front_end_common.utilities.exceptions import SpinnFrontEndException
from spinn_front_end_common.interface.simulation import simulation_utilities
log = FormatAdapter(logging.getLogger(__name__))
TIMEOUT_RETRY_LIMIT = 20
TIMEOUT_MESSAGE = "Failed to hear from the machine during {} attempts. "\
"Please try removing firewalls."
_MINOR_LOSS_MESSAGE = (
"During the extraction of data of {} bytes from memory address {}, "
"attempt {} had {} sequences that were lost.")
_MINOR_LOSS_THRESHOLD = 10
_MAJOR_LOSS_MESSAGE = (
"During the extraction of data from chip {}, there were {} cases of "
"serious data loss. The system recovered, but the speed of download "
"was compromised. Reduce the number of executing applications and remove "
"routers between yourself and the SpiNNaker machine to reduce the chance "
"of this occurring.")
_MAJOR_LOSS_THRESHOLD = 100
# Size of a SpiNNaker word
WORD_SIZE = 4
# number of items used up by the retransmit code for its header
SDP_RETRANSMISSION_HEADER_SIZE = 2
# size of config region in bytes
CONFIG_SIZE = 4 * WORD_SIZE
# items of data a SDP packet can hold when SCP header removed
WORDS_PER_FULL_PACKET = 68 # 272 bytes as removed SCP header
# size of items the sequence number uses
SEQUENCE_NUMBER_SIZE_IN_ITEMS = 1
# items of data from SDP packet with a sequence number
WORDS_PER_FULL_PACKET_WITH_SEQUENCE_NUM = \
WORDS_PER_FULL_PACKET - SEQUENCE_NUMBER_SIZE_IN_ITEMS
# points where SDP beats data speed up due to overheads
THRESHOLD_WHERE_SDP_BETTER_THAN_DATA_EXTRACTOR_IN_BYTES = 40000
THRESHOLD_WHERE_SDP_BETTER_THAN_DATA_INPUT_IN_BYTES = 300
# offset where data in starts on first command
# (command, base_address, x&y, max_seq_number)
WORDS_FOR_COMMAND_AND_ADDRESS_HEADER = 4
BYTES_FOR_COMMAND_AND_ADDRESS_HEADER = (
WORDS_FOR_COMMAND_AND_ADDRESS_HEADER * WORD_SIZE)
# offset where data starts after a command id and seq number
WORDS_FOR_COMMAND_AND_SEQ_HEADER = 2
BYTES_FOR_COMMAND_AND_SEQ_HEADER = (
WORDS_FOR_COMMAND_AND_SEQ_HEADER * WORD_SIZE)
# size for data to store when first packet with command and address
WORDS_IN_FULL_PACKET_WITH_ADDRESS = (
WORDS_PER_FULL_PACKET - WORDS_FOR_COMMAND_AND_ADDRESS_HEADER)
BYTES_IN_FULL_PACKET_WITH_ADDRESS = (
WORDS_IN_FULL_PACKET_WITH_ADDRESS * WORD_SIZE)
# size for data in to store when not first packet
WORDS_IN_FULL_PACKET_WITHOUT_ADDRESS = (
WORDS_PER_FULL_PACKET - WORDS_FOR_COMMAND_AND_SEQ_HEADER)
BYTES_IN_FULL_PACKET_WITHOUT_ADDRESS = (
WORDS_IN_FULL_PACKET_WITHOUT_ADDRESS * WORD_SIZE)
# size of data in key space
# x, y, key (all ints) for possible 48 chips,
SIZE_DATA_IN_CHIP_TO_KEY_SPACE = (3 * 48 + 1) * WORD_SIZE
# DSG data regions
_DATA_REGIONS = Enum(
value="DATA_REGIONS",
names=[('SYSTEM', 0),
('CONFIG', 1),
('CHIP_TO_KEY_SPACE', 2)])
# command IDs for the SDP packets for data out
DATA_OUT_COMMANDS = Enum(
value="DATA_OUT_COMMANDS", names=[
("START_SENDING", 100),
("START_MISSING_SEQ", 1000),
("MISSING_SEQ", 1001),
("CLEAR", 2000)])
# command IDs for the SDP packets for data in
DATA_IN_COMMANDS = Enum(
value="DATA_IN_COMMANDS", names=[
("SEND_DATA_TO_LOCATION", 200),
("SEND_SEQ_DATA", 2000),
("SEND_DONE", 2002),
("RECEIVE_FIRST_MISSING_SEQ", 2003),
("RECEIVE_MISSING_SEQ_DATA", 2004),
("RECEIVE_FINISHED", 2005)])
# precompiled structures
_ONE_WORD = struct.Struct("<I")
_TWO_WORDS = struct.Struct("<II")
_THREE_WORDS = struct.Struct("<III")
_FOUR_WORDS = struct.Struct("<IIII")
# Set to true to check that the data is correct after it has been sent in.
# This is expensive, and only works in Python 3.5 or later.
VERIFY_SENT_DATA = False
[docs]def ceildiv(dividend, divisor):
""" How to divide two possibly-integer numbers and round up.
"""
assert divisor > 0
q, r = divmod(dividend, divisor)
return int(q) + (r != 0)
# SDRAM requirement for storing missing SDP packets seq nums
SDRAM_FOR_MISSING_SDP_SEQ_NUMS = ceildiv(
120.0 * 1024 * 1024,
WORDS_PER_FULL_PACKET_WITH_SEQUENCE_NUM * WORD_SIZE)
[docs]class DataSpeedUpPacketGatherMachineVertex(
MachineVertex, AbstractGeneratesDataSpecification,
AbstractHasAssociatedBinary, AbstractProvidesLocalProvenanceData):
__slots__ = [
"_x", "_y",
"_app_id",
"_connection",
# store of the extra monitors to location. helpful in data in
"_extra_monitors_by_chip",
# boolean tracker for handling out of order packets
"_have_received_missing_seq_count_packet",
# path for the data in report
"_in_report_path",
"_ip_address",
# store for the last reinjection status
"_last_status",
# the max seq num expected given a data retrieval
"_max_seq_num",
# holder for missing seq nums for data in
"_missing_seq_nums_data_in",
# holder of data from out
"_output",
# my placement for future lookup
"_placement",
# provenance holder
"_provenance_data_items",
"_remote_tag",
# path to the data out report
"_out_report_path",
# tracker for expected missing seq nums
"_total_expected_missing_seq_packets",
"_write_data_speed_up_reports",
# data holder for output
"_view"]
# base key (really nasty hack to tie in fixed route keys)
BASE_KEY = 0xFFFFFFF9
NEW_SEQ_KEY = 0xFFFFFFF8
FIRST_DATA_KEY = 0xFFFFFFF7
END_FLAG_KEY = 0xFFFFFFF6
# to use with multicast stuff
BASE_MASK = 0xFFFFFFFB
NEW_SEQ_KEY_OFFSET = 1
FIRST_DATA_KEY_OFFSET = 2
END_FLAG_KEY_OFFSET = 3
# throttle on the transmission
TRANSMISSION_THROTTLE_TIME = 0.000001
# TRAFFIC_TYPE = EdgeTrafficType.MULTICAST
TRAFFIC_TYPE = EdgeTrafficType.FIXED_ROUTE
# report names for tracking used routers
OUT_REPORT_NAME = "routers_used_in_speed_up_process.rpt"
IN_REPORT_NAME = "speeds_gained_in_speed_up_process.rpt"
# the end flag is set when the high bit of the sequence number word is set
LAST_MESSAGE_FLAG_BIT_MASK = 0x80000000
# corresponding mask for the actual sequence numbers
SEQUENCE_NUMBER_MASK = 0x7fffffff
# time outs used by the protocol for separate bits
TIMEOUT_PER_RECEIVE_IN_SECONDS = 1
TIME_OUT_FOR_SENDING_IN_SECONDS = 0.01
# end flag for missing seq nums
MISSING_SEQ_NUMS_END_FLAG = 0xFFFFFFFF
_ADDRESS_PACKET_BYTE_FORMAT = struct.Struct(
"<{}B".format(BYTES_IN_FULL_PACKET_WITH_ADDRESS))
# Router timeouts, in mantissa,exponent form. See datasheet for details
LONG_TIMEOUT = (14, 14)
SHORT_TIMEOUT = (1, 1)
TEMP_TIMEOUT = (15, 4)
ZERO_TIMEOUT = (0, 0)
# Initial port for the reverse IP tag (to be replaced later)
_TAG_INITIAL_PORT = 10000
def __init__(
self, x, y, extra_monitors_by_chip, ip_address,
report_default_directory,
write_data_speed_up_reports, constraints=None):
super(DataSpeedUpPacketGatherMachineVertex, self).__init__(
label="SYSTEM:PacketGatherer({},{})".format(x, y),
constraints=constraints)
# data holders for the output, and sequence numbers
self._view = None
self._max_seq_num = None
self._output = None
# store of the extra monitors to location. helpful in data in
self._extra_monitors_by_chip = extra_monitors_by_chip
self._total_expected_missing_seq_packets = 0
self._have_received_missing_seq_count_packet = False
self._missing_seq_nums_data_in = list()
self._missing_seq_nums_data_in.append(list())
# Create a connection to be used
self._x = x
self._y = y
self._ip_address = ip_address
self._remote_tag = None
self._connection = None
# local provenance storage
self._provenance_data_items = defaultdict(list)
self._placement = None
self._app_id = None
# create report if it doesn't already exist
self._out_report_path = \
os.path.join(report_default_directory, self.OUT_REPORT_NAME)
self._in_report_path = \
os.path.join(report_default_directory, self.IN_REPORT_NAME)
self._write_data_speed_up_reports = write_data_speed_up_reports
# Stored reinjection status for resetting timeouts
self._last_status = None
def __throttled_send(self, message):
""" slows down transmissions to allow spinnaker to keep up.
:param message: message to send
:param connection: the connection to send down
:rtype: None
"""
# send first message
self._connection.send_sdp_message(message)
time.sleep(self.TRANSMISSION_THROTTLE_TIME)
@property
@overrides(MachineVertex.resources_required)
def resources_required(self):
return self.static_resources_required()
[docs] @staticmethod
def static_resources_required():
return ResourceContainer(
sdram=ConstantSDRAM(
SYSTEM_BYTES_REQUIREMENT + CONFIG_SIZE +
SDRAM_FOR_MISSING_SDP_SEQ_NUMS +
SIZE_DATA_IN_CHIP_TO_KEY_SPACE),
iptags=[IPtagResource(
port=DataSpeedUpPacketGatherMachineVertex._TAG_INITIAL_PORT,
strip_sdp=True, ip_address="localhost",
traffic_identifier="DATA_SPEED_UP")])
[docs] @overrides(AbstractHasAssociatedBinary.get_binary_start_type)
def get_binary_start_type(self):
return ExecutableType.SYSTEM
[docs] @inject_items({
"machine_graph": "MemoryMachineGraph",
"routing_info": "MemoryRoutingInfos",
"tags": "MemoryTags",
"machine_time_step": "MachineTimeStep",
"time_scale_factor": "TimeScaleFactor",
"mc_data_chips_to_keys": "DataInMulticastKeyToChipMap",
"machine": "MemoryExtendedMachine",
"app_id": "APPID"
})
@overrides(
AbstractGeneratesDataSpecification.generate_data_specification,
additional_arguments={
"machine_graph", "routing_info", "tags",
"machine_time_step", "time_scale_factor",
"mc_data_chips_to_keys", "machine", "app_id"
})
def generate_data_specification(
self, spec, placement, machine_graph, routing_info, tags,
machine_time_step, time_scale_factor, mc_data_chips_to_keys,
machine, app_id):
# pylint: disable=too-many-arguments, arguments-differ
# update my placement for future knowledge
self._placement = placement
self._app_id = app_id
# Create the data regions for hello world
self._reserve_memory_regions(spec)
# write data for the simulation data item
spec.switch_write_focus(_DATA_REGIONS.SYSTEM.value)
spec.write_array(simulation_utilities.get_simulation_header_array(
self.get_binary_file_name(), machine_time_step, time_scale_factor))
# the keys for the special cases
if self.TRAFFIC_TYPE == EdgeTrafficType.MULTICAST:
base_key = routing_info.get_first_key_for_edge(
list(machine_graph.get_edges_ending_at_vertex(self))[0])
new_seq_key = base_key + self.NEW_SEQ_KEY_OFFSET
first_data_key = base_key + self.FIRST_DATA_KEY_OFFSET
end_flag_key = base_key + self.END_FLAG_KEY_OFFSET
else:
new_seq_key = self.NEW_SEQ_KEY
first_data_key = self.FIRST_DATA_KEY
end_flag_key = self.END_FLAG_KEY
spec.switch_write_focus(_DATA_REGIONS.CONFIG.value)
spec.write_value(new_seq_key)
spec.write_value(first_data_key)
spec.write_value(end_flag_key)
# locate the tag ID for our data and update with a port
# Note: The port doesn't matter as we are going to override this later
iptags = tags.get_ip_tags_for_vertex(self)
iptag = iptags[0]
spec.write_value(iptag.tag)
self._remote_tag = iptag.tag
# write mc chip key map
spec.switch_write_focus(_DATA_REGIONS.CHIP_TO_KEY_SPACE.value)
chips_on_board = list(machine.get_existing_xys_on_board(
machine.get_chip_at(placement.x, placement.y)))
# write how many chips to read
spec.write_value(len(chips_on_board))
# write each chip x and y and base key
for chip_xy in chips_on_board:
board_chip_x, board_chip_y = machine.get_local_xy(
machine.get_chip_at(*chip_xy))
spec.write_value(board_chip_x)
spec.write_value(board_chip_y)
spec.write_value(mc_data_chips_to_keys[chip_xy])
log.debug("for chip {}:{} base key is {}",
chip_xy[0], chip_xy[1], mc_data_chips_to_keys[chip_xy])
# End-of-Spec:
spec.end_specification()
@staticmethod
def _reserve_memory_regions(spec):
""" Writes the DSG regions memory sizes. Static so that it can be used\
by the application vertex.
:param spec: spec file
:param system_size: size of system region
:rtype: None
"""
spec.reserve_memory_region(
region=_DATA_REGIONS.SYSTEM.value,
size=SIMULATION_N_BYTES,
label='systemInfo')
spec.reserve_memory_region(
region=_DATA_REGIONS.CONFIG.value,
size=CONFIG_SIZE,
label="config")
spec.reserve_memory_region(
region=_DATA_REGIONS.CHIP_TO_KEY_SPACE.value,
size=SIZE_DATA_IN_CHIP_TO_KEY_SPACE,
label="mc_key_map")
[docs] @overrides(AbstractHasAssociatedBinary.get_binary_file_name)
def get_binary_file_name(self):
return "data_speed_up_packet_gatherer.aplx"
[docs] @overrides(AbstractProvidesLocalProvenanceData.get_local_provenance_data)
def get_local_provenance_data(self):
prov_items = list()
significant_losses = defaultdict(list)
for (placement, memory_address, length_in_bytes) in \
self._provenance_data_items.keys():
# handle duplicates of the same calls
times_extracted_the_same_thing = 0
top_level_name = "Provenance_for_{}".format(self._label)
for time_taken, lost_seq_nums in self._provenance_data_items[
placement, memory_address, length_in_bytes]:
# handle time
chip_name = "chip{}:{}".format(placement.x, placement.y)
last_name = "Memory_address:{}:Length_in_bytes:{}"\
.format(memory_address, length_in_bytes)
iteration_name = "iteration{}".format(
times_extracted_the_same_thing)
prov_items.append(ProvenanceDataItem(
[top_level_name, "extraction_time", chip_name, last_name,
iteration_name],
time_taken, report=False, message=None))
times_extracted_the_same_thing += 1
# handle lost sequence numbers
for i, n_lost_seq_nums in enumerate(lost_seq_nums):
# Zeroes are not reported at all
if n_lost_seq_nums:
prov_items.append(ProvenanceDataItem(
[top_level_name, "lost_seq_nums", chip_name,
last_name, iteration_name,
"iteration_{}".format(i)],
n_lost_seq_nums, report=(
n_lost_seq_nums > _MINOR_LOSS_THRESHOLD),
message=_MINOR_LOSS_MESSAGE.format(
length_in_bytes, memory_address, i,
n_lost_seq_nums)))
if n_lost_seq_nums > _MAJOR_LOSS_THRESHOLD:
significant_losses[placement.x, placement.y] += [i]
for chip in significant_losses:
n_times = len(significant_losses[chip])
chip_name = "chip{}:{}".format(*chip)
prov_items.append(ProvenanceDataItem(
[top_level_name, "serious_lost_seq_num_count", chip_name],
n_times, report=True, message=_MAJOR_LOSS_MESSAGE.format(
chip, n_times)))
return prov_items
[docs] @staticmethod
def locate_correct_write_data_function_for_chip_location(
uses_advanced_monitors, machine, x, y, transceiver,
extra_monitor_cores_to_ethernet_connection_map):
""" supports other components figuring out which gather and function \
to call for writing data onto spinnaker
:param uses_advanced_monitors: \
Whether the system is using advanced monitors
:type uses_advanced_monitors: bool
:param machine: the SpiNNMachine instance
:param x: the chip x coordinate to write data to
:param y: the chip y coordinate to write data to
:param extra_monitor_cores_to_ethernet_connection_map: \
mapping between cores and connections
:param transceiver: the SpiNNMan instance
:return: a write function of either a LPG or the spinnMan
:rtype: func
"""
if not uses_advanced_monitors:
return transceiver.write_memory
chip = machine.get_chip_at(x, y)
ethernet_connected_chip = machine.get_chip_at(
chip.nearest_ethernet_x, chip.nearest_ethernet_y)
gatherer = extra_monitor_cores_to_ethernet_connection_map[
ethernet_connected_chip.x, ethernet_connected_chip.y]
return gatherer.send_data_into_spinnaker
def _generate_data_in_report(
self, time_diff, data_size, x, y,
address_written_to, missing_seq_nums):
""" writes the data in report for this stage
:param time_took_ms: the time taken to write the memory
:param data_size: the size of data that was written in bytes
:param x: the location in machine where the data was written to X axis
:param y: the location in machine where the data was written to Y axis
:param address_written_to: where in SDRAM it was written to
:param missing_seq_nums: \
the set of missing sequence numbers per data transmission attempt
:rtype: None
"""
if not os.path.isfile(self._in_report_path):
with open(self._in_report_path, "w") as writer:
writer.write(
"x\t\t y\t\t SDRAM address\t\t size in bytes\t\t\t"
" time took \t\t\t Mb/s \t\t\t missing sequence numbers\n")
writer.write(
"------------------------------------------------"
"------------------------------------------------"
"-------------------------------------------------\n")
time_took_ms = float(time_diff.microseconds +
time_diff.total_seconds() * 1000000)
megabits = (data_size * 8.0) / (1024.0 * 1024.0)
if time_took_ms == 0:
mbs = "unknown, below threshold"
else:
mbs = megabits / (float(time_took_ms) / 100000.0)
with open(self._in_report_path, "a") as writer:
writer.write(
"{}\t\t {}\t\t {}\t\t {}\t\t\t\t {}\t\t\t {}\t\t {}\n".format(
x, y, address_written_to, data_size, time_took_ms,
mbs, missing_seq_nums))
[docs] def send_data_into_spinnaker(
self, x, y, base_address, data, n_bytes=None, offset=0,
cpu=0, is_filename=False):
""" sends a block of data into SpiNNaker to a given chip
:param x: chip x for data
:param y: chip y for data
:param base_address: the address in SDRAM to start writing memory
:param data: the data to write
:param n_bytes: how many bytes to read, or None if not set
:param offset: where in the data to start from
:param is_filename: whether data is actually a file.
:type is_filename: bool
:rtype: None
"""
# if file, read in and then process as normal
if is_filename:
if offset != 0:
raise Exception(
"when using a file, you can only have a offset of 0")
reader = FileDataReader(data)
if n_bytes is None:
n_bytes = os.stat(data).st_size
data = reader.readall()
else:
data = reader.read(n_bytes)
elif n_bytes is None:
n_bytes = len(data)
transceiver = get_simulator().transceiver
# if not worth using extra monitors, send via SCP
if not self._worse_via_scp(n_bytes):
# start time recording
start = datetime.datetime.now()
# write the data
transceiver.write_memory(
x=x, y=y, base_address=base_address, n_bytes=n_bytes,
data=data, offset=offset, is_filename=False, cpu=cpu)
# record when finished
end = datetime.datetime.now()
self._missing_seq_nums_data_in = [[]]
else:
log.debug("sending {} bytes to {},{} via Data In protocol",
n_bytes, x, y)
# start time recording
start = datetime.datetime.now()
# send data
self._send_data_via_extra_monitors(
transceiver, x, y, base_address, data[offset:n_bytes + offset])
# end time recording
end = datetime.datetime.now()
if VERIFY_SENT_DATA:
original_data = bytes(data[offset:n_bytes + offset])
verified_data = bytes(transceiver.read_memory(
x, y, base_address, n_bytes))
if original_data != verified_data:
log.error("VARIANCE: chip:{},{} address:{} len:{}",
x, y, base_address, n_bytes)
log.error("original:{}", original_data.hex())
log.error("verified:{}", verified_data.hex())
i = 0
for (a, b) in zip(original_data, verified_data):
if a != b:
break
i += 1
raise Exception("damn at " + str(i))
# write report
if self._write_data_speed_up_reports:
self._generate_data_in_report(
x=x, y=y, time_diff=end - start,
data_size=n_bytes, address_written_to=base_address,
missing_seq_nums=self._missing_seq_nums_data_in)
@staticmethod
def _worse_via_scp(n_bytes):
return (n_bytes is None or
n_bytes >= THRESHOLD_WHERE_SDP_BETTER_THAN_DATA_INPUT_IN_BYTES)
@staticmethod
def __make_sdp_message(placement, port, payload):
return SDPMessage(
sdp_header=SDPHeader(
destination_chip_x=placement.x,
destination_chip_y=placement.y,
destination_cpu=placement.p,
destination_port=port.value,
flags=SDPFlag.REPLY_NOT_EXPECTED),
data=payload)
def _send_data_via_extra_monitors(
self, transceiver, destination_chip_x, destination_chip_y,
start_address, data_to_write):
""" sends data using the extra monitor cores
:param transceiver: the SpiNNMan instance
:param destination_chip_x: chip x
:param destination_chip_y: chip y
:param start_address: start address in sdram to write data to
:param data_to_write: the data to write
:rtype: None
"""
# how many packets after first one we need to send
number_of_packets = ceildiv(
len(data_to_write) - BYTES_IN_FULL_PACKET_WITH_ADDRESS,
BYTES_IN_FULL_PACKET_WITHOUT_ADDRESS)
# determine board chip IDs, as the LPG does not know machine scope IDs
machine = transceiver.get_machine_details()
chip = machine.get_chip_at(destination_chip_x, destination_chip_y)
dest_x, dest_y = machine.get_local_xy(chip)
# send first packet to lpg, stating where to send it to
data = bytearray(WORDS_PER_FULL_PACKET * WORD_SIZE)
_FOUR_WORDS.pack_into(
data, 0, DATA_IN_COMMANDS.SEND_DATA_TO_LOCATION.value,
start_address, (dest_x << 16) | dest_y, number_of_packets)
self._ADDRESS_PACKET_BYTE_FORMAT.pack_into(
data, BYTES_FOR_COMMAND_AND_ADDRESS_HEADER,
*data_to_write[0:BYTES_IN_FULL_PACKET_WITH_ADDRESS])
# debug
# self._print_out_packet_data(data)
# send first message
self._connection = SCAMPConnection(
chip_x=self._x, chip_y=self._y, remote_host=self._ip_address)
self.__reprogram_tag(self._connection)
self._connection.send_sdp_message(self.__make_sdp_message(
self._placement, SDP_PORTS.EXTRA_MONITOR_CORE_DATA_IN_SPEED_UP,
data))
log.debug("sent initial {} bytes", BYTES_IN_FULL_PACKET_WITH_ADDRESS)
# send initial attempt at sending all the data
self._send_all_data_based_packets(number_of_packets, data_to_write)
# verify completed
received_confirmation = False
time_out_count = 0
while not received_confirmation:
try:
# try to receive a confirmation of some sort from spinnaker
data = self._connection.receive(
timeout=self.TIMEOUT_PER_RECEIVE_IN_SECONDS)
time_out_count = 0
# check which message type we have received
received_confirmation = self._outgoing_process_packet(
data, data_to_write)
except SpinnmanTimeoutException: # if time out, keep trying
# if the timeout has not occurred x times, keep trying
if time_out_count > TIMEOUT_RETRY_LIMIT:
emergency_recover_state_from_failure(
transceiver, self._app_id, self, self._placement)
raise SpinnFrontEndException(
TIMEOUT_MESSAGE.format(time_out_count))
# reopen the connection and try again
time_out_count += 1
remote_port = self._connection.remote_port
local_port = self._connection.local_port
local_ip = self._connection.local_ip_address
remote_ip = self._connection.remote_ip_address
self._connection.close()
self._connection = SCAMPConnection(
local_port=local_port, remote_port=remote_port,
local_host=local_ip, remote_host=remote_ip)
# if we have not received confirmation of finish, try to
# retransmit missing seq nums
if not received_confirmation:
self._outgoing_retransmit_missing_seq_nums(data_to_write)
def _read_in_missing_seq_nums(self, data, data_to_write, position):
""" handles a missing seq num packet from spinnaker
:param data: the data to translate into missing seq nums
:param data_to_write: the data to write
:param position: the position in the data to write.
:rtype: None
"""
# find how many elements are in this packet
n_elements = (len(data) - position) // WORD_SIZE
# store missing
self._missing_seq_nums_data_in[-1].extend(struct.unpack_from(
"<{}I".format(n_elements), data, position))
# determine if last element is end flag
if self._missing_seq_nums_data_in[-1][-1] == \
self.MISSING_SEQ_NUMS_END_FLAG:
del self._missing_seq_nums_data_in[-1][-1]
self._outgoing_retransmit_missing_seq_nums(data_to_write)
if (self._total_expected_missing_seq_packets == 0 and
self._have_received_missing_seq_count_packet):
self._outgoing_retransmit_missing_seq_nums(data_to_write)
def _outgoing_process_packet(self, data, data_to_write):
""" processes a packet from SpiNNaker
:param data: the packet data
:param data_to_write: the data to write to spinnaker
:return: if the packet contains a confirmation of complete
:rtype: bool
"""
position = 0
command_id = _ONE_WORD.unpack_from(data, 0)[0]
position += WORD_SIZE
log.debug("received packet with id {}", command_id)
# process first missing
if command_id == DATA_IN_COMMANDS.RECEIVE_FIRST_MISSING_SEQ.value:
# find total missing
self._total_expected_missing_seq_packets += \
_ONE_WORD.unpack_from(data, position)[0]
position += WORD_SIZE
self._have_received_missing_seq_count_packet = True
# write missing seq nums and retransmit if needed
self._read_in_missing_seq_nums(data, data_to_write, position)
# process missing seq packets
if command_id == DATA_IN_COMMANDS.RECEIVE_MISSING_SEQ_DATA.value:
# write missing seq nums and retransmit if needed
self._total_expected_missing_seq_packets -= 1
self._read_in_missing_seq_nums(data, data_to_write, position)
# process the confirmation of all data received
return command_id == DATA_IN_COMMANDS.RECEIVE_FINISHED.value
def _outgoing_retransmit_missing_seq_nums(self, data_to_write):
""" Transmits back into SpiNNaker the missing data based off missing\
sequence numbers
:param data_to_write: the data to write.
:rtype: None
"""
for missing_seq_num in self._missing_seq_nums_data_in[-1]:
message, _length = self._calculate_data_in_data_from_seq_number(
data_to_write, missing_seq_num,
DATA_IN_COMMANDS.SEND_SEQ_DATA.value, None)
self.__throttled_send(message)
self._missing_seq_nums_data_in.append(list())
self._total_expected_missing_seq_packets = 0
self._have_received_missing_seq_count_packet = False
self._send_end_flag()
def _calculate_position_from_seq_number(self, seq_num):
""" Calculates where in the raw data to start reading from, given a\
sequence number
:param seq_num: the sequence number to determine position from
:return: the position in the byte data
:rtype: int
"""
if seq_num == 0:
return 0
return BYTES_IN_FULL_PACKET_WITH_ADDRESS + (
BYTES_IN_FULL_PACKET_WITHOUT_ADDRESS * (seq_num - 1))
def _calculate_data_in_data_from_seq_number(
self, data_to_write, seq_num, command_id, position):
""" Determine the data needed to be sent to the SpiNNaker machine\
given a sequence number
:param data_to_write: the data to write to the SpiNNaker machine
:param seq_num: the seq num to ge tthe data for
:param position: the position in the data to write to spinnaker
:type position: int or None
:return: SDP message and how much data has been written
:rtype: SDP message
"""
# check for last packet
packet_data_length = BYTES_IN_FULL_PACKET_WITHOUT_ADDRESS
# determine position in data if not given
if position is None:
position = self._calculate_position_from_seq_number(seq_num)
# if less than a full packet worth of data, adjust length
if position + packet_data_length > len(data_to_write):
packet_data_length = len(data_to_write) - position
if packet_data_length < 0:
raise Exception()
# determine the true packet length (with header)
packet_length = (
packet_data_length + BYTES_FOR_COMMAND_AND_SEQ_HEADER)
# create struct
packet_data = bytearray(packet_length)
_TWO_WORDS.pack_into(packet_data, 0, command_id, seq_num)
struct.pack_into(
"<{}B".format(packet_data_length), packet_data,
BYTES_FOR_COMMAND_AND_SEQ_HEADER,
*data_to_write[position:position+packet_data_length])
# debug
# self._print_out_packet_data(packet_data)
# build sdp packet
message = self.__make_sdp_message(
self._placement, SDP_PORTS.EXTRA_MONITOR_CORE_DATA_IN_SPEED_UP,
packet_data)
# return message for sending, and the length in data sent
return message, packet_data_length
def _send_end_flag(self):
# send end flag as separate message
self._connection.send_sdp_message(self.__make_sdp_message(
self._placement, SDP_PORTS.EXTRA_MONITOR_CORE_DATA_IN_SPEED_UP,
_ONE_WORD.pack(DATA_IN_COMMANDS.SEND_DONE.value)))
def _send_all_data_based_packets(
self, number_of_packets, data_to_write):
""" Send all the data as one block
:param number_of_packets: the number of packets expected to send
:param data_to_write: the data to send
:rtype: None
"""
# where in the data we are currently up to
position_in_data = BYTES_IN_FULL_PACKET_WITH_ADDRESS
# send rest of data
total_data_length = len(data_to_write)
for seq_num in range(1, number_of_packets + 1):
# put in command flag and seq num
message, length_to_send = \
self._calculate_data_in_data_from_seq_number(
data_to_write, seq_num,
DATA_IN_COMMANDS.SEND_SEQ_DATA.value, position_in_data)
position_in_data += length_to_send
# send the message
self.__throttled_send(message)
log.debug("sent seq {} of {} bytes", seq_num, length_to_send)
# check for end flag
if position_in_data == total_data_length:
self._send_end_flag()
log.debug("sent end flag")
[docs] @staticmethod
def streaming(gatherers, transceiver, extra_monitor_cores, placements):
""" Helper method for setting the router timeouts to a state usable\
for data streaming via a Python context manager (i.e., using\
the 'with' statement).
:param gatherers: All the gatherers that are to be set
:param transceiver: the SpiNNMan instance
:param extra_monitor_cores: the extra monitor cores to set
:param placements: placements object
:rtype: a context manager
"""
return _StreamingContextManager(
gatherers, transceiver, extra_monitor_cores, placements)
[docs] def set_cores_for_data_streaming(
self, transceiver, extra_monitor_cores, placements):
""" Helper method for setting the router timeouts to a state usable\
for data streaming
:param transceiver: the SpiNNMan instance
:param extra_monitor_cores: the extra monitor cores to set
:param placements: placements object
:rtype: None
"""
lead_monitor = extra_monitor_cores[0]
# Store the last reinjection status for resetting
# NOTE: This assumes the status is the same on all cores
self._last_status = lead_monitor.get_reinjection_status(
placements, transceiver)
# Set to not inject dropped packets
lead_monitor.set_reinjection_packets(
placements, extra_monitor_cores, transceiver,
point_to_point=False, multicast=False, nearest_neighbour=False,
fixed_route=False)
# Clear any outstanding packets from reinjection
lead_monitor.clear_reinjection_queue(
transceiver, placements, extra_monitor_cores)
# set time outs
lead_monitor.set_router_emergency_timeout(
self.SHORT_TIMEOUT, transceiver, placements, extra_monitor_cores)
lead_monitor.set_router_time_outs(
self.LONG_TIMEOUT, transceiver, placements, extra_monitor_cores)
[docs] @staticmethod
def load_application_routing_tables(
transceiver, extra_monitor_cores, placements):
""" Set all chips to have application table loaded in the router
:param transceiver: the SpiNNMan instance
:param extra_monitor_cores: the extra monitor cores to set
:param placements: placements object
:rtype: None
"""
extra_monitor_cores[0].load_application_mc_routes(
placements, extra_monitor_cores, transceiver)
[docs] @staticmethod
def load_system_routing_tables(
transceiver, extra_monitor_cores, placements):
""" Set all chips to have the system table loaded in the router
:param transceiver: the SpiNNMan instance
:param extra_monitor_cores: the extra monitor cores to set
:param placements: placements object
:rtype: None
"""
extra_monitor_cores[0].load_system_mc_routes(
placements, extra_monitor_cores, transceiver)
[docs] def unset_cores_for_data_streaming(
self, transceiver, extra_monitor_cores, placements):
""" Helper method for setting the router timeouts to a state usable\
for data streaming
:param transceiver: the SpiNNMan instance
:param extra_monitor_cores: the extra monitor cores to set
:param placements: placements object
:rtype: None
"""
lead_monitor = extra_monitor_cores[0]
# Set the routers to temporary values
lead_monitor.set_router_time_outs(
self.TEMP_TIMEOUT, transceiver, placements, extra_monitor_cores)
lead_monitor.set_router_emergency_timeout(
self.ZERO_TIMEOUT, transceiver, placements, extra_monitor_cores)
if self._last_status is None:
log.warning(
"Cores have not been set for data extraction, so can't be"
" unset")
try:
lead_monitor.set_router_time_outs(
self._last_status.router_timeout_parameters,
transceiver, placements, extra_monitor_cores)
lead_monitor.set_router_emergency_timeout(
self._last_status.router_emergency_timeout_parameters,
transceiver, placements, extra_monitor_cores)
lead_monitor.set_reinjection_packets(
placements, extra_monitor_cores, transceiver,
point_to_point=self._last_status.is_reinjecting_point_to_point,
multicast=self._last_status.is_reinjecting_multicast,
nearest_neighbour=(
self._last_status.is_reinjecting_nearest_neighbour),
fixed_route=self._last_status.is_reinjecting_fixed_route)
except Exception: # pylint: disable=broad-except
log.exception("Error resetting timeouts")
log.error("Checking if the cores are OK...")
core_subsets = convert_vertices_to_core_subset(
extra_monitor_cores, placements)
try:
error_cores = transceiver.get_cores_not_in_state(
core_subsets, {CPUState.RUNNING})
if error_cores:
log.error("Cores in an unexpected state: {}".format(
error_cores))
except Exception: # pylint: disable=broad-except
log.exception("Couldn't get core state")
def __reprogram_tag(self, connection):
request = IPTagSet(
self._x, self._y, [0, 0, 0, 0], 0,
self._remote_tag, strip=True, use_sender=True)
data = connection.get_scp_data(request)
einfo = None
for _ in range(3):
try:
connection.send(data)
_, _, response, offset = \
connection.receive_scp_response()
request.get_scp_response().read_bytestring(response, offset)
return
except SpinnmanTimeoutException:
einfo = sys.exc_info()
reraise(*einfo)
[docs] def get_data(
self, placement, memory_address, length_in_bytes, fixed_routes):
""" Gets data from a given core and memory address.
:param placement: placement object for where to get data from
:param memory_address: the address in SDRAM to start reading from
:param length_in_bytes: the length of data to read in bytes
:param fixed_routes: the fixed routes, used in the report of which\
chips were used by the speed up process
:return: byte array of the data
"""
start = float(time.time())
# if asked for no data, just return a empty byte array
if length_in_bytes == 0:
data = bytearray(0)
end = float(time.time())
self._provenance_data_items[
placement, memory_address,
length_in_bytes].append((end - start, [0]))
return data
transceiver = get_simulator().transceiver
if (length_in_bytes <
THRESHOLD_WHERE_SDP_BETTER_THAN_DATA_EXTRACTOR_IN_BYTES):
data = transceiver.read_memory(
placement.x, placement.y, memory_address, length_in_bytes)
end = float(time.time())
self._provenance_data_items[
placement, memory_address,
length_in_bytes].append((end - start, [0]))
return data
# Update the IP Tag to work through a NAT firewall
connection = SCAMPConnection(
chip_x=self._x, chip_y=self._y, remote_host=self._ip_address)
self.__reprogram_tag(connection)
# send
connection.send_sdp_message(self.__make_sdp_message(
placement, SDP_PORTS.EXTRA_MONITOR_CORE_DATA_SPEED_UP,
_THREE_WORDS.pack(
DATA_OUT_COMMANDS.START_SENDING.value,
memory_address, length_in_bytes)))
# receive
self._output = bytearray(length_in_bytes)
self._view = memoryview(self._output)
self._max_seq_num = self.calculate_max_seq_num()
lost_seq_nums = self._receive_data(transceiver, placement, connection)
# Stop anything else getting through (and reduce traffic)
connection.send_sdp_message(self.__make_sdp_message(
placement, SDP_PORTS.EXTRA_MONITOR_CORE_DATA_SPEED_UP,
_ONE_WORD.pack(DATA_OUT_COMMANDS.CLEAR.value)))
connection.close()
end = float(time.time())
self._provenance_data_items[
placement, memory_address, length_in_bytes].append(
(end - start, lost_seq_nums))
# create report elements
if self._write_data_speed_up_reports:
routers_been_in_use = self._determine_which_routers_were_used(
placement, fixed_routes, transceiver.get_machine_details())
self._write_routers_used_into_report(
self._out_report_path, routers_been_in_use, placement)
return self._output
def _receive_data(self, transceiver, placement, connection):
seq_nums = set()
lost_seq_nums = list()
timeoutcount = 0
finished = False
while not finished:
try:
data = connection.receive(
timeout=self.TIMEOUT_PER_RECEIVE_IN_SECONDS)
timeoutcount = 0
seq_nums, finished = self._process_data(
data, seq_nums, finished, placement, transceiver,
lost_seq_nums)
except SpinnmanTimeoutException:
if timeoutcount > TIMEOUT_RETRY_LIMIT:
raise SpinnFrontEndException(
"Failed to hear from the machine during {} attempts. "
"Please try removing firewalls".format(timeoutcount))
timeoutcount += 1
# self.__reset_connection()
if not finished:
finished = self._determine_and_retransmit_missing_seq_nums(
seq_nums, transceiver, placement, lost_seq_nums)
return lost_seq_nums
@staticmethod
def _determine_which_routers_were_used(placement, fixed_routes, machine):
""" Traverse the fixed route paths from a given location to its\
destination. Used for determining which routers were used.
:param placement: the source to start from
:param fixed_routes: the fixed routes for each router
:param machine: the spinnMachine instance
:return: list of chip IDs
"""
routers = list()
routers.append((placement.x, placement.y))
entry = fixed_routes[(placement.x, placement.y)]
chip_x = placement.x
chip_y = placement.y
while len(entry.processor_ids) == 0:
# can assume one link, as its a minimum spanning tree going to
# the root
machine_link = machine.get_chip_at(
chip_x, chip_y).router.get_link(next(iter(entry.link_ids)))
chip_x = machine_link.destination_x
chip_y = machine_link.destination_y
routers.append((chip_x, chip_y))
entry = fixed_routes[(chip_x, chip_y)]
return routers
@staticmethod
def _write_routers_used_into_report(
report_path, routers_been_in_use, placement):
""" Write the used routers into a report
:param report_path: the path to the report file
:param routers_been_in_use: the routers been in use
:param placement: the first placement used
:rtype: None
"""
writer_behaviour = "w"
if os.path.isfile(report_path):
writer_behaviour = "a"
with open(report_path, writer_behaviour) as writer:
writer.write("[{}:{}:{}] = {}\n".format(
placement.x, placement.y, placement.p, routers_been_in_use))
def _calculate_missing_seq_nums(self, seq_nums):
""" Determine which sequence numbers we've missed
:param seq_nums: the set already acquired
:return: list of missing sequence numbers
"""
return [sn for sn in xrange(0, self._max_seq_num)
if sn not in seq_nums]
def _determine_and_retransmit_missing_seq_nums(
self, seq_nums, transceiver, placement, lost_seq_nums):
""" Determine if there are any missing sequence numbers, and if so\
retransmits the missing sequence numbers back to the core for\
retransmission.
:param seq_nums: the sequence numbers already received
:param transceiver: spinnman instance
:param placement: placement instance
:return: whether all packets are transmitted
:rtype: bool
"""
# pylint: disable=too-many-locals
# locate missing sequence numbers from pile
missing_seq_nums = self._calculate_missing_seq_nums(seq_nums)
lost_seq_nums.append(len(missing_seq_nums))
# self._print_missing(missing_seq_nums)
if not missing_seq_nums:
return True
# figure n packets given the 2 formats
n_packets = 1
length_via_format2 = \
len(missing_seq_nums) - (WORDS_PER_FULL_PACKET - 2)
if length_via_format2 > 0:
n_packets += ceildiv(
length_via_format2, WORDS_PER_FULL_PACKET - 1)
# transmit missing sequence as a new SDP packet
first = True
seq_num_offset = 0
for _ in xrange(n_packets):
length_left_in_packet = WORDS_PER_FULL_PACKET
offset = 0
# if first, add n packets to list
if first:
# get left over space / data size
size_of_data_left_to_transmit = min(
length_left_in_packet - 2,
len(missing_seq_nums) - seq_num_offset)
# build data holder accordingly
data = bytearray(
(size_of_data_left_to_transmit + 2) * WORD_SIZE)
# pack flag and n packets
_ONE_WORD.pack_into(
data, offset, DATA_OUT_COMMANDS.START_MISSING_SEQ.value)
_ONE_WORD.pack_into(data, WORD_SIZE, n_packets)
# update state
offset += 2 * WORD_SIZE
length_left_in_packet -= 2
first = False
else: # just add data
# get left over space / data size
size_of_data_left_to_transmit = min(
WORDS_PER_FULL_PACKET_WITH_SEQUENCE_NUM,
len(missing_seq_nums) - seq_num_offset)
# build data holder accordingly
data = bytearray(
(size_of_data_left_to_transmit + 1) * WORD_SIZE)
# pack flag
_ONE_WORD.pack_into(
data, offset, DATA_OUT_COMMANDS.MISSING_SEQ.value)
offset += 1 * WORD_SIZE
length_left_in_packet -= 1
# fill data field
struct.pack_into(
"<{}I".format(size_of_data_left_to_transmit), data, offset,
*missing_seq_nums[
seq_num_offset:
seq_num_offset + size_of_data_left_to_transmit])
seq_num_offset += length_left_in_packet
# build SDP message and send it to the core
transceiver.send_sdp_message(self.__make_sdp_message(
placement, SDP_PORTS.EXTRA_MONITOR_CORE_DATA_SPEED_UP, data))
# sleep for ensuring core doesn't lose packets
time.sleep(self.TIME_OUT_FOR_SENDING_IN_SECONDS)
# self._print_packet_num_being_sent(packet_count, n_packets)
return False
def _process_data(
self, data, seq_nums, finished, placement, transceiver,
lost_seq_nums):
""" Take a packet and processes it see if we're finished yet
:param data: the packet data
:param seq_nums: the list of sequence numbers received so far
:param finished: bool which states if finished or not
:param placement: placement object for location on machine
:param transceiver: spinnman instance
:param lost_seq_nums: the list of n sequence numbers lost per iteration
:return: set of data items, if its the first packet, the list of\
sequence numbers, the sequence number received and if its finished
"""
# pylint: disable=too-many-arguments
# self._print_out_packet_data(data)
length_of_data = len(data)
first_packet_element, = _ONE_WORD.unpack_from(data, 0)
# get flags
seq_num = first_packet_element & self.SEQUENCE_NUMBER_MASK
is_end_of_stream = (
first_packet_element & self.LAST_MESSAGE_FLAG_BIT_MASK) != 0
# check seq num not insane
if seq_num > self._max_seq_num:
raise Exception(
"got an insane sequence number. got {} when "
"the max is {} with a length of {}".format(
seq_num, self._max_seq_num, length_of_data))
# figure offset for where data is to be put
offset = self._calculate_offset(seq_num)
# write data
true_data_length = offset + length_of_data - WORD_SIZE
if not is_end_of_stream or length_of_data != WORD_SIZE:
self._write_into_view(
offset, true_data_length, data, WORD_SIZE,
length_of_data, seq_num, length_of_data, False)
# add seq num to list
seq_nums.add(seq_num)
# if received a last flag on its own, its during retransmission.
# check and try again if required
if is_end_of_stream:
if not self._check(seq_nums):
finished = self._determine_and_retransmit_missing_seq_nums(
placement=placement, transceiver=transceiver,
seq_nums=seq_nums, lost_seq_nums=lost_seq_nums)
else:
finished = True
return seq_nums, finished
def _calculate_offset(self, seq_num):
return seq_num * WORDS_PER_FULL_PACKET_WITH_SEQUENCE_NUM * WORD_SIZE
def _write_into_view(
self, view_start_position, view_end_position,
data, data_start_position, data_end_position, seq_num,
packet_length, is_final):
""" Puts data into the view
:param view_start_position: where in view to start
:param view_end_position: where in view to end
:param data: the data holder to write from
:param data_start_position: where in data holder to start from
:param data_end_position: where in data holder to end
:param seq_num: the sequence number to figure
:rtype: None
"""
# pylint: disable=too-many-arguments
if view_end_position > len(self._output):
raise Exception(
"I'm trying to add to my output data, but am trying to add "
"outside my acceptable output positions! max is {} and I "
"received a request to fill to {} for sequence num {} from max"
" sequence num {} length of packet {} and final {}".format(
len(self._output), view_end_position, seq_num,
self._max_seq_num, packet_length, is_final))
self._view[view_start_position: view_end_position] = \
data[data_start_position:data_end_position]
def _check(self, seq_nums):
""" Verify if the sequence numbers are correct.
:param seq_nums: the received sequence numbers
:return: Whether all the sequence numbers have been received
:rtype: bool
"""
# hand back
seq_nums = sorted(seq_nums)
max_needed = self.calculate_max_seq_num()
if len(seq_nums) > max_needed + 1:
raise Exception("I've received more data than I was expecting!!")
return len(seq_nums) == max_needed + 1
[docs] def calculate_max_seq_num(self):
""" Deduce the max sequence number expected to be received
:return: int of the biggest sequence num expected
"""
return ceildiv(
len(self._output),
WORDS_PER_FULL_PACKET_WITH_SEQUENCE_NUM * WORD_SIZE)
@staticmethod
def _print_missing(seq_nums):
""" Debug printer for the missing sequence numbers from the pile
:param seq_nums: the sequence numbers received so far
:rtype: None
"""
for seq_num in sorted(seq_nums):
log.info("from list I'm missing sequence num {}", seq_num)
def _print_out_packet_data(self, data):
""" Debug prints out the data from the packet
:param data: the packet data
:rtype: None
"""
reread_data = struct.unpack("<{}I".format(
ceildiv(len(data), WORD_SIZE)), data)
log.info("converted data back into readable form is {}", reread_data)
@staticmethod
def _print_length_of_received_seq_nums(seq_nums, max_needed):
""" Debug helper method for figuring out if everything been received
:param seq_nums: sequence numbers received
:param max_needed: biggest expected to have
:rtype: None
"""
if len(seq_nums) != max_needed:
log.info("should have received {} sequence numbers, but received "
"{} sequence numbers", max_needed, len(seq_nums))
@staticmethod
def _print_packet_num_being_sent(packet_count, n_packets):
""" Debug helper for printing missing sequence number packet\
transmission
:param packet_count: which packet is being fired
:param n_packets: how many packets to fire.
:rtype: None
"""
log.info("send SDP packet with missing sequence numbers: {} of {}",
packet_count + 1, n_packets)
class _StreamingContextManager(object):
""" The implementation of the context manager object for streaming \
configuration control.
"""
__slots__ = ["_gatherers", "_monitors", "_placements", "_txrx"]
def __init__(self, gatherers, txrx, monitors, placements):
self._gatherers = list(gatherers)
self._txrx = txrx
self._monitors = monitors
self._placements = placements
def __enter__(self):
for gatherer in self._gatherers:
gatherer.set_cores_for_data_streaming(
self._txrx, self._monitors, self._placements)
def __exit__(self, _type, _value, _tb):
for gatherer in self._gatherers:
gatherer.unset_cores_for_data_streaming(
self._txrx, self._monitors, self._placements)
return False