Source code for spinn_front_end_common.utilities.database.database_connection

# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import logging
from threading import Thread
from six import raise_from
from spinn_utilities.log import FormatAdapter
from spinnman.exceptions import (
    SpinnmanIOException, SpinnmanInvalidPacketException,
    SpinnmanTimeoutException)
from spinnman.messages.eieio.command_messages import EIEIOCommandHeader
from spinnman.connections.udp_packet_connections import UDPConnection
from spinnman.constants import EIEIO_COMMAND_IDS as CMDS
from spinn_front_end_common.utilities.constants import NOTIFY_PORT
from .database_reader import DatabaseReader

logger = FormatAdapter(logging.getLogger(__name__))


[docs]class DatabaseConnection(UDPConnection): """ A connection from the toolchain which will be notified when the \ database has been written, and can then respond when the database \ has been read, and further wait for notification that the simulation \ has started. """ __slots__ = [ "__database_callbacks", "__pause_and_stop_callback", "__running", "__start_resume_callback"] def __init__(self, start_resume_callback_function=None, stop_pause_callback_function=None, local_host=None, local_port=NOTIFY_PORT): """ :param start_resume_callback_function: A function to be called when \ the start message has been received. This function should not \ take any parameters or return anything. :type start_resume_callback_function: function() -> None :param local_host: Optional specification of the local hostname or\ IP address of the interface to listen on :type local_host: str :param local_port: Optional specification of the local port to listen \ on. Must match the port that the toolchain will send the \ notification on (19999 by default) :type local_port: int """ super(DatabaseConnection, self).__init__( local_host=local_host, local_port=local_port, remote_host=None, remote_port=None) thread = Thread(name="SpyNNakerDatabaseConnection:{}:{}".format( self.local_ip_address, self.local_port), target=self.__run) self.__database_callbacks = list() self.__start_resume_callback = start_resume_callback_function self.__pause_and_stop_callback = stop_pause_callback_function self.__running = False thread.daemon = True thread.start()
[docs] def add_database_callback(self, database_callback_function): """ Add a database callback to be called when the database is ready. :param database_callback_function: A function to be called when the\ database message has been received. This function should take \ a single parameter, which will be a DatabaseReader object. \ Once the function returns, it will be assumed that the database \ has been read, and the return response will be sent. :type database_callback_function: function(\ :py:class:`spinn_front_end_common.utilities.database.database_reader.DatabaseReader`)\ -> None :raises SpinnmanIOException: If anything goes wrong """ self.__database_callbacks.append(database_callback_function)
def __run(self): # pylint: disable=broad-except self.__running = True logger.info( "{}:{} Waiting for message to indicate that the database is " "ready", self.local_ip_address, self.local_port) try: while self.__running: try: data, address = self.receive_with_address(timeout=3) except SpinnmanTimeoutException: continue self.__read_db(address, data) # Wait for the start of the simulation if self.__start_resume_callback is not None: self.__start_resume() # Wait for the end of the simulation if self.__pause_and_stop_callback is not None: self.__pause_stop() except Exception as e: logger.error("Failure processing database callback", exc_info=True) raise_from(SpinnmanIOException(str(e)), e) finally: self.__running = False def __read_db(self, address, data): # Read the read packet confirmation logger.info("{}:{} Reading database", self.local_ip_address, self.local_port) if len(data) > 2: database_path = data[2:].decode('utf-8') logger.info("database is at {}", database_path) # Call the callback with DatabaseReader(database_path) as db_reader: for db_callback in self.__database_callbacks: db_callback(db_reader) else: logger.warning("Database path was empty - assuming no database") # Send the response logger.info("Notifying the toolchain that the database has been read") self._send_command(CMDS.DATABASE_CONFIRMATION, address) def __start_resume(self): logger.info( "Waiting for message to indicate that the simulation has " "started or resumed") command_code = self._receive_command() if command_code != CMDS.START_RESUME_NOTIFICATION.value: raise SpinnmanInvalidPacketException( "command_code", "expected a start/resume command code now, and did not " "receive it") # Call the callback self.__start_resume_callback() def __pause_stop(self): logger.info( "Waiting for message to indicate that the simulation has " "stopped or paused") command_code = self._receive_command() if command_code != CMDS.STOP_PAUSE_NOTIFICATION.value: raise SpinnmanInvalidPacketException( "command_code", "expected a pause/stop command code now, and did not " "receive it") # Call the callback self.__pause_and_stop_callback() def _send_command(self, command, address): self.send_to(EIEIOCommandHeader(command.value).bytestring, address) def _receive_command(self): return EIEIOCommandHeader.from_bytestring(self.receive(), 0).command
[docs] def close(self): self.__running = False UDPConnection.close(self)