# Copyright (c) 2015 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
import os
from typing import Dict, Iterable, List, Optional, Tuple, cast, TYPE_CHECKING
from spinn_utilities.log import FormatAdapter
from spinn_machine import Machine
from pacman.model.graphs import AbstractVertex
from pacman.model.graphs.machine import MachineVertex
from pacman.model.graphs.application.abstract import (
AbstractOneAppOneMachineVertex)
from pacman.utilities.utility_calls import get_keys
from pacman.model.graphs.abstract_edge_partition import AbstractEdgePartition
from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.utilities.sqlite_db import SQLiteDB
from spinn_front_end_common.abstract_models import (
AbstractSupportsDatabaseInjection, HasCustomAtomKeyMap, LiveOutputDevice)
from spinn_front_end_common.utility_models import (
LivePacketGather, LivePacketGatherMachineVertex)
if TYPE_CHECKING:
from spinn_front_end_common.utility_models.live_packet_gather import (
_LPGSplitter)
logger = FormatAdapter(logging.getLogger(__name__))
DB_NAME = "input_output_database.sqlite3"
INIT_SQL = "db.sql"
def _extract_int(x):
return None if x is None else int(x)
class DatabaseWriter(SQLiteDB):
"""
The interface for the database system for main front ends.
Any special tables needed from a front end should be done
by subclasses of this interface.
"""
__slots__ = (
# the path of the database
"_database_path",
# the identifier for the SpiNNaker machine
"_machine_id",
# Mappings used to accelerate inserts
"__machine_to_id", "__vertex_to_id")
def __init__(self) -> None:
self._database_path = os.path.join(FecDataView.get_run_dir_path(),
DB_NAME)
init_sql_path = os.path.join(os.path.dirname(__file__), INIT_SQL)
# delete any old database
if os.path.isfile(self._database_path):
os.remove(self._database_path)
super().__init__(self._database_path, ddl_file=init_sql_path)
self.__machine_to_id: Dict[Machine, int] = dict()
self.__vertex_to_id: Dict[AbstractVertex, int] = dict()
# set up checks
self._machine_id = 0
[docs]
@staticmethod
def auto_detect_database() -> bool:
"""
Auto detects if there is a need to activate the database system.
:return: whether the database is needed for the application
:rtype: bool
"""
if FecDataView.get_vertices_by_type(LivePacketGather):
return True
for vertex in FecDataView.get_vertices_by_type(
AbstractSupportsDatabaseInjection):
if vertex.is_in_injection_mode:
return True
return False
@property
def database_path(self) -> str:
"""
:rtype: str
"""
return self._database_path
def __insert(self, sql: str, *args) -> int:
"""
:param str sql:
:rtype: int
"""
try:
self.execute(sql, args)
return self.lastrowid
except Exception:
logger.exception("problem with insertion; argument types are {}",
str(map(type, args)))
raise
[docs]
def add_machine_objects(self) -> None:
"""
Store the machine object into the database.
"""
machine = FecDataView.get_machine()
self.__machine_to_id[machine] = self._machine_id = self.__insert(
"""
INSERT INTO Machine_layout(
x_dimension, y_dimension)
VALUES(?, ?)
""", machine.width, machine.height)
self.executemany(
"""
INSERT INTO Machine_chip(
no_processors, chip_x, chip_y, machine_id,
ip_address, nearest_ethernet_x, nearest_ethernet_y)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
(chip.n_processors, chip.x, chip.y, self._machine_id,
chip.ip_address,
chip.nearest_ethernet_x, chip.nearest_ethernet_y)
for chip in machine.chips))
[docs]
def add_application_vertices(self) -> None:
"""
Stores the main application graph description (vertices, edges).
"""
# add vertices
for vertex in FecDataView.iterate_vertices():
vertex_id = self.__insert(
"INSERT INTO Application_vertices(vertex_label) VALUES(?)",
vertex.label)
self.__vertex_to_id[vertex] = vertex_id
for m_vertex in vertex.machine_vertices:
m_vertex_id = self.__add_machine_vertex(m_vertex)
self.__insert(
"""
INSERT INTO graph_mapper_vertex (
application_vertex_id, machine_vertex_id)
VALUES(?, ?)
""",
vertex_id, m_vertex_id)
def __add_machine_vertex(self, m_vertex: MachineVertex) -> int:
m_vertex_id = self.__insert(
"INSERT INTO Machine_vertices (label) VALUES(?)",
str(m_vertex.label))
self.__vertex_to_id[m_vertex] = m_vertex_id
return m_vertex_id
[docs]
def add_system_params(self, runtime: Optional[float]):
"""
Write system parameters into the database.
:param int runtime: the amount of time the application is to run for
"""
self.executemany(
"""
INSERT INTO configuration_parameters (
parameter_id, value)
VALUES (?, ?)
""", [
("machine_time_step",
FecDataView.get_simulation_time_step_us()),
("time_scale_factor",
FecDataView.get_time_scale_factor()),
("infinite_run", str(runtime is None)),
("runtime", -1 if runtime is None else runtime),
("app_id", FecDataView.get_app_id())])
[docs]
def add_proxy_configuration(self) -> None:
"""
Store the proxy configuration.
"""
# pylint: disable=protected-access
job = FecDataView.get_spalloc_job()
if job is not None:
config = job.get_session_credentials_for_db()
self.executemany(
"""
INSERT INTO proxy_configuration(kind, name, value)
VALUES(?, ?, ?)
""", [(k1, k2, v) for (k1, k2), v in config.items()])
[docs]
def add_placements(self) -> None:
"""
Adds the placements objects into the database.
"""
# Make sure machine vertices are represented
for placement in FecDataView.iterate_placemements():
if placement.vertex not in self.__vertex_to_id:
self.__add_machine_vertex(placement.vertex)
# add records
self.executemany(
"""
INSERT INTO Placements(
vertex_id, chip_x, chip_y, chip_p, machine_id)
VALUES(?, ?, ?, ?, ?)
""", (
(self.__vertex_to_id[placement.vertex],
placement.x, placement.y, placement.p, self._machine_id)
for placement in FecDataView.iterate_placemements()))
[docs]
def create_atom_to_event_id_mapping(
self, machine_vertices: Optional[
Iterable[Tuple[MachineVertex, str]]]):
"""
:param machine_vertices:
:type machine_vertices:
list(tuple(~pacman.model.graphs.machine.MachineVertex,str))
"""
routing_infos = FecDataView.get_routing_infos()
# This could happen if there are no LPGs
if machine_vertices is None:
return
for (m_vertex, partition_id) in machine_vertices:
atom_keys: Iterable[Tuple[int, int]] = ()
if isinstance(m_vertex.app_vertex, HasCustomAtomKeyMap):
atom_keys = m_vertex.app_vertex.get_atom_key_map(
m_vertex, partition_id, routing_infos)
else:
r_info = routing_infos.get_routing_info_from_pre_vertex(
m_vertex, partition_id)
# r_info could be None if there are no outgoing edges,
# at which point there is nothing to do here anyway
if r_info is not None:
vertex_slice = m_vertex.vertex_slice
keys = get_keys(r_info.key, vertex_slice)
start = vertex_slice.lo_atom
atom_keys = [(i, k) for i, k in enumerate(keys, start)]
m_vertex_id = self.__vertex_to_id[m_vertex]
self.executemany(
"""
INSERT INTO event_to_atom_mapping(
vertex_id, event_id, atom_id)
VALUES (?, ?, ?)
""", ((m_vertex_id, int(key), int(i)) for i, key in atom_keys)
)
[docs]
def create_device_atom_event_id_mapping(
self, devices: Iterable[LiveOutputDevice]):
"""
Add output mappings for devices.
"""
for device in devices:
for m_vertex, atom_keys in device.get_device_output_keys().items():
m_vertex_id = self.__vertex_to_id[m_vertex]
self.executemany(
"""
INSERT INTO event_to_atom_mapping(
vertex_id, event_id, atom_id)
VALUES (?, ?, ?)
""", ((m_vertex_id, int(key), int(i))
for i, key in atom_keys)
)
def _get_machine_lpg_mappings(
self, part: AbstractEdgePartition) -> Iterable[
Tuple[MachineVertex, str, MachineVertex]]:
"""
Get places where an LPG Machine vertex has been added to a graph
"directly" (via SpiNNakerGraphFrontEnd);
and so it's application vertex *isn't* a LivePacketGather
"""
for edge in part.edges:
if (isinstance(edge.pre_vertex,
AbstractOneAppOneMachineVertex) and
isinstance(edge.post_vertex,
AbstractOneAppOneMachineVertex) and
isinstance(edge.post_vertex.machine_vertex,
LivePacketGatherMachineVertex) and
not isinstance(edge.post_vertex, LivePacketGather)):
yield (edge.pre_vertex.machine_vertex, part.identifier,
edge.post_vertex.machine_vertex)
@staticmethod
def __lpg_splitter(vertex: LivePacketGather) -> _LPGSplitter:
return cast('_LPGSplitter', vertex.splitter)
[docs]
def add_lpg_mapping(self) -> List[Tuple[MachineVertex, str]]:
"""
Add mapping from machine vertex to LPG machine vertex.
:return: A list of (source vertex, partition id)
:rtype: list(~pacman.model.graphs.machine.MachineVertex, str)
"""
targets: List[Tuple[MachineVertex, str, MachineVertex]] = [
(m_vertex, part_id, lpg_m_vertex)
for vertex in FecDataView.iterate_vertices()
if isinstance(vertex, LivePacketGather)
for lpg_m_vertex, m_vertex, part_id
in self.__lpg_splitter(vertex).targeted_lpgs]
targets.extend(
(m_vertex, part_id, lpg_m_vertex)
for part in FecDataView.iterate_partitions()
for (m_vertex, part_id, lpg_m_vertex) in
self._get_machine_lpg_mappings(part))
self.executemany(
"""
INSERT INTO m_vertex_to_lpg_vertex(
pre_vertex_id, partition_id, post_vertex_id)
VALUES(?, ?, ?)
""", ((self.__vertex_to_id[m_vertex], part_id,
self.__vertex_to_id[lpg_m_vertex])
for m_vertex, part_id, lpg_m_vertex in targets))
return [(source, part_id) for source, part_id, _target in targets]