# SPDX-FileCopyrightText: Copyright 2021-2022 Arm Limited and/or its affiliates # SPDX-License-Identifier: Apache-2.0 import logging import time from typing import List from .._generated.driver import Device, Inference, Network, Buffer, InferenceStatus_OK def open_device(device: str) -> Device: """Opens the Ethos-U device file descriptor. Args: device: device name. Returns: `Device`: Return the object that represents Ethos-U device file descriptor and manages Ethos-U device lifecycle. """ device = Device("/dev/{}".format(device)) return device def load_model(device: Device, model: str) -> Network: """Create a `Network` when providing `Device` object and a string containing tflite file path. Args: device: `Device` object that Ethos-U device file descriptor. model: tflite model file path . Returns: `Network`: Return the object that represent the neural __network file descriptor received from the Ethos-U device. """ logging.info("Creating network") network_buffer = Buffer(device, model) return Network(device, network_buffer) def populate_buffers(input_data: List[bytearray], buffers: List[Buffer]): """Set the feature maps associated memory buffer with the given data. Args: input_data: list of input feature maps data. buffers: list of already initialized ifm buffers. Raises: RuntimeError: if input data size is incorrect. """ number_of_buffers = len(buffers) if number_of_buffers != len(input_data): raise RuntimeError("Incorrect number of inputs, expected {}, got {}.".format(number_of_buffers, len(input_data))) for index, (buffer, data_chunk) in enumerate(zip(buffers, input_data)): cap = buffer.capacity() logging.info("Copying data to a buffer {} of {} with size = {}".format(index + 1, number_of_buffers, cap)) if len(data_chunk) > cap: raise RuntimeError("Buffer expects {} bytes, got {} bytes.".format(cap, len(data_chunk))) buffer.resize(len(data_chunk)) buffer.from_buffer(data_chunk) def allocate_buffers(device: Device, dimensions: List) -> List[Buffer]: """Returns output feature maps associated with memory buffers. Args: device: `Device` object that Ethos-U device file descriptor. dimensions: `Network` object that represent the neural __network file descriptor. Returns: list: output feature map buffers. """ buffers = [] total = len(dimensions) for index, size in enumerate(dimensions): logging.info("Allocating {} of {} buffer with size = {}".format(index + 1, total, size)) buffer = Buffer(device, size) buffers.append(buffer) return buffers def get_results(inference: Inference) -> List[Buffer]: """Retrieves output inference buffers Args: inference: `Inference` object that represents the inference file descriptor. Returns: list: list of buffer objects Raises: RuntimeError: in case of inference returned failure status. """ if InferenceStatus_OK != inference.status(): raise RuntimeError("Inference failed!") else: logging.info("Inference succeeded!") return inference.getOfmBuffers() class InferenceRunner: """Helper class to execute inference.""" def __init__(self, device_name: str, model: str): """Initialises instance to execute inferences on the given model with given device Device is opened with the name '/dev/'. Input/Output feature maps memory is allocated. Args: device_name: npu device name model: Tflite model file path """ self.__device = open_device(device_name) if not InferenceRunner.wait_for_ping(self.__device, 3): raise RuntimeError("Failed to communicate with device {}".format(device_name)) self.__network = load_model(self.__device, model) # it is important to have a reference to current inference object to have access to OFMs. self.__inf = None self.__enabled_counters = () @staticmethod def wait_for_ping(device: Device, count: int) -> bool: if count == 0: return False try: device.ping() return True except: logging.info("Waiting for device: {}".format(count)) time.sleep(0.5) return InferenceRunner.wait_for_ping(device, count-1) def set_enabled_counters(self, enabled_counters: List[int] = ()): """Set the enabled performance counter to use during inference. Args: enabled_counters: list of integer counter to enable. Raises: ValueError: in case of inference returned failure status or the Pmu counter requests exceed the maximum supported. """ max_pmu_events = Inference.getMaxPmuEventCounters() if len(enabled_counters) > max_pmu_events: raise ValueError("Number of PMU counters requested exceed the maximum supported ({}).".format(max_pmu_events)) self.__enabled_counters = enabled_counters def run(self, input_data: List[bytearray], timeout: int) -> List[Buffer]: """Run a inference with the given input feature maps data. Args: input_data: data list containing input data as binary arrays timeout: inference timout in nano seconds Returns: list: list of buffer objects """ ofms = allocate_buffers(self.__device, self.__network.getOfmDims()) ifms = allocate_buffers(self.__device, self.__network.getIfmDims()) populate_buffers(input_data, ifms) self.__inf = Inference( self.__network, ifms, ofms, self.__enabled_counters, True) self.__inf.wait(int(timeout)) return get_results(self.__inf) def get_pmu_counters(self) -> List: """Return the PMU data for the inference run. Returns: list: pairs of PMU type and cycle count value """ return list(zip(self.__enabled_counters, self.__inf.getPmuCounters())) def get_pmu_total_cycles(self) -> int: """ Returns the total cycle count, including idle cycles, as reported by the PMU Returns: total cycle count """ return self.__inf.getCycleCounter()