Source code for spinn_front_end_common.interface.interface_functions.notification_protocol

from spinn_front_end_common.utilities.notification_protocol \
    import NotificationProtocol as Notification

import logging


logger = logging.getLogger(__name__)


[docs]class NotificationProtocol(object): """ The notification protocol for external device interaction """ __slots__ = [ # the notification protocol for talking to external devices "_notification_protocol" ] def __call__( self, wait_for_read_confirmation, socket_addresses, database_file_path): # notification protocol self._notification_protocol = Notification( socket_addresses, wait_for_read_confirmation) self.send_read_notification(database_file_path) return self
[docs] def wait_for_confirmation(self): """ Waits for devices to confirm they have read the database via the\ notification protocol :rtype: None: """ self._notification_protocol.wait_for_confirmation()
[docs] def send_read_notification(self, database_directory): """ Send the read notifications via the notification protocol :param database_directory: the path to the database :rtype: None: """ self._notification_protocol.send_read_notification(database_directory)
[docs] def send_start_resume_notification(self): """ Send the start notifications via the notification protocol :rtype: None: """ self._notification_protocol.send_start_resume_notification()
[docs] def send_stop_pause_notification(self): """ Send the stop or pause notifications via the notification protocol :rtype: None: """ self._notification_protocol.send_stop_pause_notification()
[docs] def stop(self): """ Ends the notification protocol :rtype: None: """ logger.debug("[data_base_thread] Stopping") self._notification_protocol.close()