Source code for spinn_front_end_common.interface.provenance.router_prov_mapper

# Copyright (c) 2020 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import os
from typing import (
    Any, ContextManager, FrozenSet, Iterable, List, Optional, Tuple, cast)
import sqlite3
from types import ModuleType

import numpy
from typing_extensions import Literal

from spinn_front_end_common.utilities.sqlite_db import SQLiteDB
from spinn_front_end_common.utilities.exceptions import ConfigurationException

# The types of router provenance that we'll plot
ROUTER_PLOTTABLES = (
    "Default_Routed_External_Multicast_Packets",
    "Dropped_FR_Packets",
    "Dropped_Multicast_Packets",
    "Dropped_Multicast_Packets_via_Local_Transmission",
    "Dropped_NN_Packets",
    "Dropped_P2P_Packets",
    "Dumped_from_a_Link",
    "Dumped_from_a_Processor",
    "Error",
    "External_FR_Packets",
    "External_Multicast_Packets",
    "External_NN_Packets",
    "External_P2P_Packets",
    "Local_Multicast_Packets",
    "Local_P2P_Packets",
    "Local_NN_Packets",
    "Local_FR_Packets",
    "Missed_for_Reinjection",
    "Received_for_Reinjection",
    "Reinjected",
    "Reinjection_Overflows")
SINGLE_PLOTNAME = "Plot.png"


[docs] class Plotter(ContextManager[SQLiteDB]): """ Code to plot provenance data from the database """ __slots__ = ("cmap", "_db", "__have_insertion_order", "__verbose") __pyplot: Optional[ModuleType] = None __seaborn: Optional[ModuleType] = None def __init__(self, db_filename: str, verbose: bool = False): self._db = SQLiteDB(db_filename, read_only=True, text_factory=str) self.__have_insertion_order = True self.__verbose = verbose self.cmap = "plasma" def __enter__(self) -> SQLiteDB: return self._db.__enter__() def __exit__(self, *args) -> Literal[False]: return self._db.__exit__(*args) def __do_chip_query(self, description: str) -> Iterable[sqlite3.Row]: # Does the query in one of two ways, depending on schema version if self.__have_insertion_order: try: return self._db.execute(""" SELECT source_name AS "source", x, y, description_name AS "description", the_value AS "value" FROM provenance_view WHERE description LIKE ? GROUP BY x, y, p HAVING insertion_order = MAX(insertion_order) """, (description, )) except sqlite3.OperationalError as e: if "no such column: insertion_order" != str(e): raise self.__have_insertion_order = False return self._db.execute(""" SELECT source_name AS "source", x, y, description_name AS "description", MAX(the_value) AS "value" FROM provenance_view WHERE description LIKE ? GROUP BY x, y, p """, (description, ))
[docs] def get_per_chip_prov_types(self) -> FrozenSet[str]: """ Get a set of the descriptions available at chip level :rtype: set(str) """ query = """ SELECT DISTINCT description_name AS "description" FROM provenance_view WHERE x IS NOT NULL AND p IS NULL AND "description" IS NOT NULL """ return frozenset(row["description"] for row in self._db.execute( query))
[docs] def get_per_chip_prov_details(self, info: str) -> Tuple[ str, int, int, numpy.ndarray]: """ Gets the provenance of a per chip basis :param str info: The name of the metadata to sum :return: name, max x, max y and data :rtype: tuple(str, int, int, numpy.ndarray) """ data = [] xs = [] ys = [] src: Optional[str] = None name: Optional[str] = None for row in self.__do_chip_query("%" + info + "%"): if src is None: src = row["source"] if name is None: name = row["description"] data.append((row["x"], row["y"], row["value"])) xs.append(row["x"]) ys.append(row["y"]) ary = numpy.full((max(ys) + 1, max(xs) + 1), float("NaN")) for (x, y, value) in data: ary[y, x] = value assert src is not None and name is not None, "no such chip" return (f"{src}/{name}".replace("_", " "), max(xs) + 1, max(ys) + 1, ary)
def __do_sum_query(self, description: str) -> Iterable[sqlite3.Row]: # Does the query in one of two ways, depending on schema version if self.__have_insertion_order: try: return self._db.execute(""" SELECT "source", x, y, "description", SUM("value") AS "value" FROM ( SELECT source_name AS "source", x, y, p, description_name AS "description", the_value AS "value" FROM provenance_view WHERE description LIKE ? AND p IS NOT NULL GROUP BY x, y, p HAVING insertion_order = MAX(insertion_order)) GROUP BY x, y """, (description, )) except sqlite3.OperationalError as e: if "no such column: insertion_order" != str(e): raise self.__have_insertion_order = False return self._db.execute(""" SELECT "source", x, y, "description", SUM("value") AS "value" FROM ( SELECT source_name AS "source", x, y, description_name AS "description", MAX(the_value) AS "value" FROM provenance_view WHERE description LIKE ? AND p IS NOT NULL GROUP BY x, y, p) GROUP BY x, y """, (description, ))
[docs] def get_per_core_prov_types(self) -> FrozenSet[str]: """ Get a set of the descriptions available at core level :rtype: set(str) """ query = """ SELECT DISTINCT description_name AS "description" FROM provenance_view WHERE x IS NOT NULL AND p IS NOT NULL AND "description" IS NOT NULL """ return frozenset( cast(str, row["description"]) for row in self._db.execute(query))
[docs] def get_sum_chip_prov_details(self, info: str) -> Tuple[ str, int, int, numpy.ndarray]: """ Gets the sum of the provenance :param str info: The name of the metadata to sum :return: name, max x, max y and data :rtype: tuple(str, int, int, numpy.ndarray) """ data: List[Tuple[int, int, Any]] = [] xs: List[int] = [] ys: List[int] = [] name: Optional[str] = None for row in self.__do_sum_query("%" + info + "%"): if name is None: name = row["description"] data.append((row["x"], row["y"], row["value"])) xs.append(row["x"]) ys.append(row["y"]) assert name is not None, "no chips match query" ary = numpy.full((max(ys) + 1, max(xs) + 1), float("NaN")) for (x, y, value) in data: ary[y, x] = value return name.replace("_", " "), max(xs) + 1, max(ys) + 1, ary
@classmethod def __plotter_apis(cls) -> Tuple[ModuleType, ModuleType]: # Import here because otherwise CI fails # pylint: disable=import-error,import-outside-toplevel if not cls.__pyplot: import matplotlib.pyplot as plot # type: ignore[import] cls.__pyplot = plot if not cls.__seaborn: import seaborn # type: ignore[import] cls.__seaborn = seaborn if cls.__pyplot is None or cls.__seaborn is None: raise ConfigurationException( "no plotting APIs present; please install " "matplotlib and seaborn to plot router provenance") return cls.__pyplot, cls.__seaborn
[docs] def plot_per_core_data(self, key: str, output_filename: str): """ Plots the metadata for this key/term to the file at a core level :param str key: The name of the metadata to plot, or a unique fragment of it :param str output_filename: """ plot, seaborn = self.__plotter_apis() if self.__verbose: print("creating " + output_filename) (title, width, height, data) = self.get_sum_chip_prov_details(key) _fig, ax = plot.subplots(figsize=(width, height)) plot.title(title) ax.set_xticks([]) ax.set_yticks([]) ax.axis("off") labels = data.astype(int) seaborn.heatmap( data, annot=labels, fmt="", square=True, cmap=self.cmap).invert_yaxis() plot.savefig(output_filename, bbox_inches='tight') plot.close()
[docs] def plot_per_chip_data(self, key: str, output_filename: str): """ Plots the metadata for this key/term to the file at a chip level :param str key: The name of the metadata to plot, or a unique fragment of it :param str output_filename: """ plot, seaborn = self.__plotter_apis() if self.__verbose: print("creating " + output_filename) (title, width, height, data) = self.get_per_chip_prov_details(key) _fig, ax = plot.subplots(figsize=(width, height)) plot.title(title) ax.set_xticks([]) ax.set_yticks([]) ax.axis("off") labels = data.astype(int) seaborn.heatmap( data, annot=labels, fmt="", square=True, cmap=self.cmap).invert_yaxis() plot.savefig(output_filename, bbox_inches='tight') plot.close()
[docs] def main() -> None: """ Generate heat maps from SpiNNaker provenance databases """ ap = argparse.ArgumentParser( description="Generate heat maps from SpiNNaker provenance databases.") ap.add_argument("-c", "--colourmap", nargs="?", default="plasma", help="colour map rule for plot; default 'plasma'") ap.add_argument("-l", "--list", action="store_true", default=False, help="list the types of metadata available") ap.add_argument("-q", "--quiet", action="store_true", default=False, help="don't print progress information") ap.add_argument("-s", "--sumcores", action="store_true", default=False, help="compute information by summing data from the cores " "of each chip; needs a metadata_name as well unless the " "--list option is also given") ap.add_argument("dbfile", metavar="database_file", help="the provenance database to extract data from; " "usually called 'provenance.sqlite3'") ap.add_argument("term", metavar="metadata_name", nargs="?", default=None, help="the name of the metadata to plot, or a unique " "fragment of it; if omitted, maps will be produced for " "all the router provenance categories") args = ap.parse_args() plotter = Plotter(args.dbfile, not args.quiet) plotter.cmap = args.colourmap with plotter: if args.list: if args.sumcores: for term in plotter.get_per_core_prov_types(): print(term) else: for term in plotter.get_per_chip_prov_types(): print(term) elif args.term: if args.sumcores: plotter.plot_per_core_data( args.term, os.path.abspath(SINGLE_PLOTNAME)) else: plotter.plot_per_chip_data( args.term, os.path.abspath(SINGLE_PLOTNAME)) else: if args.sumcores: raise ValueError( "cannot use --sumcores with default router provenance") for term in ROUTER_PLOTTABLES: plotter.plot_per_chip_data( term, os.path.abspath(term + ".png"))
if __name__ == "__main__": main()