From 245d64c60d0ea30f5080ff53225b5169927e24d6 Mon Sep 17 00:00:00 2001 From: Matthew Bentham Date: Mon, 2 Dec 2019 12:59:43 +0000 Subject: Work in progress of python bindings for Arm NN Not built or tested in any way Signed-off-by: Matthew Bentham Change-Id: Ie7f92b529aa5087130f0c5cc8c17db1581373236 --- python/pyarmnn/test/test_runtime.py | 275 ++++++++++++++++++++++++++++++++++++ 1 file changed, 275 insertions(+) create mode 100644 python/pyarmnn/test/test_runtime.py (limited to 'python/pyarmnn/test/test_runtime.py') diff --git a/python/pyarmnn/test/test_runtime.py b/python/pyarmnn/test/test_runtime.py new file mode 100644 index 0000000000..c20d347785 --- /dev/null +++ b/python/pyarmnn/test/test_runtime.py @@ -0,0 +1,275 @@ +# Copyright © 2019 Arm Ltd. All rights reserved. +# SPDX-License-Identifier: MIT +import os + +import pytest +import numpy as np +from PIL import Image +import pyarmnn as ann +import platform + + +@pytest.fixture(scope="function") +def random_runtime(shared_data_folder): + parser = ann.ITfLiteParser() + network = parser.CreateNetworkFromBinaryFile(os.path.join(shared_data_folder, 'ssd_mobilenetv1.tflite')) + preferred_backends = [ann.BackendId('CpuRef')] + options = ann.CreationOptions() + runtime = ann.IRuntime(options) + + graphs_count = parser.GetSubgraphCount() + + graph_id = graphs_count - 1 + input_names = parser.GetSubgraphInputTensorNames(graph_id) + + input_binding_info = parser.GetNetworkInputBindingInfo(graph_id, input_names[0]) + input_tensor_id = input_binding_info[0] + + input_tensor_info = input_binding_info[1] + + output_names = parser.GetSubgraphOutputTensorNames(graph_id) + + input_data = np.random.randint(255, size=input_tensor_info.GetNumElements(), dtype=np.uint8) + + const_tensor_pair = (input_tensor_id, ann.ConstTensor(input_tensor_info, input_data)) + + input_tensors = [const_tensor_pair] + + output_tensors = [] + + for index, output_name in enumerate(output_names): + out_bind_info = parser.GetNetworkOutputBindingInfo(graph_id, output_name) + + out_tensor_info = out_bind_info[1] + out_tensor_id = out_bind_info[0] + + output_tensors.append((out_tensor_id, + ann.Tensor(out_tensor_info))) + + yield preferred_backends, network, runtime, input_tensors, output_tensors + + +@pytest.fixture(scope='function') +def mobilenet_ssd_runtime(shared_data_folder): + parser = ann.ITfLiteParser() + network = parser.CreateNetworkFromBinaryFile(os.path.join(shared_data_folder, 'ssd_mobilenetv1.tflite')) + graph_id = 0 + + input_binding_info = parser.GetNetworkInputBindingInfo(graph_id, "normalized_input_image_tensor") + + input_tensor_data = np.array(Image.open(os.path.join(shared_data_folder, 'cococat.jpeg')).resize((300, 300)), dtype=np.uint8) + + preferred_backends = [ann.BackendId('CpuRef')] + + options = ann.CreationOptions() + runtime = ann.IRuntime(options) + + opt_network, messages = ann.Optimize(network, preferred_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions()) + + print(messages) + + net_id, messages = runtime.LoadNetwork(opt_network) + + print(messages) + + input_tensors = ann.make_input_tensors([input_binding_info], [input_tensor_data]) + + output_names = parser.GetSubgraphOutputTensorNames(graph_id) + outputs_binding_info = [] + + for output_name in output_names: + outputs_binding_info.append(parser.GetNetworkOutputBindingInfo(graph_id, output_name)) + + output_tensors = ann.make_output_tensors(outputs_binding_info) + + yield runtime, net_id, input_tensors, output_tensors + + +def test_python_disowns_network(random_runtime): + preferred_backends = random_runtime[0] + network = random_runtime[1] + runtime = random_runtime[2] + opt_network, _ = ann.Optimize(network, preferred_backends, + runtime.GetDeviceSpec(), ann.OptimizerOptions()) + + runtime.LoadNetwork(opt_network) + + assert not opt_network.thisown + + +def test_load_network(random_runtime): + preferred_backends = random_runtime[0] + network = random_runtime[1] + runtime = random_runtime[2] + + opt_network, _ = ann.Optimize(network, preferred_backends, + runtime.GetDeviceSpec(), ann.OptimizerOptions()) + + net_id, messages = runtime.LoadNetwork(opt_network) + assert "" == messages + assert net_id == 0 + + +def test_load_network_properties_provided(random_runtime): + preferred_backends = random_runtime[0] + network = random_runtime[1] + runtime = random_runtime[2] + + opt_network, _ = ann.Optimize(network, preferred_backends, + runtime.GetDeviceSpec(), ann.OptimizerOptions()) + properties = ann.INetworkProperties(True, True) + net_id, messages = runtime.LoadNetwork(opt_network, properties) + assert "" == messages + assert net_id == 0 + + +def test_unload_network_fails_for_invalid_net_id(random_runtime): + preferred_backends = random_runtime[0] + network = random_runtime[1] + runtime = random_runtime[2] + + ann.Optimize(network, preferred_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions()) + + with pytest.raises(RuntimeError) as err: + runtime.UnloadNetwork(9) + + expected_error_message = "Failed to unload network." + assert expected_error_message in str(err.value) + + +def test_enqueue_workload(random_runtime): + preferred_backends = random_runtime[0] + network = random_runtime[1] + runtime = random_runtime[2] + input_tensors = random_runtime[3] + output_tensors = random_runtime[4] + + opt_network, _ = ann.Optimize(network, preferred_backends, + runtime.GetDeviceSpec(), ann.OptimizerOptions()) + + net_id, _ = runtime.LoadNetwork(opt_network) + runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) + + +def test_enqueue_workload_fails_with_empty_input_tensors(random_runtime): + preferred_backends = random_runtime[0] + network = random_runtime[1] + runtime = random_runtime[2] + input_tensors = [] + output_tensors = random_runtime[4] + + opt_network, _ = ann.Optimize(network, preferred_backends, + runtime.GetDeviceSpec(), ann.OptimizerOptions()) + + net_id, _ = runtime.LoadNetwork(opt_network) + with pytest.raises(RuntimeError) as err: + runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) + + expected_error_message = "Number of inputs provided does not match network." + assert expected_error_message in str(err.value) + + +@pytest.mark.skipif(platform.processor() != 'x86_64', reason="Only run on x86, this is because these are exact results " + "for x86 only. The Juno produces slightly different " + "results meaning this test would fail.") +@pytest.mark.parametrize('count', [5]) +def test_multiple_inference_runs_yield_same_result(count, mobilenet_ssd_runtime): + """ + Test that results remain consistent among multiple runs of the same inference. + """ + runtime = mobilenet_ssd_runtime[0] + net_id = mobilenet_ssd_runtime[1] + input_tensors = mobilenet_ssd_runtime[2] + output_tensors = mobilenet_ssd_runtime[3] + + expected_results = [[0.17047899961471558, 0.22598055005073547, 0.8146906495094299, 0.7677907943725586, + 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0], + [16.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + [0.80078125, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + [1.0]] + + for _ in range(count): + runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) + + output_vectors = ann.workload_tensors_to_ndarray(output_tensors) + + for i in range(len(expected_results)): + assert all(output_vectors[i] == expected_results[i]) + + +@pytest.mark.juno +def test_juno_inference_results(mobilenet_ssd_runtime): + """ + Test inference results are sensible on a Juno. + For the Juno we allow +/-3% compared to the results on x86. + """ + runtime = mobilenet_ssd_runtime[0] + net_id = mobilenet_ssd_runtime[1] + input_tensors = mobilenet_ssd_runtime[2] + output_tensors = mobilenet_ssd_runtime[3] + + runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) + + output_vectors = ann.workload_tensors_to_ndarray(output_tensors) + + expected_outputs = [[pytest.approx(0.17047899961471558, 0.03), pytest.approx(0.22598055005073547, 0.03), + pytest.approx(0.8146906495094299, 0.03), pytest.approx(0.7677907943725586, 0.03), + 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, + 0.0, 0.0, 0.0, 0.0], + [16.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + [0.80078125, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], + [1.0]] + + for i in range(len(expected_outputs)): + assert all(output_vectors[i] == expected_outputs[i]) + + +def test_enqueue_workload_with_profiler(random_runtime): + """ + Tests ArmNN's profiling extension + """ + preferred_backends = random_runtime[0] + network = random_runtime[1] + runtime = random_runtime[2] + input_tensors = random_runtime[3] + output_tensors = random_runtime[4] + + opt_network, _ = ann.Optimize(network, preferred_backends, + runtime.GetDeviceSpec(), ann.OptimizerOptions()) + net_id, _ = runtime.LoadNetwork(opt_network) + + profiler = runtime.GetProfiler(net_id) + # By default profiling should be turned off: + assert profiler.IsProfilingEnabled() is False + + # Enable profiling: + profiler.EnableProfiling(True) + assert profiler.IsProfilingEnabled() is True + + # Run the inference: + runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) + + # Get profile output as a string: + str_profile = profiler.as_json() + + # Verify that certain markers are present: + assert len(str_profile) != 0 + assert str_profile.find('\"ArmNN\": {') > 0 + + # Get events analysis output as a string: + str_events_analysis = profiler.event_log() + + assert "Event Sequence - Name | Duration (ms) | Start (ms) | Stop (ms) | Device" in str_events_analysis + + assert profiler.thisown == 0 + + +def test_check_runtime_swig_ownership(random_runtime): + # Check to see that SWIG has ownership for runtime. This instructs SWIG to take + # ownership of the return value. This allows the value to be automatically + # garbage-collected when it is no longer in use + runtime = random_runtime[2] + assert runtime.thisown -- cgit v1.2.1