Source code for spinn_front_end_common.interface.provenance.global_provenance

# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime, timedelta
import logging
import os
import re
from sqlite3 import Row
from typing import Iterable, List, Optional, Union

from spinn_utilities.config_holder import get_timestamp_path
from spinn_utilities.log import FormatAdapter

from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.utilities.constants import (
    MICRO_TO_MILLISECOND_CONVERSION)
from spinn_front_end_common.utilities.sqlite_db import SQLiteDB
from spinn_front_end_common.interface.provenance.timer_work import TimerWork

from .timer_category import TimerCategory

logger = FormatAdapter(logging.getLogger(__name__))

_DDL_FILE = os.path.join(os.path.dirname(__file__), "global.sql")
_RE = re.compile(r"(\d+)([_,:])(\d+)(?:\2(\d+))?")


class GlobalProvenance(SQLiteDB):
    """
    Specific implementation of the Database for SQLite 3.

    .. note::
        *Not thread safe on the same database file.*
        Threads can access different DBs just fine.

    .. note::
        This totally relies on the way SQLite's type affinities function.
        You can't port to a different database engine without a lot of work.
    """

    __slots__ = ("_database_file", )

[docs] @classmethod def get_global_provenace_path(cls) -> str: """ Get the path of the current provenance database of the last run Used the cfg setting tpath_global_provenance placing this in the timestamp directory .. warning:: Calling this method between start/reset and run may result in a path to a database not yet created. :raises ValueError: if the system is in a state where path can't be retrieved, for example before run is called :returns: Directory for this database based on the cfg setting """ return get_timestamp_path("tpath_global_provenance")
def __init__( self, database_file: Optional[str] = None, memory: bool = False): """ :param database_file: The name of a file that contains (or will contain) an SQLite database holding the data. If omitted, either the default file path or an unshared in-memory database will be used (suitable only for testing). :param memory: Flag to say unshared in-memory can be used. Otherwise a `None` file will mean the default should be used """ if database_file is None and not memory: database_file = self.get_global_provenace_path() self._database_file = database_file SQLiteDB.__init__(self, database_file, ddl_file=_DDL_FILE, row_factory=None, text_factory=None)
[docs] def insert_version(self, description: str, the_value: str) -> None: """ Inserts data into the version_provenance table :param description: The package for which the version applies :param the_value: The version to be recorded """ self.cursor().execute( """ INSERT INTO version_provenance( description, the_value) VALUES(?, ?) """, [description, the_value])
[docs] def insert_category( self, category: TimerCategory, machine_on: bool) -> int: """ Inserts category into the category_timer_provenance returning id :param category: Name of Category starting :param machine_on: If the machine was done during all or some of the time :returns: ID of the inserted category """ self.cursor().execute( """ INSERT INTO category_timer_provenance( category, machine_on, n_reset, n_run, n_loop) VALUES(?, ?, ?, ?, ?) """, [category.category_name, machine_on, FecDataView.get_reset_number(), FecDataView.get_run_number(), FecDataView.get_run_step()]) return self.lastrowid
[docs] def insert_category_timing( self, category_id: int, delta: timedelta) -> None: """ Inserts run time into the category :param category_id: id of the Category finished :param delta: Time to be recorded """ time_taken = ( (delta.seconds * MICRO_TO_MILLISECOND_CONVERSION) + (delta.microseconds / MICRO_TO_MILLISECOND_CONVERSION)) self.cursor().execute( """ UPDATE category_timer_provenance SET time_taken = ? WHERE category_id = ? """, (time_taken, category_id))
[docs] def insert_timing( self, category: int, algorithm: str, work: TimerWork, delta: timedelta, skip_reason: Optional[str]) -> None: """ Inserts algorithms run times into the timer_provenance table :param category: Category Id of the Algorithm :param algorithm: Algorithm name :param work: Type of work being done :param delta: Time to be recorded :param skip_reason: The reason the algorithm was skipped or `None` if it was not skipped """ time_taken = ( (delta.seconds * MICRO_TO_MILLISECOND_CONVERSION) + (delta.microseconds / MICRO_TO_MILLISECOND_CONVERSION)) self.cursor().execute( """ INSERT INTO timer_provenance( category_id, algorithm, work, time_taken, skip_reason) VALUES(?, ?, ?, ?, ?) """, [category, algorithm, work.work_name, time_taken, skip_reason])
[docs] def store_log(self, level: int, message: str, timestamp: Optional[datetime] = None) -> None: """ Stores log messages into the database """ if timestamp is None: timestamp = datetime.now() self.cursor().execute( """ INSERT INTO p_log_provenance( timestamp, level, message) VALUES(?, ?, ?) """, [timestamp, level, message])
def _test_log_locked(self, text: str) -> None: """ THIS IS A TESTING METHOD. This will lock the database and then try to do a log """ # lock the database self.cursor().execute( """ INSERT INTO version_provenance( description, the_value) VALUES("foo", "bar") """) # try logging and storing while locked. logger.warning(text)
[docs] def run_query(self, query: str, params: Iterable[Union[str, int, float, None, bytes]] = () ) -> List[Row]: """ Opens a connection to the database, runs a query, extracts the results and closes the connection The return type depends on the use_sqlite_rows parameter. By default this method returns tuples (lookup by index) but the advanced tuple type can be used instead, which supports lookup by name used in the query (use ``AS name`` in the query to set). This method will not allow queries that change the database unless the read_only flag is set to False. .. note:: This method is mainly provided as a support method for the later methods that return specific data. For new IntergationTests please add a specific method rather than call this directly. :param query: The SQL query to be run. May include ``?`` wildcards :param params: The values to replace the ``?`` wildcards with. The number and types must match what the query expects :return: A list possibly empty of tuples/rows (one for each row in the database) where the number and type of the values corresponds to the where statement """ results = [] for row in self.cursor().execute(query, list(params)): results.append(row) return results
[docs] def get_timer_provenance(self, algorithm: str) -> str: """ Gets the timer provenance item(s) from the last run :param algorithm: The value to LIKE search for in the algorithm column. Can be the full name, or have ``%`` and ``_`` wildcards. :return: A possibly multi line string with for each row which matches the like a line ``algorithm: value`` """ query = """ SELECT algorithm, time_taken FROM timer_provenance WHERE algorithm LIKE ? """ return "\n".join( f"{row[0]}: {row[1]}" for row in self.run_query(query, [algorithm]))
[docs] def get_run_time_of_buffer_extractor(self) -> str: """ Gets the buffer extractor provenance item(s) from the last run :return: A possibly multi line string with for each row which matches the ``LIKE %BufferExtractor`` """ return self.get_timer_provenance("%BufferExtractor")
[docs] def get_machine_on_by_reset(self, n_reset: Optional[int] = None) -> int: """ Get the total time the machine was on for this reset :param n_reset: :return: """ if n_reset is None: n_reset = FecDataView.get_reset_number() query = """ SELECT sum(time_taken) FROM category_timer_provenance WHERE machine_on = TRUE """ data = self.run_query(query) try: info = data[0][0] if info is None: return 0 return info except IndexError: return 0
[docs] def get_category_timer_sum(self, category: TimerCategory, n_reset: Optional[int] = None) -> int: """ Get the total runtime for one category of algorithms :param category: What to get the sum of :param n_reset: Which reset to sum or None for all :return: total off all run times with this category """ if n_reset is None: query = """ SELECT sum(time_taken) FROM category_timer_provenance WHERE category = ? """ data = self.run_query(query, [category.category_name]) else: query = """ SELECT sum(time_taken) FROM category_timer_provenance WHERE category = ? AND n_reset = ? """ data = self.run_query(query, [category.category_name, n_reset]) try: info = data[0][0] if info is None: return 0 return info except IndexError: return 0
[docs] def get_timer_sum_by_category(self, category: TimerCategory) -> int: """ Get the total runtime for one category of algorithms :param category: :return: total of all run times with this category """ query = """ SELECT sum(time_taken) FROM full_timer_view WHERE category = ? """ data = self.run_query(query, [category.category_name]) try: info = data[0][0] if info is None: return 0 return info except IndexError: return 0
[docs] def get_timer_sum_by_category_and_reset( self, category: TimerCategory, n_reset: Optional[int] = None) -> int: """ Get the total runtime for one category of algorithms :return: total of all run times with this category """ if n_reset is None: n_reset = FecDataView.get_reset_number() query = """ SELECT sum(time_taken) FROM full_timer_view WHERE category = ? AND n_reset = ? """ data = self.run_query(query, [category.category_name, n_reset]) try: info = data[0][0] if info is None: return 0 return info except IndexError: return 0
[docs] def get_timer_sum_by_work(self, work: TimerWork) -> int: """ Get the total runtime for one work type of algorithms :param work: :return: total off all run times with this category """ query = """ SELECT sum(time_taken) FROM full_timer_view WHERE work = ? """ data = self.run_query(query, [work.work_name]) try: info = data[0][0] if info is None: return 0 return info except IndexError: return 0
[docs] def get_timer_sum_by_algorithm(self, algorithm: str) -> int: """ Get the total runtime for one algorithm :param algorithm: :return: total off all run times with this algorithm """ query = """ SELECT sum(time_taken) FROM timer_provenance WHERE algorithm = ? """ data = self.run_query(query, [algorithm]) try: info = data[0][0] if info is None: return 0 return info except IndexError: return 0
[docs] def retreive_log_messages( self, min_level: int = 0) -> List[str]: """ :returns: All log messages at or above the min_level """ query = """ SELECT message FROM p_log_provenance WHERE level >= ? """ messages = self.run_query(query, [min_level]) return list(map(lambda x: x[0], messages))