From 0efca3cadbad5517a59884576ddb90cfe7ac30f8 Mon Sep 17 00:00:00 2001 From: Diego Russo Date: Mon, 30 May 2022 13:34:14 +0100 Subject: Add MLIA codebase Add MLIA codebase including sources and tests. Change-Id: Id41707559bd721edd114793618d12ccd188d8dbd --- src/mlia/devices/__init__.py | 3 + src/mlia/devices/config.py | 11 + src/mlia/devices/ethosu/__init__.py | 3 + src/mlia/devices/ethosu/advice_generation.py | 209 ++++++++++++++ src/mlia/devices/ethosu/advisor.py | 151 ++++++++++ src/mlia/devices/ethosu/config.py | 89 ++++++ src/mlia/devices/ethosu/data_analysis.py | 154 +++++++++++ src/mlia/devices/ethosu/data_collection.py | 188 +++++++++++++ src/mlia/devices/ethosu/events.py | 24 ++ src/mlia/devices/ethosu/handlers.py | 146 ++++++++++ src/mlia/devices/ethosu/operators.py | 14 + src/mlia/devices/ethosu/performance.py | 257 +++++++++++++++++ src/mlia/devices/ethosu/reporters.py | 398 +++++++++++++++++++++++++++ 13 files changed, 1647 insertions(+) create mode 100644 src/mlia/devices/__init__.py create mode 100644 src/mlia/devices/config.py create mode 100644 src/mlia/devices/ethosu/__init__.py create mode 100644 src/mlia/devices/ethosu/advice_generation.py create mode 100644 src/mlia/devices/ethosu/advisor.py create mode 100644 src/mlia/devices/ethosu/config.py create mode 100644 src/mlia/devices/ethosu/data_analysis.py create mode 100644 src/mlia/devices/ethosu/data_collection.py create mode 100644 src/mlia/devices/ethosu/events.py create mode 100644 src/mlia/devices/ethosu/handlers.py create mode 100644 src/mlia/devices/ethosu/operators.py create mode 100644 src/mlia/devices/ethosu/performance.py create mode 100644 src/mlia/devices/ethosu/reporters.py (limited to 'src/mlia/devices') diff --git a/src/mlia/devices/__init__.py b/src/mlia/devices/__init__.py new file mode 100644 index 0000000..d533f4a --- /dev/null +++ b/src/mlia/devices/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Devices module.""" diff --git a/src/mlia/devices/config.py b/src/mlia/devices/config.py new file mode 100644 index 0000000..7ab6b43 --- /dev/null +++ b/src/mlia/devices/config.py @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""IP configuration module.""" + + +class IPConfiguration: # pylint: disable=too-few-public-methods + """Base class for IP configuration.""" + + def __init__(self, target: str) -> None: + """Init IP configuration instance.""" + self.target = target diff --git a/src/mlia/devices/ethosu/__init__.py b/src/mlia/devices/ethosu/__init__.py new file mode 100644 index 0000000..73925e1 --- /dev/null +++ b/src/mlia/devices/ethosu/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Ethos-U devices module.""" diff --git a/src/mlia/devices/ethosu/advice_generation.py b/src/mlia/devices/ethosu/advice_generation.py new file mode 100644 index 0000000..7a818c9 --- /dev/null +++ b/src/mlia/devices/ethosu/advice_generation.py @@ -0,0 +1,209 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Ethos-U advice generation.""" +from functools import singledispatchmethod +from typing import List +from typing import Union + +from mlia.core.advice_generation import Advice +from mlia.core.advice_generation import advice_category +from mlia.core.advice_generation import ContextAwareAdviceProducer +from mlia.core.advice_generation import FactBasedAdviceProducer +from mlia.core.common import AdviceCategory +from mlia.core.common import DataItem +from mlia.devices.ethosu.data_analysis import AllOperatorsSupportedOnNPU +from mlia.devices.ethosu.data_analysis import HasCPUOnlyOperators +from mlia.devices.ethosu.data_analysis import HasUnsupportedOnNPUOperators +from mlia.devices.ethosu.data_analysis import OptimizationResults +from mlia.nn.tensorflow.optimizations.select import OptimizationSettings + + +class EthosUAdviceProducer(FactBasedAdviceProducer): + """Ethos-U advice producer.""" + + @singledispatchmethod + def produce_advice(self, data_item: DataItem) -> None: + """Produce advice.""" + + @produce_advice.register + @advice_category(AdviceCategory.OPERATORS, AdviceCategory.ALL) + def handle_cpu_only_ops(self, data_item: HasCPUOnlyOperators) -> None: + """Advice for CPU only operators.""" + cpu_only_ops = ",".join(sorted(set(data_item.cpu_only_ops))) + cpu_only_ops_num = len(data_item.cpu_only_ops) + + self.add_advice( + [ + f"You have at least {cpu_only_ops_num} " + f"operator{'s' if cpu_only_ops_num > 1 else ''} that is CPU " + f"only: {cpu_only_ops}.", + "Using operators that are supported by the NPU will " + "improve performance.", + ] + + self.context.action_resolver.supported_operators_info() + ) + + @produce_advice.register + @advice_category(AdviceCategory.OPERATORS, AdviceCategory.ALL) + def handle_unsupported_operators( + self, data_item: HasUnsupportedOnNPUOperators + ) -> None: + """Advice for the unsupported operators.""" + self.add_advice( + [ + f"You have {data_item.npu_unsupported_ratio*100:.0f}% of operators " + "that cannot be placed on the NPU.", + "For better performance, please review the reasons reported " + "in the table, and adjust the model accordingly " + "where possible.", + ] + ) + + @produce_advice.register + @advice_category(AdviceCategory.OPERATORS, AdviceCategory.ALL) + def handle_all_operators_supported( + self, _data_item: AllOperatorsSupportedOnNPU + ) -> None: + """Advice if all operators supported.""" + self.add_advice( + [ + "You don't have any unsupported operators, your model will " + "run completely on NPU." + ] + + self.context.action_resolver.check_performance() + ) + + @produce_advice.register + @advice_category(AdviceCategory.OPTIMIZATION, AdviceCategory.ALL) + def handle_optimization_results(self, data_item: OptimizationResults) -> None: + """Advice based on optimization results.""" + if not data_item.diffs or len(data_item.diffs) != 1: + return + + optim_details = data_item.diffs[0] + metrics = [ + (metric_name, optim_details.opt_diffs[metric_key]) + for (metric_name, metric_key) in ( + ("DRAM used (KB)", "dram"), + ("SRAM used (KB)", "sram"), + ("On chip flash used (KB)", "on_chip_flash"), + ("Off chip flash used (KB)", "off_chip_flash"), + ("NPU total cycles", "npu_total_cycles"), + ) + if metric_key in optim_details.opt_diffs + and not optim_details.opt_diffs[metric_key].same + ] + + improved = [ + f"- You have achieved {abs(metric_value.diff):.2f}% performance " + f"improvement in {metric_name}" + for metric_name, metric_value in metrics + if metric_value.improved + ] + + degraded = [ + f"- {metric_name} have degraded by {abs(metric_value.diff):.2f}%" + for metric_name, metric_value in metrics + if metric_value.degraded + ] + + opts = ", ".join(str(s) for s in optim_details.opt_type) + messages = [f"With the selected optimization ({opts})", *improved, *degraded] + + if improved: + if next_optimization_target := self.get_next_optimization_targets( + optim_details.opt_type + ): + next_optimization_target_as_str = " and/or ".join( + str(item) for item in next_optimization_target + ) + + messages.append( + "You can try to push the optimization target higher " + f"(e.g. {next_optimization_target_as_str}) " + "to check if those results can be further improved." + ) + messages += self.context.action_resolver.apply_optimizations( + opt_settings=next_optimization_target + ) + + elif degraded: + messages.append( + "The performance seems to have degraded after " + "applying the selected optimizations, " + "try exploring different optimization types/targets." + ) + + self.add_advice(messages) + + self.add_advice( + [ + "The applied tooling techniques have an impact " + "on accuracy. Additional hyperparameter tuning may be required " + "after any optimization." + ] + ) + + @staticmethod + def get_next_optimization_targets( + opt_type: List[OptimizationSettings], + ) -> List[OptimizationSettings]: + """Get next optimization targets.""" + next_targets = (item.next_target() for item in opt_type) + + # filter out targets that have not been changed + valid_targets = [ + next_ + for next_, old in zip(next_targets, opt_type) + if ( + old.optimization_type == "pruning" + and old.optimization_target < next_.optimization_target + ) + or ( + old.optimization_type == "clustering" + and old.optimization_target > next_.optimization_target + ) + ] + return valid_targets + + +class EthosUStaticAdviceProducer(ContextAwareAdviceProducer): + """Advice producer that not depends on input data.""" + + def produce_advice(self, data_item: DataItem) -> None: + """Do not process passed data items.""" + + def get_advice(self) -> Union[Advice, List[Advice]]: + """Return predefined advice based on category.""" + if self.context.advice_category is None: + return [] + + advice_per_category = { + AdviceCategory.PERFORMANCE: [ + Advice( + [ + "You can improve the inference time by using only operators " + "that are supported by the NPU.", + ] + + self.context.action_resolver.check_operator_compatibility() + ), + Advice( + [ + "Check if you can improve the performance by applying " + "tooling techniques to your model." + ] + + self.context.action_resolver.apply_optimizations() + ), + ], + AdviceCategory.OPTIMIZATION: [ + Advice( + [ + "For better performance, make sure that all the operators " + "of your final TFLite model are supported by the NPU.", + ] + + self.context.action_resolver.operator_compatibility_details() + ) + ], + } + + return advice_per_category.get(self.context.advice_category, []) diff --git a/src/mlia/devices/ethosu/advisor.py b/src/mlia/devices/ethosu/advisor.py new file mode 100644 index 0000000..802826b --- /dev/null +++ b/src/mlia/devices/ethosu/advisor.py @@ -0,0 +1,151 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Ethos-U MLIA module.""" +from pathlib import Path +from typing import List +from typing import Optional + +from mlia.core.advice_generation import AdviceProducer +from mlia.core.advisor import InferenceAdvisor +from mlia.core.common import AdviceCategory +from mlia.core.context import Context +from mlia.core.data_analysis import DataAnalyzer +from mlia.core.data_collection import DataCollector +from mlia.core.mixins import ParameterResolverMixin +from mlia.core.workflow import DefaultWorkflowExecutor +from mlia.core.workflow import WorkflowExecutor +from mlia.devices.ethosu.advice_generation import EthosUAdviceProducer +from mlia.devices.ethosu.advice_generation import EthosUStaticAdviceProducer +from mlia.devices.ethosu.config import EthosUConfiguration +from mlia.devices.ethosu.config import get_target +from mlia.devices.ethosu.data_analysis import EthosUDataAnalyzer +from mlia.devices.ethosu.data_collection import EthosUOperatorCompatibility +from mlia.devices.ethosu.data_collection import EthosUOptimizationPerformance +from mlia.devices.ethosu.data_collection import EthosUPerformance +from mlia.devices.ethosu.events import EthosUAdvisorStartedEvent + + +class EthosUInferenceAdvisor(InferenceAdvisor, ParameterResolverMixin): + """Ethos-U Inference Advisor.""" + + @classmethod + def name(cls) -> str: + """Return name of the advisor.""" + return "ethos_u_inference_advisor" + + def configure(self, context: Context) -> WorkflowExecutor: + """Configure advisor execution.""" + model = self._get_model(context) + device = self._get_device(context) + backends = self._get_backends(context) + + collectors = self._get_collectors(context, model, device, backends) + analyzers = self._get_analyzers() + producers = self._get_advice_producers() + + return DefaultWorkflowExecutor( + context, + collectors, + analyzers, + producers, + before_start_events=[ + EthosUAdvisorStartedEvent(device=device, model=model), + ], + ) + + def _get_collectors( + self, + context: Context, + model: Path, + device: EthosUConfiguration, + backends: Optional[List[str]], + ) -> List[DataCollector]: + """Get collectors.""" + collectors: List[DataCollector] = [] + + if context.any_category_enabled( + AdviceCategory.OPERATORS, + AdviceCategory.ALL, + ): + collectors.append(EthosUOperatorCompatibility(model, device)) + + if context.category_enabled(AdviceCategory.PERFORMANCE): + collectors.append(EthosUPerformance(model, device, backends)) + + if context.any_category_enabled( + AdviceCategory.OPTIMIZATION, + AdviceCategory.ALL, + ): + optimization_settings = self._get_optimization_settings(context) + collectors.append( + EthosUOptimizationPerformance( + model, device, optimization_settings, backends + ) + ) + + return collectors + + @staticmethod + def _get_analyzers() -> List[DataAnalyzer]: + """Return data analyzers.""" + return [ + EthosUDataAnalyzer(), + ] + + @staticmethod + def _get_advice_producers() -> List[AdviceProducer]: + """Return advice producers.""" + return [ + EthosUAdviceProducer(), + EthosUStaticAdviceProducer(), + ] + + def _get_device(self, context: Context) -> EthosUConfiguration: + """Get device.""" + device_params = self.get_parameter( + self.name(), + "device", + expected_type=dict, + context=context, + ) + + try: + target_profile = device_params["target_profile"] + except KeyError as err: + raise Exception("Unable to get device details") from err + + return get_target(target_profile) + + def _get_model(self, context: Context) -> Path: + """Get path to the model.""" + model_param = self.get_parameter( + self.name(), + "model", + expected_type=str, + context=context, + ) + + if not (model := Path(model_param)).exists(): + raise Exception(f"Path {model} does not exist") + + return model + + def _get_optimization_settings(self, context: Context) -> List[List[dict]]: + """Get optimization settings.""" + return self.get_parameter( # type: ignore + EthosUOptimizationPerformance.name(), + "optimizations", + expected_type=list, + expected=False, + context=context, + ) + + def _get_backends(self, context: Context) -> Optional[List[str]]: + """Get list of backends.""" + return self.get_parameter( # type: ignore + self.name(), + "backends", + expected_type=list, + expected=False, + context=context, + ) diff --git a/src/mlia/devices/ethosu/config.py b/src/mlia/devices/ethosu/config.py new file mode 100644 index 0000000..cecbb27 --- /dev/null +++ b/src/mlia/devices/ethosu/config.py @@ -0,0 +1,89 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Ethos-U configuration.""" +import logging +from typing import Any +from typing import Dict + +from mlia.devices.config import IPConfiguration +from mlia.tools.vela_wrapper import resolve_compiler_config +from mlia.tools.vela_wrapper import VelaCompilerOptions +from mlia.utils.filesystem import get_profile +from mlia.utils.filesystem import get_vela_config + + +logger = logging.getLogger(__name__) + + +class EthosUConfiguration(IPConfiguration): + """Ethos-U configuration.""" + + def __init__(self, target_profile: str) -> None: + """Init Ethos-U target configuration.""" + target_data = get_profile(target_profile) + _check_target_data_complete(target_data) + + target = target_data["target"] + super().__init__(target) + + mac = target_data["mac"] + _check_device_options_valid(target, mac) + + self.mac = mac + self.compiler_options = VelaCompilerOptions( + system_config=target_data["system_config"], + memory_mode=target_data["memory_mode"], + config_files=str(get_vela_config()), + accelerator_config=f"{self.target}-{mac}", # type: ignore + ) + + @property + def resolved_compiler_config(self) -> Dict[str, Any]: + """Resolve compiler configuration.""" + return resolve_compiler_config(self.compiler_options) + + def __str__(self) -> str: + """Return string representation.""" + return ( + f"Ethos-U target={self.target} " + f"mac={self.mac} " + f"compiler_options={self.compiler_options}" + ) + + def __repr__(self) -> str: + """Return string representation.""" + return f"" + + +def get_target(target_profile: str) -> EthosUConfiguration: + """Get target instance based on provided params.""" + if not target_profile: + raise Exception("No target profile given") + + return EthosUConfiguration(target_profile) + + +def _check_target_data_complete(target_data: Dict[str, Any]) -> None: + """Check if profile contains all needed data.""" + mandatory_keys = {"target", "mac", "system_config", "memory_mode"} + missing_keys = sorted(mandatory_keys - target_data.keys()) + + if missing_keys: + raise Exception(f"Mandatory fields missing from target profile: {missing_keys}") + + +def _check_device_options_valid(target: str, mac: int) -> None: + """Check if mac is valid for selected device.""" + target_mac_ranges = { + "ethos-u55": [32, 64, 128, 256], + "ethos-u65": [256, 512], + } + + if target not in target_mac_ranges: + raise Exception(f"Unsupported target: {target}") + + target_mac_range = target_mac_ranges[target] + if mac not in target_mac_range: + raise Exception( + f"Mac value for selected device should be in {target_mac_range}" + ) diff --git a/src/mlia/devices/ethosu/data_analysis.py b/src/mlia/devices/ethosu/data_analysis.py new file mode 100644 index 0000000..9ed32ff --- /dev/null +++ b/src/mlia/devices/ethosu/data_analysis.py @@ -0,0 +1,154 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Ethos-U data analysis module.""" +from dataclasses import dataclass +from functools import singledispatchmethod +from typing import Dict +from typing import List +from typing import Union + +from mlia.core.common import DataItem +from mlia.core.data_analysis import Fact +from mlia.core.data_analysis import FactExtractor +from mlia.devices.ethosu.performance import OptimizationPerformanceMetrics +from mlia.nn.tensorflow.optimizations.select import OptimizationSettings +from mlia.tools.vela_wrapper import Operators + + +@dataclass +class HasCPUOnlyOperators(Fact): + """Model has CPU only operators.""" + + cpu_only_ops: List[str] + + +@dataclass +class HasUnsupportedOnNPUOperators(Fact): + """Model has unsupported on NPU operators.""" + + npu_unsupported_ratio: float + + +@dataclass +class AllOperatorsSupportedOnNPU(Fact): + """All model's operators supported on NPU.""" + + +@dataclass +class PerfMetricDiff: + """Performance metric difference.""" + + original_value: Union[int, float] + optimized_value: Union[int, float] + + @property + def diff(self) -> float: + """Difference between metrics.""" + if self.original_value == 0: + return 0 + + return 100 - ((self.optimized_value / self.original_value) * 100) + + @property + def improved(self) -> bool: + """Return true if metric improved.""" + return self.diff > 0 + + @property + def degraded(self) -> bool: + """Return true if metric degraded.""" + return self.diff < 0 + + @property + def same(self) -> bool: + """Return true if metric stays the same.""" + return self.diff == 0 + + +@dataclass +class OptimizationDiff: + """Optimization performance impact.""" + + opt_type: List[OptimizationSettings] + opt_diffs: Dict[str, PerfMetricDiff] + + +@dataclass +class OptimizationResults(Fact): + """Optimization results.""" + + diffs: List[OptimizationDiff] + + +class EthosUDataAnalyzer(FactExtractor): + """Ethos-U data analyzer.""" + + @singledispatchmethod + def analyze_data(self, data_item: DataItem) -> None: + """Analyse the data.""" + + @analyze_data.register + def analyze_operator_compatibility(self, operators: Operators) -> None: + """Analyse operator compatibility information.""" + cpu_only = [op.op_type for op in operators.ops if op.cpu_only] + if cpu_only: + self.add_fact(HasCPUOnlyOperators(cpu_only)) + + if operators.npu_unsupported_ratio != 0: + self.add_fact(HasUnsupportedOnNPUOperators(operators.npu_unsupported_ratio)) + + if operators.npu_unsupported_ratio == 0: + self.add_fact(AllOperatorsSupportedOnNPU()) + + @analyze_data.register + def analyze_optimization_results( + self, optimization_results: OptimizationPerformanceMetrics + ) -> None: + """Analyse optimization performance metrics.""" + optimizations = optimization_results.optimizations_perf_metrics + if not optimizations: + return + + orig = optimization_results.original_perf_metrics.in_kilobytes() + orig_memory = orig.memory_usage + orig_cycles = orig.npu_cycles + + diffs: List[OptimizationDiff] = [] + for opt_type, opt_perf_metrics in optimizations: + opt = opt_perf_metrics.in_kilobytes() + opt_memory = opt.memory_usage + opt_cycles = opt.npu_cycles + + opt_diffs: Dict[str, PerfMetricDiff] = {} + + if orig_memory and opt_memory: + opt_diffs.update( + { + "sram": PerfMetricDiff( + orig_memory.sram_memory_area_size, + opt_memory.sram_memory_area_size, + ), + "dram": PerfMetricDiff( + orig_memory.dram_memory_area_size, + opt_memory.dram_memory_area_size, + ), + "on_chip_flash": PerfMetricDiff( + orig_memory.on_chip_flash_memory_area_size, + opt_memory.on_chip_flash_memory_area_size, + ), + "off_chip_flash": PerfMetricDiff( + orig_memory.off_chip_flash_memory_area_size, + opt_memory.off_chip_flash_memory_area_size, + ), + } + ) + if orig_cycles and opt_cycles: + opt_diffs["npu_total_cycles"] = PerfMetricDiff( + orig_cycles.npu_total_cycles, + opt_cycles.npu_total_cycles, + ) + + diff = OptimizationDiff(opt_type=opt_type, opt_diffs=opt_diffs) + diffs.append(diff) + + self.add_fact(OptimizationResults(diffs)) diff --git a/src/mlia/devices/ethosu/data_collection.py b/src/mlia/devices/ethosu/data_collection.py new file mode 100644 index 0000000..291f1b8 --- /dev/null +++ b/src/mlia/devices/ethosu/data_collection.py @@ -0,0 +1,188 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Data collection module for Ethos-U.""" +import logging +from pathlib import Path +from typing import List +from typing import Optional + +from mlia.core.context import Context +from mlia.core.data_collection import ContextAwareDataCollector +from mlia.core.errors import FunctionalityNotSupportedError +from mlia.core.performance import estimate_performance +from mlia.devices.ethosu.config import EthosUConfiguration +from mlia.devices.ethosu.performance import EthosUPerformanceEstimator +from mlia.devices.ethosu.performance import OptimizationPerformanceMetrics +from mlia.devices.ethosu.performance import PerformanceMetrics +from mlia.nn.tensorflow.config import get_keras_model +from mlia.nn.tensorflow.config import get_tflite_model +from mlia.nn.tensorflow.config import KerasModel +from mlia.nn.tensorflow.optimizations.select import get_optimizer +from mlia.nn.tensorflow.optimizations.select import OptimizationSettings +from mlia.nn.tensorflow.utils import save_keras_model +from mlia.tools.vela_wrapper import Operators +from mlia.tools.vela_wrapper import supported_operators +from mlia.utils.types import is_list_of + +logger = logging.getLogger(__name__) + + +class EthosUOperatorCompatibility(ContextAwareDataCollector): + """Collect operator compatibility information.""" + + def __init__(self, model: Path, device: EthosUConfiguration) -> None: + """Init operator compatibility data collector.""" + self.model = model + self.device = device + + def collect_data(self) -> Operators: + """Collect operator compatibility information.""" + tflite_model = get_tflite_model(self.model, self.context) + + logger.info("Checking operator compatibility ...") + ops = supported_operators( + Path(tflite_model.model_path), self.device.compiler_options + ) + logger.info("Done\n") + return ops + + @classmethod + def name(cls) -> str: + """Return name of the collector.""" + return "ethos_u_operator_compatibility" + + +class EthosUPerformance(ContextAwareDataCollector): + """Collect performance metrics.""" + + def __init__( + self, + model: Path, + device: EthosUConfiguration, + backends: Optional[List[str]] = None, + ) -> None: + """Init performance data collector.""" + self.model = model + self.device = device + self.backends = backends + + def collect_data(self) -> PerformanceMetrics: + """Collect model performance metrics.""" + tflite_model = get_tflite_model(self.model, self.context) + estimator = EthosUPerformanceEstimator( + self.context, + self.device, + self.backends, + ) + + return estimator.estimate(tflite_model) + + @classmethod + def name(cls) -> str: + """Return name of the collector.""" + return "ethos_u_performance" + + +class OptimizeModel: + """Helper class for model optimization.""" + + def __init__( + self, context: Context, opt_settings: List[OptimizationSettings] + ) -> None: + """Init helper.""" + self.context = context + self.opt_settings = opt_settings + + def __call__(self, keras_model: KerasModel) -> KerasModel: + """Run optimization.""" + optimizer = get_optimizer(keras_model, self.opt_settings) + + opts_as_str = ", ".join(str(opt) for opt in self.opt_settings) + logger.info("Applying model optimizations - [%s]", opts_as_str) + optimizer.apply_optimization() + + model = optimizer.get_model() + model_path = self.context.get_model_path("optimized_model.h5") + save_keras_model(model, model_path) + + return KerasModel(model_path) + + +class EthosUOptimizationPerformance(ContextAwareDataCollector): + """Collect performance metrics for the optimizations.""" + + def __init__( + self, + model: Path, + device: EthosUConfiguration, + optimizations: List[List[dict]], + backends: Optional[List[str]] = None, + ) -> None: + """Init performance optimizations data collector.""" + self.model = model + self.device = device + self.optimizations = optimizations + self.backends = backends + + def collect_data(self) -> Optional[OptimizationPerformanceMetrics]: + """Collect performance metrics for the optimizations.""" + logger.info("Estimate performance ...") + + if not self.optimizations: + raise FunctionalityNotSupportedError( + reason="Unable to estimate model optimizations impact", + description="No optimization targets provided", + ) + + opt_settings = self._parse_optimization_params(self.optimizations) + + try: + keras_model = get_keras_model(self.model, self.context) + except NotImplementedError as err: + raise FunctionalityNotSupportedError( + reason="Unable to run model optimizations", + description=f"{self.model} is not a Keras model and " + "could not be converted to a Keras model", + ) from err + + optimizers = [OptimizeModel(self.context, opts) for opts in opt_settings] + + estimator = EthosUPerformanceEstimator( + self.context, + self.device, + self.backends, + ) + original_metrics, *optimized_metrics = estimate_performance( + keras_model, estimator, optimizers # type: ignore + ) + + result = OptimizationPerformanceMetrics( + original_perf_metrics=original_metrics, + optimizations_perf_metrics=list(zip(opt_settings, optimized_metrics)), + ) + return result + + @staticmethod + def _parse_optimization_params( + optimizations: List[List[dict]], + ) -> List[List[OptimizationSettings]]: + """Parse optimization parameters.""" + if not is_list_of(optimizations, list): + raise Exception("Optimization parameters expected to be a list") + + return [ + [ + OptimizationSettings( + item.get("optimization_type"), # type: ignore + item.get("optimization_target"), # type: ignore + item.get("layers_to_optimized"), + ) + for item in opt_configuration + ] + for opt_configuration in optimizations + ] + + @classmethod + def name(cls) -> str: + """Return name of the collector.""" + return "ethos_u_model_optimizations" diff --git a/src/mlia/devices/ethosu/events.py b/src/mlia/devices/ethosu/events.py new file mode 100644 index 0000000..d5408b0 --- /dev/null +++ b/src/mlia/devices/ethosu/events.py @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Ethos-U MLIA module events.""" +from dataclasses import dataclass +from pathlib import Path + +from mlia.core.events import Event +from mlia.core.events import EventDispatcher +from mlia.devices.ethosu.config import EthosUConfiguration + + +@dataclass +class EthosUAdvisorStartedEvent(Event): + """Event with Ethos-U advisor parameters.""" + + model: Path + device: EthosUConfiguration + + +class EthosUAdvisorEventHandler(EventDispatcher): + """Event handler for the Ethos-U inference advisor.""" + + def on_ethos_u_advisor_started(self, event: EthosUAdvisorStartedEvent) -> None: + """Handle EthosUAdvisorStarted event.""" diff --git a/src/mlia/devices/ethosu/handlers.py b/src/mlia/devices/ethosu/handlers.py new file mode 100644 index 0000000..7a0c31c --- /dev/null +++ b/src/mlia/devices/ethosu/handlers.py @@ -0,0 +1,146 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Event handler.""" +import logging +from pathlib import Path +from typing import Dict +from typing import List +from typing import Optional + +from mlia.core._typing import OutputFormat +from mlia.core._typing import PathOrFileLike +from mlia.core.advice_generation import Advice +from mlia.core.advice_generation import AdviceEvent +from mlia.core.events import AdviceStageFinishedEvent +from mlia.core.events import AdviceStageStartedEvent +from mlia.core.events import CollectedDataEvent +from mlia.core.events import DataAnalysisStageFinishedEvent +from mlia.core.events import DataCollectionStageStartedEvent +from mlia.core.events import DataCollectorSkippedEvent +from mlia.core.events import ExecutionFailedEvent +from mlia.core.events import ExecutionStartedEvent +from mlia.core.events import SystemEventsHandler +from mlia.core.reporting import Reporter +from mlia.devices.ethosu.events import EthosUAdvisorEventHandler +from mlia.devices.ethosu.events import EthosUAdvisorStartedEvent +from mlia.devices.ethosu.performance import OptimizationPerformanceMetrics +from mlia.devices.ethosu.performance import PerformanceMetrics +from mlia.devices.ethosu.reporters import find_appropriate_formatter +from mlia.tools.vela_wrapper import Operators +from mlia.utils.console import create_section_header + +logger = logging.getLogger(__name__) + +ADV_EXECUTION_STARTED = create_section_header("ML Inference Advisor started") +MODEL_ANALYSIS_MSG = create_section_header("Model Analysis") +MODEL_ANALYSIS_RESULTS_MSG = create_section_header("Model Analysis Results") +ADV_GENERATION_MSG = create_section_header("Advice Generation") +REPORT_GENERATION_MSG = create_section_header("Report Generation") + + +class WorkflowEventsHandler(SystemEventsHandler): + """Event handler for the system events.""" + + def on_execution_started(self, event: ExecutionStartedEvent) -> None: + """Handle ExecutionStarted event.""" + logger.info(ADV_EXECUTION_STARTED) + + def on_execution_failed(self, event: ExecutionFailedEvent) -> None: + """Handle ExecutionFailed event.""" + raise event.err + + def on_data_collection_stage_started( + self, event: DataCollectionStageStartedEvent + ) -> None: + """Handle DataCollectionStageStarted event.""" + logger.info(MODEL_ANALYSIS_MSG) + + def on_advice_stage_started(self, event: AdviceStageStartedEvent) -> None: + """Handle AdviceStageStarted event.""" + logger.info(ADV_GENERATION_MSG) + + def on_data_collector_skipped(self, event: DataCollectorSkippedEvent) -> None: + """Handle DataCollectorSkipped event.""" + logger.info("Skipped: %s", event.reason) + + +class EthosUEventHandler(WorkflowEventsHandler, EthosUAdvisorEventHandler): + """CLI event handler.""" + + def __init__(self, output: Optional[PathOrFileLike] = None) -> None: + """Init event handler.""" + output_format = self.resolve_output_format(output) + + self.reporter = Reporter(find_appropriate_formatter, output_format) + self.output = output + self.advice: List[Advice] = [] + + def on_advice_stage_finished(self, event: AdviceStageFinishedEvent) -> None: + """Handle AdviceStageFinishedEvent event.""" + self.reporter.submit( + self.advice, + show_title=False, + show_headers=False, + space="between", + table_style="no_borders", + ) + + self.reporter.generate_report(self.output) + + if self.output is not None: + logger.info(REPORT_GENERATION_MSG) + logger.info("Report(s) and advice list saved to: %s", self.output) + + def on_data_analysis_stage_finished( + self, event: DataAnalysisStageFinishedEvent + ) -> None: + """Handle DataAnalysisStageFinished event.""" + logger.info(MODEL_ANALYSIS_RESULTS_MSG) + self.reporter.print_delayed() + + def on_collected_data(self, event: CollectedDataEvent) -> None: + """Handle CollectedDataEvent event.""" + data_item = event.data_item + + if isinstance(data_item, Operators): + self.reporter.submit([data_item.ops, data_item], delay_print=True) + + if isinstance(data_item, PerformanceMetrics): + self.reporter.submit(data_item, delay_print=True) + + if isinstance(data_item, OptimizationPerformanceMetrics): + original_metrics = data_item.original_perf_metrics + if not data_item.optimizations_perf_metrics: + return + + _opt_settings, optimized_metrics = data_item.optimizations_perf_metrics[0] + + self.reporter.submit( + [original_metrics, optimized_metrics], + delay_print=True, + columns_name="Metrics", + title="Performance metrics", + space=True, + ) + + def on_advice_event(self, event: AdviceEvent) -> None: + """Handle Advice event.""" + self.advice.append(event.advice) + + def on_ethos_u_advisor_started(self, event: EthosUAdvisorStartedEvent) -> None: + """Handle EthosUAdvisorStarted event.""" + self.reporter.submit(event.device) + + @staticmethod + def resolve_output_format(output: Optional[PathOrFileLike]) -> OutputFormat: + """Resolve output format based on the output name.""" + output_format: OutputFormat = "plain_text" + + if isinstance(output, str): + output_path = Path(output) + output_formats: Dict[str, OutputFormat] = {".csv": "csv", ".json": "json"} + + if (suffix := output_path.suffix) in output_formats: + return output_formats[suffix] + + return output_format diff --git a/src/mlia/devices/ethosu/operators.py b/src/mlia/devices/ethosu/operators.py new file mode 100644 index 0000000..ff0d99f --- /dev/null +++ b/src/mlia/devices/ethosu/operators.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Operators module.""" +import logging + +from mlia.tools import vela_wrapper + + +logger = logging.getLogger(__name__) + + +def generate_supported_operators_report() -> None: + """Generate supported operators report.""" + vela_wrapper.generate_supported_operators_report() diff --git a/src/mlia/devices/ethosu/performance.py b/src/mlia/devices/ethosu/performance.py new file mode 100644 index 0000000..b0718a5 --- /dev/null +++ b/src/mlia/devices/ethosu/performance.py @@ -0,0 +1,257 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Performance estimation.""" +import logging +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import List +from typing import Optional +from typing import Tuple +from typing import Union + +import mlia.tools.aiet_wrapper as aiet +import mlia.tools.vela_wrapper as vela +from mlia.core.context import Context +from mlia.core.performance import PerformanceEstimator +from mlia.devices.ethosu.config import EthosUConfiguration +from mlia.nn.tensorflow.config import get_tflite_model +from mlia.nn.tensorflow.config import ModelConfiguration +from mlia.nn.tensorflow.optimizations.select import OptimizationSettings + + +logger = logging.getLogger(__name__) + + +@dataclass +class NPUCycles: + """NPU cycles metrics.""" + + npu_active_cycles: int + npu_idle_cycles: int + npu_total_cycles: int + npu_axi0_rd_data_beat_received: int + npu_axi0_wr_data_beat_written: int + npu_axi1_rd_data_beat_received: int + + +BYTES_PER_KILOBYTE = 1024 + + +class MemorySizeType(Enum): + """Memory size type enumeration.""" + + BYTES = 0 + KILOBYTES = 1 + + +@dataclass +class MemoryUsage: + """Memory usage metrics.""" + + sram_memory_area_size: Union[int, float] + dram_memory_area_size: Union[int, float] + unknown_memory_area_size: Union[int, float] + on_chip_flash_memory_area_size: Union[int, float] + off_chip_flash_memory_area_size: Union[int, float] + memory_size_type: MemorySizeType = MemorySizeType.BYTES + + _default_columns = [ + "SRAM used", + "DRAM used", + "Unknown memory used", + "On chip flash used", + "Off chip flash used", + ] + + def in_kilobytes(self) -> "MemoryUsage": + """Return memory usage with values in kilobytes.""" + if self.memory_size_type == MemorySizeType.KILOBYTES: + return self + + kilobytes = [ + value / BYTES_PER_KILOBYTE + for value in [ + self.sram_memory_area_size, + self.dram_memory_area_size, + self.unknown_memory_area_size, + self.on_chip_flash_memory_area_size, + self.off_chip_flash_memory_area_size, + ] + ] + + return MemoryUsage( + *kilobytes, # type: ignore + memory_size_type=MemorySizeType.KILOBYTES, + ) + + +@dataclass +class PerformanceMetrics: + """Performance metrics.""" + + device: EthosUConfiguration + npu_cycles: Optional[NPUCycles] + memory_usage: Optional[MemoryUsage] + + def in_kilobytes(self) -> "PerformanceMetrics": + """Return metrics with memory usage in KiB.""" + if self.memory_usage is None: + return PerformanceMetrics(self.device, self.npu_cycles, self.memory_usage) + + return PerformanceMetrics( + self.device, self.npu_cycles, self.memory_usage.in_kilobytes() + ) + + +@dataclass +class OptimizationPerformanceMetrics: + """Optimization performance metrics.""" + + original_perf_metrics: PerformanceMetrics + optimizations_perf_metrics: List[ + Tuple[List[OptimizationSettings], PerformanceMetrics] + ] + + +class VelaPerformanceEstimator( + PerformanceEstimator[Union[Path, ModelConfiguration], MemoryUsage] +): + """Vela based performance estimator.""" + + def __init__(self, context: Context, device: EthosUConfiguration) -> None: + """Init Vela based performance estimator.""" + self.context = context + self.device = device + + def estimate(self, model: Union[Path, ModelConfiguration]) -> MemoryUsage: + """Estimate performance.""" + logger.info("Getting the memory usage metrics ...") + + model_path = ( + Path(model.model_path) if isinstance(model, ModelConfiguration) else model + ) + + vela_perf_metrics = vela.estimate_performance( + model_path, self.device.compiler_options + ) + + memory_usage = MemoryUsage( + vela_perf_metrics.sram_memory_area_size, + vela_perf_metrics.dram_memory_area_size, + vela_perf_metrics.unknown_memory_area_size, + vela_perf_metrics.on_chip_flash_memory_area_size, + vela_perf_metrics.off_chip_flash_memory_area_size, + ) + logger.info("Done\n") + return memory_usage + + +class AIETPerformanceEstimator( + PerformanceEstimator[Union[Path, ModelConfiguration], NPUCycles] +): + """AIET based performance estimator.""" + + def __init__( + self, context: Context, device: EthosUConfiguration, backend: str + ) -> None: + """Init AIET based performance estimator.""" + self.context = context + self.device = device + self.backend = backend + + def estimate(self, model: Union[Path, ModelConfiguration]) -> NPUCycles: + """Estimate performance.""" + logger.info("Getting the performance metrics for '%s' ...", self.backend) + logger.info( + "WARNING: This task may require several minutes (press ctrl-c to interrupt)" + ) + + model_path = ( + Path(model.model_path) if isinstance(model, ModelConfiguration) else model + ) + + optimized_model_path = self.context.get_model_path( + f"{model_path.stem}_vela.tflite" + ) + + vela.optimize_model( + model_path, self.device.compiler_options, optimized_model_path + ) + + model_info = aiet.ModelInfo(model_path=optimized_model_path) + device_info = aiet.DeviceInfo( + device_type=self.device.target, # type: ignore + mac=self.device.mac, + memory_mode=self.device.compiler_options.memory_mode, # type: ignore + ) + + aiet_perf_metrics = aiet.estimate_performance( + model_info, device_info, self.backend + ) + + npu_cycles = NPUCycles( + aiet_perf_metrics.npu_active_cycles, + aiet_perf_metrics.npu_idle_cycles, + aiet_perf_metrics.npu_total_cycles, + aiet_perf_metrics.npu_axi0_rd_data_beat_received, + aiet_perf_metrics.npu_axi0_wr_data_beat_written, + aiet_perf_metrics.npu_axi1_rd_data_beat_received, + ) + + logger.info("Done\n") + return npu_cycles + + +class EthosUPerformanceEstimator( + PerformanceEstimator[Union[Path, ModelConfiguration], PerformanceMetrics] +): + """Ethos-U performance estimator.""" + + def __init__( + self, + context: Context, + device: EthosUConfiguration, + backends: Optional[List[str]] = None, + ) -> None: + """Init performance estimator.""" + self.context = context + self.device = device + if backends is None: + backends = ["Vela"] # Only Vela is always available as default + for backend in backends: + if backend != "Vela" and not aiet.is_supported(backend): + raise ValueError( + f"Unsupported backend '{backend}'. " + f"Only 'Vela' and {aiet.supported_backends()} are supported." + ) + self.backends = set(backends) + + def estimate(self, model: Union[Path, ModelConfiguration]) -> PerformanceMetrics: + """Estimate performance.""" + model_path = ( + Path(model.model_path) if isinstance(model, ModelConfiguration) else model + ) + + tflite_model = get_tflite_model(model_path, self.context) + + memory_usage = None + npu_cycles = None + + for backend in self.backends: + if backend == "Vela": + vela_estimator = VelaPerformanceEstimator(self.context, self.device) + memory_usage = vela_estimator.estimate(tflite_model) + elif backend in aiet.supported_backends(): + aiet_estimator = AIETPerformanceEstimator( + self.context, self.device, backend + ) + npu_cycles = aiet_estimator.estimate(tflite_model) + else: + logger.warning( + "Backend '%s' is not supported for Ethos-U performance " + "estimation.", + backend, + ) + + return PerformanceMetrics(self.device, npu_cycles, memory_usage) diff --git a/src/mlia/devices/ethosu/reporters.py b/src/mlia/devices/ethosu/reporters.py new file mode 100644 index 0000000..d28c68f --- /dev/null +++ b/src/mlia/devices/ethosu/reporters.py @@ -0,0 +1,398 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Reports module.""" +from collections import defaultdict +from typing import Any +from typing import Callable +from typing import List +from typing import Tuple +from typing import Union + +from mlia.core.advice_generation import Advice +from mlia.core.reporting import BytesCell +from mlia.core.reporting import Cell +from mlia.core.reporting import ClockCell +from mlia.core.reporting import Column +from mlia.core.reporting import CompoundFormatter +from mlia.core.reporting import CyclesCell +from mlia.core.reporting import Format +from mlia.core.reporting import NestedReport +from mlia.core.reporting import Report +from mlia.core.reporting import ReportItem +from mlia.core.reporting import SingleRow +from mlia.core.reporting import Table +from mlia.devices.ethosu.config import EthosUConfiguration +from mlia.devices.ethosu.performance import PerformanceMetrics +from mlia.tools.vela_wrapper import Operator +from mlia.tools.vela_wrapper import Operators +from mlia.utils.console import style_improvement +from mlia.utils.types import is_list_of + + +def report_operators_stat(operators: Operators) -> Report: + """Return table representation for the ops stats.""" + columns = [ + Column("Number of operators", alias="num_of_operators"), + Column("Number of NPU supported operators", "num_of_npu_supported_operators"), + Column("Unsupported ops ratio", "npu_unsupported_ratio"), + ] + rows = [ + ( + operators.total_number, + operators.npu_supported_number, + Cell( + operators.npu_unsupported_ratio * 100, + fmt=Format(str_fmt="{0:.0f}%".format), + ), + ) + ] + + return SingleRow( + columns, rows, name="Operators statistics", alias="operators_stats" + ) + + +def report_operators(ops: List[Operator]) -> Report: + """Return table representation for the list of operators.""" + columns = [ + Column("#", only_for=["plain_text"]), + Column( + "Operator name", + alias="operator_name", + fmt=Format(wrap_width=30), + ), + Column( + "Operator type", + alias="operator_type", + fmt=Format(wrap_width=25), + ), + Column( + "Placement", + alias="placement", + fmt=Format(wrap_width=20), + ), + Column( + "Notes", + alias="notes", + fmt=Format(wrap_width=35), + ), + ] + + rows = [ + ( + i + 1, + op.name, + op.op_type, + Cell( + "NPU" if (npu := op.run_on_npu.supported) else "CPU", + Format(style=style_improvement(npu)), + ), + Table( + columns=[ + Column( + "Note", + alias="note", + fmt=Format(wrap_width=35), + ) + ], + rows=[ + (Cell(item, Format(str_fmt=lambda x: f"* {x}")),) + for reason in op.run_on_npu.reasons + for item in reason + if item + ], + name="Notes", + ), + ) + for i, op in enumerate(ops) + ] + + return Table(columns, rows, name="Operators", alias="operators") + + +def report_device_details(device: EthosUConfiguration) -> Report: + """Return table representation for the device.""" + compiler_config = device.resolved_compiler_config + + memory_settings = [ + ReportItem( + "Const mem area", + "const_mem_area", + compiler_config["const_mem_area"], + ), + ReportItem( + "Arena mem area", + "arena_mem_area", + compiler_config["arena_mem_area"], + ), + ReportItem( + "Cache mem area", + "cache_mem_area", + compiler_config["cache_mem_area"], + ), + ReportItem( + "Arena cache size", + "arena_cache_size", + BytesCell(compiler_config["arena_cache_size"]), + ), + ] + + mem_areas_settings = [ + ReportItem( + f"{mem_area_name}", + mem_area_name, + None, + nested_items=[ + ReportItem( + "Clock scales", + "clock_scales", + mem_area_settings["clock_scales"], + ), + ReportItem( + "Burst length", + "burst_length", + BytesCell(mem_area_settings["burst_length"]), + ), + ReportItem( + "Read latency", + "read_latency", + CyclesCell(mem_area_settings["read_latency"]), + ), + ReportItem( + "Write latency", + "write_latency", + CyclesCell(mem_area_settings["write_latency"]), + ), + ], + ) + for mem_area_name, mem_area_settings in compiler_config["memory_area"].items() + ] + + system_settings = [ + ReportItem( + "Accelerator clock", + "accelerator_clock", + ClockCell(compiler_config["core_clock"]), + ), + ReportItem( + "AXI0 port", + "axi0_port", + compiler_config["axi0_port"], + ), + ReportItem( + "AXI1 port", + "axi1_port", + compiler_config["axi1_port"], + ), + ReportItem( + "Memory area settings", "memory_area", None, nested_items=mem_areas_settings + ), + ] + + arch_settings = [ + ReportItem( + "Permanent storage mem area", + "permanent_storage_mem_area", + compiler_config["permanent_storage_mem_area"], + ), + ReportItem( + "Feature map storage mem area", + "feature_map_storage_mem_area", + compiler_config["feature_map_storage_mem_area"], + ), + ReportItem( + "Fast storage mem area", + "fast_storage_mem_area", + compiler_config["fast_storage_mem_area"], + ), + ] + + return NestedReport( + "Device information", + "device", + [ + ReportItem("Target", alias="target", value=device.target), + ReportItem("MAC", alias="mac", value=device.mac), + ReportItem( + "Memory mode", + alias="memory_mode", + value=compiler_config["memory_mode"], + nested_items=memory_settings, + ), + ReportItem( + "System config", + alias="system_config", + value=compiler_config["system_config"], + nested_items=system_settings, + ), + ReportItem( + "Architecture settings", + "arch_settings", + None, + nested_items=arch_settings, + ), + ], + ) + + +def metrics_as_records(perf_metrics: List[PerformanceMetrics]) -> List[Tuple]: + """Convert perf metrics object into list of records.""" + perf_metrics = [item.in_kilobytes() for item in perf_metrics] + + def _cycles_as_records(perf_metrics: List[PerformanceMetrics]) -> List[Tuple]: + metric_map = defaultdict(list) + for metrics in perf_metrics: + if not metrics.npu_cycles: + return [] + metric_map["NPU active cycles"].append(metrics.npu_cycles.npu_active_cycles) + metric_map["NPU idle cycles"].append(metrics.npu_cycles.npu_idle_cycles) + metric_map["NPU total cycles"].append(metrics.npu_cycles.npu_total_cycles) + + return [ + (name, *(Cell(value, Format(str_fmt="12,d")) for value in values), "cycles") + for name, values in metric_map.items() + ] + + def _memory_usage_as_records(perf_metrics: List[PerformanceMetrics]) -> List[Tuple]: + metric_map = defaultdict(list) + for metrics in perf_metrics: + if not metrics.memory_usage: + return [] + metric_map["SRAM used"].append(metrics.memory_usage.sram_memory_area_size) + metric_map["DRAM used"].append(metrics.memory_usage.dram_memory_area_size) + metric_map["Unknown memory area used"].append( + metrics.memory_usage.unknown_memory_area_size + ) + metric_map["On-chip flash used"].append( + metrics.memory_usage.on_chip_flash_memory_area_size + ) + metric_map["Off-chip flash used"].append( + metrics.memory_usage.off_chip_flash_memory_area_size + ) + + return [ + (name, *(Cell(value, Format(str_fmt="12.2f")) for value in values), "KiB") + for name, values in metric_map.items() + if all(val > 0 for val in values) + ] + + def _data_beats_as_records(perf_metrics: List[PerformanceMetrics]) -> List[Tuple]: + metric_map = defaultdict(list) + for metrics in perf_metrics: + if not metrics.npu_cycles: + return [] + metric_map["NPU AXI0 RD data beat received"].append( + metrics.npu_cycles.npu_axi0_rd_data_beat_received + ) + metric_map["NPU AXI0 WR data beat written"].append( + metrics.npu_cycles.npu_axi0_wr_data_beat_written + ) + metric_map["NPU AXI1 RD data beat received"].append( + metrics.npu_cycles.npu_axi1_rd_data_beat_received + ) + + return [ + (name, *(Cell(value, Format(str_fmt="12,d")) for value in values), "beats") + for name, values in metric_map.items() + ] + + return [ + metrics + for metrics_func in ( + _memory_usage_as_records, + _cycles_as_records, + _data_beats_as_records, + ) + for metrics in metrics_func(perf_metrics) + ] + + +def report_perf_metrics( + perf_metrics: Union[PerformanceMetrics, List[PerformanceMetrics]] +) -> Report: + """Return comparison table for the performance metrics.""" + if isinstance(perf_metrics, PerformanceMetrics): + perf_metrics = [perf_metrics] + + rows = metrics_as_records(perf_metrics) + + if len(perf_metrics) == 2: + return Table( + columns=[ + Column("Metric", alias="metric", fmt=Format(wrap_width=30)), + Column("Original", alias="original", fmt=Format(wrap_width=15)), + Column("Optimized", alias="optimized", fmt=Format(wrap_width=15)), + Column("Unit", alias="unit", fmt=Format(wrap_width=15)), + Column("Improvement (%)", alias="improvement"), + ], + rows=[ + ( + metric, + original_value, + optimized_value, + unit, + Cell( + ( + diff := 100 + - (optimized_value.value / original_value.value * 100) + ), + Format(str_fmt="15.2f", style=style_improvement(diff > 0)), + ) + if original_value.value != 0 + else None, + ) + for metric, original_value, optimized_value, unit in rows + ], + name="Performance metrics", + alias="performance_metrics", + notes="IMPORTANT: The performance figures above refer to NPU only", + ) + + return Table( + columns=[ + Column("Metric", alias="metric", fmt=Format(wrap_width=30)), + Column("Value", alias="value", fmt=Format(wrap_width=15)), + Column("Unit", alias="unit", fmt=Format(wrap_width=15)), + ], + rows=rows, + name="Performance metrics", + alias="performance_metrics", + notes="IMPORTANT: The performance figures above refer to NPU only", + ) + + +def report_advice(advice: List[Advice]) -> Report: + """Generate report for the advice.""" + return Table( + columns=[ + Column("#", only_for=["plain_text"]), + Column("Advice", alias="advice_message"), + ], + rows=[(i + 1, a.messages) for i, a in enumerate(advice)], + name="Advice", + alias="advice", + ) + + +def find_appropriate_formatter(data: Any) -> Callable[[Any], Report]: + """Find appropriate formatter for the provided data.""" + if isinstance(data, PerformanceMetrics) or is_list_of(data, PerformanceMetrics, 2): + return report_perf_metrics + + if is_list_of(data, Advice): + return report_advice + + if is_list_of(data, Operator): + return report_operators + + if isinstance(data, Operators): + return report_operators_stat + + if isinstance(data, EthosUConfiguration): + return report_device_details + + if isinstance(data, (list, tuple)): + formatters = [find_appropriate_formatter(item) for item in data] + return CompoundFormatter(formatters) + + raise Exception(f"Unable to find appropriate formatter for {data}") -- cgit v1.2.1