aboutsummaryrefslogtreecommitdiff
path: root/python/pyarmnn/src/pyarmnn/_utilities/profiling_helper.py
blob: d10c28915e75a60ce1cef1a2871fae73066df886 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# Copyright © 2019 Arm Ltd. All rights reserved.
# SPDX-License-Identifier: MIT
import json
from collections import namedtuple

ProfilerData = namedtuple('ProfilerData', ['inference_data', 'per_workload_execution_data'])
ProfilerData.__doc__ = """Container to hold the profiling inference data, and the profiling data per workload.

Contains:
    inference_data (dict): holds end-to-end inference performance data. Keys:
                           'time_unit' - timer units.
                           'execution_time' - list of total inference execution times for each inference run.                            
    per_workload_execution_data (dict): holds per operation performance data, key is a operation name
                                        Each operation has 
                                        'time_unit' - timer units.   
                                        'execution_time' - list of total execution times for each inference run.
                                        'backend' - backend used for this operation.
                                        
Example:
    
    >>> data = get_profiling_data(profiler)
    >>> print(data)
    >>> ProfilerData(inference_data={'time_unit': 'us', 
                                     'execution_time': [8901372.972]}, 
                    per_workload_execution_data={'CopyMemGeneric_Execute_#3': {'time_unit': 'us', 
                                                                               'execution_time': [28.941], 
                                                                               'backend': 'Unknown'}, 
                                                 'RefConvolution2dWorkload_Execute_#5': {'time_unit': 'us', 
                                                                                         'execution_time': [126838.071], 
                                                                                         'backend': 'CpuRef'}, 
                                                 'RefDepthwiseConvolution2dWorkload_Execute_#6': {'time_unit': 'us', 
                                                                                                  'execution_time': [49886.208], 
                                                                                                  'backend': 'CpuRef'}
                                                 ...etc
                                                 }
                    )  
"""


def get_profiling_data(profiler: 'IProfiler') -> ProfilerData:
    """Reads IProfiler object passed in, extracts the relevant data
        and returns it in a ProfilerData container.

        Args:
            profile_log (IProfiler): The IProfiler object to be parsed.

        Returns:
            ProfilerData: A container containing the relevant data extracted from the Profiler output.
    """

    top_level_dict = json.loads(profiler.as_json())
    armnn_data = top_level_dict["ArmNN"]
    inference_measurements = armnn_data["inference_measurements_#1"]
    execution_data = inference_measurements["Execute_#2"]

    workload_data = {}
    inference_data = {}
    for exec_key, exec_value in execution_data.items():
        # Check all items with a type.
        if "type" in exec_value and exec_value["type"] == "Event":
            for event_key, event_value in exec_value.items():
                if event_key.startswith("Wall clock time_#") and event_value["type"] == "Measurement":
                    time_data = __get_wall_clock_times__(event_value)
                    time_data["backend"] = __get_backend(exec_key)
                    workload_data[exec_key] = time_data
        # This is the total inference time map
        if exec_key.startswith("Wall clock time_#") and exec_value["type"] == "Measurement":
            time_data = __get_wall_clock_times__(exec_value)
            inference_data.update(time_data)
    return ProfilerData(inference_data=inference_data, per_workload_execution_data=workload_data)


def __get_wall_clock_times__(wall_clock_item):
    execution_times = wall_clock_item["raw"]
    time_data = {}
    raw_data = []
    for time in execution_times:
        raw_data.append(time)
    time_data["time_unit"] = wall_clock_item["unit"]
    time_data["execution_time"] = raw_data
    return time_data


def __get_backend(exec_key):
    if "ref" in exec_key.lower():
        return "CpuRef"
    elif "neon" in exec_key.lower():
        return "CpuAcc"
    elif "cl" in exec_key.lower():
        return "GpuAcc"
    elif "npu" in exec_key.lower():
        return "NpuAcc"
    else:
        return "Unknown"