# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import sqlite3
from typing import Dict, Iterable, List, Optional, Tuple, cast, TYPE_CHECKING
import numpy
from spinn_utilities.log import FormatAdapter
from spinn_utilities.typing.coords import XYP
from spinnman.model.enums import ExecutableType
from spinnman.spalloc.spalloc_job import SpallocJob
from spinn_front_end_common.data import FecDataView
from spinn_front_end_common.abstract_models import AbstractHasAssociatedBinary
from spinn_front_end_common.utilities.constants import (
APP_PTR_TABLE_BYTE_SIZE)
from spinn_front_end_common.utilities.exceptions import DsDatabaseException
from spinn_front_end_common.utilities.sqlite_db import SQLiteDB
if TYPE_CHECKING:
from spinn_front_end_common.interface.interface_functions.\
spalloc_allocator import SpallocJobController # @UnusedImport
_DDL_FILE = os.path.join(os.path.dirname(__file__), "dse.sql")
logger = FormatAdapter(logging.getLogger(__name__))
# Stop large numbers being written as blobs
# pylint: disable=unnecessary-lambda
sqlite3.register_adapter(numpy.int64, lambda val: int(val))
sqlite3.register_adapter(numpy.int32, lambda val: int(val))
class DsSqlliteDatabase(SQLiteDB):
"""
A database for holding data specification details.
"""
__slots__ = ["_init_file"]
def __init__(self, database_file: Optional[str] = None):
"""
:param bool init_file:
Whether to initialise the DB from our DDL file. If not specified,
this is guessed from whether we can read the file.
"""
if database_file is None:
database_file = FecDataView.get_ds_database_path()
self._init_file = not os.path.exists(database_file)
super().__init__(
database_file, ddl_file=_DDL_FILE if self._init_file else None)
def _context_entered(self):
super()._context_entered()
if self._init_file:
self.__init_ethernets()
self._init_file = False
def __init_ethernets(self) -> None:
"""
Set up the database contents from the machine.
.. note:: Call of this method has to be delayed until inside the with
"""
eth_chips = FecDataView.get_machine().ethernet_connected_chips
self.executemany(
"""
INSERT INTO ethernet(
ethernet_x, ethernet_y, ip_address)
VALUES(?, ?, ?)
""", (
(ethernet.x, ethernet.y, ethernet.ip_address)
for ethernet in eth_chips))
[docs]
def set_core(self, x: int, y: int, p: int,
vertex: AbstractHasAssociatedBinary):
"""
Creates a database record for the core with this x,y,z
:param int x: X coordinate of the core
:param int y: Y coordinate of the core
:param int p: Processor ID of the core
:param vertex: Vertex to check if it is a system vertex.
if missing this method will not create a new record
:type vertex:
~spinn_front_end_common.abstract_models.AbstractHasAssociatedBinary
:rtype: int
:raises AttributeError:
If the vertex is not an AbstractHasAssociatedBinary
:raises KeyError:
If there is no Chip as x, y
:raises ~sqlite3.IntegrityError:
If this combination of x, y, p has already been used
Even if with the same vertex
"""
self._set_chip(x, y)
if vertex.get_binary_start_type() == ExecutableType.SYSTEM:
is_system = 1
else:
is_system = 0
self.execute(
"""
INSERT INTO core(x, y, p, is_system)
VALUES(?, ?, ?, ?)
""", (x, y, p, is_system))
[docs]
def get_core_infos(self, is_system: bool) -> List[
Tuple[int, int, int, int, int]]:
"""
Gets a list of id, x, y, p, ethernet_x, ethernet_y for all cores
according to is_system
:param bool is_system: if True returns system cores
otherwise application cores
:return:
(x, y, p, ethernet_x, ethernet_y)
for each system or app core
:rtype: list(int, int, int, int, int, int)
"""
core_infos: List[Tuple[int, int, int, int, int]] = []
for row in self.execute(
"""
SELECT x, y, p, ethernet_x, ethernet_y
FROM core_view
WHERE is_system = ?
ORDER BY x, y, p
""", (is_system,)):
core_infos.append(
(row["x"], row["y"], row["p"],
row["ethernet_x"], row["ethernet_y"]))
return core_infos
def _set_chip(self, x: int, y: int):
"""
:param int x:
:param int y:
"""
# skip if it already exists
for _ in self.execute(
"""
SELECT x
FROM chip
WHERE x = ? AND y = ?
LIMIT 1
""", (x, y)):
return
chip = FecDataView().get_chip_at(x, y)
self.execute(
"""
INSERT INTO chip(x, y, ethernet_x, ethernet_y) VALUES(?, ?, ?, ?)
""", (x, y, chip.nearest_ethernet_x, chip.nearest_ethernet_y))
[docs]
def set_memory_region(
self, x: int, y: int, p: int, region_num: int, size: int,
reference: Optional[int], label: Optional[str]):
"""
Writes the information to reserve a memory region into the database
Typically called after a DS.reserve_memory_region call
:param int x: X coordinate of the core
:param int y: Y coordinate of the core
:param int p: Processor ID of the core t
:param int region_num: The number of the region to reserve
:param int size: The size to reserve for the region, in bytes
:param label: An optional label for the region
:type label: str or None
:param reference: A globally unique reference for this region
:type reference: int or None
:return:
"""
self.execute(
"""
INSERT INTO region(
x, y, p, region_num, size, reference_num, region_label)
VALUES(?, ?, ?, ?, ?, ?, ?)
""", (x, y, p, region_num, size, reference, label))
return self.lastrowid
[docs]
def get_region_size(self, x: int, y: int, p: int, region_num: int) -> int:
"""
Gets the size for a region with this x, y, p and region
:param int x: X coordinate of the core
:param int y: Y coordinate of the core
:param int p: Processor ID of the core
:param int region_num: The region number
:return: The size of the region, in bytes
:rtype: int
"""
for row in self.execute(
"""
SELECT size
FROM region
WHERE x = ? AND y = ? AND p = ? AND region_num = ?
LIMIT 1
""", (x, y, p, region_num)):
return row["size"]
raise DsDatabaseException(f"Region {region_num} not set")
[docs]
def set_reference(self, x: int, y: int, p: int, region_num: int,
reference: int, ref_label: Optional[str]):
"""
Writes a outgoing region_reference into the database
:param int x: X coordinate of the core
:param int y: Y coordinate of the core
:param int p: Processor ID of the core
:param int region_num: The region number
:param int reference: The number of the reference on this core
:param ref_label: label for the referencing region
:type ref_label: str or None
"""
self.execute(
"""
INSERT INTO reference(
x, y, p, region_num, reference_num, ref_label)
VALUES(?, ?, ?, ?, ?, ?)
""", (x, y, p, region_num, reference, ref_label))
[docs]
def get_reference_pointers(self, x: int, y: int, p: int) -> Iterable[
Tuple[int, int]]:
"""
Yields the reference regions and where they point for this core
This may yield nothing if there are no reference pointers or
if the core is not known
.. note::
Do not use the database for anything else while iterating.
:param int x: X coordinate of the core
:param int y: Y coordinate of the core
:param int p: Processor ID of the core
:return: Yields the referencing vertex region number and the pointer
:rtype: iterable(tuple(int,int))
"""
for row in self.execute(
"""
SELECT ref_region, pointer
FROM linked_reference_view
WHERE x = ? AND y = ? AND ref_p = ?
""", (x, y, p)):
yield row["ref_region"], row["pointer"]
[docs]
def get_unlinked_references(self) -> Iterable[
Tuple[int, int, int, int, int, str]]:
"""
Finds and yields info on unreferenced links
If all is well this method yields nothing!
.. note::
Do not use the database for anything else while iterating.
:return: x, y, p, region, reference, label for all unlinked references
:rtype: iterable(tuple(int, int, int, int, int, str))
"""
for row in self.execute(
"""
SELECT x, y, ref_p, ref_region, reference_num,
COALESCE(ref_label, "") as ref_label
FROM linked_reference_view
WHERE act_region IS NULL
"""):
yield (row["x"], row["y"], row["ref_p"], row["ref_region"],
row["reference_num"], str(row["ref_label"], "utf8"))
[docs]
def get_double_region(self) -> Iterable[Tuple[int, int, int, int]]:
"""
Finds and yields any region that was used in both region definition
and a reference
If all is well this method yields nothing!
.. note::
Do not use the database for anything else while iterating.
:return: x, y, p, region
:rtype: iterable(tuple(int, int, int, int))
"""
for row in self.execute(
"""
SELECT x, y, p, region_num
FROM pointer_content_view
GROUP BY X, y, p, region_num
HAVING count(*) != 1
"""):
yield (row["x"], row["y"], row["p"], row["region_num"])
[docs]
def set_region_content(
self, x: int, y: int, p: int, region_num: int, content: bytes,
content_debug: Optional[str]):
"""
Sets the content for this region
:param int x: X coordinate of the core
:param int y: Y coordinate of the core
:param int p: Processor ID of the core
:param int region_num: The region number
:param bytes content: content to write
:param content_debug: debug text
:type content_debug: str or None
:raises DsDatabaseException: If the region already has content
"""
# check for previous content
for row in self.execute(
"""
SELECT content
FROM region
WHERE x = ? AND y = ? and p = ? and region_num = ?
LIMIT 1
""", (x, y, p, region_num)):
if row["content"]:
raise DsDatabaseException(
f"Illegal attempt to overwrite content for "
f"{x=} {y=} {p=} {region_num=}")
self.execute(
"""
UPDATE region
SET content = ?, content_debug = ?
WHERE x = ? AND y = ? and p = ? and region_num = ?
""", (content, content_debug, x, y, p, region_num))
if self.rowcount == 0:
raise DsDatabaseException(
f"No region {x=} {y=} {p=} {region_num=}")
[docs]
def get_region_pointer(
self, x: int, y: int, p: int, region_num: int) -> Optional[int]:
"""
Gets the pointer for this region as set during the original load
returns None if the region is known but for some reason the pointer
was not set
:param int x: X coordinate of the core
:param int y: Y coordinate of the core
:param int p: Processor ID of the core
:param int region_num: The Data Specification region number
:return: The pointer set during the original load
:rtype: int or None
:raises DsDatabaseException: if the region is not known
"""
for row in self.execute(
"""
SELECT pointer
FROM region
WHERE x = ? AND y = ? AND p = ? AND region_num = ?
LIMIT 1
""", (x, y, p, region_num)):
return row["pointer"]
raise DsDatabaseException(f"No region {x=} {y=} {p=} {region_num=}")
[docs]
def get_region_sizes(self, x: int, y: int, p: int) -> Dict[int, int]:
"""
Gets a dict of the regions and sizes reserved
The returned dict will be empty if there are no regions reserved
or if the core is not known.
:param int x: X coordinate of the core
:param int y: Y coordinate of the core
:param int p: Processor ID of the core
:return: dict of region_num to size but only for regions with a size
:rtype: dict(int, int)
"""
regions: Dict[int, int] = dict()
for row in self.execute(
"""
SELECT region_num, size
FROM region
WHERE x = ? AND y = ? AND p = ?
ORDER BY region_num
""", (x, y, p)):
regions[row["region_num"]] = row["size"]
return regions
[docs]
def get_total_regions_size(self, x: int, y: int, p: int) -> int:
"""
Gets the total size of the regions of this core
Does not include the size of the pointer table
Returns 0 even if the core is not known
:param int x: X coordinate of the core
:param int y: Y coordinate of the core
:param int p: Processor ID of the core
:return: The size of the regions
or 0 if there are no regions for this core
:rtype: int
"""
for row in self.execute(
"""
SELECT COALESCE(sum(size), 0) as total
FROM region
WHERE x = ? AND y = ? AND p = ?
LIMIT 1
""", (x, y, p)):
return row["total"]
raise DsDatabaseException("Query failed unexpectedly")
[docs]
def set_start_address(self, x: int, y: int, p: int, start_address: int):
"""
Sets the base address for a core and calculates pointers
:param int x: X coordinate of the core
:param int y: Y coordinate of the core
:param int p: Processor ID of the core
:param int start_address: The base address for the whole core
:raises DsDatabaseException: if the region is not known
"""
self.execute(
"""
UPDATE core
SET start_address = ?
WHERE x = ? AND y = ? AND p = ?
""", (start_address, x, y, p))
if self.rowcount == 0:
raise DsDatabaseException(
f"No core {x=} {y=} {p=}")
[docs]
def get_start_address(self, x: int, y: int, p: int) -> int:
"""
Gets the start_address for this core
:param int x: X coordinate of the core
:param int y: Y coordinate of the core
:param int p: Processor ID of the core
:return: The base address for the whole core
:rtype: int
"""
for row in self.execute(
"""
SELECT start_address
FROM core
WHERE x = ? AND y = ? and p = ?
LIMIT 1
""", (x, y, p)):
return row["start_address"]
raise DsDatabaseException(f"No core {x=} {y=} {p=}")
[docs]
def set_region_pointer(
self, x: int, y: int, p: int, region_num: int, pointer: int):
"""
Sets the pointer to the start of the address for this x, y, p region.
:param int x:
:param int y:
:param int p:
:param int region_num:
:param int pointer: start address
"""
self.execute(
"""
UPDATE region
SET pointer = ?
WHERE x = ? AND y = ? and p = ? and region_num = ?
""", (pointer, x, y, p, region_num))
if self.rowcount == 0:
raise DsDatabaseException(
f"No region {x=} {y=} {p=} {region_num=}")
[docs]
def get_region_pointers_and_content(
self, x: int, y: int, p: int) -> Iterable[Tuple[
int, int, Optional[bytes]]]:
"""
Yields the number, pointers and content for each reserved region
This includes regions with no content set where content will be None
Will yield nothing if there are no regions reserved or if the core if
not known
:param int x: X coordinate of the core
:param int y: Y coordinate of the core
:param int p: Processor ID of the core
:return: number, pointer and (content or None)
:rtype: iterable(tuple(int, int, bytearray or None))
"""
for row in self.execute(
"""
SELECT region_num, content, pointer
FROM pointer_content_view
WHERE x = ? AND y = ? AND p = ?
ORDER BY region_num
""", (x, y, p)):
if row["content"]:
content = bytearray(row["content"])
else:
content = None
yield row["region_num"], row["pointer"], content
[docs]
def get_ds_cores(self) -> Iterable[XYP]:
"""
Yields the x, y, p for the cores with possible Data Specifications
Includes cores where DataSpecs started even if no regions reserved
Yields nothing if there are no unknown cores
.. note::
Do not use the database for anything else while iterating.
:return: Yields the (x, y, p)
:rtype: iterable(tuple(int,int,int))
"""
for row in self.execute(
"""
SELECT x, y, p FROM core
"""):
yield (row["x"], row["y"], row["p"])
[docs]
def get_n_ds_cores(self) -> int:
"""
Returns the number for cores there is a data specification saved for.
Includes cores where DataSpecs started even if no regions reserved
:rtype: int
:raises DsDatabaseException:
"""
for row in self.execute(
"""
SELECT COUNT(*) as count FROM core
LIMIT 1
"""):
return row["count"]
raise DsDatabaseException("Count query failed")
[docs]
def get_memory_to_malloc(self, x: int, y: int, p: int) -> int:
"""
Gets the expected number of bytes to be written
:param int x: core X coordinate
:param int y: core Y coordinate
:param int p: core processor ID
:return: expected memory_written in bytes
:rtype: int
"""
to_malloc = APP_PTR_TABLE_BYTE_SIZE
# try the fast way using regions
for row in self.execute(
"""
SELECT regions_size
FROM region_size_view
WHERE x = ? AND y = ? AND p = ?
LIMIT 1
""", (x, y, p)):
to_malloc += row["regions_size"]
return to_malloc
[docs]
def get_memory_to_write(self, x: int, y: int, p: int) -> int:
"""
Gets the expected number of bytes to be written
:param int x: core X coordinate
:param int y: core Y coordinate
:param int p: core processor ID
:return: expected memory_written in bytes
:rtype: int
"""
to_write = APP_PTR_TABLE_BYTE_SIZE
# try the fast way using regions
for row in self.execute(
"""
SELECT contents_size
FROM content_size_view
WHERE x = ? AND y = ? AND p = ?
LIMIT 1
""", (x, y, p)):
to_write += row["contents_size"]
return to_write
[docs]
def get_info_for_cores(self) -> Iterable[Tuple[XYP, int, int, int]]:
"""
Yields the (x, y, p) and write info for each core
The sizes INCLUDE pointer table size
Yields nothing if no cores known
.. note::
A DB transaction may be held while this iterator is processing.
Reentrant use of this class is not supported.
:return: Yields the (x, y, p), start_address, memory_used
and memory_written
:rtype: iterable(tuple(tuple(int, int, int), int, int, int))
"""
for row in self.execute(
"""
SELECT x, y, p, start_address, to_write,malloc_size
FROM core_summary_view
"""):
yield ((row["x"], row["y"], row["p"]), row["start_address"],
row["malloc_size"], row["to_write"])
[docs]
def write_session_credentials_to_db(self) -> None:
"""
Write Spalloc session credentials to the database, if in use.
"""
# pylint: disable=protected-access
if not FecDataView.has_allocation_controller():
return
mac = FecDataView.get_allocation_controller()
if mac.proxying:
# This is now assumed to be a SpallocJobController;
# can't check that because of import circularity.
job = cast('SpallocJobController', mac)._job
if isinstance(job, SpallocJob):
config = job.get_session_credentials_for_db()
self.executemany(
"""
INSERT INTO proxy_configuration(kind, name, value)
VALUES(?, ?, ?)
""", [(k1, k2, v) for (k1, k2), v in config.items()])
[docs]
def set_app_id(self) -> None:
"""
Sets the app id
"""
# check for previous content
self.execute(
"""
INSERT INTO app_id(app_id)
VALUES(?)
""", (FecDataView.get_app_id(), ))
if self.rowcount == 0:
raise DsDatabaseException("Unable to set app id")