Source code for spinn_front_end_common.utilities.database.database_connection

# spinnman imports
from spinnman.exceptions \
    import SpinnmanIOException, SpinnmanInvalidPacketException, \
    SpinnmanTimeoutException
from spinnman.messages.eieio.command_messages import EIEIOCommandHeader
from spinnman.connections.udp_packet_connections import UDPConnection
from spinnman.constants import EIEIO_COMMAND_IDS as CMDS

# FrontEndCommon imports
from .database_reader import DatabaseReader

# general imports
from threading import Thread
import logging

logger = logging.getLogger(__name__)


[docs]class DatabaseConnection(UDPConnection, Thread): """ A connection from the toolchain which will be notified\ when the database has been written, and can then respond when the\ database has been read, and further wait for notification that the\ simulation has started. """ def __init__(self, start_resume_callback_function=None, stop_pause_callback_function=None, local_host=None, local_port=19999): """ :param start_resume_callback_function: A function to be called when the start message has been received. This function should not take any parameters or return anything. :type start_resume_callback_function: function() -> None :param local_host: Optional specification of the local hostname or\ ip address of the interface to listen on :type local_host: str :param local_port: Optional specification of the local port to listen\ on. Must match the port that the toolchain will send the\ notification on (19999 by default) :type local_port: int """ UDPConnection.__init__( self, local_host=local_host, local_port=local_port, remote_host=None, remote_port=None) Thread.__init__( self, name="SpyNNakerDatabaseConnection:{}:{}".format( self.local_ip_address, self.local_port)) self._database_callback_functions = list() self._start_resume_callback_function = start_resume_callback_function self._pause_and_stop_callback_function = stop_pause_callback_function self._running = False self.daemon = True self.start()
[docs] def add_database_callback(self, database_callback_function): """ Add a database callback to be called when the database is ready :param database_callback_function: A function to be called when the\ database message has been received. This function should take \ a single parameter, which will be a DatabaseReader object. \ Once the function returns, it will be assumed that the database \ has been read, and the return response will be sent. :type database_callback_function: function(\ :py:class:`spynnaker_external_devices.pyNN.connections.database_reader.DatabaseReader`)\ -> None """ self._database_callback_functions.append(database_callback_function)
[docs] def run(self): self._running = True logger.info( "{}:{} Waiting for message to indicate that the database is " "ready".format(self.local_ip_address, self.local_port)) while self._running: try: data, address = self._retrieve_database_address() if data is not None: self._process_message(address, data) except Exception as e: logger.error("Failure processing database callback", exc_info=True) raise SpinnmanIOException(str(e))
def _process_message(self, address, data): # Read the read packet confirmation logger.info("{}:{} Reading database".format( self.local_ip_address, self.local_port)) database_path = str(data[2:]) # Call the callback database_reader = DatabaseReader(database_path) for database_callback in self._database_callback_functions: database_callback(database_reader) database_reader.close() # Send the response logger.info("Notifying the toolchain that the database has been read") self._send_command(1, address) # Wait for the start of the simulation if self._start_resume_callback_function is not None: logger.info( "Waiting for message to indicate that the simulation has " "started or resumed") command_code = self._receive_command() if command_code != CMDS.START_RESUME_NOTIFICATION.value: raise SpinnmanInvalidPacketException( "command_code", "expected a start/resume command code now, and did not " "receive it") # Call the callback self._start_resume_callback_function() if self._pause_and_stop_callback_function is not None: logger.info( "Waiting for message to indicate that the simulation has " "stopped or paused") command_code = self._receive_command() if command_code != CMDS.STOP_PAUSE_NOTIFICATION.value: raise SpinnmanInvalidPacketException( "command_code", "expected a pause/stop command code now, and did not " "receive it") # Call the callback self._pause_and_stop_callback_function() def _send_command(self, command, address): self.send_to(EIEIOCommandHeader(command).bytestring, address) def _receive_command(self): return EIEIOCommandHeader.from_bytestring(self.receive(), 0).command def _retrieve_database_address(self): try: data, address = self.receive_with_address(timeout=3) return data, address except SpinnmanTimeoutException: return None, None except SpinnmanIOException: raise
[docs] def close(self): self._running = False UDPConnection.close(self)