Source code for spinn_front_end_common.interface.interface_functions.chip_iobuf_extractor
# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import os
import re
from spinn_front_end_common.utilities.utility_objs import ExecutableType
from spinn_utilities.log import FormatAdapter
from spinn_utilities.make_tools.replacer import Replacer
from spinn_utilities.progress_bar import ProgressBar
from spinn_front_end_common.utilities.helpful_functions import (
convert_string_into_chip_and_core_subset)
from spinn_machine.core_subsets import CoreSubsets
from spinnman.model.io_buffer import IOBuffer
logger = FormatAdapter(logging.getLogger(__name__))
ERROR_ENTRY = re.compile(r"\[ERROR\]\s+\((.*)\):\s+(.*)")
WARNING_ENTRY = re.compile(r"\[WARNING\]\s+\((.*)\):\s+(.*)")
ENTRY_FILE = 1
ENTRY_TEXT = 2
[docs]class ChipIOBufExtractor(object):
""" Extract the logging output buffers from the machine, and separates\
lines based on their prefix.
"""
__slots__ = ["_filename_template", "_recovery_mode"]
def __init__(self, recovery_mode=False,
filename_template="iobuf_for_chip_{}_{}_processor_id_{}.txt"):
self._filename_template = filename_template
self._recovery_mode = bool(recovery_mode)
def __call__(
self, transceiver, executable_targets, executable_finder,
app_provenance_file_path, system_provenance_file_path,
binary_executable_types, from_cores="ALL", binary_types=None):
error_entries = list()
warn_entries = list()
label = (("Recovering" if self._recovery_mode else "Extracting")
+ " IOBUF from the machine")
# all the cores
if from_cores == "ALL":
progress = ProgressBar(len(executable_targets.binaries), label)
for binary in progress.over(executable_targets.binaries):
core_subsets = executable_targets.get_cores_for_binary(binary)
if (binary_executable_types[binary].value ==
ExecutableType.SYSTEM.value):
prov_path = system_provenance_file_path
else:
prov_path = app_provenance_file_path
self._run_for_core_subsets(
core_subsets, binary, transceiver, prov_path,
error_entries, warn_entries)
elif from_cores:
if binary_types:
# bit of both
progress = ProgressBar(len(executable_targets.binaries), label)
binaries = executable_finder.get_executable_paths(binary_types)
iocores = convert_string_into_chip_and_core_subset(from_cores)
for binary in progress.over(executable_targets.binaries):
if binary in binaries:
core_subsets = executable_targets.get_cores_for_binary(
binary)
else:
core_subsets = iocores.intersect(
executable_targets.get_cores_for_binary(binary))
if core_subsets:
if (binary_executable_types[binary] ==
ExecutableType.SYSTEM):
prov_path = system_provenance_file_path
else:
prov_path = app_provenance_file_path
self._run_for_core_subsets(
core_subsets, binary, transceiver, prov_path,
error_entries, warn_entries)
else:
# some hard coded cores
progress = ProgressBar(len(executable_targets.binaries), label)
iocores = convert_string_into_chip_and_core_subset(from_cores)
for binary in progress.over(executable_targets.binaries):
core_subsets = iocores.intersect(
executable_targets.get_cores_for_binary(binary))
if core_subsets:
if (binary_executable_types[binary] ==
ExecutableType.SYSTEM):
prov_path = system_provenance_file_path
else:
prov_path = app_provenance_file_path
self._run_for_core_subsets(
core_subsets, binary, transceiver, prov_path,
error_entries, warn_entries)
else:
if binary_types:
# some binaries
binaries = executable_finder.get_executable_paths(binary_types)
progress = ProgressBar(len(binaries), label)
for binary in progress.over(binaries):
core_subsets = executable_targets.get_cores_for_binary(
binary)
if (binary_executable_types[binary] ==
ExecutableType.SYSTEM):
prov_path = system_provenance_file_path
else:
prov_path = app_provenance_file_path
self._run_for_core_subsets(
core_subsets, binary, transceiver, prov_path,
error_entries, warn_entries)
else:
# nothing
pass
return error_entries, warn_entries
def _run_for_core_subsets(
self, core_subsets, binary, transceiver, provenance_file_path,
error_entries, warn_entries):
replacer = Replacer(binary)
# extract iobuf
if self._recovery_mode:
io_buffers = self.__recover_iobufs(transceiver, core_subsets)
else:
io_buffers = list(transceiver.get_iobuf(core_subsets))
# write iobuf
for iobuf in io_buffers:
file_name = os.path.join(
provenance_file_path,
self._filename_template.format(iobuf.x, iobuf.y, iobuf.p))
# set mode of the file based off if the file already exists
mode = "a" if os.path.exists(file_name) else "w"
# write iobuf to file and call out errors and warnings.
with open(file_name, mode) as f:
for line in iobuf.iobuf.split("\n"):
replaced = replacer.replace(line)
f.write(replaced)
f.write("\n")
self.__add_value_if_match(
ERROR_ENTRY, replaced, error_entries, iobuf)
self.__add_value_if_match(
WARNING_ENTRY, replaced, warn_entries, iobuf)
def __recover_iobufs(self, transceiver, core_subsets):
io_buffers = []
for core_subset in core_subsets:
for p in core_subset.processor_ids:
cs = CoreSubsets()
cs.add_processor(core_subset.x, core_subset.y, p)
try:
io_buffers.extend(transceiver.get_iobuf(cs))
except Exception as e: # pylint: disable=broad-except
io_buffers.append(IOBuffer(
core_subset.x, core_subset.y, p,
"failed to retrieve iobufs from {},{},{}; {}".format(
core_subset.x, core_subset.y, p, str(e))))
return io_buffers
@staticmethod
def __add_value_if_match(regex, line, entries, place):
match = regex.match(line)
if match:
entries.append("{}, {}, {}: {} ({})".format(
place.x, place.y, place.p, match.group(ENTRY_TEXT),
match.group(ENTRY_FILE)))