# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import logging
import os
import errno
import shutil
import spinn_utilities.conf_loader as conf_loader
from spinn_utilities.log import FormatAdapter
from spinn_front_end_common.utilities.helpful_functions import (
read_config, read_config_boolean, read_config_int)
logger = FormatAdapter(logging.getLogger(__name__))
APP_DIRNAME = 'application_generated_data_files'
CONFIG_FILE = "spinnaker.cfg"
FINISHED_FILENAME = "finished"
REPORTS_DIRNAME = "reports"
TIMESTAMP_FILENAME = "time_stamp"
[docs]class ConfigHandler(object):
"""
Subclass of AbstractSpinnaker base that handles function only dependent
of the config and the order its methods are called
"""
__slots__ = [
#
"_app_data_runtime_folder",
#
"_app_data_top_simulation_folder",
# the interface to the cfg files. supports get get_int etc
"_config",
#
"_json_folder",
#
"_provenance_file_path",
#
"_report_default_directory",
#
"_report_simulation_top_directory",
#
"_this_run_time_string",
#
"_use_virtual_board",
]
def __init__(
self, configfile, default_config_paths, validation_cfg):
# global params
if default_config_paths is None:
default_config_paths = []
default_config_paths.insert(0, os.path.join(
os.path.dirname(__file__), CONFIG_FILE))
self._config = conf_loader.load_config(
filename=configfile, defaults=default_config_paths,
validation_cfg=validation_cfg)
# set up machine targeted data
self._use_virtual_board = self._config.getboolean(
"Machine", "virtual_board")
self._app_data_runtime_folder = None
self._app_data_top_simulation_folder = None
self._json_folder = None
self._provenance_file_path = None
self._report_default_directory = None
self._report_simulation_top_directory = None
self._this_run_time_string = None
def _adjust_config(self, runtime):
""" Adjust and checks config based on runtime and mode
:param runtime:
:type runtime: int or bool
:raises ConfigurationException
"""
if self._config.get("Mode", "mode") == "Debug":
for option in self._config.options("Reports"):
# options names are all lower without _ inside config
if option in self.DEBUG_ENABLE_OPTS or option[:5] == "write":
try:
if not self._config.get_bool("Reports", option):
self._config.set("Reports", option, "True")
logger.info("As mode == \"Debug\", [Reports] {} "
"has been set to True", option)
except ValueError:
pass
elif not self._config.getboolean("Reports", "reportsEnabled"):
for option in self._config.options("Reports"):
# options names are all lower without _ inside config
if option in self.REPORT_DISABLE_OPTS or option[:5] == "write":
try:
if not self._config.get_bool("Reports", option):
self._config.set("Reports", option, "False")
logger.info(
"As reportsEnabled == \"False\", [Reports] {} "
"has been set to False", option)
except ValueError:
pass
if runtime is None:
if self._config.getboolean(
"Reports", "write_energy_report") is True:
self._config.set("Reports", "write_energy_report", "False")
logger.info("[Reports]write_energy_report has been set to "
"False as runtime is set to forever")
if self._config.get_bool(
"EnergySavings", "turn_off_board_after_discovery") is True:
self._config.set(
"EnergySavings", "turn_off_board_after_discovery", "False")
logger.info("[EnergySavings]turn_off_board_after_discovery has"
" been set to False as runtime is set to forever")
if self._use_virtual_board:
if self._config.getboolean(
"Reports", "write_energy_report") is True:
self._config.set("Reports", "write_energy_report", "False")
logger.info("[Reports]write_energy_report has been set to "
"False as using virtual boards")
if self._config.get_bool(
"EnergySavings", "turn_off_board_after_discovery") is True:
self._config.set(
"EnergySavings", "turn_off_board_after_discovery", "False")
logger.info("[EnergySavings]turn_off_board_after_discovery has"
" been set to False as s using virtual boards")
if self._config.getboolean(
"Reports", "write_board_chip_report") is True:
self._config.set("Reports", "write_board_chip_report", "False")
logger.info("[Reports]write_board_chip_report has been set to"
" False as using virtual boards")
[docs] def child_folder(self, parent, child_name):
child = os.path.join(parent, child_name)
if not os.path.exists(child):
self._make_dirs(child)
return child
def _remove_excess_folders(self, max_kept, starting_directory):
try:
files_in_report_folder = os.listdir(starting_directory)
# while there's more than the valid max, remove the oldest one
if len(files_in_report_folder) > max_kept:
# sort files into time frame
files_in_report_folder.sort(
key=lambda temp_file: os.path.getmtime(
os.path.join(starting_directory, temp_file)))
# remove only the number of files required, and only if they
# have the finished flag file created
num_files_to_remove = len(files_in_report_folder) - max_kept
files_removed = 0
files_not_closed = 0
for current_oldest_file in files_in_report_folder:
finished_flag = os.path.join(os.path.join(
starting_directory, current_oldest_file),
FINISHED_FILENAME)
if os.path.exists(finished_flag):
shutil.rmtree(os.path.join(
starting_directory, current_oldest_file),
ignore_errors=True)
files_removed += 1
else:
files_not_closed += 1
if files_removed + files_not_closed >= num_files_to_remove:
break
if files_not_closed > max_kept // 4:
logger.warning(
"{} has {} old reports that have not been closed",
starting_directory, files_not_closed)
except IOError:
# This might happen if there is an open file, or more than one
# process in the same folder, but we shouldn't die because of it
pass
def _set_up_report_specifics(self, n_calls_to_run):
"""
:param n_calls_to_run:
the counter of how many times run has been called.
:type n_calls_to_run: int
:return: The folder for this run, the time_stamp
"""
default_report_file_path = self._config.get_str(
"Reports", "default_report_file_path")
# determine common report folder
if default_report_file_path == "DEFAULT":
directory = os.getcwd()
# global reports folder
report_default_directory = self.child_folder(
directory, REPORTS_DIRNAME)
elif default_report_file_path == "REPORTS":
report_default_directory = REPORTS_DIRNAME
if not os.path.exists(report_default_directory):
self._make_dirs(report_default_directory)
else:
report_default_directory = self.child_folder(
default_report_file_path, REPORTS_DIRNAME)
# clear and clean out folders considered not useful anymore
if os.listdir(report_default_directory):
self._remove_excess_folders(
self._config.getint("Reports", "max_reports_kept"),
report_default_directory)
# determine the time slot for later
if self._this_run_time_string is None:
now = datetime.datetime.now()
self._this_run_time_string = (
"{:04}-{:02}-{:02}-{:02}-{:02}-{:02}-{:02}".format(
now.year, now.month, now.day,
now.hour, now.minute, now.second, now.microsecond))
# handle timing app folder and cleaning of report folder from last run
self._report_simulation_top_directory = self.child_folder(
report_default_directory, self._this_run_time_string)
# create sub folder within reports for sub runs
# (where changes need to be recorded)
self._report_default_directory = self.child_folder(
self._report_simulation_top_directory, "run_{}".format(
n_calls_to_run))
# store timestamp in latest/time_stamp for provenance reasons
time_of_run_file_name = os.path.join(
self._report_simulation_top_directory, TIMESTAMP_FILENAME)
with open(time_of_run_file_name, "w") as f:
f.writelines(self._this_run_time_string)
[docs] def set_up_output_application_data_specifics(self, n_calls_to_run):
"""
:param n_calls_to_run:
the counter of how many times run has been called.
:type n_calls_to_run: int
:return: the run folder for this simulation to hold app data
"""
where_to_write_application_data_files = self._config.get(
"Reports", "default_application_data_file_path")
if where_to_write_application_data_files == "DEFAULT":
where_to_write_application_data_files = os.getcwd()
application_generated_data_file_folder = self.child_folder(
where_to_write_application_data_files, APP_DIRNAME)
# add time stamped folder for this run
self._app_data_top_simulation_folder = self.child_folder(
application_generated_data_file_folder, self._this_run_time_string)
# remove folders that are old and above the limit
self._remove_excess_folders(
self._config.getint("Reports", "max_application_binaries_kept"),
application_generated_data_file_folder)
# store timestamp in latest/time_stamp
time_of_run_file_name = os.path.join(
self._app_data_top_simulation_folder, TIMESTAMP_FILENAME)
with open(time_of_run_file_name, "w") as f:
f.writelines(str(self._this_run_time_string))
# create sub folder within reports for sub runs
# (where changes need to be recorded)
self._app_data_runtime_folder = self.child_folder(
self._app_data_top_simulation_folder, "run_{}".format(
n_calls_to_run))
def _set_up_output_folders(self, n_calls_to_run):
""" Sets up all outgoing folders by creating\
a new timestamp folder for each and clearing
:rtype: None
"""
# set up reports default folder
self._set_up_report_specifics(n_calls_to_run)
# set up application report folder
self.set_up_output_application_data_specifics(n_calls_to_run)
if self._read_config_boolean("Reports",
"writePacmanExecutorProvenance"):
self._pacman_executor_provenance_path = os.path.join(
self._report_default_directory,
"pacman_executor_provenance.rpt")
self._json_folder = os.path.join(
self._report_default_directory, "json_files")
if not os.path.exists(self._json_folder):
self._make_dirs(self._json_folder)
# make a folder for the provenance data storage
self._provenance_file_path = os.path.join(
self._report_default_directory, "provenance_data")
if not os.path.exists(self._provenance_file_path):
self._make_dirs(self._provenance_file_path)
[docs] def write_finished_file(self):
# write a finished file that allows file removal to only remove folders
# that are finished
app_file_name = os.path.join(self._app_data_top_simulation_folder,
FINISHED_FILENAME)
with open(app_file_name, "w") as f:
f.writelines("finished")
app_file_name = os.path.join(self._report_simulation_top_directory,
FINISHED_FILENAME)
with open(app_file_name, "w") as f:
f.writelines("finished")
def _read_config(self, section, item):
return read_config(self._config, section, item)
def _read_config_int(self, section, item):
return read_config_int(self._config, section, item)
def _read_config_boolean(self, section, item):
return read_config_boolean(self._config, section, item)
@staticmethod
def _make_dirs(path):
# Workaround for Python 2/3 Compatibility (Python 3 raises on exists)
try:
os.makedirs(path)
except OSError as e:
if e.errno != errno.EEXIST:
raise