Source code for spinn_front_end_common.utilities.scp.get_current_time_process

# Copyright (c) 2016 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from functools import partial
import struct
import logging
from typing import Optional
from spinn_utilities.overrides import overrides
from spinn_utilities.progress_bar import ProgressBar
from spinn_utilities.log import FormatAdapter
from spinn_machine import CoreSubsets
from spinnman.messages.sdp import SDPHeader, SDPFlag
from spinnman.messages.scp.abstract_messages import (
    AbstractSCPRequest, AbstractSCPResponse)
from spinnman.messages.scp import SCPRequestHeader
from spinnman.processes import (
    AbstractMultiConnectionProcess, MostDirectConnectionSelector)
from spinnman.model.enums import (
    SDP_PORTS, SDP_RUNNING_MESSAGE_CODES)
from spinnman.messages.scp.enums import SCPResult
from spinnman.exceptions import SpinnmanUnexpectedResponseCodeException

logger = FormatAdapter(logging.getLogger(__name__))


class _GetCurrentTimeResponse(AbstractSCPResponse):

    __slots__ = (
        "__current_time",
    )

    def __init__(self) -> None:
        super().__init__()
        self.__current_time: Optional[int] = None

    @overrides(AbstractSCPResponse.read_data_bytestring)
    def read_data_bytestring(self, data: bytes, offset: int) -> None:
        result = self.scp_response_header.result
        # We can accept a no-reply response here; that could just mean
        # that the count wasn't complete (but might be enough anyway)
        if result != SCPResult.RC_OK and result != SCPResult.RC_P2P_NOREPLY:
            raise SpinnmanUnexpectedResponseCodeException(
                "CountState", "CMD_COUNT", result.name)
        self.__current_time = struct.unpack_from("<I", data, offset)[0]

    @property
    def current_time(self) -> int:
        """ Get the current time from the response
        """
        assert self.__current_time is not None
        return self.__current_time


class _GetCurrentTimeRequest(AbstractSCPRequest[_GetCurrentTimeResponse]):
    def __init__(self, x: int, y: int, p: int):
        """
        :param x:
        :param y:
        :param p:
        """
        sdp_flags = SDPFlag.REPLY_EXPECTED

        super().__init__(
            SDPHeader(
                flags=sdp_flags,
                destination_port=SDP_PORTS.RUNNING_COMMAND_SDP_PORT.value,
                destination_cpu=p, destination_chip_x=x, destination_chip_y=y),
            SCPRequestHeader(
                command=SDP_RUNNING_MESSAGE_CODES.SDP_GET_CURRENT_TIME_CODE))

    @overrides(AbstractSCPRequest.get_scp_response)
    def get_scp_response(self) -> _GetCurrentTimeResponse:
        return _GetCurrentTimeResponse()


class GetCurrentTimeProcess(
        AbstractMultiConnectionProcess[_GetCurrentTimeResponse]):
    """
    How to update the target running time of a set of cores.

    .. note::
        The cores must be using the simulation interface.
    """
    __slots__ = (
        "__latest_time",
        "__earliest_time"
    )

    def __init__(self, connection_selector: MostDirectConnectionSelector):
        """
         :param connection_selector:
            Connection to send the request over.
        """
        super().__init__(connection_selector)
        self.__latest_time: Optional[int] = None
        self.__earliest_time: Optional[int] = None

    def __receive_response(self, progress: ProgressBar,
                           response: _GetCurrentTimeResponse) -> None:
        progress.update()
        current_time = response.current_time
        if self.__latest_time is None or current_time > self.__latest_time:
            self.__latest_time = current_time
        if self.__earliest_time is None or current_time < self.__earliest_time:
            self.__earliest_time = current_time

[docs] def get_latest_runtime( self, n_cores: int, core_subsets: CoreSubsets) -> Optional[int]: """ Reads the runtime off all cores in the subset Logger warns if not all cores reported the same runtime :param core_subsets: :param n_cores: Number of cores being updated :returns: The latest found """ self.__latest_time = None with ProgressBar(n_cores, "Getting current time") as progress, \ self._collect_responses(): for core_subset in core_subsets: for processor_id in core_subset.processor_ids: self._send_request( _GetCurrentTimeRequest( core_subset.x, core_subset.y, processor_id), callback=partial(self.__receive_response, progress)) self._finish() self.check_for_error() if self.__earliest_time != self.__latest_time: logger.warning( "The cores did not all stop on the same time-step; on the " "next run, the simulation will start at the latest time of " f"{self.__latest_time}. For information, the earliest time " f"was {self.__earliest_time}.") return self.__latest_time