aboutsummaryrefslogtreecommitdiff
path: root/python/pyarmnn/test/test_runtime.py
blob: a37772c5dfeb0eb4200e843150e90e3adb2ffb5b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
# Copyright © 2020 Arm Ltd. All rights reserved.
# SPDX-License-Identifier: MIT
import os

import pytest
import warnings
import numpy as np

import pyarmnn as ann


@pytest.fixture(scope="function")
def random_runtime(shared_data_folder):
    parser = ann.ITfLiteParser()
    network = parser.CreateNetworkFromBinaryFile(os.path.join(shared_data_folder, 'mock_model.tflite'))
    preferred_backends = [ann.BackendId('CpuRef')]
    options = ann.CreationOptions()

    runtime = ann.IRuntime(options)

    graphs_count = parser.GetSubgraphCount()

    graph_id = graphs_count - 1
    input_names = parser.GetSubgraphInputTensorNames(graph_id)

    input_binding_info = parser.GetNetworkInputBindingInfo(graph_id, input_names[0])
    input_tensor_id = input_binding_info[0]

    input_tensor_info = input_binding_info[1]
    input_tensor_info.SetConstant()

    output_names = parser.GetSubgraphOutputTensorNames(graph_id)

    input_data = np.random.randint(255, size=input_tensor_info.GetNumElements(), dtype=np.uint8)

    const_tensor_pair = (input_tensor_id, ann.ConstTensor(input_tensor_info, input_data))

    input_tensors = [const_tensor_pair]

    output_tensors = []

    for index, output_name in enumerate(output_names):
        out_bind_info = parser.GetNetworkOutputBindingInfo(graph_id, output_name)

        out_tensor_info = out_bind_info[1]
        out_tensor_id = out_bind_info[0]

        output_tensors.append((out_tensor_id,
                               ann.Tensor(out_tensor_info)))

    yield preferred_backends, network, runtime, input_tensors, output_tensors


@pytest.fixture(scope='function')
def mock_model_runtime(shared_data_folder):
    parser = ann.ITfLiteParser()
    network = parser.CreateNetworkFromBinaryFile(os.path.join(shared_data_folder, 'mock_model.tflite'))
    graph_id = 0

    input_binding_info = parser.GetNetworkInputBindingInfo(graph_id, "input_1")

    input_tensor_data = np.load(os.path.join(shared_data_folder, 'tflite_parser/input_lite.npy'))

    preferred_backends = [ann.BackendId('CpuRef')]

    options = ann.CreationOptions()
    runtime = ann.IRuntime(options)

    opt_network, messages = ann.Optimize(network, preferred_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions())

    print(messages)

    net_id, messages = runtime.LoadNetwork(opt_network)

    print(messages)

    input_tensors = ann.make_input_tensors([input_binding_info], [input_tensor_data])

    output_names = parser.GetSubgraphOutputTensorNames(graph_id)
    outputs_binding_info = []

    for output_name in output_names:
        outputs_binding_info.append(parser.GetNetworkOutputBindingInfo(graph_id, output_name))

    output_tensors = ann.make_output_tensors(outputs_binding_info)

    yield runtime, net_id, input_tensors, output_tensors


def test_python_disowns_network(random_runtime):
    preferred_backends = random_runtime[0]
    network = random_runtime[1]
    runtime = random_runtime[2]
    opt_network, _ = ann.Optimize(network, preferred_backends,
                                  runtime.GetDeviceSpec(), ann.OptimizerOptions())

    runtime.LoadNetwork(opt_network)

    assert not opt_network.thisown


def test_load_network(random_runtime):
    preferred_backends = random_runtime[0]
    network = random_runtime[1]
    runtime = random_runtime[2]

    opt_network, _ = ann.Optimize(network, preferred_backends,
                                  runtime.GetDeviceSpec(), ann.OptimizerOptions())

    net_id, messages = runtime.LoadNetwork(opt_network)
    assert "" == messages
    assert net_id == 0


def test_create_runtime_with_external_profiling_enabled():

    options = ann.CreationOptions()

    options.m_ProfilingOptions.m_FileOnly = True
    options.m_ProfilingOptions.m_EnableProfiling = True
    options.m_ProfilingOptions.m_OutgoingCaptureFile = "/tmp/outgoing.txt"
    options.m_ProfilingOptions.m_IncomingCaptureFile = "/tmp/incoming.txt"
    options.m_ProfilingOptions.m_TimelineEnabled = True
    options.m_ProfilingOptions.m_CapturePeriod = 1000
    options.m_ProfilingOptions.m_FileFormat = "JSON"

    runtime = ann.IRuntime(options)

    assert runtime is not None


def test_create_runtime_with_external_profiling_enabled_invalid_options():

    options = ann.CreationOptions()

    options.m_ProfilingOptions.m_FileOnly = True
    options.m_ProfilingOptions.m_EnableProfiling = False
    options.m_ProfilingOptions.m_OutgoingCaptureFile = "/tmp/outgoing.txt"
    options.m_ProfilingOptions.m_IncomingCaptureFile = "/tmp/incoming.txt"
    options.m_ProfilingOptions.m_TimelineEnabled = True
    options.m_ProfilingOptions.m_CapturePeriod = 1000
    options.m_ProfilingOptions.m_FileFormat = "JSON"

    with pytest.raises(RuntimeError) as err:
        runtime = ann.IRuntime(options)

    expected_error_message = "It is not possible to enable timeline reporting without profiling being enabled"
    assert expected_error_message in str(err.value)


def test_load_network_properties_provided(random_runtime):
    preferred_backends = random_runtime[0]
    network = random_runtime[1]
    runtime = random_runtime[2]

    opt_network, _ = ann.Optimize(network, preferred_backends,
                                  runtime.GetDeviceSpec(), ann.OptimizerOptions())

    inputSource = ann.MemorySource_Malloc
    outputSource = ann.MemorySource_Malloc
    properties = ann.INetworkProperties(False, inputSource, outputSource)
    net_id, messages = runtime.LoadNetwork(opt_network, properties)
    assert "" == messages
    assert net_id == 0


def test_network_properties_constructor(random_runtime):
    preferred_backends = random_runtime[0]
    network = random_runtime[1]
    runtime = random_runtime[2]

    opt_network, _ = ann.Optimize(network, preferred_backends,
                                  runtime.GetDeviceSpec(), ann.OptimizerOptions())

    inputSource = ann.MemorySource_Undefined
    outputSource = ann.MemorySource_Undefined
    properties = ann.INetworkProperties(True, inputSource, outputSource)
    assert properties.m_AsyncEnabled == True
    assert properties.m_ProfilingEnabled == False
    assert properties.m_OutputNetworkDetailsMethod == ann.ProfilingDetailsMethod_Undefined
    assert properties.m_InputSource == ann.MemorySource_Undefined
    assert properties.m_OutputSource == ann.MemorySource_Undefined

    net_id, messages = runtime.LoadNetwork(opt_network, properties)
    assert "" == messages
    assert net_id == 0


def test_unload_network_fails_for_invalid_net_id(random_runtime):
    preferred_backends = random_runtime[0]
    network = random_runtime[1]
    runtime = random_runtime[2]

    ann.Optimize(network, preferred_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions())

    with pytest.raises(RuntimeError) as err:
        runtime.UnloadNetwork(9)

    expected_error_message = "Failed to unload network."
    assert expected_error_message in str(err.value)


def test_enqueue_workload(random_runtime):
    preferred_backends = random_runtime[0]
    network = random_runtime[1]
    runtime = random_runtime[2]
    input_tensors = random_runtime[3]
    output_tensors = random_runtime[4]

    opt_network, _ = ann.Optimize(network, preferred_backends,
                                  runtime.GetDeviceSpec(), ann.OptimizerOptions())

    net_id, _ = runtime.LoadNetwork(opt_network)
    runtime.EnqueueWorkload(net_id, input_tensors, output_tensors)


def test_enqueue_workload_fails_with_empty_input_tensors(random_runtime):
    preferred_backends = random_runtime[0]
    network = random_runtime[1]
    runtime = random_runtime[2]
    input_tensors = []
    output_tensors = random_runtime[4]

    opt_network, _ = ann.Optimize(network, preferred_backends,
                                  runtime.GetDeviceSpec(), ann.OptimizerOptions())

    net_id, _ = runtime.LoadNetwork(opt_network)
    with pytest.raises(RuntimeError) as err:
        runtime.EnqueueWorkload(net_id, input_tensors, output_tensors)

    expected_error_message = "Number of inputs provided does not match network."
    assert expected_error_message in str(err.value)


@pytest.mark.x86_64
@pytest.mark.parametrize('count', [5])
def test_multiple_inference_runs_yield_same_result(count, mock_model_runtime):
    """
    Test that results remain consistent among multiple runs of the same inference.
    """
    runtime = mock_model_runtime[0]
    net_id = mock_model_runtime[1]
    input_tensors = mock_model_runtime[2]
    output_tensors = mock_model_runtime[3]

    expected_results = np.array([[4,  85, 108,  29,   8,  16,   0,   2,   5,   0]])

    for _ in range(count):
        runtime.EnqueueWorkload(net_id, input_tensors, output_tensors)

        output_vectors = ann.workload_tensors_to_ndarray(output_tensors)

        for i in range(len(expected_results)):
            assert output_vectors[i].all() == expected_results[i].all()


@pytest.mark.aarch64
def test_aarch64_inference_results(mock_model_runtime):

    runtime = mock_model_runtime[0]
    net_id = mock_model_runtime[1]
    input_tensors = mock_model_runtime[2]
    output_tensors = mock_model_runtime[3]

    runtime.EnqueueWorkload(net_id, input_tensors, output_tensors)

    output_vectors = ann.workload_tensors_to_ndarray(output_tensors)

    expected_outputs = expected_results = np.array([[4,  85, 108,  29,   8,  16,   0,   2,   5,   0]])

    for i in range(len(expected_outputs)):
        assert output_vectors[i].all() == expected_results[i].all()


def test_enqueue_workload_with_profiler(random_runtime):
    """
    Tests ArmNN's profiling extension
    """
    preferred_backends = random_runtime[0]
    network = random_runtime[1]
    runtime = random_runtime[2]
    input_tensors = random_runtime[3]
    output_tensors = random_runtime[4]

    opt_network, _ = ann.Optimize(network, preferred_backends,
                                  runtime.GetDeviceSpec(), ann.OptimizerOptions())
    net_id, _ = runtime.LoadNetwork(opt_network)

    profiler = runtime.GetProfiler(net_id)
    # By default profiling should be turned off:
    assert profiler.IsProfilingEnabled() is False

    # Enable profiling:
    profiler.EnableProfiling(True)
    assert profiler.IsProfilingEnabled() is True

    # Run the inference:
    runtime.EnqueueWorkload(net_id, input_tensors, output_tensors)

    # Get profile output as a string:
    str_profile = profiler.as_json()

    # Verify that certain markers are present:
    assert len(str_profile) != 0
    assert str_profile.find('\"ArmNN\": {') > 0

    # Get events analysis output as a string:
    str_events_analysis = profiler.event_log()

    assert "Event Sequence - Name | Duration (ms) | Start (ms) | Stop (ms) | Device" in str_events_analysis

    assert profiler.thisown == 0


def test_check_runtime_swig_ownership(random_runtime):
    # Check to see that SWIG has ownership for runtime. This instructs SWIG to take
    # ownership of the return value. This allows the value to be automatically
    # garbage-collected when it is no longer in use
    runtime = random_runtime[2]
    assert runtime.thisown