From dc0c6ed9f8b993e63f492f203d7d7080ab4c835c Mon Sep 17 00:00:00 2001 From: Richard Burton Date: Wed, 8 Apr 2020 16:39:05 +0100 Subject: Add PyArmNN to work with ArmNN API of 20.02 * Add Swig rules for generating python wrapper * Add documentation * Add tests and testing data Change-Id: If48eda08931514fa21e72214dfead2835f07237c Signed-off-by: Richard Burton Signed-off-by: Derek Lamberti --- python/pyarmnn/test/test_runtime.py | 257 ++++++++++++++++++++++++++++++++++++ 1 file changed, 257 insertions(+) create mode 100644 python/pyarmnn/test/test_runtime.py (limited to 'python/pyarmnn/test/test_runtime.py') diff --git a/python/pyarmnn/test/test_runtime.py b/python/pyarmnn/test/test_runtime.py new file mode 100644 index 0000000000..2943be8f16 --- /dev/null +++ b/python/pyarmnn/test/test_runtime.py @@ -0,0 +1,257 @@ +# Copyright © 2020 Arm Ltd. All rights reserved. +# SPDX-License-Identifier: MIT +import os + +import pytest +import numpy as np + +import pyarmnn as ann + + +@pytest.fixture(scope="function") +def random_runtime(shared_data_folder): + parser = ann.ITfLiteParser() + network = parser.CreateNetworkFromBinaryFile(os.path.join(shared_data_folder, 'mock_model.tflite')) + preferred_backends = [ann.BackendId('CpuRef')] + options = ann.CreationOptions() + runtime = ann.IRuntime(options) + + graphs_count = parser.GetSubgraphCount() + + graph_id = graphs_count - 1 + input_names = parser.GetSubgraphInputTensorNames(graph_id) + + input_binding_info = parser.GetNetworkInputBindingInfo(graph_id, input_names[0]) + input_tensor_id = input_binding_info[0] + + input_tensor_info = input_binding_info[1] + + output_names = parser.GetSubgraphOutputTensorNames(graph_id) + + input_data = np.random.randint(255, size=input_tensor_info.GetNumElements(), dtype=np.uint8) + + const_tensor_pair = (input_tensor_id, ann.ConstTensor(input_tensor_info, input_data)) + + input_tensors = [const_tensor_pair] + + output_tensors = [] + + for index, output_name in enumerate(output_names): + out_bind_info = parser.GetNetworkOutputBindingInfo(graph_id, output_name) + + out_tensor_info = out_bind_info[1] + out_tensor_id = out_bind_info[0] + + output_tensors.append((out_tensor_id, + ann.Tensor(out_tensor_info))) + + yield preferred_backends, network, runtime, input_tensors, output_tensors + + +@pytest.fixture(scope='function') +def mock_model_runtime(shared_data_folder): + parser = ann.ITfLiteParser() + network = parser.CreateNetworkFromBinaryFile(os.path.join(shared_data_folder, 'mock_model.tflite')) + graph_id = 0 + + input_binding_info = parser.GetNetworkInputBindingInfo(graph_id, "input_1") + + input_tensor_data = np.load(os.path.join(shared_data_folder, 'tflite_parser/input_lite.npy')) + + preferred_backends = [ann.BackendId('CpuRef')] + + options = ann.CreationOptions() + runtime = ann.IRuntime(options) + + opt_network, messages = ann.Optimize(network, preferred_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions()) + + print(messages) + + net_id, messages = runtime.LoadNetwork(opt_network) + + print(messages) + + input_tensors = ann.make_input_tensors([input_binding_info], [input_tensor_data]) + + output_names = parser.GetSubgraphOutputTensorNames(graph_id) + outputs_binding_info = [] + + for output_name in output_names: + outputs_binding_info.append(parser.GetNetworkOutputBindingInfo(graph_id, output_name)) + + output_tensors = ann.make_output_tensors(outputs_binding_info) + + yield runtime, net_id, input_tensors, output_tensors + + +def test_python_disowns_network(random_runtime): + preferred_backends = random_runtime[0] + network = random_runtime[1] + runtime = random_runtime[2] + opt_network, _ = ann.Optimize(network, preferred_backends, + runtime.GetDeviceSpec(), ann.OptimizerOptions()) + + runtime.LoadNetwork(opt_network) + + assert not opt_network.thisown + + +def test_load_network(random_runtime): + preferred_backends = random_runtime[0] + network = random_runtime[1] + runtime = random_runtime[2] + + opt_network, _ = ann.Optimize(network, preferred_backends, + runtime.GetDeviceSpec(), ann.OptimizerOptions()) + + net_id, messages = runtime.LoadNetwork(opt_network) + assert "" == messages + assert net_id == 0 + + +def test_load_network_properties_provided(random_runtime): + preferred_backends = random_runtime[0] + network = random_runtime[1] + runtime = random_runtime[2] + + opt_network, _ = ann.Optimize(network, preferred_backends, + runtime.GetDeviceSpec(), ann.OptimizerOptions()) + + properties = ann.INetworkProperties(True, True) + net_id, messages = runtime.LoadNetwork(opt_network, properties) + assert "" == messages + assert net_id == 0 + + +def test_unload_network_fails_for_invalid_net_id(random_runtime): + preferred_backends = random_runtime[0] + network = random_runtime[1] + runtime = random_runtime[2] + + ann.Optimize(network, preferred_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions()) + + with pytest.raises(RuntimeError) as err: + runtime.UnloadNetwork(9) + + expected_error_message = "Failed to unload network." + assert expected_error_message in str(err.value) + + +def test_enqueue_workload(random_runtime): + preferred_backends = random_runtime[0] + network = random_runtime[1] + runtime = random_runtime[2] + input_tensors = random_runtime[3] + output_tensors = random_runtime[4] + + opt_network, _ = ann.Optimize(network, preferred_backends, + runtime.GetDeviceSpec(), ann.OptimizerOptions()) + + net_id, _ = runtime.LoadNetwork(opt_network) + runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) + + +def test_enqueue_workload_fails_with_empty_input_tensors(random_runtime): + preferred_backends = random_runtime[0] + network = random_runtime[1] + runtime = random_runtime[2] + input_tensors = [] + output_tensors = random_runtime[4] + + opt_network, _ = ann.Optimize(network, preferred_backends, + runtime.GetDeviceSpec(), ann.OptimizerOptions()) + + net_id, _ = runtime.LoadNetwork(opt_network) + with pytest.raises(RuntimeError) as err: + runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) + + expected_error_message = "Number of inputs provided does not match network." + assert expected_error_message in str(err.value) + + +@pytest.mark.x86_64 +@pytest.mark.parametrize('count', [5]) +def test_multiple_inference_runs_yield_same_result(count, mock_model_runtime): + """ + Test that results remain consistent among multiple runs of the same inference. + """ + runtime = mock_model_runtime[0] + net_id = mock_model_runtime[1] + input_tensors = mock_model_runtime[2] + output_tensors = mock_model_runtime[3] + + expected_results = np.array([[4, 85, 108, 29, 8, 16, 0, 2, 5, 0]]) + + for _ in range(count): + runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) + + output_vectors = ann.workload_tensors_to_ndarray(output_tensors) + + for i in range(len(expected_results)): + assert output_vectors[i].all() == expected_results[i].all() + + +@pytest.mark.aarch64 +def test_aarch64_inference_results(mock_model_runtime): + + runtime = mock_model_runtime[0] + net_id = mock_model_runtime[1] + input_tensors = mock_model_runtime[2] + output_tensors = mock_model_runtime[3] + + runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) + + output_vectors = ann.workload_tensors_to_ndarray(output_tensors) + + expected_outputs = expected_results = np.array([[4, 85, 108, 29, 8, 16, 0, 2, 5, 0]]) + + for i in range(len(expected_outputs)): + assert output_vectors[i].all() == expected_results[i].all() + + +def test_enqueue_workload_with_profiler(random_runtime): + """ + Tests ArmNN's profiling extension + """ + preferred_backends = random_runtime[0] + network = random_runtime[1] + runtime = random_runtime[2] + input_tensors = random_runtime[3] + output_tensors = random_runtime[4] + + opt_network, _ = ann.Optimize(network, preferred_backends, + runtime.GetDeviceSpec(), ann.OptimizerOptions()) + net_id, _ = runtime.LoadNetwork(opt_network) + + profiler = runtime.GetProfiler(net_id) + # By default profiling should be turned off: + assert profiler.IsProfilingEnabled() is False + + # Enable profiling: + profiler.EnableProfiling(True) + assert profiler.IsProfilingEnabled() is True + + # Run the inference: + runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) + + # Get profile output as a string: + str_profile = profiler.as_json() + + # Verify that certain markers are present: + assert len(str_profile) != 0 + assert str_profile.find('\"ArmNN\": {') > 0 + + # Get events analysis output as a string: + str_events_analysis = profiler.event_log() + + assert "Event Sequence - Name | Duration (ms) | Start (ms) | Stop (ms) | Device" in str_events_analysis + + assert profiler.thisown == 0 + + +def test_check_runtime_swig_ownership(random_runtime): + # Check to see that SWIG has ownership for runtime. This instructs SWIG to take + # ownership of the return value. This allows the value to be automatically + # garbage-collected when it is no longer in use + runtime = random_runtime[2] + assert runtime.thisown -- cgit v1.2.1