Source code for spinn_front_end_common.utilities.iobuf_extractor

# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Sized
import logging
import os
import re
from typing import (
    Iterable, List, Optional, Pattern, Sequence, Set, Tuple, TypeVar, Union)

from spinn_utilities.config_holder import get_config_str_or_none
from spinn_utilities.log import FormatAdapter
from spinn_utilities.make_tools.replacer import Replacer
from spinn_utilities.progress_bar import ProgressBar

from spinn_machine.core_subsets import CoreSubsets

from spinnman.model import ExecutableTargets
from spinnman.model.enums import ExecutableType
from spinnman.model.io_buffer import IOBuffer

from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.utilities.helpful_functions import (
    convert_string_into_chip_and_core_subset)


logger = FormatAdapter(logging.getLogger(__name__))
ERROR_ENTRY = re.compile(r"\[ERROR\]\s+\((.*)\):\s+(.*)")
WARNING_ENTRY = re.compile(r"\[WARNING\]\s+\((.*)\):\s+(.*)")
ENTRY_FILE = 1
ENTRY_TEXT = 2

#: :meta private:
T = TypeVar("T")


class _DummyProgress(object):
    """
    An alternative to the Progress bar so the over can be called.
    """

    def over(self, values: Iterable[T]) -> Iterable[T]:
        """
        Simple wrapper for the cases where a progress bar is being used
        to show progress through the iteration over a single collection.

        :param ~collections.abc.Iterable values:
            The base collection (any iterable) being iterated over
        :return: The passed in collection unchanged.
        :rtype: ~collections.abc.Iterable
        """
        return values


[docs] class IOBufExtractor(object): """ Extract the logging output buffers from the machine, and separates lines based on their prefix. """ __slots__ = ( "_filename_template", "_recovery_mode", "__system_binaries", "__app_path", "__sys_path", "__suppress_progress", "__from_cores", "__binary_types", "__executable_targets") def __init__( self, executable_targets: Optional[ExecutableTargets] = None, *, recovery_mode: bool = False, filename_template: str = ( "iobuf_for_chip_{}_{}_processor_id_{}.txt"), suppress_progress: bool = False): """ :param executable_targets: Which Binaries and core to extract from. `None` to extract from all. :type executable_targets: ~spinnman.model.ExecutableTargets or None :param bool recovery_mode: :param str filename_template: :param bool suppress_progress: """ self._filename_template = filename_template self._recovery_mode = bool(recovery_mode) self.__suppress_progress = bool(suppress_progress) self.__app_path = FecDataView.get_app_provenance_dir_path() self.__sys_path = FecDataView.get_system_provenance_dir_path() self.__from_cores = get_config_str_or_none( "Reports", "extract_iobuf_from_cores") self.__binary_types = get_config_str_or_none( "Reports", "extract_iobuf_from_binary_types") if executable_targets is None: self.__executable_targets = FecDataView.get_executable_targets() else: self.__executable_targets = executable_targets self.__system_binaries: Set[str] = set() try: self.__system_binaries.update( self.__executable_targets.get_binaries_of_executable_type( ExecutableType.SYSTEM)) except KeyError: pass
[docs] def extract_iobuf(self) -> Tuple[Sequence[str], Sequence[str]]: """ Perform the extraction of IOBUF. :return: error_entries, warn_entries :rtype: tuple(list(str),list(str)) """ if self.__from_cores == "ALL": return self.__extract_all_cores() elif self.__from_cores and self.__binary_types: return self.__extract_selected_cores_and_types() elif self.__from_cores: return self.__extract_selected_cores() elif self.__binary_types: return self.__extract_selected_types() else: # nothing return [], []
def __progress(self, bins: Sized) -> Union[ProgressBar, _DummyProgress]: """ :param list bins: :rtype: ~.ProgressBar """ if self.__suppress_progress: return _DummyProgress() label = (("Recovering" if self._recovery_mode else "Extracting") + " IOBUF from the machine") return ProgressBar(len(bins), label) def __prov_path(self, binary: str) -> str: """ :param str binary: :return: provenance directory path :rtype: str """ return (self.__sys_path if binary in self.__system_binaries else self.__app_path) def __extract_all_cores(self) -> Tuple[List[str], List[str]]: """ :rtype: tuple(list(str), list(str)) """ error_entries: List[str] = list() warn_entries: List[str] = list() # all the cores progress = self.__progress(self.__executable_targets.binaries) for binary in progress.over(self.__executable_targets.binaries): core_subsets = self.__executable_targets.get_cores_for_binary( binary) self.__extract_iobufs_for_binary( core_subsets, binary, error_entries, warn_entries) return error_entries, warn_entries def __extract_selected_cores_and_types( self) -> Tuple[List[str], List[str]]: """ :rtype: tuple(list(str), list(str)) """ error_entries: List[str] = list() warn_entries: List[str] = list() # bit of both assert self.__binary_types is not None progress = self.__progress(self.__executable_targets.binaries) binaries = FecDataView.get_executable_paths(self.__binary_types) iocores = convert_string_into_chip_and_core_subset(self.__from_cores) for binary in progress.over(self.__executable_targets.binaries): if binary in binaries: core_subsets = self.__executable_targets.get_cores_for_binary( binary) else: core_subsets = iocores.intersect( self.__executable_targets.get_cores_for_binary(binary)) if core_subsets: self.__extract_iobufs_for_binary( core_subsets, binary, error_entries, warn_entries) return error_entries, warn_entries def __extract_selected_cores(self) -> Tuple[List[str], List[str]]: """ :rtype: tuple(list(str), list(str)) """ error_entries: List[str] = list() warn_entries: List[str] = list() # some hard coded cores progress = self.__progress(self.__executable_targets.binaries) iocores = convert_string_into_chip_and_core_subset(self.__from_cores) for binary in progress.over(self.__executable_targets.binaries): core_subsets = iocores.intersect( self.__executable_targets.get_cores_for_binary(binary)) if core_subsets: self.__extract_iobufs_for_binary( core_subsets, binary, error_entries, warn_entries) return error_entries, warn_entries def __extract_selected_types(self) -> Tuple[List[str], List[str]]: """ :rtype: tuple(list(str), list(str)) """ error_entries: List[str] = list() warn_entries: List[str] = list() # some binaries assert self.__binary_types is not None binaries = FecDataView.get_executable_paths(self.__binary_types) progress = self.__progress(binaries) for binary in progress.over(binaries): core_subsets = self.__executable_targets.get_cores_for_binary( binary) if core_subsets is not None: self.__extract_iobufs_for_binary( core_subsets, binary, error_entries, warn_entries) return error_entries, warn_entries def __extract_iobufs_for_binary( self, core_subsets: CoreSubsets, binary: str, error_entries: List[str], warn_entries: List[str]): """ :param ~.CoreSubsets core_subsets: Where the binary is deployed :param str binary: What binary was deployed there. This is used to determine how to decompress the IOBUF output. :param list(str) error_entries: :param list(str) warn_entries: """ replacer = Replacer() prov_path = self.__prov_path(binary) # extract iobuf if self._recovery_mode: io_buffers = self.__recover_iobufs(core_subsets) else: io_buffers = list(FecDataView.get_transceiver().get_iobuf( core_subsets)) # write iobuf for iobuf in io_buffers: self.__process_one_iobuf(iobuf, prov_path, replacer, error_entries, warn_entries) def __process_one_iobuf( self, iobuf: IOBuffer, file_path: str, replacer: Replacer, error_entries: List[str], warn_entries: List[str]): """ :param ~.IOBuffer iobuf: :param str file_path: :param ~.Replacer replacer: :param list(str) error_entries: :param list(str) warn_entries: """ file_name = os.path.join( file_path, self._filename_template.format( iobuf.x, iobuf.y, iobuf.p)) # set mode of the file based off if the file already exists mode = "a" if os.path.exists(file_name) else "w" # write iobuf to file and call out errors and warnings. with open(file_name, mode, encoding="utf-8") as f: for line in iobuf.iobuf.split("\n"): replaced = replacer.replace(line) f.write(replaced) f.write("\n") self.__add_value_if_match( ERROR_ENTRY, replaced, error_entries, iobuf) self.__add_value_if_match( WARNING_ENTRY, replaced, warn_entries, iobuf) def __recover_iobufs(self, core_subsets: CoreSubsets) -> List[IOBuffer]: """ :param ~.CoreSubsets core_subsets: :rtype: list(~.IOBuffer) """ io_buffers: List[IOBuffer] = [] for core_subset in core_subsets: for p in core_subset.processor_ids: cs = CoreSubsets() cs.add_processor(core_subset.x, core_subset.y, p) try: transceiver = FecDataView.get_transceiver() io_buffers.extend(transceiver.get_iobuf(cs)) except Exception as e: # pylint: disable=broad-except io_buffers.append(IOBuffer( core_subset.x, core_subset.y, p, "failed to retrieve iobufs from " f"{core_subset.x},{core_subset.y},{p}; " f"{str(e)}")) return io_buffers @staticmethod def __add_value_if_match( regex: Pattern, line: str, entries: List[str], iobuf: IOBuffer): """ :param ~typing.Pattern regex: :param str line: :param list(str) entries: :param ~.IOBuffer iobuf: """ match = regex.match(line) if match: entries.append(f"{iobuf.x}, {iobuf.y}, {iobuf.p}: " f"{match.group(ENTRY_TEXT)} " f"({match.group(ENTRY_FILE)})")