Source code for spinn_front_end_common.utility_models.command_sender
# pacman imports
from pacman.model.decorators import overrides
from pacman.model.graphs.application import ApplicationEdge
from pacman.model.constraints.key_allocator_constraints \
import FixedKeyAndMaskConstraint
from pacman.model.graphs.application import ApplicationVertex
from pacman.model.resources import ResourceContainer, SDRAMResource
from pacman.model.routing_info import BaseKeyAndMask
from pacman.executor.injection_decorator import inject_items
# spinn front end common imports
from spinn_front_end_common.abstract_models import \
AbstractProvidesOutgoingPartitionConstraints, AbstractHasAssociatedBinary
from spinn_front_end_common.abstract_models import \
AbstractVertexWithEdgeToDependentVertices
from spinn_front_end_common.abstract_models \
import AbstractGeneratesDataSpecification
from spinn_front_end_common.utilities import constants
from .command_sender_machine_vertex import CommandSenderMachineVertex
from spinn_front_end_common.utilities.utility_objs import ExecutableStartType
[docs]class CommandSender(
ApplicationVertex, AbstractGeneratesDataSpecification,
AbstractHasAssociatedBinary,
AbstractProvidesOutgoingPartitionConstraints):
""" A utility for sending commands to a vertex (possibly an external\
device) at fixed times in the simulation
"""
# all commands will use this mask
_DEFAULT_COMMAND_MASK = 0xFFFFFFFF
def __init__(self, label, constraints):
ApplicationVertex.__init__(self, label, constraints, 1)
self._timed_commands = list()
self._commands_at_start_resume = list()
self._commands_at_pause_stop = list()
self._partition_id_to_keys = dict()
self._keys_to_partition_id = dict()
self._edge_partition_id_counter = 0
self._vertex_to_key_map = dict()
[docs] def add_commands(
self, start_resume_commands, pause_stop_commands,
timed_commands, vertex_to_send_to):
""" Add commands to be sent down a given edge
:param start_resume_commands: The commands to send when the simulation\
starts or resumes from pause
:type start_resume_commands: iterable of\
:py:class:`spinn_front_end_common.utility_models.multi_cast_command.MultiCastCommand`
:param pause_stop_commands: the commands to send when the simulation\
stops or pauses after running
:type pause_stop_commands: iterable of\
:py:class:`spinn_front_end_common.utility_models.multi_cast_command.MultiCastCommand`
:param timed_commands: The commands to send at specific times
:type timed_commands: iterable of\
:py:class:`spinn_front_end_common.utility_models.multi_cast_command.MultiCastCommand`
:param vertex_to_send_to: The vertex these commands are to be sent to
"""
# container for keys for partition mapping (remove duplicates)
command_keys = set()
self._vertex_to_key_map[vertex_to_send_to] = set()
# update holders
self._commands_at_start_resume.extend(start_resume_commands)
self._commands_at_pause_stop.extend(pause_stop_commands)
self._timed_commands.extend(timed_commands)
for commands in (
start_resume_commands, pause_stop_commands, timed_commands):
for command in commands:
# track keys
command_keys.add(command.key)
self._vertex_to_key_map[vertex_to_send_to].add(command.key)
# create mapping between keys and partitions via partition constraint
for key in command_keys:
partition_id = "COMMANDS{}".format(self._edge_partition_id_counter)
self._keys_to_partition_id[key] = partition_id
self._partition_id_to_keys[partition_id] = key
self._edge_partition_id_counter += 1
[docs] @inject_items({
"machine_time_step": "MachineTimeStep",
"time_scale_factor": "TimeScaleFactor",
"n_machine_time_steps": "RunTimeMachineTimeSteps"
})
@overrides(
AbstractGeneratesDataSpecification.generate_data_specification,
additional_arguments={
"machine_time_step", "time_scale_factor", "n_machine_time_steps"
})
def generate_data_specification(
self, spec, placement, machine_time_step, time_scale_factor,
n_machine_time_steps):
placement.vertex.generate_data_specification(
spec, placement, machine_time_step, time_scale_factor,
n_machine_time_steps)
[docs] @overrides(ApplicationVertex.create_machine_vertex)
def create_machine_vertex(
self, vertex_slice, resources_required, label=None,
constraints=None):
return CommandSenderMachineVertex(
constraints, resources_required, label,
self._commands_at_start_resume, self._commands_at_pause_stop,
self._timed_commands)
[docs] @overrides(ApplicationVertex.get_resources_used_by_atoms)
def get_resources_used_by_atoms(self, vertex_slice):
sdram = (
CommandSenderMachineVertex.get_timed_commands_bytes(
self._timed_commands) +
CommandSenderMachineVertex.get_n_command_bytes(
self._commands_at_start_resume) +
CommandSenderMachineVertex.get_n_command_bytes(
self._commands_at_pause_stop) +
constants.SYSTEM_BYTES_REQUIREMENT +
CommandSenderMachineVertex.get_provenance_data_size(0) +
(CommandSenderMachineVertex.get_number_of_mallocs_used_by_dsg() *
constants.SARK_PER_MALLOC_SDRAM_USAGE))
# Return the SDRAM and 1 core
return ResourceContainer(sdram=SDRAMResource(sdram))
@property
@overrides(ApplicationVertex.n_atoms)
def n_atoms(self):
return 1
[docs] @overrides(AbstractHasAssociatedBinary.get_binary_file_name)
def get_binary_file_name(self):
""" Return a string representation of the models binary
"""
return CommandSenderMachineVertex.BINARY_FILE_NAME
[docs] @overrides(AbstractVertexWithEdgeToDependentVertices.dependent_vertices)
def dependent_vertices(self):
return self._vertex_to_key_map.keys()
[docs] def edges_and_partitions(self):
edges = list()
partition_ids = list()
keys_added = set()
for vertex in self._vertex_to_key_map:
for key in self._vertex_to_key_map[vertex]:
if key not in keys_added:
keys_added.add(key)
app_edge = ApplicationEdge(self, vertex)
edges.append(app_edge)
partition_ids.append(self._keys_to_partition_id[key])
return edges, partition_ids
[docs] @overrides(AbstractProvidesOutgoingPartitionConstraints.
get_outgoing_partition_constraints)
def get_outgoing_partition_constraints(self, partition):
return [FixedKeyAndMaskConstraint([
BaseKeyAndMask(
self._partition_id_to_keys[partition.identifier],
self._DEFAULT_COMMAND_MASK)
])]
[docs] @overrides(AbstractHasAssociatedBinary.get_binary_start_type)
def get_binary_start_type(self):
return ExecutableStartType.USES_SIMULATION_INTERFACE