aboutsummaryrefslogtreecommitdiff
path: root/scripts/run_platform.py
blob: bb2f06fa4c77c026b18a1fb5d58ad9a4f1c46fd0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/env python3

#
# Copyright (c) 2021 Arm Limited. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the License); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an AS IS BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import argparse
import multiprocessing
import numpy
import os
import pathlib
import re
import shutil
import subprocess
import sys

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
from tensorflow.lite.python.interpreter import Interpreter, OpResolverType

CORE_PLATFORM_PATH = pathlib.Path(__file__).resolve().parents[1]

def run_cmd(cmd, **kwargs):
    # str() is called to handle pathlib.Path objects
    cmd_str = " ".join([str(arg) for arg in cmd])
    print(f"Running command: {cmd_str}")
    return subprocess.run(cmd, check=True, **kwargs)

def build_core_platform(output_folder, target, toolchain, memory_model, memory_arena, pmu):
    build_folder = output_folder/"model"/"build"
    cmake_cmd = ["cmake",
                 CORE_PLATFORM_PATH/"targets"/target,
                 f"-B{build_folder}",
                 f"-DCMAKE_TOOLCHAIN_FILE={CORE_PLATFORM_PATH/'cmake'/'toolchain'/(toolchain + '.cmake')}",
                 f"-DBAREMETAL_PATH={output_folder}",
                 f"-DMEMORY_MODEL={memory_model}",
                 f"-DMEMORY_ARENA={memory_arena}"]
    if pmu:
        for i in range(len(pmu)):
            cmake_cmd += [f"-DETHOSU_PMU_EVENT_{i}={pmu[i]}"]

    run_cmd(cmake_cmd)

    make_cmd = ["make", "-C", build_folder, f"-j{multiprocessing.cpu_count()}", "baremetal_custom"]
    run_cmd(make_cmd)

def generate_reference_data(output_folder, non_optimized_model_path, input_path, expected_output_path):
    interpreter = Interpreter(model_path=str(non_optimized_model_path.resolve()), experimental_op_resolver_type=OpResolverType.BUILTIN_REF)

    interpreter.allocate_tensors()
    input_detail  = interpreter.get_input_details()[0]
    output_detail = interpreter.get_output_details()[0]

    input_data = None
    if input_path is None:
        # Randomly generate input data
        dtype = input_detail["dtype"]
        if dtype is numpy.float32:
            rand = numpy.random.default_rng()
            input_data = rand.random(size=input_detail["shape"], dtype=numpy.float32)
        else:
            input_data = numpy.random.randint(low=numpy.iinfo(dtype).min, high=numpy.iinfo(dtype).max, size=input_detail["shape"], dtype=dtype)
    else:
        # Load user provided input data
        input_data = numpy.load(input_path)

    output_data = None
    if expected_output_path is None:
        # Run the network with input_data to get reference output
        interpreter.set_tensor(input_detail["index"], input_data)
        interpreter.invoke()
        output_data = interpreter.get_tensor(output_detail["index"])
    else:
        # Load user provided output data
        output_data = numpy.load(expected_output_path)

    network_input_path  = output_folder/"ref_input.bin"
    network_output_path = output_folder/"ref_output.bin"

    with network_input_path.open("wb") as fp:
        fp.write(input_data.tobytes())
    with network_output_path.open("wb") as fp:
        fp.write(output_data.tobytes())

    output_folder = pathlib.Path(output_folder)
    dump_c_header(network_input_path, output_folder/"input.h", "inputData", "input_data_sec", 4)
    dump_c_header(network_output_path, output_folder/"output.h", "expectedOutputData", "expected_output_data_sec", 4)

def dump_c_header(input_path, output_path, array_name, section, alignment, extra_data=""):
    byte_array = []
    with open(input_path, "rb") as fp:
        byte_string = fp.read()
        byte_array = [f"0x{format(byte, '02x')}" for byte in byte_string]

        last = byte_array[-1]
        byte_array = [byte + "," for byte in byte_array[:-1]] + [last]

        byte_array = ["  " + byte if idx % 12 == 0 else byte
                      for idx, byte in enumerate(byte_array)]

        byte_array = [byte + "\n" if (idx + 1) % 12 == 0 else byte + " "
                      for idx, byte in enumerate(byte_array)]

    with open(output_path, "w") as carray:
        header = f"uint8_t {array_name}[] __attribute__((section(\"{section}\"), aligned({alignment}))) = {{\n"
        carray.write(extra_data)
        carray.write(header)
        carray.write("".join(byte_array))
        carray.write("\n};\n")

def optimize_network(output_folder, network_path, accelerator_conf):
    vela_cmd  = ["vela",
                 network_path,
                 "--output-dir", output_folder,
                 "--accelerator-config", accelerator_conf]
    res = run_cmd(vela_cmd)
    optimized_model_path = output_folder/(network_path.stem + "_vela.tflite")
    model_name = network_path.stem
    dump_c_header(optimized_model_path, output_folder/"model.h", "networkModelData", "network_model_sec", 16, extra_data=f"const char *modelName=\"{model_name}\";\n")

def run_model(output_folder):
    build_folder = output_folder/"model"/"build"
    model_cmd = ["ctest", "-V", "-R", "^baremetal_custom$" ]
    res = run_cmd(model_cmd, cwd=build_folder)

def main():
    target_mapping = {
        "corstone-300": "ethos-u55-128"
    }
    parser = argparse.ArgumentParser()
    parser.add_argument("-o", "--output-folder", type=pathlib.Path, default="output", help="Output folder for build and generated files")
    parser.add_argument("--network-path", type=pathlib.Path, required=True, help="Path to .tflite file")
    parser.add_argument("--target", choices=target_mapping, default="corstone-300", help=f"Configure target")
    parser.add_argument("--toolchain", choices=["armclang", "arm-none-eabi-gcc"], default="armclang", help=f"Configure toolchain")
    parser.add_argument("--memory_model", choices=["sram", "dram"], default="dram", help=f"Configure memory_model")
    parser.add_argument("--memory_arena", choices=["sram", "dram"], default="sram", help=f"Configure memory_arena")
    parser.add_argument("--pmu", type=int, action='append', help="PMU Event Counters")
    parser.add_argument("--custom-input", type=pathlib.Path, help="Custom input to network")
    parser.add_argument("--custom-output", type=pathlib.Path, help="Custom expected output data for network")

    args = parser.parse_args()
    args.output_folder.mkdir(exist_ok=True)

    try:
        optimize_network(args.output_folder, args.network_path, target_mapping[args.target])
        generate_reference_data(args.output_folder, args.network_path, args.custom_input, args.custom_output)
        build_core_platform(args.output_folder, args.target, args.toolchain, args.memory_model, args.memory_arena, args.pmu)
        run_model(args.output_folder)
    except subprocess.CalledProcessError as err:
        print(f"Command: '{err.cmd}' failed", file=sys.stderr)
        return 1
    return 0

if __name__ == "__main__":
    sys.exit(main())