Source code for spinn_front_end_common.interface.config_handler

# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import datetime
import logging
import os
import errno
import shutil
import time
import spinn_utilities.conf_loader as conf_loader
from spinn_utilities.log import FormatAdapter
from spinn_machine import Machine
from spinn_front_end_common.utilities.exceptions import ConfigurationException
from spinn_front_end_common.utilities.helpful_functions import (
    read_config, read_config_boolean, read_config_int)

logger = FormatAdapter(logging.getLogger(__name__))

APP_DIRNAME = 'application_generated_data_files'
CONFIG_FILE = "spinnaker.cfg"
FINISHED_FILENAME = "finished"
REPORTS_DIRNAME = "reports"
TIMESTAMP_FILENAME = "time_stamp"


[docs]class ConfigHandler(object): """ Subclass of AbstractSpinnaker base that handles function only dependent of the config and the order its methods are called """ __slots__ = [ # "_app_data_runtime_folder", # "_app_data_top_simulation_folder", # the interface to the cfg files. supports get get_int etc "_config", # "_json_folder", # "_provenance_file_path", # "_app_provenance_file_path", # "_system_provenance_file_path", # "_report_default_directory", # "_report_simulation_top_directory", # "_this_run_time_string", # "_use_virtual_board", # If not None, path to append pacman executor provenance info to "_pacman_executor_provenance_path", ] def __init__(self, configfile, default_config_paths, validation_cfg): # global params if default_config_paths is None: default_config_paths = [] default_config_paths.insert(0, os.path.join( os.path.dirname(__file__), CONFIG_FILE)) self._config = conf_loader.load_config( filename=configfile, defaults=default_config_paths, validation_cfg=validation_cfg) # set up machine targeted data self._use_virtual_board = self._config.getboolean( "Machine", "virtual_board") # Pass max_machine_cores to Machine so if effects everything! max_machine_core = self._read_config_int("Machine", "max_machine_core") if max_machine_core is not None: Machine.set_max_cores_per_chip(max_machine_core) self._app_data_runtime_folder = None self._app_data_top_simulation_folder = None self._json_folder = None self._provenance_file_path = None self._report_default_directory = None self._report_simulation_top_directory = None self._this_run_time_string = None def _adjust_config(self, runtime, debug_enable_opts, report_disable_opts): """ Adjust and checks config based on runtime and mode :param runtime: :type runtime: int or bool :type debug_enable_opts: frozenset(str) :type report_disable_opts: frozenset(str) :raises ConfigurationException """ if self._config.get("Mode", "mode") == "Debug": for option in self._config.options("Reports"): # options names are all lower without _ inside config if option in debug_enable_opts or option[:5] == "write": try: if not self._config.get_bool("Reports", option): self._config.set("Reports", option, "True") logger.info("As mode == \"Debug\", [Reports] {} " "has been set to True", option) except ValueError: pass elif not self._config.getboolean("Reports", "reportsEnabled"): for option in self._config.options("Reports"): # options names are all lower without _ inside config if option in report_disable_opts or option[:5] == "write": try: if not self._config.get_bool("Reports", option): self._config.set("Reports", option, "False") logger.info( "As reportsEnabled == \"False\", [Reports] {} " "has been set to False", option) except ValueError: pass if runtime is None: if self._config.getboolean( "Reports", "write_energy_report") is True: self._config.set("Reports", "write_energy_report", "False") logger.info("[Reports]write_energy_report has been set to " "False as runtime is set to forever") if self._config.get_bool( "EnergySavings", "turn_off_board_after_discovery") is True: self._config.set( "EnergySavings", "turn_off_board_after_discovery", "False") logger.info("[EnergySavings]turn_off_board_after_discovery has" " been set to False as runtime is set to forever") if self._use_virtual_board: if self._config.getboolean( "Reports", "write_energy_report") is True: self._config.set("Reports", "write_energy_report", "False") logger.info("[Reports]write_energy_report has been set to " "False as using virtual boards") if self._config.get_bool( "EnergySavings", "turn_off_board_after_discovery") is True: self._config.set( "EnergySavings", "turn_off_board_after_discovery", "False") logger.info("[EnergySavings]turn_off_board_after_discovery has" " been set to False as s using virtual boards") if self._config.getboolean( "Reports", "write_board_chip_report") is True: self._config.set("Reports", "write_board_chip_report", "False") logger.info("[Reports]write_board_chip_report has been set to" " False as using virtual boards")
[docs] def child_folder(self, parent, child_name, must_create=False): """ :param must_create: If `True`, the directory named by `child_name`\ (but not necessarily its parents) must be created by this call,\ and an exception will be thrown if this fails. :raises OSError: if the directory existed ahead of time and creation\ was required by the user """ child = os.path.join(parent, child_name) if must_create: # Throws OSError or FileExistsError (a subclass of OSError) if the # directory exists. os.makedirs(child) elif not os.path.exists(child): self._make_dirs(child) return child
def _remove_excess_folders(self, max_kept, starting_directory): try: files_in_report_folder = os.listdir(starting_directory) # while there's more than the valid max, remove the oldest one if len(files_in_report_folder) > max_kept: # sort files into time frame files_in_report_folder.sort( key=lambda temp_file: os.path.getmtime( os.path.join(starting_directory, temp_file))) # remove only the number of files required, and only if they # have the finished flag file created num_files_to_remove = len(files_in_report_folder) - max_kept files_removed = 0 files_not_closed = 0 for current_oldest_file in files_in_report_folder: finished_flag = os.path.join(os.path.join( starting_directory, current_oldest_file), FINISHED_FILENAME) if os.path.exists(finished_flag): shutil.rmtree(os.path.join( starting_directory, current_oldest_file), ignore_errors=True) files_removed += 1 else: files_not_closed += 1 if files_removed + files_not_closed >= num_files_to_remove: break if files_not_closed > max_kept // 4: logger.warning( "{} has {} old reports that have not been closed", starting_directory, files_not_closed) except IOError: # This might happen if there is an open file, or more than one # process in the same folder, but we shouldn't die because of it pass def _set_up_report_specifics(self, n_calls_to_run): """ :param n_calls_to_run: \ the counter of how many times run has been called. :type n_calls_to_run: int :return: The folder for this run, the time_stamp """ default_report_file_path = self._config.get_str( "Reports", "default_report_file_path") # determine common report folder if default_report_file_path == "DEFAULT": directory = os.getcwd() # global reports folder report_default_directory = self.child_folder( directory, REPORTS_DIRNAME) elif default_report_file_path == "REPORTS": report_default_directory = REPORTS_DIRNAME if not os.path.exists(report_default_directory): self._make_dirs(report_default_directory) else: report_default_directory = self.child_folder( default_report_file_path, REPORTS_DIRNAME) # clear and clean out folders considered not useful anymore if os.listdir(report_default_directory): self._remove_excess_folders( self._config.getint("Reports", "max_reports_kept"), report_default_directory) # determine the time slot for later while also making the report folder if self._this_run_time_string is None: while True: try: timestamp = self.__make_timestamp() self._report_simulation_top_directory = self.child_folder( report_default_directory, timestamp, must_create=True) self._this_run_time_string = timestamp break except OSError: time.sleep(0.5) else: self._report_simulation_top_directory = self.child_folder( report_default_directory, self._this_run_time_string) # create sub folder within reports for sub runs # (where changes need to be recorded) self._report_default_directory = self.child_folder( self._report_simulation_top_directory, "run_{}".format( n_calls_to_run)) # store timestamp in latest/time_stamp for provenance reasons time_of_run_file_name = os.path.join( self._report_simulation_top_directory, TIMESTAMP_FILENAME) with open(time_of_run_file_name, "w") as f: f.writelines(self._this_run_time_string) @staticmethod def __make_timestamp(): now = datetime.datetime.now() return "{:04}-{:02}-{:02}-{:02}-{:02}-{:02}-{:02}".format( now.year, now.month, now.day, now.hour, now.minute, now.second, now.microsecond)
[docs] def set_up_output_application_data_specifics(self, n_calls_to_run): """ :param n_calls_to_run: \ the counter of how many times run has been called. :type n_calls_to_run: int :return: the run folder for this simulation to hold app data """ where_to_write_application_data_files = self._config.get( "Reports", "default_application_data_file_path") if where_to_write_application_data_files == "DEFAULT": where_to_write_application_data_files = os.getcwd() application_generated_data_file_folder = self.child_folder( where_to_write_application_data_files, APP_DIRNAME) # add time stamped folder for this run self._app_data_top_simulation_folder = self.child_folder( application_generated_data_file_folder, self._this_run_time_string) # remove folders that are old and above the limit self._remove_excess_folders( self._config.getint("Reports", "max_application_binaries_kept"), application_generated_data_file_folder) # store timestamp in latest/time_stamp time_of_run_file_name = os.path.join( self._app_data_top_simulation_folder, TIMESTAMP_FILENAME) with open(time_of_run_file_name, "w") as f: f.writelines(str(self._this_run_time_string)) # create sub folder within reports for sub runs # (where changes need to be recorded) self._app_data_runtime_folder = self.child_folder( self._app_data_top_simulation_folder, "run_{}".format( n_calls_to_run))
def _set_up_output_folders(self, n_calls_to_run): """ Sets up all outgoing folders by creating\ a new timestamp folder for each and clearing :rtype: None """ # set up reports default folder self._set_up_report_specifics(n_calls_to_run) # set up application report folder self.set_up_output_application_data_specifics(n_calls_to_run) if self._read_config_boolean("Reports", "writePacmanExecutorProvenance"): self._pacman_executor_provenance_path = os.path.join( self._report_default_directory, "pacman_executor_provenance.rpt") self._json_folder = os.path.join( self._report_default_directory, "json_files") if not os.path.exists(self._json_folder): self._make_dirs(self._json_folder) # make a folder for the provenance data storage self._provenance_file_path = os.path.join( self._report_default_directory, "provenance_data") if not os.path.exists(self._provenance_file_path): self._make_dirs(self._provenance_file_path) # make application folder for provenance data storage self._app_provenance_file_path = os.path.join( self._provenance_file_path, "app_provenance_data") if not os.path.exists(self._app_provenance_file_path): self._make_dirs(self._app_provenance_file_path) # make system folder for provenance data storage self._system_provenance_file_path = os.path.join( self._provenance_file_path, "system_provenance_data") if not os.path.exists(self._system_provenance_file_path): self._make_dirs(self._system_provenance_file_path)
[docs] def write_finished_file(self): """ Write a finished file that allows file removal to only remove \ folders that are finished. """ app_file_name = os.path.join(self._app_data_top_simulation_folder, FINISHED_FILENAME) with open(app_file_name, "w") as f: f.writelines("finished") app_file_name = os.path.join(self._report_simulation_top_directory, FINISHED_FILENAME) with open(app_file_name, "w") as f: f.writelines("finished")
def _read_config(self, section, item): return read_config(self._config, section, item) def _read_config_int(self, section, item): return read_config_int(self._config, section, item) def _read_config_boolean(self, section, item): return read_config_boolean(self._config, section, item) @property def machine_time_step(self): return self._read_config_int("Machine", "machine_time_step") @machine_time_step.setter def machine_time_step(self, new_value): self._config.set("Machine", "machine_time_step", new_value) @property def time_scale_factor(self): return self._read_config_int("Machine", "time_scale_factor") @time_scale_factor.setter def time_scale_factor(self, new_value): self._config.set("Machine", "time_scale_factor", new_value)
[docs] def set_up_timings(self, machine_time_step=None, time_scale_factor=None): """ Set up timings of the machine :param machine_time_step:\ An explicitly specified time step for the machine. If None,\ the value is read from the config :param time_scale_factor:\ An explicitly specified time scale factor for the simulation.\ If None, the value is read from the config """ # set up timings if machine_time_step is not None: self.machine_time_step = machine_time_step if self.machine_time_step <= 0: raise ConfigurationException( "invalid machine_time_step {}: must greater than zero".format( self.machine_time_step)) if time_scale_factor is not None: self.time_scale_factor = time_scale_factor
@staticmethod def _make_dirs(path): # Workaround for Python 2/3 Compatibility (Python 3 raises on exists) try: os.makedirs(path) except OSError as e: if e.errno != errno.EEXIST: raise