# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. # SPDX-License-Identifier: Apache-2.0 """Vela wrapper module.""" import itertools import logging import sys from dataclasses import dataclass from pathlib import Path from typing import Any from typing import Dict from typing import List from typing import Literal from typing import Optional from typing import Tuple from typing import Union import numpy as np from ethosu.vela.architecture_features import ArchitectureFeatures from ethosu.vela.compiler_driver import compiler_driver from ethosu.vela.compiler_driver import CompilerOptions from ethosu.vela.compiler_driver import TensorAllocator from ethosu.vela.model_reader import ModelReaderOptions from ethosu.vela.model_reader import read_model from ethosu.vela.nn_graph import Graph from ethosu.vela.nn_graph import NetworkType from ethosu.vela.npu_performance import PassCycles from ethosu.vela.operation import CustomType from ethosu.vela.operation import Op from ethosu.vela.scheduler import OptimizationStrategy from ethosu.vela.scheduler import SchedulerOptions from ethosu.vela.tensor import BandwidthDirection from ethosu.vela.tensor import MemArea from ethosu.vela.tensor import Tensor from ethosu.vela.tflite_mapping import optype_to_builtintype from ethosu.vela.tflite_model_semantic import TFLiteSemantic from ethosu.vela.tflite_supported_operators import TFLiteSupportedOperators from ethosu.vela.tflite_writer import write_tflite from ethosu.vela.vela import generate_supported_ops from mlia.utils.logging import redirect_output logger = logging.getLogger(__name__) VELA_INTERNAL_OPS = (Op.Placeholder, Op.SubgraphInput, Op.Const) @dataclass class PerformanceMetrics: # pylint: disable=too-many-instance-attributes """Contains all the performance metrics Vela generates in a run.""" npu_cycles: int sram_access_cycles: int dram_access_cycles: int on_chip_flash_access_cycles: int off_chip_flash_access_cycles: int total_cycles: int batch_inference_time: float inferences_per_second: float batch_size: int unknown_memory_area_size: int sram_memory_area_size: int dram_memory_area_size: int on_chip_flash_memory_area_size: int off_chip_flash_memory_area_size: int @dataclass class NpuSupported: """Operator's npu supported attribute.""" supported: bool reasons: List[Tuple[str, str]] @dataclass class Operator: """Model operator.""" name: str op_type: str run_on_npu: NpuSupported @property def cpu_only(self) -> bool: """Return true if operator is CPU only.""" cpu_only_reasons = [("CPU only operator", "")] return ( not self.run_on_npu.supported and self.run_on_npu.reasons == cpu_only_reasons ) @dataclass class Operators: """Model's operators.""" ops: List[Operator] @property def npu_supported_ratio(self) -> float: """Return NPU supported ratio.""" total = self.total_number npu_supported = self.npu_supported_number if total == 0 or npu_supported == 0: return 0 return npu_supported / total @property def npu_unsupported_ratio(self) -> float: """Return NPU unsupported ratio.""" return 1 - self.npu_supported_ratio @property def total_number(self) -> int: """Return total number of operators.""" return len(self.ops) @property def npu_supported_number(self) -> int: """Return number of npu supported operators.""" return sum(op.run_on_npu.supported for op in self.ops) @dataclass class Model: """Model metadata.""" nng: Graph network_type: NetworkType @property def optimized(self) -> bool: """Return true if model is already optimized.""" return any( op.attrs.get("custom_type") == CustomType.ExistingNpuOp for sg in self.nng.subgraphs for op in sg.get_all_ops() ) @dataclass class OptimizedModel: """Instance of the Vela optimized model.""" nng: Graph arch: ArchitectureFeatures compiler_options: CompilerOptions scheduler_options: SchedulerOptions def save(self, output_filename: Union[str, Path]) -> None: """Save instance of the optimized model to the file.""" write_tflite(self.nng, output_filename) AcceleratorConfigType = Literal[ "ethos-u55-32", "ethos-u55-64", "ethos-u55-128", "ethos-u55-256", "ethos-u65-256", "ethos-u65-512", ] TensorAllocatorType = Literal["LinearAlloc", "Greedy", "HillClimb"] OptimizationStrategyType = Literal["Performance", "Size"] @dataclass class VelaCompilerOptions: # pylint: disable=too-many-instance-attributes """Vela compiler options.""" config_files: Optional[Union[str, List[str]]] = None system_config: str = ArchitectureFeatures.DEFAULT_CONFIG memory_mode: str = ArchitectureFeatures.DEFAULT_CONFIG accelerator_config: Optional[AcceleratorConfigType] = None max_block_dependency: int = ArchitectureFeatures.MAX_BLOCKDEP arena_cache_size: Optional[int] = None tensor_allocator: TensorAllocatorType = "HillClimb" cpu_tensor_alignment: int = Tensor.AllocationQuantum optimization_strategy: OptimizationStrategyType = "Performance" output_dir: Optional[str] = None recursion_limit: int = 1000 class VelaCompiler: # pylint: disable=too-many-instance-attributes """Vela compiler wrapper.""" def __init__(self, compiler_options: VelaCompilerOptions): """Init Vela wrapper instance.""" self.config_files = compiler_options.config_files self.system_config = compiler_options.system_config self.memory_mode = compiler_options.memory_mode self.accelerator_config = compiler_options.accelerator_config self.max_block_dependency = compiler_options.max_block_dependency self.arena_cache_size = compiler_options.arena_cache_size self.tensor_allocator = TensorAllocator[compiler_options.tensor_allocator] self.cpu_tensor_alignment = compiler_options.cpu_tensor_alignment self.optimization_strategy = OptimizationStrategy[ compiler_options.optimization_strategy ] self.output_dir = compiler_options.output_dir self.recursion_limit = compiler_options.recursion_limit sys.setrecursionlimit(self.recursion_limit) def read_model(self, model: Union[str, Path]) -> Model: """Read model.""" logger.debug("Read model %s", model) nng, network_type = self._read_model(model) return Model(nng, network_type) def compile_model(self, model: Union[str, Path, Model]) -> OptimizedModel: """Compile the model.""" if isinstance(model, (str, Path)): nng, network_type = self._read_model(model) else: nng, network_type = model.nng, NetworkType.TFLite if not nng: raise Exception("Unable to read model") try: arch = self._architecture_features() compiler_options = self._compiler_options() scheduler_options = self._scheduler_options() with redirect_output( logger, stdout_level=logging.DEBUG, stderr_level=logging.DEBUG ): compiler_driver( nng, arch, compiler_options, scheduler_options, network_type ) return OptimizedModel(nng, arch, compiler_options, scheduler_options) except (SystemExit, Exception) as err: raise Exception("Model could not be optimized with Vela compiler") from err def get_config(self) -> Dict[str, Any]: """Get compiler configuration.""" arch = self._architecture_features() memory_area = { mem.name: { "clock_scales": arch.memory_clock_scales[mem], "burst_length": arch.memory_burst_length[mem], "read_latency": arch.memory_latency[mem][BandwidthDirection.Read], "write_latency": arch.memory_latency[mem][BandwidthDirection.Write], } for mem in ( MemArea.Sram, MemArea.Dram, MemArea.OnChipFlash, MemArea.OffChipFlash, ) } return { "accelerator_config": arch.accelerator_config.value, "system_config": arch.system_config, "core_clock": arch.core_clock, "axi0_port": arch.axi0_port.name, "axi1_port": arch.axi1_port.name, "memory_mode": arch.memory_mode, "const_mem_area": arch.const_mem_area.name, "arena_mem_area": arch.arena_mem_area.name, "cache_mem_area": arch.cache_mem_area.name, "arena_cache_size": arch.arena_cache_size, "permanent_storage_mem_area": arch.permanent_storage_mem_area.name, "feature_map_storage_mem_area": arch.feature_map_storage_mem_area.name, "fast_storage_mem_area": arch.fast_storage_mem_area.name, "memory_area": memory_area, } @staticmethod def _read_model(model: Union[str, Path]) -> Tuple[Graph, NetworkType]: """Read TFLite model.""" try: model_path = str(model) if isinstance(model, Path) else model with redirect_output( logger, stdout_level=logging.DEBUG, stderr_level=logging.DEBUG ): return read_model(model_path, ModelReaderOptions()) # type: ignore except (SystemExit, Exception) as err: raise Exception(f"Unable to read model {model_path}") from err def _architecture_features(self) -> ArchitectureFeatures: """Return ArchitectureFeatures instance.""" return ArchitectureFeatures( vela_config_files=self.config_files, accelerator_config=self.accelerator_config, system_config=self.system_config, memory_mode=self.memory_mode, max_blockdep=self.max_block_dependency, verbose_config=False, arena_cache_size=self.arena_cache_size, ) def _scheduler_options(self) -> SchedulerOptions: """Return SchedulerOptions instance.""" arch = self._architecture_features() return SchedulerOptions( optimization_strategy=self.optimization_strategy, sram_target=arch.arena_cache_size, verbose_schedule=False, ) def _compiler_options(self) -> CompilerOptions: """Return CompilerOptions instance.""" return CompilerOptions( verbose_graph=False, verbose_quantization=False, verbose_packing=False, verbose_tensor_purpose=False, verbose_tensor_format=False, verbose_allocation=False, verbose_high_level_command_stream=False, verbose_register_command_stream=False, verbose_operators=False, verbose_weights=False, show_cpu_operations=False, tensor_allocator=self.tensor_allocator, timing=False, output_dir=self.output_dir, cpu_tensor_alignment=self.cpu_tensor_alignment, ) def resolve_compiler_config( vela_compiler_options: VelaCompilerOptions, ) -> Dict[str, Any]: """Resolve passed compiler options. Vela has number of configuration parameters that being resolved during passing compiler options. E.g. Vela reads configuration parameters from vela.ini and fills it's internal structures with resolved values (memory mode, system mode, etc.). In order to get this information we need to create instance of the Vela compiler first. """ vela_compiler = VelaCompiler(vela_compiler_options) return vela_compiler.get_config() def estimate_performance( model_path: Path, compiler_options: VelaCompilerOptions ) -> PerformanceMetrics: """Return performance estimations for the model/device. Logic for this function comes from Vela module stats_writer.py """ logger.debug( "Estimate performance for the model %s on %s", model_path, compiler_options.accelerator_config, ) vela_compiler = VelaCompiler(compiler_options) initial_model = vela_compiler.read_model(model_path) if initial_model.optimized: raise Exception("Unable to estimate performance for the given optimized model") optimized_model = vela_compiler.compile_model(initial_model) return _performance_metrics(optimized_model) def optimize_model( model_path: Path, compiler_options: VelaCompilerOptions, output_model_path: Path ) -> None: """Optimize model and return it's path after optimization.""" logger.debug( "Optimize model %s for device %s", model_path, compiler_options.accelerator_config, ) vela_compiler = VelaCompiler(compiler_options) optimized_model = vela_compiler.compile_model(model_path) logger.debug("Save optimized model into %s", output_model_path) optimized_model.save(output_model_path) def _performance_metrics(optimized_model: OptimizedModel) -> PerformanceMetrics: """Return performance metrics for optimized model.""" cycles = optimized_model.nng.cycles def memory_usage(mem_area: MemArea) -> int: """Get memory usage for the proviced memory area type.""" memory_used: Dict[MemArea, int] = optimized_model.nng.memory_used bandwidths = optimized_model.nng.bandwidths return memory_used.get(mem_area, 0) if np.sum(bandwidths[mem_area]) > 0 else 0 midpoint_fps = np.nan midpoint_inference_time = cycles[PassCycles.Total] / optimized_model.arch.core_clock if midpoint_inference_time > 0: midpoint_fps = 1 / midpoint_inference_time return PerformanceMetrics( npu_cycles=int(cycles[PassCycles.Npu]), sram_access_cycles=int(cycles[PassCycles.SramAccess]), dram_access_cycles=int(cycles[PassCycles.DramAccess]), on_chip_flash_access_cycles=int(cycles[PassCycles.OnChipFlashAccess]), off_chip_flash_access_cycles=int(cycles[PassCycles.OffChipFlashAccess]), total_cycles=int(cycles[PassCycles.Total]), batch_inference_time=midpoint_inference_time * 1000, inferences_per_second=midpoint_fps, batch_size=optimized_model.nng.batch_size, unknown_memory_area_size=memory_usage(MemArea.Unknown), sram_memory_area_size=memory_usage(MemArea.Sram), dram_memory_area_size=memory_usage(MemArea.Dram), on_chip_flash_memory_area_size=memory_usage(MemArea.OnChipFlash), off_chip_flash_memory_area_size=memory_usage(MemArea.OffChipFlash), ) def supported_operators( model_path: Path, compiler_options: VelaCompilerOptions ) -> Operators: """Return list of model's operators.""" logger.debug("Check supported operators for the model %s", model_path) vela_compiler = VelaCompiler(compiler_options) initial_model = vela_compiler.read_model(model_path) return Operators( [ Operator(op.name, optype_to_builtintype(op.type), run_on_npu(op)) for sg in initial_model.nng.subgraphs for op in sg.get_all_ops() if op.type not in VELA_INTERNAL_OPS ] ) def run_on_npu(operator: Op) -> NpuSupported: """Return information if operator can run on NPU. Vela does a number of checks that can help establish whether a particular operator is supported to run on NPU. There are two groups of checks: - general TFLite constraints - operator specific constraints If an operator is not supported on NPU then this function will return the reason of that. The reason is split in two parts: - general description of why the operator cannot be placed on NPU - details on the particular operator """ semantic_checker = TFLiteSemantic() semantic_constraints = itertools.chain( semantic_checker.generic_constraints, semantic_checker.specific_constraints[operator.type], ) for constraint in semantic_constraints: op_valid, op_reason = constraint(operator) if not op_valid: return NpuSupported(False, [(constraint.__doc__, op_reason)]) if operator.type not in TFLiteSupportedOperators.supported_operators: reasons = ( [("CPU only operator", "")] if operator.type not in VELA_INTERNAL_OPS else [] ) return NpuSupported(False, reasons) tflite_supported_operators = TFLiteSupportedOperators() operation_constraints = itertools.chain( tflite_supported_operators.generic_constraints, tflite_supported_operators.specific_constraints[operator.type], ) for constraint in operation_constraints: op_valid, op_reason = constraint(operator) if not op_valid: return NpuSupported(False, [(constraint.__doc__, op_reason)]) return NpuSupported(True, []) def generate_supported_operators_report() -> None: """Generate supported operators report in current working directory.""" with redirect_output(logger): generate_supported_ops()