aboutsummaryrefslogtreecommitdiff
path: root/driver_library/python/src/ethosu_driver/_utilities/driver_utilities.py
blob: fcea91f963844ff2a534a81fa985a23917c32443 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# SPDX-FileCopyrightText: Copyright 2021-2023 Arm Limited and/or its affiliates <open-source-office@arm.com>
# SPDX-License-Identifier: Apache-2.0
import logging
import time
from typing import List
from .._generated.driver import Device, Inference, Network, Buffer, InferenceStatus_OK


def open_device(device: str) -> Device:
    """Opens the Ethos-U device file descriptor.

        Args:
            device: device name.

        Returns:
            `Device`: Return the  object that represents Ethos-U device file descriptor and manages Ethos-U device lifecycle.
    """
    device = Device("/dev/{}".format(device))
    return device


def load_model(device: Device, model: str) -> Network:
    """Create a `Network` when providing `Device` object and a string containing tflite file path.

        Args:
            device: `Device` object that Ethos-U device file descriptor.
            model: tflite model file path .

        Returns:
            `Network`: Return the object that represent the neural __network file descriptor received from the Ethos-U device.
    """
    logging.info("Creating network")
    network_buffer = Buffer(device, model)
    return Network(device, network_buffer)


def populate_buffers(input_data: List[bytearray], buffers: List[Buffer]):
    """Set the feature maps associated memory buffer with the given data.

        Args:
            input_data: list of input feature maps data.
            buffers: list of already initialized ifm buffers.
        Raises:
             RuntimeError: if input data size is incorrect.
    """
    number_of_buffers = len(buffers)

    if number_of_buffers != len(input_data):
        raise RuntimeError("Incorrect number of inputs, expected {}, got {}.".format(number_of_buffers, len(input_data)))

    for index, (buffer, data_chunk) in enumerate(zip(buffers, input_data)):
        size = buffer.size()
        logging.info("Copying data to a buffer {} of {} with size = {}".format(index + 1, number_of_buffers, size))

        if len(data_chunk) > size:
            raise RuntimeError("Buffer expects {} bytes, got {} bytes.".format(size, len(data_chunk)))
        buffer.from_buffer(data_chunk)


def allocate_buffers(device: Device, dimensions: List) -> List[Buffer]:
    """Returns output feature maps associated with memory buffers.

        Args:
            device: `Device` object that Ethos-U device file descriptor.
            dimensions: `Network` object that represent the neural __network file descriptor.

        Returns:
            list: output feature map buffers.
    """
    buffers = []
    total = len(dimensions)
    for index, size in enumerate(dimensions):
        logging.info("Allocating {} of {} buffer with size = {}".format(index + 1, total, size))
        buffer = Buffer(device, size)
        buffers.append(buffer)

    return buffers


def get_results(inference: Inference) -> List[Buffer]:
    """Retrieves output inference buffers

        Args:
            inference: `Inference` object that represents the inference file descriptor.

        Returns:
            list: list of buffer objects
        Raises:
            RuntimeError: in case of inference returned failure status.

    """
    if InferenceStatus_OK != inference.status():
        raise RuntimeError("Inference failed!")
    else:
        logging.info("Inference succeeded!")
        return inference.getOfmBuffers()


class InferenceRunner:
    """Helper class to execute inference."""

    def __init__(self, device_name: str, model: str):
        """Initialises instance to execute inferences on the given model with given device

            Device is opened with the name '/dev/<device_name>'.
            Input/Output feature maps memory is allocated.

            Args:
                device_name: npu device name
                model: Tflite model file path
        """
        self.__device = open_device(device_name)
        if not InferenceRunner.wait_for_ping(self.__device, 3):
            raise RuntimeError("Failed to communicate with device {}".format(device_name))

        self.__network = load_model(self.__device, model)
        # it is important to have a reference to current inference object to have access to OFMs.
        self.__inf = None
        self.__enabled_counters = ()

    @staticmethod
    def wait_for_ping(device: Device, count: int) -> bool:
        if count == 0:
            return False
        try:
            device.ping()
            return True
        except:
            logging.info("Waiting for device: {}".format(count))
            time.sleep(0.5)
            return InferenceRunner.wait_for_ping(device,  count-1)

    def set_enabled_counters(self, enabled_counters: List[int] = ()):
        """Set the enabled performance counter to use during inference.

            Args:
                enabled_counters: list of integer counter to enable.
            Raises:
                ValueError: in case of inference returned failure status or the Pmu counter requests exceed the maximum supported.
        """
        max_pmu_events = Inference.getMaxPmuEventCounters()
        if len(enabled_counters) > max_pmu_events:
            raise ValueError("Number of PMU counters requested exceed the maximum supported ({}).".format(max_pmu_events))
        self.__enabled_counters = enabled_counters

    def run(self, input_data: List[bytearray], timeout: int) -> List[Buffer]:
        """Run a inference with the given input feature maps data.

            Args:
                input_data: data list containing input data as binary arrays
                timeout: inference timout in nano seconds

            Returns:
                list: list of buffer objects
        """
        ofms = allocate_buffers(self.__device, self.__network.getOfmDims())
        ifms = allocate_buffers(self.__device, self.__network.getIfmDims())
        populate_buffers(input_data, ifms)

        self.__inf = Inference(
            self.__network,
            ifms,
            ofms,
            self.__enabled_counters,
            True)

        self.__inf.wait(int(timeout))
        return get_results(self.__inf)

    def get_pmu_counters(self) -> List:
        """Return the PMU data for the inference run.

            Returns:
                list: pairs of PMU type and cycle count value
        """
        return list(zip(self.__enabled_counters, self.__inf.getPmuCounters()))

    def get_pmu_total_cycles(self) -> int:
        """
        Returns the total cycle count, including idle cycles, as reported by
        the PMU

        Returns: total cycle count
        """
        return self.__inf.getCycleCounter()