Source code for spinn_front_end_common.interface.interface_functions.application_runner

# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import logging
import time
from spinn_utilities.log import FormatAdapter
from spinnman.messages.scp.enums import Signal
from spinn_front_end_common.utilities.exceptions import ConfigurationException
from spinn_front_end_common.utilities.utility_objs import ExecutableType

logger = FormatAdapter(logging.getLogger(__name__))


class _NotificationWrapper(object):
    def __init__(self, notification_interface, wait_on_confirmation):
        self._notifier = notification_interface
        self._wait = wait_on_confirmation and self._notifier is not None

    def wait_for_confirmation(self):
        if self._wait:
            self._notifier.wait_for_confirmation()

    def send_start_resume_notification(self):
        if self._notifier is not None:
            self._notifier.send_start_resume_notification()

    def send_stop_pause_notification(self):
        if self._notifier is not None:
            self._notifier.send_stop_pause_notification()


[docs]class ApplicationRunner(object): """ Ensures all cores are initialised correctly, ran, and completed\ successfully. """ __slots__ = [] # Wraps up as a PACMAN algorithm def __call__( self, buffer_manager, wait_on_confirmation, notification_interface, executable_types, app_id, txrx, runtime, time_scale_factor, no_sync_changes, time_threshold, run_until_complete=False): # pylint: disable=too-many-arguments, too-many-locals logger.info("*** Running simulation... *** ") # Simplify the notifications notifier = _NotificationWrapper( notification_interface, wait_on_confirmation) return self.run_application( buffer_manager, notifier, executable_types, app_id, txrx, runtime, time_scale_factor, no_sync_changes, time_threshold, run_until_complete) # The actual runner
[docs] def run_application( self, buffer_manager, notifier, executable_types, app_id, txrx, runtime, time_scale_factor, no_sync_changes, time_threshold, run_until_complete): # pylint: disable=too-many-arguments # wait for all cores to be ready self._wait_for_start(txrx, app_id, executable_types) # set the buffer manager into a resume state, so that if it had ran # before it'll work again buffer_manager.resume() # every thing is in sync0 so load the initial buffers buffer_manager.load_initial_buffers() # wait till external app is ready for us to start if required notifier.wait_for_confirmation() # set off the executables that are in sync state # (sending to all is just as safe) if (ExecutableType.USES_SIMULATION_INTERFACE in executable_types or ExecutableType.SYNC in executable_types): # locate all signals needed to set off executables sync_signal, no_sync_changes = \ self._determine_simulation_sync_signals( executable_types, no_sync_changes) # fire all signals as required txrx.send_signal(app_id, sync_signal) # Send start notification to external applications notifier.send_start_resume_notification() # Wait for the application to finish if runtime is None and not run_until_complete: logger.info("Application is set to run forever; exiting") # Do NOT stop the buffer manager; app is using it still else: try: self._run_wait( txrx, app_id, executable_types, run_until_complete, runtime, time_scale_factor, time_threshold) finally: # Stop the buffer manager after run buffer_manager.stop() # Send stop notification if runtime is not None: notifier.send_stop_pause_notification() return no_sync_changes
def _run_wait(self, txrx, app_id, executable_types, run_until_complete, runtime, time_scale_factor, time_threshold): # pylint: disable=too-many-arguments if not run_until_complete: time_to_wait = runtime * time_scale_factor / 1000.0 + 0.1 logger.info( "Application started; waiting {}s for it to stop", time_to_wait) time.sleep(time_to_wait) self._wait_for_end(txrx, app_id, executable_types, timeout=time_threshold) else: logger.info("Application started; waiting until finished") self._wait_for_end(txrx, app_id, executable_types) @staticmethod def _wait_for_start(txrx, app_id, executable_types, timeout=None): for executable_type in executable_types: txrx.wait_for_cores_to_be_in_state( executable_types[executable_type], app_id, executable_type.start_state, timeout=timeout) @staticmethod def _wait_for_end(txrx, app_id, executable_types, timeout=None): for executable_type in executable_types: txrx.wait_for_cores_to_be_in_state( executable_types[executable_type], app_id, executable_type.end_state, timeout=timeout) @staticmethod def _determine_simulation_sync_signals(executable_types, no_sync_changes): """ Determines the start states, and creates core subsets of the\ states for further checks. :param no_sync_changes: sync counter :param executable_types: the types of executables :return: the sync signal and updated no_sync_changes """ sync_signal = None if ExecutableType.USES_SIMULATION_INTERFACE in executable_types: if no_sync_changes % 2 == 0: sync_signal = Signal.SYNC0 else: sync_signal = Signal.SYNC1 # when it falls out of the running, it'll be in a next sync \ # state, thus update needed no_sync_changes += 1 # handle the sync states, but only send once if they work with \ # the simulation interface requirement if ExecutableType.SYNC in executable_types: if sync_signal == Signal.SYNC1: raise ConfigurationException( "There can only be one SYNC signal per run. This is " "because we cannot ensure the cores have not reached the " "next SYNC state before we send the next SYNC. Resulting " "in uncontrolled behaviour") sync_signal = Signal.SYNC0 no_sync_changes += 1 return sync_signal, no_sync_changes