# Copyright (c) 2017-2019 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import os
import sys
import sqlite3
from spinn_utilities.log import FormatAdapter
from pacman.model.graphs.application.application_vertex import (
ApplicationVertex)
from pacman.model.graphs.common import EdgeTrafficType
from spinn_front_end_common.abstract_models import (
AbstractProvidesKeyToAtomMapping, AbstractRecordable,
AbstractSupportsDatabaseInjection)
logger = FormatAdapter(logging.getLogger(__name__))
DB_NAME = "input_output_database.db"
INIT_SQL = "db.sql"
def _extract_int(x):
return None if x is None else int(x)
[docs]class DatabaseWriter(object):
""" The interface for the database system for main front ends.\
Any special tables needed from a front end should be done\
by sub classes of this interface.
"""
__slots__ = [
# boolean flag for when the database writer has finished
"_done",
# the path of the database
"_database_path",
# the identifier for the SpiNNaker machine
"_machine_id",
# the database connection itself
"_connection",
# Mappings used to accelerate inserts
"_machine_to_id", "_vertex_to_id", "_edge_to_id"
]
def __init__(self, database_directory):
self._done = False
self._database_path = os.path.join(database_directory, DB_NAME)
self._connection = None
self._machine_to_id = dict()
self._vertex_to_id = dict()
self._edge_to_id = dict()
# delete any old database
if os.path.isfile(self._database_path):
os.remove(self._database_path)
# set up checks
self._machine_id = 0
def __enter__(self):
self._connection = sqlite3.connect(self._database_path)
self.__create_schema()
return self
def __exit__(self, exc_type, exc_val, exc_tb): # @UnusedVariable
self._connection.close()
self._connection = None
return False
[docs] @staticmethod
def auto_detect_database(machine_graph):
""" Auto detects if there is a need to activate the database system
:param machine_graph: the machine graph of the application\
problem space.
:return: a bool which represents if the database is needed
"""
return any(isinstance(vertex, AbstractSupportsDatabaseInjection)
and vertex.is_in_injection_mode
for vertex in machine_graph.vertices)
@property
def database_path(self):
return self._database_path
def __insert(self, sql, *args):
c = self._connection.cursor()
try:
c.execute(sql, args)
return c.lastrowid
except Exception:
logger.exception("problem with insertion; argument types are {}",
str(map(type, args)))
raise
def __create_schema(self):
init_sql_path = os.path.join(os.path.dirname(__file__), INIT_SQL)
with self._connection, open(init_sql_path) as f:
sql = f.read()
self._connection.executescript(sql)
def __insert_machine_layout(self, x_dimension, y_dimension):
return self.__insert(
"INSERT INTO Machine_layout("
" x_dimension, y_dimension) "
"VALUES(?, ?)",
int(x_dimension), int(y_dimension))
def __insert_machine_chip(self, no_processors, chip, machine_id):
if not chip.virtual:
return self.__insert(
"INSERT INTO Machine_chip("
" no_processors, chip_x, chip_y, machine_id,"
" ip_address, nearest_ethernet_x, nearest_ethernet_y) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
int(no_processors), int(chip.x), int(chip.y), int(machine_id),
chip.ip_address,
int(chip.nearest_ethernet_x), int(chip.nearest_ethernet_y))
else:
return self.__insert(
"INSERT INTO Machine_chip("
" no_processors, chip_x, chip_y, machine_id) "
"VALUES (?, ?, ?, ?)",
int(no_processors), int(chip.x), int(chip.y), int(machine_id))
def __insert_processor(self, chip, machine_id, available_DTCM,
available_CPU, physical_id):
# pylint: disable=too-many-arguments
return self.__insert(
"INSERT INTO Processor("
" chip_x, chip_y, machine_id, available_DTCM, "
" available_CPU, physical_id) "
"VALUES(?, ?, ?, ?, ?, ?)",
int(chip.x), int(chip.y), int(machine_id),
int(available_DTCM), int(available_CPU), int(physical_id))
def __insert_app_vertex(self, vertex, max_atoms, is_recording):
v_id = self.__insert(
"INSERT INTO Application_vertices("
" vertex_label, vertex_class, no_atoms, max_atom_constrant,"
" recorded) "
"VALUES(?, ?, ?, ?, ?)",
str(vertex.label), vertex.__class__.__name__,
int(vertex.n_atoms), int(max_atoms), int(is_recording))
self._vertex_to_id[vertex] = v_id
return v_id
def __insert_app_edge(self, edge):
e_id = self.__insert(
"INSERT INTO Application_edges ("
" pre_vertex, post_vertex, edge_label, edge_class) "
"VALUES(?, ?, ?, ?)",
int(self._vertex_to_id[edge.pre_vertex]),
int(self._vertex_to_id[edge.post_vertex]),
str(edge.label), edge.__class__.__name__)
self._edge_to_id[edge] = e_id
return e_id
def __insert_app_graph_element(self, vertex, edge):
return self.__insert(
"INSERT INTO Application_graph ("
" vertex_id, edge_id) "
"VALUES(?, ?)",
int(self._vertex_to_id[vertex]),
int(self._edge_to_id[edge]))
def __insert_cfg(self, parameter_id, value):
# NB: No type constraints on value; this is SQLite (not Sparta!)
return self.__insert(
"INSERT INTO configuration_parameters ("
" parameter_id, value) "
"VALUES (?, ?)",
str(parameter_id), value)
def __insert_machine_vertex(self, vertex, cpu_used, sdram_used, dtcm_used):
v_id = self.__insert(
"INSERT INTO Machine_vertices ("
" label, class, cpu_used, sdram_used, dtcm_used) "
"VALUES(?, ?, ?, ?, ?)",
str(vertex.label), vertex.__class__.__name__,
_extract_int(cpu_used.get_value()),
_extract_int(sdram_used),
_extract_int(dtcm_used))
self._vertex_to_id[vertex] = v_id
return v_id
def __insert_machine_edge(self, edge):
e_id = self.__insert(
"INSERT INTO Machine_edges ("
" pre_vertex, post_vertex, label, class) "
"VALUES(?, ?, ?, ?)",
int(self._vertex_to_id[edge.pre_vertex]),
int(self._vertex_to_id[edge.post_vertex]),
str(edge.label), edge.__class__.__name__)
self._edge_to_id[edge] = e_id
return e_id
def __insert_machine_graph_element(self, vertex, edge):
return self.__insert(
"INSERT INTO Machine_graph ("
" vertex_id, edge_id) "
"VALUES(?, ?)",
int(self._vertex_to_id[vertex]), int(self._edge_to_id[edge]))
def __insert_graph_mapper_vertex(
self, app_vertex, machine_vertex, vertex_slice):
return self.__insert(
"INSERT INTO graph_mapper_vertex ("
" application_vertex_id, machine_vertex_id, "
" lo_atom, hi_atom) "
"VALUES(?, ?, ?, ?)",
int(self._vertex_to_id[app_vertex]),
int(self._vertex_to_id[machine_vertex]),
int(vertex_slice.lo_atom), int(vertex_slice.hi_atom))
def __insert_graph_mapper_edge(self, app_edge, machine_edge):
return self.__insert(
"INSERT INTO graph_mapper_edges ("
" application_edge_id, machine_edge_id) "
"VALUES(?, ?)",
int(self._edge_to_id[app_edge]),
int(self._edge_to_id[machine_edge]))
def __insert_placement(self, placement, machine_id):
return self.__insert(
"INSERT INTO Placements("
" vertex_id, chip_x, chip_y, chip_p, machine_id) "
"VALUES(?, ?, ?, ?, ?)",
int(self._vertex_to_id[placement.vertex]),
int(placement.x), int(placement.y), int(placement.p),
int(machine_id))
def __insert_routing_info(self, edge, key_mask):
return self.__insert(
"INSERT INTO Routing_info("
" edge_id, \"key\", mask) "
"VALUES(?, ?, ?)",
int(self._edge_to_id[edge]), int(key_mask.key), int(key_mask.mask))
def __insert_routing_entry(self, routing_table, counter, entry, route):
return self.__insert(
"INSERT INTO Routing_table("
" chip_x, chip_y, position, key_combo, mask, route) "
"VALUES(?, ?, ?, ?, ?, ?)",
int(routing_table.x), int(routing_table.y), int(counter),
int(entry.routing_entry_key), int(entry.mask), int(route))
def __insert_ip_tag(self, vertex, ip_tag):
port = ip_tag.port
if port is None:
port = 0
return self.__insert(
"INSERT INTO IP_tags("
" vertex_id, tag, board_address, ip_address,"
" port, strip_sdp) "
"VALUES (?, ?, ?, ?, ?, ?)",
int(self._vertex_to_id[vertex]),
int(ip_tag.tag), str(ip_tag.board_address),
str(ip_tag.ip_address), int(port),
1 if ip_tag.strip_sdp else 0)
def __insert_reverse_ip_tag(self, vertex, reverse_ip_tag):
port = reverse_ip_tag.port
if port is None:
port = 0
return self.__insert(
"INSERT INTO Reverse_IP_tags("
" vertex_id, tag, board_address, port) "
"VALUES (?, ?, ?, ?)",
int(self._vertex_to_id[vertex]), int(reverse_ip_tag.tag),
str(reverse_ip_tag.board_address), int(port))
def __insert_event_atom_mapping(self, vertex, event_id, atom_id):
return self.__insert(
"INSERT INTO event_to_atom_mapping("
" vertex_id, event_id, atom_id) "
"VALUES (?, ?, ?)",
int(self._vertex_to_id[vertex]), int(event_id), int(atom_id))
[docs] def add_machine_objects(self, machine):
""" Store the machine object into the database
:param machine: the machine object.
:rtype: None
"""
with self._connection:
self._machine_to_id[machine] = self.__insert_machine_layout(
machine.max_chip_x + 1, machine.max_chip_y + 1)
self._machine_id += 1
for chip in machine.chips:
self.__insert_machine_chip(
len(list(chip.processors)), chip, self._machine_id)
for processor in chip.processors:
self.__insert_processor(
chip, self._machine_id,
processor.dtcm_available,
processor.cpu_cycles_available,
processor.processor_id)
[docs] def add_application_vertices(self, application_graph):
"""
:param application_graph:
:rtype: None
"""
with self._connection:
# add vertices
for vertex in application_graph.vertices:
if isinstance(vertex, AbstractRecordable):
self.__insert_app_vertex(
vertex, vertex.get_max_atoms_per_core(),
vertex.is_recording_spikes())
elif isinstance(vertex, ApplicationVertex):
self.__insert_app_vertex(
vertex, vertex.get_max_atoms_per_core(), 0)
else:
self.__insert_app_vertex(vertex, sys.maxsize, 0)
# add edges
for vertex in application_graph.vertices:
for edge in application_graph.\
get_edges_starting_at_vertex(vertex):
self.__insert_app_edge(edge)
# update graph
for vertex in application_graph.vertices:
for edge in application_graph.\
get_edges_starting_at_vertex(vertex):
self.__insert_app_graph_element(vertex, edge)
[docs] def add_system_params(self, time_scale_factor, machine_time_step, runtime):
""" Write system params into the database
:param time_scale_factor: the time scale factor used in timing
:param machine_time_step: the machine time step used in timing
:param runtime: the amount of time the application is to run for
"""
with self._connection:
self.__insert_cfg("machine_time_step", machine_time_step)
self.__insert_cfg("time_scale_factor", time_scale_factor)
if runtime is not None:
self.__insert_cfg("infinite_run", "False")
self.__insert_cfg("runtime", runtime)
else:
self.__insert_cfg("infinite_run", "True")
self.__insert_cfg("runtime", -1)
[docs] def add_vertices(self, machine_graph, data_n_timesteps, graph_mapper,
application_graph):
""" Add the machine graph, graph mapper and application graph \
into the database.
:param machine_graph: the machine graph object
:param data_n_timesteps: The number of timesteps for which data space\
will been reserved
:param graph_mapper: the graph mapper object
:param application_graph: the application graph object
:rtype: None
"""
with self._connection:
for vertex in machine_graph.vertices:
req = vertex.resources_required
self.__insert_machine_vertex(
vertex, req.cpu_cycles,
req.sdram.get_total_sdram(data_n_timesteps),
req.dtcm.get_value())
# add machine edges
for edge in machine_graph.edges:
self.__insert_machine_edge(edge)
# add to machine graph
for vertex in machine_graph.vertices:
for edge in machine_graph.get_edges_starting_at_vertex(vertex):
self.__insert_machine_graph_element(vertex, edge)
if application_graph is not None:
for machine_vertex in machine_graph.vertices:
app_vertex = graph_mapper.get_application_vertex(
machine_vertex)
self.__insert_graph_mapper_vertex(
app_vertex, machine_vertex,
graph_mapper.get_slice(machine_vertex))
# add graph_mapper edges
for edge in machine_graph.edges:
self.__insert_graph_mapper_edge(
graph_mapper.get_application_edge(edge), edge)
[docs] def add_placements(self, placements):
""" Adds the placements objects into the database
:param placements: the placements object
:param machine_graph: the machine graph object
:rtype: None
"""
with self._connection:
# add records
for placement in placements.placements:
self.__insert_placement(placement, self._machine_id)
[docs] def add_routing_infos(self, routing_infos, machine_graph):
""" Adds the routing information (key masks etc) into the database
:param routing_infos: the routing information object
:param machine_graph: the machine graph object
:rtype: None:
"""
with self._connection:
for partition in machine_graph.outgoing_edge_partitions:
if partition.traffic_type == EdgeTrafficType.MULTICAST:
rinfo = routing_infos.get_routing_info_from_partition(
partition)
for edge in partition.edges:
for key_mask in rinfo.keys_and_masks:
self.__insert_routing_info(edge, key_mask)
[docs] def add_routing_tables(self, routing_tables):
""" Adds the routing tables into the database
:param routing_tables: the routing tables object
:rtype: None
"""
with self._connection:
for routing_table in routing_tables.routing_tables:
for counter, entry in \
enumerate(routing_table.multicast_routing_entries):
route_entry = 0
for processor_id in entry.processor_ids:
route_entry |= 1 << (6 + processor_id)
for link_id in entry.link_ids:
route_entry |= 1 << link_id
self.__insert_routing_entry(
routing_table, counter, entry, route_entry)
[docs] def create_atom_to_event_id_mapping(
self, application_graph, machine_graph, routing_infos,
graph_mapper):
"""
:param application_graph:
:param machine_graph:
:param routing_infos:
:param graph_mapper:
:rtype: None
"""
have_app_graph = (application_graph is not None and
application_graph.n_vertices != 0)
with self._connection:
for vertex in machine_graph.vertices:
for partition in machine_graph.\
get_outgoing_edge_partitions_starting_at_vertex(
vertex):
if have_app_graph:
self._insert_vertex_atom_to_key_map(
graph_mapper.get_application_vertex(vertex),
partition, routing_infos)
else:
self._insert_vertex_atom_to_key_map(
vertex, partition, routing_infos)
def _insert_vertex_atom_to_key_map(
self, vertex, partition, routing_infos):
"""
:param vertex:
:param partition:
:param routing_infos:
:rtype: None
"""
if isinstance(vertex, AbstractProvidesKeyToAtomMapping):
routing_info = routing_infos.get_routing_info_from_partition(
partition)
for atom_id, key in vertex.routing_key_partition_atom_mapping(
routing_info, partition):
self.__insert_event_atom_mapping(vertex, key, atom_id)