aboutsummaryrefslogtreecommitdiff
path: root/src/mlia/tools/vela_wrapper.py
blob: 7225797de19810cf2fbaf380f928ed00d670dad5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
# SPDX-License-Identifier: Apache-2.0
"""Vela wrapper module."""
import itertools
import logging
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from typing import Dict
from typing import List
from typing import Literal
from typing import Optional
from typing import Tuple
from typing import Union

import numpy as np
from ethosu.vela.architecture_features import ArchitectureFeatures
from ethosu.vela.compiler_driver import compiler_driver
from ethosu.vela.compiler_driver import CompilerOptions
from ethosu.vela.compiler_driver import TensorAllocator
from ethosu.vela.model_reader import ModelReaderOptions
from ethosu.vela.model_reader import read_model
from ethosu.vela.nn_graph import Graph
from ethosu.vela.nn_graph import NetworkType
from ethosu.vela.npu_performance import PassCycles
from ethosu.vela.operation import CustomType
from ethosu.vela.operation import Op
from ethosu.vela.scheduler import OptimizationStrategy
from ethosu.vela.scheduler import SchedulerOptions
from ethosu.vela.tensor import BandwidthDirection
from ethosu.vela.tensor import MemArea
from ethosu.vela.tensor import Tensor
from ethosu.vela.tflite_mapping import optype_to_builtintype
from ethosu.vela.tflite_model_semantic import TFLiteSemantic
from ethosu.vela.tflite_supported_operators import TFLiteSupportedOperators
from ethosu.vela.tflite_writer import write_tflite
from ethosu.vela.vela import generate_supported_ops

from mlia.utils.logging import redirect_output


logger = logging.getLogger(__name__)

VELA_INTERNAL_OPS = (Op.Placeholder, Op.SubgraphInput, Op.Const)


@dataclass
class PerformanceMetrics:  # pylint: disable=too-many-instance-attributes
    """Contains all the performance metrics Vela generates in a run."""

    npu_cycles: int
    sram_access_cycles: int
    dram_access_cycles: int
    on_chip_flash_access_cycles: int
    off_chip_flash_access_cycles: int
    total_cycles: int
    batch_inference_time: float
    inferences_per_second: float
    batch_size: int
    unknown_memory_area_size: int
    sram_memory_area_size: int
    dram_memory_area_size: int
    on_chip_flash_memory_area_size: int
    off_chip_flash_memory_area_size: int


@dataclass
class NpuSupported:
    """Operator's npu supported attribute."""

    supported: bool
    reasons: List[Tuple[str, str]]


@dataclass
class Operator:
    """Model operator."""

    name: str
    op_type: str
    run_on_npu: NpuSupported

    @property
    def cpu_only(self) -> bool:
        """Return true if operator is CPU only."""
        cpu_only_reasons = [("CPU only operator", "")]
        return (
            not self.run_on_npu.supported
            and self.run_on_npu.reasons == cpu_only_reasons
        )


@dataclass
class Operators:
    """Model's operators."""

    ops: List[Operator]

    @property
    def npu_supported_ratio(self) -> float:
        """Return NPU supported ratio."""
        total = self.total_number
        npu_supported = self.npu_supported_number

        if total == 0 or npu_supported == 0:
            return 0

        return npu_supported / total

    @property
    def npu_unsupported_ratio(self) -> float:
        """Return NPU unsupported ratio."""
        return 1 - self.npu_supported_ratio

    @property
    def total_number(self) -> int:
        """Return total number of operators."""
        return len(self.ops)

    @property
    def npu_supported_number(self) -> int:
        """Return number of npu supported operators."""
        return sum(op.run_on_npu.supported for op in self.ops)


@dataclass
class Model:
    """Model metadata."""

    nng: Graph
    network_type: NetworkType

    @property
    def optimized(self) -> bool:
        """Return true if model is already optimized."""
        return any(
            op.attrs.get("custom_type") == CustomType.ExistingNpuOp
            for sg in self.nng.subgraphs
            for op in sg.get_all_ops()
        )


@dataclass
class OptimizedModel:
    """Instance of the Vela optimized model."""

    nng: Graph
    arch: ArchitectureFeatures
    compiler_options: CompilerOptions
    scheduler_options: SchedulerOptions

    def save(self, output_filename: Union[str, Path]) -> None:
        """Save instance of the optimized model to the file."""
        write_tflite(self.nng, output_filename)


AcceleratorConfigType = Literal[
    "ethos-u55-32",
    "ethos-u55-64",
    "ethos-u55-128",
    "ethos-u55-256",
    "ethos-u65-256",
    "ethos-u65-512",
]

TensorAllocatorType = Literal["LinearAlloc", "Greedy", "HillClimb"]

OptimizationStrategyType = Literal["Performance", "Size"]


@dataclass
class VelaCompilerOptions:  # pylint: disable=too-many-instance-attributes
    """Vela compiler options."""

    config_files: Optional[Union[str, List[str]]] = None
    system_config: str = ArchitectureFeatures.DEFAULT_CONFIG
    memory_mode: str = ArchitectureFeatures.DEFAULT_CONFIG
    accelerator_config: Optional[AcceleratorConfigType] = None
    max_block_dependency: int = ArchitectureFeatures.MAX_BLOCKDEP
    arena_cache_size: Optional[int] = None
    tensor_allocator: TensorAllocatorType = "HillClimb"
    cpu_tensor_alignment: int = Tensor.AllocationQuantum
    optimization_strategy: OptimizationStrategyType = "Performance"
    output_dir: Optional[str] = None
    recursion_limit: int = 1000


class VelaCompiler:  # pylint: disable=too-many-instance-attributes
    """Vela compiler wrapper."""

    def __init__(self, compiler_options: VelaCompilerOptions):
        """Init Vela wrapper instance."""
        self.config_files = compiler_options.config_files
        self.system_config = compiler_options.system_config
        self.memory_mode = compiler_options.memory_mode
        self.accelerator_config = compiler_options.accelerator_config
        self.max_block_dependency = compiler_options.max_block_dependency
        self.arena_cache_size = compiler_options.arena_cache_size
        self.tensor_allocator = TensorAllocator[compiler_options.tensor_allocator]
        self.cpu_tensor_alignment = compiler_options.cpu_tensor_alignment
        self.optimization_strategy = OptimizationStrategy[
            compiler_options.optimization_strategy
        ]
        self.output_dir = compiler_options.output_dir
        self.recursion_limit = compiler_options.recursion_limit

        sys.setrecursionlimit(self.recursion_limit)

    def read_model(self, model: Union[str, Path]) -> Model:
        """Read model."""
        logger.debug("Read model %s", model)

        nng, network_type = self._read_model(model)
        return Model(nng, network_type)

    def compile_model(self, model: Union[str, Path, Model]) -> OptimizedModel:
        """Compile the model."""
        if isinstance(model, (str, Path)):
            nng, network_type = self._read_model(model)
        else:
            nng, network_type = model.nng, NetworkType.TFLite

        if not nng:
            raise Exception("Unable to read model")

        try:
            arch = self._architecture_features()
            compiler_options = self._compiler_options()
            scheduler_options = self._scheduler_options()

            with redirect_output(
                logger, stdout_level=logging.DEBUG, stderr_level=logging.DEBUG
            ):
                compiler_driver(
                    nng, arch, compiler_options, scheduler_options, network_type
                )

            return OptimizedModel(nng, arch, compiler_options, scheduler_options)
        except (SystemExit, Exception) as err:
            raise Exception("Model could not be optimized with Vela compiler") from err

    def get_config(self) -> Dict[str, Any]:
        """Get compiler configuration."""
        arch = self._architecture_features()

        memory_area = {
            mem.name: {
                "clock_scales": arch.memory_clock_scales[mem],
                "burst_length": arch.memory_burst_length[mem],
                "read_latency": arch.memory_latency[mem][BandwidthDirection.Read],
                "write_latency": arch.memory_latency[mem][BandwidthDirection.Write],
            }
            for mem in (
                MemArea.Sram,
                MemArea.Dram,
                MemArea.OnChipFlash,
                MemArea.OffChipFlash,
            )
        }

        return {
            "accelerator_config": arch.accelerator_config.value,
            "system_config": arch.system_config,
            "core_clock": arch.core_clock,
            "axi0_port": arch.axi0_port.name,
            "axi1_port": arch.axi1_port.name,
            "memory_mode": arch.memory_mode,
            "const_mem_area": arch.const_mem_area.name,
            "arena_mem_area": arch.arena_mem_area.name,
            "cache_mem_area": arch.cache_mem_area.name,
            "arena_cache_size": arch.arena_cache_size,
            "permanent_storage_mem_area": arch.permanent_storage_mem_area.name,
            "feature_map_storage_mem_area": arch.feature_map_storage_mem_area.name,
            "fast_storage_mem_area": arch.fast_storage_mem_area.name,
            "memory_area": memory_area,
        }

    @staticmethod
    def _read_model(model: Union[str, Path]) -> Tuple[Graph, NetworkType]:
        """Read TFLite model."""
        try:
            model_path = str(model) if isinstance(model, Path) else model

            with redirect_output(
                logger, stdout_level=logging.DEBUG, stderr_level=logging.DEBUG
            ):
                return read_model(model_path, ModelReaderOptions())  # type: ignore
        except (SystemExit, Exception) as err:
            raise Exception(f"Unable to read model {model_path}") from err

    def _architecture_features(self) -> ArchitectureFeatures:
        """Return ArchitectureFeatures instance."""
        return ArchitectureFeatures(
            vela_config_files=self.config_files,
            accelerator_config=self.accelerator_config,
            system_config=self.system_config,
            memory_mode=self.memory_mode,
            max_blockdep=self.max_block_dependency,
            verbose_config=False,
            arena_cache_size=self.arena_cache_size,
        )

    def _scheduler_options(self) -> SchedulerOptions:
        """Return SchedulerOptions instance."""
        arch = self._architecture_features()

        return SchedulerOptions(
            optimization_strategy=self.optimization_strategy,
            sram_target=arch.arena_cache_size,
            verbose_schedule=False,
        )

    def _compiler_options(self) -> CompilerOptions:
        """Return CompilerOptions instance."""
        return CompilerOptions(
            verbose_graph=False,
            verbose_quantization=False,
            verbose_packing=False,
            verbose_tensor_purpose=False,
            verbose_tensor_format=False,
            verbose_allocation=False,
            verbose_high_level_command_stream=False,
            verbose_register_command_stream=False,
            verbose_operators=False,
            verbose_weights=False,
            show_cpu_operations=False,
            tensor_allocator=self.tensor_allocator,
            timing=False,
            output_dir=self.output_dir,
            cpu_tensor_alignment=self.cpu_tensor_alignment,
        )


def resolve_compiler_config(
    vela_compiler_options: VelaCompilerOptions,
) -> Dict[str, Any]:
    """Resolve passed compiler options.

    Vela has number of configuration parameters that being
    resolved during passing compiler options. E.g. Vela
    reads configuration parameters from vela.ini and fills
    it's internal structures with resolved values (memory mode,
    system mode, etc.).

    In order to get this information we need to create
    instance of the Vela compiler first.
    """
    vela_compiler = VelaCompiler(vela_compiler_options)
    return vela_compiler.get_config()


def estimate_performance(
    model_path: Path, compiler_options: VelaCompilerOptions
) -> PerformanceMetrics:
    """Return performance estimations for the model/device.

    Logic for this function comes from Vela module stats_writer.py
    """
    logger.debug(
        "Estimate performance for the model %s on %s",
        model_path,
        compiler_options.accelerator_config,
    )

    vela_compiler = VelaCompiler(compiler_options)

    initial_model = vela_compiler.read_model(model_path)
    if initial_model.optimized:
        raise Exception("Unable to estimate performance for the given optimized model")

    optimized_model = vela_compiler.compile_model(initial_model)

    return _performance_metrics(optimized_model)


def optimize_model(
    model_path: Path, compiler_options: VelaCompilerOptions, output_model_path: Path
) -> None:
    """Optimize model and return it's path after optimization."""
    logger.debug(
        "Optimize model %s for device %s",
        model_path,
        compiler_options.accelerator_config,
    )

    vela_compiler = VelaCompiler(compiler_options)
    optimized_model = vela_compiler.compile_model(model_path)

    logger.debug("Save optimized model into %s", output_model_path)
    optimized_model.save(output_model_path)


def _performance_metrics(optimized_model: OptimizedModel) -> PerformanceMetrics:
    """Return performance metrics for optimized model."""
    cycles = optimized_model.nng.cycles

    def memory_usage(mem_area: MemArea) -> int:
        """Get memory usage for the proviced memory area type."""
        memory_used: Dict[MemArea, int] = optimized_model.nng.memory_used
        bandwidths = optimized_model.nng.bandwidths

        return memory_used.get(mem_area, 0) if np.sum(bandwidths[mem_area]) > 0 else 0

    midpoint_fps = np.nan
    midpoint_inference_time = cycles[PassCycles.Total] / optimized_model.arch.core_clock
    if midpoint_inference_time > 0:
        midpoint_fps = 1 / midpoint_inference_time

    return PerformanceMetrics(
        npu_cycles=int(cycles[PassCycles.Npu]),
        sram_access_cycles=int(cycles[PassCycles.SramAccess]),
        dram_access_cycles=int(cycles[PassCycles.DramAccess]),
        on_chip_flash_access_cycles=int(cycles[PassCycles.OnChipFlashAccess]),
        off_chip_flash_access_cycles=int(cycles[PassCycles.OffChipFlashAccess]),
        total_cycles=int(cycles[PassCycles.Total]),
        batch_inference_time=midpoint_inference_time * 1000,
        inferences_per_second=midpoint_fps,
        batch_size=optimized_model.nng.batch_size,
        unknown_memory_area_size=memory_usage(MemArea.Unknown),
        sram_memory_area_size=memory_usage(MemArea.Sram),
        dram_memory_area_size=memory_usage(MemArea.Dram),
        on_chip_flash_memory_area_size=memory_usage(MemArea.OnChipFlash),
        off_chip_flash_memory_area_size=memory_usage(MemArea.OffChipFlash),
    )


def supported_operators(
    model_path: Path, compiler_options: VelaCompilerOptions
) -> Operators:
    """Return list of model's operators."""
    logger.debug("Check supported operators for the model %s", model_path)

    vela_compiler = VelaCompiler(compiler_options)
    initial_model = vela_compiler.read_model(model_path)

    return Operators(
        [
            Operator(op.name, optype_to_builtintype(op.type), run_on_npu(op))
            for sg in initial_model.nng.subgraphs
            for op in sg.get_all_ops()
            if op.type not in VELA_INTERNAL_OPS
        ]
    )


def run_on_npu(operator: Op) -> NpuSupported:
    """Return information if operator can run on NPU.

    Vela does a number of checks that can help establish whether
    a particular operator is supported to run on NPU.

    There are two groups of checks:
      - general TFLite constraints
      - operator specific constraints

    If an operator is not supported on NPU then this function
    will return the reason of that.

    The reason is split in two parts:
      - general description of why the operator cannot be placed on NPU
      - details on the particular operator
    """
    semantic_checker = TFLiteSemantic()
    semantic_constraints = itertools.chain(
        semantic_checker.generic_constraints,
        semantic_checker.specific_constraints[operator.type],
    )

    for constraint in semantic_constraints:
        op_valid, op_reason = constraint(operator)
        if not op_valid:
            return NpuSupported(False, [(constraint.__doc__, op_reason)])

    if operator.type not in TFLiteSupportedOperators.supported_operators:
        reasons = (
            [("CPU only operator", "")]
            if operator.type not in VELA_INTERNAL_OPS
            else []
        )

        return NpuSupported(False, reasons)

    tflite_supported_operators = TFLiteSupportedOperators()
    operation_constraints = itertools.chain(
        tflite_supported_operators.generic_constraints,
        tflite_supported_operators.specific_constraints[operator.type],
    )
    for constraint in operation_constraints:
        op_valid, op_reason = constraint(operator)
        if not op_valid:
            return NpuSupported(False, [(constraint.__doc__, op_reason)])

    return NpuSupported(True, [])


def generate_supported_operators_report() -> None:
    """Generate supported operators report in current working directory."""
    with redirect_output(logger):
        generate_supported_ops()