diff options
author | Dmitrii Agibov <dmitrii.agibov@arm.com> | 2022-11-18 17:21:09 +0000 |
---|---|---|
committer | Dmitrii Agibov <dmitrii.agibov@arm.com> | 2022-11-29 14:44:13 +0000 |
commit | 6a88ee5315b4ce5b023370c1e55e48bf9f2b6f67 (patch) | |
tree | 88edabf90228724f4fe2944b0ab23859d824a880 /src/mlia/target | |
parent | a34163c9d9a5cc0416bcaea2ebf8383bda9d505c (diff) | |
download | mlia-6a88ee5315b4ce5b023370c1e55e48bf9f2b6f67.tar.gz |
Rename modules
- Rename module "mlia.devices" into "mlia.target"
- Rename module "mlia.target.ethosu" into "mlia.target.ethos_u"
- Rename module "mlia.target.cortexa" into "mlia.target.cortex_a"
- Rename and update tests
Change-Id: I6dca7c8646d881f739fb6b5914d1cc7e45e63dc2
Diffstat (limited to 'src/mlia/target')
34 files changed, 2944 insertions, 0 deletions
diff --git a/src/mlia/target/__init__.py b/src/mlia/target/__init__.py new file mode 100644 index 0000000..2370221 --- /dev/null +++ b/src/mlia/target/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Target module.""" diff --git a/src/mlia/target/config.py b/src/mlia/target/config.py new file mode 100644 index 0000000..7ab6b43 --- /dev/null +++ b/src/mlia/target/config.py @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""IP configuration module.""" + + +class IPConfiguration: # pylint: disable=too-few-public-methods + """Base class for IP configuration.""" + + def __init__(self, target: str) -> None: + """Init IP configuration instance.""" + self.target = target diff --git a/src/mlia/target/cortex_a/__init__.py b/src/mlia/target/cortex_a/__init__.py new file mode 100644 index 0000000..fe01835 --- /dev/null +++ b/src/mlia/target/cortex_a/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Cortex-A target module.""" diff --git a/src/mlia/target/cortex_a/advice_generation.py b/src/mlia/target/cortex_a/advice_generation.py new file mode 100644 index 0000000..b68106e --- /dev/null +++ b/src/mlia/target/cortex_a/advice_generation.py @@ -0,0 +1,153 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Cortex-A advice generation.""" +from functools import singledispatchmethod + +from mlia.core.advice_generation import advice_category +from mlia.core.advice_generation import FactBasedAdviceProducer +from mlia.core.common import AdviceCategory +from mlia.core.common import DataItem +from mlia.target.cortex_a.data_analysis import ModelHasCustomOperators +from mlia.target.cortex_a.data_analysis import ModelIsCortexACompatible +from mlia.target.cortex_a.data_analysis import ModelIsNotCortexACompatible +from mlia.target.cortex_a.data_analysis import ModelIsNotTFLiteCompatible +from mlia.target.cortex_a.data_analysis import TFLiteCompatibilityCheckFailed + + +class CortexAAdviceProducer(FactBasedAdviceProducer): + """Cortex-A advice producer.""" + + cortex_a_disclaimer = ( + "Note that the provided compatibility information is general. " + "At runtime individual operators in the given model might fall back to " + "the TensorFlow Lite reference or might produce errors based on the " + "specific parameters." + ) + + @singledispatchmethod + def produce_advice(self, _data_item: DataItem) -> None: # type: ignore + """Produce advice.""" + + @produce_advice.register + @advice_category(AdviceCategory.ALL, AdviceCategory.OPERATORS) + def handle_model_is_cortex_a_compatible( + self, data_item: ModelIsCortexACompatible + ) -> None: + """Advice for Cortex-A compatibility.""" + self.add_advice( + [ + f"Model is fully compatible with {data_item.backend_info} for " + "Cortex-A.", + self.cortex_a_disclaimer, + ] + ) + + @produce_advice.register + @advice_category(AdviceCategory.ALL, AdviceCategory.OPERATORS) + def handle_model_is_not_cortex_a_compatible( + self, data_item: ModelIsNotCortexACompatible + ) -> None: + """Advice for Cortex-A compatibility.""" + if data_item.unsupported_ops: + self.add_advice( + [ + "The following operators are not supported by " + f"{data_item.backend_info} and will fall back to the " + "TensorFlow Lite runtime:", + "\n".join(f" - {op}" for op in data_item.unsupported_ops), + ] + ) + + if data_item.activation_func_support: + self.add_advice( + [ + "The fused activation functions of the following operators " + f"are not supported by {data_item.backend_info}. Please " + "consider using one of the supported activation functions " + "instead:", + "\n".join( + f" - {op}\n" + f" - Used unsupported: {act.used_unsupported}\n" + f" - Supported: {act.supported}" + for op, act in data_item.activation_func_support.items() + ), + ] + ) + + self.add_advice( + [ + "Please, refer to the full table of operators above for more " + "information.", + self.cortex_a_disclaimer, + ] + ) + + @produce_advice.register + @advice_category(AdviceCategory.ALL, AdviceCategory.OPERATORS) + def handle_model_is_not_tflite_compatible( + self, data_item: ModelIsNotTFLiteCompatible + ) -> None: + """Advice for TensorFlow Lite compatibility.""" + if data_item.flex_ops: + self.add_advice( + [ + "The following operators are not natively " + "supported by TensorFlow Lite: " + f"{', '.join(data_item.flex_ops)}.", + "Using select TensorFlow operators in TensorFlow Lite model " + "requires special initialization of TFLiteConverter and " + "TensorFlow Lite run-time.", + "Please refer to the TensorFlow documentation for more " + "details: https://www.tensorflow.org/lite/guide/ops_select", + "Note, such models are not supported by the ML Inference Advisor.", + ] + ) + + if data_item.custom_ops: + self.add_advice( + [ + "The following operators appear to be custom and not natively " + "supported by TensorFlow Lite: " + f"{', '.join(data_item.custom_ops)}.", + "Using custom operators in TensorFlow Lite model " + "requires special initialization of TFLiteConverter and " + "TensorFlow Lite run-time.", + "Please refer to the TensorFlow documentation for more " + "details: https://www.tensorflow.org/lite/guide/ops_custom", + "Note, such models are not supported by the ML Inference Advisor.", + ] + ) + + if not data_item.flex_ops and not data_item.custom_ops: + self.add_advice( + [ + "Model could not be converted into TensorFlow Lite format.", + "Please refer to the table for more details.", + ] + ) + + @produce_advice.register + @advice_category(AdviceCategory.ALL, AdviceCategory.OPERATORS) + def handle_tflite_check_failed( + self, _data_item: TFLiteCompatibilityCheckFailed + ) -> None: + """Advice for the failed TensorFlow Lite compatibility checks.""" + self.add_advice( + [ + "Model could not be converted into TensorFlow Lite format.", + "Please refer to the table for more details.", + ] + ) + + @produce_advice.register + @advice_category(AdviceCategory.ALL, AdviceCategory.OPERATORS) + def handle_model_has_custom_operators( + self, _data_item: ModelHasCustomOperators + ) -> None: + """Advice for the models with custom operators.""" + self.add_advice( + [ + "Models with custom operators require special initialization " + "and currently are not supported by the ML Inference Advisor.", + ] + ) diff --git a/src/mlia/target/cortex_a/advisor.py b/src/mlia/target/cortex_a/advisor.py new file mode 100644 index 0000000..5912e38 --- /dev/null +++ b/src/mlia/target/cortex_a/advisor.py @@ -0,0 +1,92 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Cortex-A MLIA module.""" +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from mlia.core.advice_generation import AdviceProducer +from mlia.core.advisor import DefaultInferenceAdvisor +from mlia.core.advisor import InferenceAdvisor +from mlia.core.common import AdviceCategory +from mlia.core.context import Context +from mlia.core.context import ExecutionContext +from mlia.core.data_analysis import DataAnalyzer +from mlia.core.data_collection import DataCollector +from mlia.core.events import Event +from mlia.core.typing import PathOrFileLike +from mlia.target.cortex_a.advice_generation import CortexAAdviceProducer +from mlia.target.cortex_a.config import CortexAConfiguration +from mlia.target.cortex_a.data_analysis import CortexADataAnalyzer +from mlia.target.cortex_a.data_collection import CortexAOperatorCompatibility +from mlia.target.cortex_a.events import CortexAAdvisorStartedEvent +from mlia.target.cortex_a.handlers import CortexAEventHandler + + +class CortexAInferenceAdvisor(DefaultInferenceAdvisor): + """Cortex-A Inference Advisor.""" + + @classmethod + def name(cls) -> str: + """Return name of the advisor.""" + return "cortex_a_inference_advisor" + + def get_collectors(self, context: Context) -> list[DataCollector]: + """Return list of the data collectors.""" + model = self.get_model(context) + + collectors: list[DataCollector] = [] + + if AdviceCategory.OPERATORS in context.advice_category: + collectors.append(CortexAOperatorCompatibility(model)) + + return collectors + + def get_analyzers(self, context: Context) -> list[DataAnalyzer]: + """Return list of the data analyzers.""" + return [ + CortexADataAnalyzer(), + ] + + def get_producers(self, context: Context) -> list[AdviceProducer]: + """Return list of the advice producers.""" + return [CortexAAdviceProducer()] + + def get_events(self, context: Context) -> list[Event]: + """Return list of the startup events.""" + model = self.get_model(context) + target_profile = self.get_target_profile(context) + + return [ + CortexAAdvisorStartedEvent(model, CortexAConfiguration(target_profile)), + ] + + +def configure_and_get_cortexa_advisor( + context: ExecutionContext, + target_profile: str, + model: str | Path, + output: PathOrFileLike | None = None, + **_extra_args: Any, +) -> InferenceAdvisor: + """Create and configure Cortex-A advisor.""" + if context.event_handlers is None: + context.event_handlers = [CortexAEventHandler(output)] + + if context.config_parameters is None: + context.config_parameters = _get_config_parameters(model, target_profile) + + return CortexAInferenceAdvisor() + + +def _get_config_parameters(model: str | Path, target_profile: str) -> dict[str, Any]: + """Get configuration parameters for the advisor.""" + advisor_parameters: dict[str, Any] = { + "cortex_a_inference_advisor": { + "model": str(model), + "target_profile": target_profile, + }, + } + + return advisor_parameters diff --git a/src/mlia/target/cortex_a/config.py b/src/mlia/target/cortex_a/config.py new file mode 100644 index 0000000..b2b51ea --- /dev/null +++ b/src/mlia/target/cortex_a/config.py @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Cortex-A configuration.""" +from __future__ import annotations + +from mlia.target.config import IPConfiguration +from mlia.utils.filesystem import get_profile + + +class CortexAConfiguration(IPConfiguration): # pylint: disable=too-few-public-methods + """Cortex-A configuration.""" + + def __init__(self, target_profile: str) -> None: + """Init Cortex-A target configuration.""" + target_data = get_profile(target_profile) + + target = target_data["target"] + if target != "cortex-a": + raise Exception(f"Wrong target {target} for Cortex-A configuration") + super().__init__(target) diff --git a/src/mlia/target/cortex_a/data_analysis.py b/src/mlia/target/cortex_a/data_analysis.py new file mode 100644 index 0000000..4a3a068 --- /dev/null +++ b/src/mlia/target/cortex_a/data_analysis.py @@ -0,0 +1,128 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Cortex-A data analysis module.""" +from __future__ import annotations + +from collections import defaultdict +from dataclasses import dataclass +from dataclasses import field +from functools import singledispatchmethod + +from mlia.core.common import DataItem +from mlia.core.data_analysis import Fact +from mlia.core.data_analysis import FactExtractor +from mlia.nn.tensorflow.tflite_compat import TFLiteCompatibilityInfo +from mlia.target.cortex_a.operators import CortexACompatibilityInfo +from mlia.target.cortex_a.operators import Operator + + +class CortexADataAnalyzer(FactExtractor): + """Cortex-A data analyzer.""" + + @singledispatchmethod + def analyze_data(self, data_item: DataItem) -> None: # type: ignore + """Analyse the data.""" + + @analyze_data.register + def analyze_operator_compatibility( + self, data_item: CortexACompatibilityInfo + ) -> None: + """Analyse operator compatibility information.""" + if data_item.cortex_a_compatible: + self.add_fact(ModelIsCortexACompatible(data_item.backend_info)) + else: + unsupported_ops = set() + activation_func_support: defaultdict[ + str, ModelIsNotCortexACompatible.ActivationFunctionSupport + ] = defaultdict(ModelIsNotCortexACompatible.ActivationFunctionSupport) + for oper in data_item.operators: + if oper.support_type == Operator.SupportType.OP_NOT_SUPPORTED: + unsupported_ops.add(oper.full_name) + + if oper.support_type == Operator.SupportType.ACTIVATION_NOT_SUPPORTED: + # Add used but unsupported actication functions + activation_func_support[oper.full_name].used_unsupported.add( + oper.activation_func.name + ) + # Add supported activation functions + activation_func_support[oper.full_name].supported.update( + oper.supported_activation_functions + ) + + assert ( + unsupported_ops or activation_func_support or not data_item.operators + ), ( + "The model is marked as not compatible with Cortex-A but there " + "are no unsupported ops activation functions listed." + ) + + self.add_fact( + ModelIsNotCortexACompatible( + data_item.backend_info, unsupported_ops, activation_func_support + ) + ) + + @analyze_data.register + def analyze_tflite_compatibility(self, data_item: TFLiteCompatibilityInfo) -> None: + """Analyze TensorFlow Lite compatibility information.""" + if data_item.compatible: + return + + if data_item.conversion_failed_with_errors: + self.add_fact( + ModelIsNotTFLiteCompatible( + custom_ops=data_item.required_custom_ops, + flex_ops=data_item.required_flex_ops, + ) + ) + + if data_item.check_failed_with_unknown_error: + self.add_fact(TFLiteCompatibilityCheckFailed()) + + if data_item.conversion_failed_for_model_with_custom_ops: + self.add_fact(ModelHasCustomOperators()) + + +@dataclass +class CortexACompatibility(Fact): + """Base class for Cortex-A compatibility providing backend info.""" + + backend_info: str + + +@dataclass +class ModelIsCortexACompatible(CortexACompatibility): + """Model is completely compatible with Cortex-A.""" + + +@dataclass +class ModelIsNotCortexACompatible(CortexACompatibility): + """Model is not compatible with Cortex-A.""" + + @dataclass + class ActivationFunctionSupport: + """Activation function support per operator.""" + + used_unsupported: set[str] = field(default_factory=set) + supported: set[str] = field(default_factory=set) + + unsupported_ops: set[str] + activation_func_support: dict[str, ActivationFunctionSupport] + + +@dataclass +class ModelIsNotTFLiteCompatible(Fact): + """Model could not be converted into TensorFlow Lite format.""" + + custom_ops: list[str] | None = None + flex_ops: list[str] | None = None + + +@dataclass +class TFLiteCompatibilityCheckFailed(Fact): + """TensorFlow Lite compatibility check failed by unknown reason.""" + + +@dataclass +class ModelHasCustomOperators(Fact): + """Model could not be loaded because it contains custom ops.""" diff --git a/src/mlia/target/cortex_a/data_collection.py b/src/mlia/target/cortex_a/data_collection.py new file mode 100644 index 0000000..3ec63e2 --- /dev/null +++ b/src/mlia/target/cortex_a/data_collection.py @@ -0,0 +1,51 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Data collection module for Cortex-A.""" +from __future__ import annotations + +import logging +from pathlib import Path + +from mlia.core.data_collection import ContextAwareDataCollector +from mlia.nn.tensorflow.config import get_tflite_model +from mlia.nn.tensorflow.tflite_compat import TFLiteChecker +from mlia.nn.tensorflow.tflite_compat import TFLiteCompatibilityInfo +from mlia.nn.tensorflow.utils import is_tflite_model +from mlia.target.cortex_a.operators import CortexACompatibilityInfo +from mlia.target.cortex_a.operators import get_cortex_a_compatibility_info +from mlia.utils.logging import log_action + + +logger = logging.getLogger(__name__) + + +class CortexAOperatorCompatibility(ContextAwareDataCollector): + """Collect operator compatibility information.""" + + def __init__(self, model: Path) -> None: + """Init operator compatibility data collector.""" + self.model = model + + def collect_data(self) -> TFLiteCompatibilityInfo | CortexACompatibilityInfo | None: + """Collect operator compatibility information.""" + if not is_tflite_model(self.model): + with log_action("Checking TensorFlow Lite compatibility ..."): + tflite_checker = TFLiteChecker() + tflite_compat = tflite_checker.check_compatibility(self.model) + + if not tflite_compat.compatible: + return tflite_compat + + tflite_model = get_tflite_model(self.model, self.context) + + with log_action("Checking operator compatibility ..."): + return ( + get_cortex_a_compatibility_info( # pylint: disable=assignment-from-none + Path(tflite_model.model_path) + ) + ) + + @classmethod + def name(cls) -> str: + """Return name of the collector.""" + return "cortex_a_operator_compatibility" diff --git a/src/mlia/target/cortex_a/events.py b/src/mlia/target/cortex_a/events.py new file mode 100644 index 0000000..a172d0d --- /dev/null +++ b/src/mlia/target/cortex_a/events.py @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Cortex-A MLIA module events.""" +from dataclasses import dataclass +from pathlib import Path + +from mlia.core.events import Event +from mlia.core.events import EventDispatcher +from mlia.target.cortex_a.config import CortexAConfiguration + + +@dataclass +class CortexAAdvisorStartedEvent(Event): + """Event with Cortex-A advisor parameters.""" + + model: Path + device: CortexAConfiguration + + +class CortexAAdvisorEventHandler(EventDispatcher): + """Event handler for the Cortex-A inference advisor.""" + + def on_cortex_a_advisor_started(self, event: CortexAAdvisorStartedEvent) -> None: + """Handle CortexAAdvisorStarted event.""" diff --git a/src/mlia/target/cortex_a/handlers.py b/src/mlia/target/cortex_a/handlers.py new file mode 100644 index 0000000..b2d5faa --- /dev/null +++ b/src/mlia/target/cortex_a/handlers.py @@ -0,0 +1,39 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Event handler.""" +from __future__ import annotations + +import logging + +from mlia.core.events import CollectedDataEvent +from mlia.core.handlers import WorkflowEventsHandler +from mlia.core.typing import PathOrFileLike +from mlia.nn.tensorflow.tflite_compat import TFLiteCompatibilityInfo +from mlia.target.cortex_a.events import CortexAAdvisorEventHandler +from mlia.target.cortex_a.events import CortexAAdvisorStartedEvent +from mlia.target.cortex_a.operators import CortexACompatibilityInfo +from mlia.target.cortex_a.reporters import cortex_a_formatters + +logger = logging.getLogger(__name__) + + +class CortexAEventHandler(WorkflowEventsHandler, CortexAAdvisorEventHandler): + """CLI event handler.""" + + def __init__(self, output: PathOrFileLike | None = None) -> None: + """Init event handler.""" + super().__init__(cortex_a_formatters, output) + + def on_collected_data(self, event: CollectedDataEvent) -> None: + """Handle CollectedDataEvent event.""" + data_item = event.data_item + + if isinstance(data_item, CortexACompatibilityInfo): + self.reporter.submit(data_item.operators, delay_print=True) + + if isinstance(data_item, TFLiteCompatibilityInfo) and not data_item.compatible: + self.reporter.submit(data_item, delay_print=True) + + def on_cortex_a_advisor_started(self, event: CortexAAdvisorStartedEvent) -> None: + """Handle CortexAAdvisorStarted event.""" + self.reporter.submit(event.device) diff --git a/src/mlia/target/cortex_a/operator_compatibility.py b/src/mlia/target/cortex_a/operator_compatibility.py new file mode 100644 index 0000000..c474e75 --- /dev/null +++ b/src/mlia/target/cortex_a/operator_compatibility.py @@ -0,0 +1,184 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Collection of Cortex-A operator compatibility information.""" +from __future__ import annotations + +from typing import Any + +ARMNN_TFLITE_DELEGATE: dict[str, dict[str, Any]] = { + "metadata": { + "backend": "Arm NN TensorFlow Lite delegate", + "version": "22.08", + }, + # BUILTIN OPERATORS + "builtin_ops": { + "ABS": {}, + "ADD": {}, + "ARG_MAX": {}, + "ARG_MIN": {}, + "AVERAGE_POOL_2D": { + "supported_fused_activation": [ + "RELU", + "RELU6", + "RELU_N1_TO_1", + "SIGMOID", + "TANH", + "NONE", + ] + }, + "BATCH_TO_SPACE_ND": {}, + "CAST": {}, + "CONCATENATION": { + "supported_fused_activation": [ + "RELU", + "RELU6", + "RELU_N1_TO_1", + "SIGMOID", + "TANH", + "NONE", + ] + }, + "CONV_2D": { + "supported_fused_activation": [ + "RELU", + "RELU6", + "RELU_N1_TO_1", + "SIGMOID", + "TANH", + "NONE", + ] + }, + "CONV_3D": { + "supported_fused_activation": [ + "RELU", + "RELU6", + "RELU_N1_TO_1", + "SIGMOID", + "TANH", + "NONE", + ] + }, + "DEPTH_TO_SPACE": {}, + "DEPTHWISE_CONV_2D": { + "supported_fused_activation": [ + "RELU", + "RELU6", + "RELU_N1_TO_1", + "SIGMOID", + "TANH", + "NONE", + ] + }, + "DEQUANTIZE": {}, + "DIV": {}, + "EQUAL": {}, + "ELU": {}, + "EXP": {}, + "EXPAND_DIMS": {}, + "FILL": {}, + "FLOOR": {}, + "FLOOR_DIV": {}, + "FULLY_CONNECTED": { + "supported_fused_activation": [ + "RELU", + "RELU6", + "RELU_N1_TO_1", + "SIGMOID", + "TANH", + "NONE", + ] + }, + "GATHER": {}, + "GATHER_ND": {}, + "GREATER": {}, + "GREATER_EQUAL": {}, + "HARD_SWISH": {}, + "L2_NORMALIZATION": {}, + "L2_POOL_2D": {}, + "LESS": {}, + "LESS_EQUAL": {}, + "LOCAL_RESPONSE_NORMALIZATION": {}, + "LOG": {}, + "LOGICAL_AND": {}, + "LOGICAL_NOT": {}, + "LOGICAL_OR": {}, + "LOGISTIC": {}, + "LOG_SOFTMAX": {}, + "LSTM": {}, + "MAXIMUM": {}, + "MAX_POOL_2D": { + "supported_fused_activation": [ + "RELU", + "RELU6", + "RELU_N1_TO_1", + "SIGMOID", + "TANH", + "NONE", + ] + }, + "MEAN": {}, + "MINIMUM": {}, + "MIRROR_PAD": {}, + "MUL": {}, + "NEG": {}, + "NOT_EQUAL": {}, + "PACK": {}, + "PAD": {}, + "PADV2": {}, + "PRELU": {}, + "QUANTIZE": {}, + "RANK": {}, + "REDUCE_MAX": {}, + "REDUCE_MIN": {}, + "REDUCE_PROD": {}, + "RELU": {}, + "RELU6": {}, + "RELU_N1_TO_1": {}, + "RESHAPE": {}, + "RESIZE_BILINEAR": {}, + "RESIZE_NEAREST_NEIGHBOR": {}, + "RSQRT": {}, + "SHAPE": {}, + "SIN": {}, + "SOFTMAX": {}, + "SPACE_TO_BATCH_ND": {}, + "SPACE_TO_DEPTH": {}, + "SPLIT": {}, + "SPLIT_V": {}, + "SQRT": {}, + "SQUEEZE": {}, + "STRIDED_SLICE": {}, + "SUB": {}, + "SUM": {}, + "TANH": {}, + "TRANSPOSE": {}, + "TRANSPOSE_CONV": {}, + "UNIDIRECTIONAL_SEQUENCE_LSTM": {}, + "UNPACK": {}, + }, + # CUSTOM OPERATORS + "custom_ops": { + "AveragePool3D": { + "supported_fused_activation": [ + "RELU", + "RELU6", + "RELU_N1_TO_1", + "SIGMOID", + "SIGN_BIT", + "TANH", + "NONE", + ] + }, + "MaxPool3D": { + "supported_fused_activation": [ + "RELU", + "RELU6", + "RELU_N1_TO_1", + "SIGMOID", + "SIGN_BIT", + "TANH", + "NONE", + ] + }, + }, +} diff --git a/src/mlia/target/cortex_a/operators.py b/src/mlia/target/cortex_a/operators.py new file mode 100644 index 0000000..91f1886 --- /dev/null +++ b/src/mlia/target/cortex_a/operators.py @@ -0,0 +1,148 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Cortex-A tools module.""" +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Any +from typing import ClassVar + +from mlia.nn.tensorflow.tflite_graph import Op +from mlia.nn.tensorflow.tflite_graph import parse_subgraphs +from mlia.nn.tensorflow.tflite_graph import TFL_ACTIVATION_FUNCTION +from mlia.target.cortex_a.operator_compatibility import ( + ARMNN_TFLITE_DELEGATE as TFLITE_DELEGATE_COMPAT, +) + + +@dataclass +class Operator: + """Cortex-A compatibility information of the operator.""" + + BUILTIN_COMPATIBILITY = TFLITE_DELEGATE_COMPAT["builtin_ops"] + CUSTOM_COMPATIBILITY = TFLITE_DELEGATE_COMPAT["custom_ops"] + + class SupportType(Enum): + """Type of operator support.""" + + COMPATIBLE = "Compatible" + OP_NOT_SUPPORTED = "Operator not supported" + ACTIVATION_NOT_SUPPORTED = "Activation not supported" + + name: str + location: str + support_type: SupportType + activation_func: TFL_ACTIVATION_FUNCTION + custom_name: str | None = None + + @property + def is_cortex_a_compatible(self) -> bool: + """Check if this operator is compatible.""" + return self.support_type == Operator.SupportType.COMPATIBLE + + @property + def full_name(self) -> str: + """Returun the full name including the custom name if applicable.""" + return self.name + (f" - '{self.custom_name}'" if self.custom_name else "") + + @property + def is_custom(self) -> bool: + """Check if this is a custom operator.""" + return bool(self.custom_name) + + @property + def compatibility_data(self) -> dict[str, dict[str, Any]]: + """Get the compatibility data (builtin or custom ops).""" + return ( + Operator.CUSTOM_COMPATIBILITY + if self.is_custom + else Operator.BUILTIN_COMPATIBILITY + ) + + @property + def supported_activation_functions(self) -> list[str]: + """Return a list of fused activation functions supported by this op.""" + op_name = self.custom_name if self.custom_name else self.name + return self.compatibility_data[op_name].get("supported_fused_activation", []) + + @classmethod + def from_tflite_op(cls, tfl_op: Op, location: str) -> Operator: + """Create a new instance from TensorFlow Lite operator and location.""" + support_type = cls._get_support_type(tfl_op) + activation_func = ( + tfl_op.builtin_options["fused_activation_function"] + if ( + tfl_op.builtin_options + and "fused_activation_function" in tfl_op.builtin_options + ) + else TFL_ACTIVATION_FUNCTION.NONE + ) + return Operator( + tfl_op.type, + location, + support_type, + activation_func=activation_func, + custom_name=(tfl_op.custom_type if tfl_op.is_custom else None), + ) + + @staticmethod + def _get_support_type(tfl_op: Op) -> Operator.SupportType: + """Get the support type from the TensorFlow Lite operator.""" + compat_data = ( + Operator.CUSTOM_COMPATIBILITY + if tfl_op.is_custom + else Operator.BUILTIN_COMPATIBILITY + ) + op_type = tfl_op.custom_type if tfl_op.is_custom else tfl_op.type + + if op_type not in compat_data: + return Operator.SupportType.OP_NOT_SUPPORTED + + compat_op = compat_data[op_type] + if "supported_fused_activation" in compat_op: + assert tfl_op.builtin_options + assert "fused_activation_function" in tfl_op.builtin_options + if ( + tfl_op.builtin_options["fused_activation_function"] + not in compat_op["supported_fused_activation"] + ): + return Operator.SupportType.ACTIVATION_NOT_SUPPORTED + + return Operator.SupportType.COMPATIBLE + + +@dataclass +class CortexACompatibilityInfo: + """Model's operators.""" + + cortex_a_compatible: bool + operators: list[Operator] + backend_info: ClassVar[str] = ( + f"{TFLITE_DELEGATE_COMPAT['metadata']['backend']} " + f"{TFLITE_DELEGATE_COMPAT['metadata']['version']}" + ) + + +def get_cortex_a_compatibility_info(model_path: Path) -> CortexACompatibilityInfo: + """Return list of model's operators.""" + model = parse_subgraphs(model_path) + + op_list = [ + Operator.from_tflite_op(oper, f"subgraph:{g_idx},oper:{op_idx}") + for g_idx, g in enumerate(model) + for op_idx, oper in enumerate(g) + ] + all_compatible = all(oper.is_cortex_a_compatible for oper in op_list) + compat_info = CortexACompatibilityInfo(all_compatible, op_list) + + return compat_info + + +def report() -> None: + """Generate supported operators report.""" + raise Exception( + "Generating a supported operators report is not " + "currently supported with Cortex-A target profile." + ) diff --git a/src/mlia/target/cortex_a/reporters.py b/src/mlia/target/cortex_a/reporters.py new file mode 100644 index 0000000..d43d6c3 --- /dev/null +++ b/src/mlia/target/cortex_a/reporters.py @@ -0,0 +1,140 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Reports module.""" +from __future__ import annotations + +from typing import Any +from typing import Callable +from typing import cast + +from mlia.core.advice_generation import Advice +from mlia.core.reporters import report_advice +from mlia.core.reporting import Cell +from mlia.core.reporting import Column +from mlia.core.reporting import Format +from mlia.core.reporting import NestedReport +from mlia.core.reporting import Report +from mlia.core.reporting import ReportItem +from mlia.core.reporting import Table +from mlia.nn.tensorflow.tflite_compat import TFLiteCompatibilityInfo +from mlia.target.cortex_a.config import CortexAConfiguration +from mlia.target.cortex_a.operators import Operator +from mlia.utils.console import style_improvement +from mlia.utils.types import is_list_of + + +def report_device(device: CortexAConfiguration) -> Report: + """Generate report for the device.""" + return NestedReport( + "Device information", + "device", + [ + ReportItem("Target", alias="target", value=device.target), + ], + ) + + +def report_tflite_compatiblity(compat_info: TFLiteCompatibilityInfo) -> Report: + """Generate report for the TensorFlow Lite compatibility information.""" + if compat_info.conversion_errors: + return Table( + [ + Column("#", only_for=["plain_text"]), + Column("Operator", alias="operator"), + Column( + "Operator location", + alias="operator_location", + fmt=Format(wrap_width=25), + ), + Column("Error code", alias="error_code"), + Column( + "Error message", alias="error_message", fmt=Format(wrap_width=25) + ), + ], + [ + ( + index + 1, + err.operator, + ", ".join(err.location), + err.code.name, + err.message, + ) + for index, err in enumerate(compat_info.conversion_errors) + ], + name="TensorFlow Lite conversion errors", + alias="tensorflow_lite_conversion_errors", + ) + + return Table( + columns=[ + Column("Reason", alias="reason"), + Column( + "Exception details", + alias="exception_details", + fmt=Format(wrap_width=40), + ), + ], + rows=[ + ( + "TensorFlow Lite compatibility check failed with exception", + str(compat_info.conversion_exception), + ), + ], + name="TensorFlow Lite compatibility errors", + alias="tflite_compatibility", + ) + + +def report_cortex_a_operators(ops: list[Operator]) -> Report: + """Generate report for the operators.""" + return Table( + [ + Column("#", only_for=["plain_text"]), + Column( + "Operator location", + alias="operator_location", + fmt=Format(wrap_width=30), + ), + Column("Operator name", alias="operator_name", fmt=Format(wrap_width=20)), + Column( + "Arm NN TFLite Delegate compatibility", + alias="cortex_a_compatible", + fmt=Format(wrap_width=40), + ), + ], + [ + ( + index + 1, + op.location, + op.full_name, + Cell( + op.support_type, + Format( + wrap_width=30, + style=style_improvement(op.is_cortex_a_compatible), + str_fmt=lambda v: cast(str, v.value), + ), + ), + ) + for index, op in enumerate(ops) + ], + name="Operators", + alias="operators", + ) + + +def cortex_a_formatters(data: Any) -> Callable[[Any], Report]: + """Find appropriate formatter for the provided data.""" + if is_list_of(data, Advice): + return report_advice + + if isinstance(data, CortexAConfiguration): + return report_device + + if isinstance(data, TFLiteCompatibilityInfo): + return report_tflite_compatiblity + + if is_list_of(data, Operator): + return report_cortex_a_operators + + raise Exception(f"Unable to find appropriate formatter for {data}") diff --git a/src/mlia/target/ethos_u/__init__.py b/src/mlia/target/ethos_u/__init__.py new file mode 100644 index 0000000..503919d --- /dev/null +++ b/src/mlia/target/ethos_u/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Ethos-U target module.""" diff --git a/src/mlia/target/ethos_u/advice_generation.py b/src/mlia/target/ethos_u/advice_generation.py new file mode 100644 index 0000000..edd78fd --- /dev/null +++ b/src/mlia/target/ethos_u/advice_generation.py @@ -0,0 +1,206 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Ethos-U advice generation.""" +from __future__ import annotations + +from functools import singledispatchmethod + +from mlia.core.advice_generation import Advice +from mlia.core.advice_generation import advice_category +from mlia.core.advice_generation import ContextAwareAdviceProducer +from mlia.core.advice_generation import FactBasedAdviceProducer +from mlia.core.common import AdviceCategory +from mlia.core.common import DataItem +from mlia.nn.tensorflow.optimizations.select import OptimizationSettings +from mlia.target.ethos_u.data_analysis import AllOperatorsSupportedOnNPU +from mlia.target.ethos_u.data_analysis import HasCPUOnlyOperators +from mlia.target.ethos_u.data_analysis import HasUnsupportedOnNPUOperators +from mlia.target.ethos_u.data_analysis import OptimizationResults + + +class EthosUAdviceProducer(FactBasedAdviceProducer): + """Ethos-U advice producer.""" + + @singledispatchmethod + def produce_advice(self, data_item: DataItem) -> None: # type: ignore + """Produce advice.""" + + @produce_advice.register + @advice_category(AdviceCategory.OPERATORS, AdviceCategory.ALL) + def handle_cpu_only_ops(self, data_item: HasCPUOnlyOperators) -> None: + """Advice for CPU only operators.""" + cpu_only_ops = ",".join(sorted(set(data_item.cpu_only_ops))) + cpu_only_ops_num = len(data_item.cpu_only_ops) + + self.add_advice( + [ + f"You have at least {cpu_only_ops_num} " + f"operator{'s' if cpu_only_ops_num > 1 else ''} that is CPU " + f"only: {cpu_only_ops}.", + "Using operators that are supported by the NPU will " + "improve performance.", + ] + + self.context.action_resolver.supported_operators_info() + ) + + @produce_advice.register + @advice_category(AdviceCategory.OPERATORS, AdviceCategory.ALL) + def handle_unsupported_operators( + self, data_item: HasUnsupportedOnNPUOperators + ) -> None: + """Advice for the unsupported operators.""" + self.add_advice( + [ + f"You have {data_item.npu_unsupported_ratio*100:.0f}% of operators " + "that cannot be placed on the NPU.", + "For better performance, please review the reasons reported " + "in the table, and adjust the model accordingly " + "where possible.", + ] + ) + + @produce_advice.register + @advice_category(AdviceCategory.OPERATORS, AdviceCategory.ALL) + def handle_all_operators_supported( + self, _data_item: AllOperatorsSupportedOnNPU + ) -> None: + """Advice if all operators supported.""" + self.add_advice( + [ + "You don't have any unsupported operators, your model will " + "run completely on NPU." + ] + + self.context.action_resolver.check_performance() + ) + + @produce_advice.register + @advice_category(AdviceCategory.OPTIMIZATION, AdviceCategory.ALL) + def handle_optimization_results(self, data_item: OptimizationResults) -> None: + """Advice based on optimization results.""" + if not data_item.diffs or len(data_item.diffs) != 1: + return + + optim_details = data_item.diffs[0] + metrics = [ + (metric_name, optim_details.opt_diffs[metric_key]) + for (metric_name, metric_key) in ( + ("DRAM used (KB)", "dram"), + ("SRAM used (KB)", "sram"), + ("On chip flash used (KB)", "on_chip_flash"), + ("Off chip flash used (KB)", "off_chip_flash"), + ("NPU total cycles", "npu_total_cycles"), + ) + if metric_key in optim_details.opt_diffs + and not optim_details.opt_diffs[metric_key].same + ] + + improved = [ + f"- You have achieved {abs(metric_value.diff):.2f}% performance " + f"improvement in {metric_name}" + for metric_name, metric_value in metrics + if metric_value.improved + ] + + degraded = [ + f"- {metric_name} have degraded by {abs(metric_value.diff):.2f}%" + for metric_name, metric_value in metrics + if metric_value.degraded + ] + + opts = ", ".join(str(s) for s in optim_details.opt_type) + messages = [f"With the selected optimization ({opts})", *improved, *degraded] + + if improved: + if next_optimization_target := self.get_next_optimization_targets( + optim_details.opt_type + ): + next_optimization_target_as_str = " and/or ".join( + str(item) for item in next_optimization_target + ) + + messages.append( + "You can try to push the optimization target higher " + f"(e.g. {next_optimization_target_as_str}) " + "to check if those results can be further improved." + ) + messages += self.context.action_resolver.apply_optimizations( + opt_settings=next_optimization_target + ) + + elif degraded: + messages.append( + "The performance seems to have degraded after " + "applying the selected optimizations, " + "try exploring different optimization types/targets." + ) + + self.add_advice(messages) + + self.add_advice( + [ + "The applied tooling techniques have an impact " + "on accuracy. Additional hyperparameter tuning may be required " + "after any optimization." + ] + ) + + @staticmethod + def get_next_optimization_targets( + opt_type: list[OptimizationSettings], + ) -> list[OptimizationSettings]: + """Get next optimization targets.""" + next_targets = (item.next_target() for item in opt_type) + + # filter out targets that have not been changed + valid_targets = [ + next_ + for next_, old in zip(next_targets, opt_type) + if ( + old.optimization_type == "pruning" + and old.optimization_target < next_.optimization_target + ) + or ( + old.optimization_type == "clustering" + and old.optimization_target > next_.optimization_target + ) + ] + return valid_targets + + +class EthosUStaticAdviceProducer(ContextAwareAdviceProducer): + """Advice producer that not depends on input data.""" + + def produce_advice(self, data_item: DataItem) -> None: + """Do not process passed data items.""" + + def get_advice(self) -> Advice | list[Advice]: + """Return predefined advice based on category.""" + advice_per_category = { + AdviceCategory.PERFORMANCE: [ + Advice( + [ + "You can improve the inference time by using only operators " + "that are supported by the NPU.", + ] + + self.context.action_resolver.check_operator_compatibility() + ), + Advice( + [ + "Check if you can improve the performance by applying " + "tooling techniques to your model." + ] + + self.context.action_resolver.apply_optimizations() + ), + ], + AdviceCategory.OPTIMIZATION: [ + Advice( + [ + "For better performance, make sure that all the operators " + "of your final TensorFlow Lite model are supported by the NPU.", + ] + + self.context.action_resolver.operator_compatibility_details() + ) + ], + } + + return advice_per_category.get(self.context.advice_category, []) diff --git a/src/mlia/target/ethos_u/advisor.py b/src/mlia/target/ethos_u/advisor.py new file mode 100644 index 0000000..b9d64ff --- /dev/null +++ b/src/mlia/target/ethos_u/advisor.py @@ -0,0 +1,194 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Ethos-U MLIA module.""" +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from mlia.core.advice_generation import AdviceProducer +from mlia.core.advisor import DefaultInferenceAdvisor +from mlia.core.advisor import InferenceAdvisor +from mlia.core.common import AdviceCategory +from mlia.core.context import Context +from mlia.core.context import ExecutionContext +from mlia.core.data_analysis import DataAnalyzer +from mlia.core.data_collection import DataCollector +from mlia.core.events import Event +from mlia.core.typing import PathOrFileLike +from mlia.nn.tensorflow.utils import is_tflite_model +from mlia.target.ethos_u.advice_generation import EthosUAdviceProducer +from mlia.target.ethos_u.advice_generation import EthosUStaticAdviceProducer +from mlia.target.ethos_u.config import EthosUConfiguration +from mlia.target.ethos_u.config import get_target +from mlia.target.ethos_u.data_analysis import EthosUDataAnalyzer +from mlia.target.ethos_u.data_collection import EthosUOperatorCompatibility +from mlia.target.ethos_u.data_collection import EthosUOptimizationPerformance +from mlia.target.ethos_u.data_collection import EthosUPerformance +from mlia.target.ethos_u.events import EthosUAdvisorStartedEvent +from mlia.target.ethos_u.handlers import EthosUEventHandler +from mlia.utils.types import is_list_of + + +class EthosUInferenceAdvisor(DefaultInferenceAdvisor): + """Ethos-U Inference Advisor.""" + + @classmethod + def name(cls) -> str: + """Return name of the advisor.""" + return "ethos_u_inference_advisor" + + def get_collectors(self, context: Context) -> list[DataCollector]: + """Return list of the data collectors.""" + model = self.get_model(context) + device = self._get_device(context) + backends = self._get_backends(context) + + collectors: list[DataCollector] = [] + + if AdviceCategory.OPERATORS in context.advice_category: + collectors.append(EthosUOperatorCompatibility(model, device)) + + # Performance and optimization are mutually exclusive. + # Decide which one to use (taking into account the model format). + if is_tflite_model(model): + # TensorFlow Lite models do not support optimization (only performance)! + if context.advice_category == AdviceCategory.OPTIMIZATION: + raise Exception( + "Command 'optimization' is not supported for TensorFlow Lite files." + ) + if AdviceCategory.PERFORMANCE in context.advice_category: + collectors.append(EthosUPerformance(model, device, backends)) + else: + # Keras/SavedModel: Prefer optimization + if AdviceCategory.OPTIMIZATION in context.advice_category: + optimization_settings = self._get_optimization_settings(context) + collectors.append( + EthosUOptimizationPerformance( + model, device, optimization_settings, backends + ) + ) + elif AdviceCategory.PERFORMANCE in context.advice_category: + collectors.append(EthosUPerformance(model, device, backends)) + + return collectors + + def get_analyzers(self, context: Context) -> list[DataAnalyzer]: + """Return list of the data analyzers.""" + return [ + EthosUDataAnalyzer(), + ] + + def get_producers(self, context: Context) -> list[AdviceProducer]: + """Return list of the advice producers.""" + return [ + EthosUAdviceProducer(), + EthosUStaticAdviceProducer(), + ] + + def get_events(self, context: Context) -> list[Event]: + """Return list of the startup events.""" + model = self.get_model(context) + device = self._get_device(context) + + return [ + EthosUAdvisorStartedEvent(device=device, model=model), + ] + + def _get_device(self, context: Context) -> EthosUConfiguration: + """Get device.""" + target_profile = self.get_target_profile(context) + + return get_target(target_profile) + + def _get_optimization_settings(self, context: Context) -> list[list[dict]]: + """Get optimization settings.""" + return self.get_parameter( # type: ignore + EthosUOptimizationPerformance.name(), + "optimizations", + expected_type=list, + expected=False, + context=context, + ) + + def _get_backends(self, context: Context) -> list[str] | None: + """Get list of backends.""" + return self.get_parameter( # type: ignore + self.name(), + "backends", + expected_type=list, + expected=False, + context=context, + ) + + +def configure_and_get_ethosu_advisor( + context: ExecutionContext, + target_profile: str, + model: str | Path, + output: PathOrFileLike | None = None, + **extra_args: Any, +) -> InferenceAdvisor: + """Create and configure Ethos-U advisor.""" + if context.event_handlers is None: + context.event_handlers = [EthosUEventHandler(output)] + + if context.config_parameters is None: + context.config_parameters = _get_config_parameters( + model, target_profile, **extra_args + ) + + return EthosUInferenceAdvisor() + + +_DEFAULT_OPTIMIZATION_TARGETS = [ + { + "optimization_type": "pruning", + "optimization_target": 0.5, + "layers_to_optimize": None, + }, + { + "optimization_type": "clustering", + "optimization_target": 32, + "layers_to_optimize": None, + }, +] + + +def _get_config_parameters( + model: str | Path, + target_profile: str, + **extra_args: Any, +) -> dict[str, Any]: + """Get configuration parameters for the advisor.""" + advisor_parameters: dict[str, Any] = { + "ethos_u_inference_advisor": { + "model": model, + "target_profile": target_profile, + }, + } + + # Specifying backends is optional (default is used) + backends = extra_args.get("backends") + if backends is not None: + if not is_list_of(backends, str): + raise Exception("Backends value has wrong format") + + advisor_parameters["ethos_u_inference_advisor"]["backends"] = backends + + optimization_targets = extra_args.get("optimization_targets") + if not optimization_targets: + optimization_targets = _DEFAULT_OPTIMIZATION_TARGETS + + if not is_list_of(optimization_targets, dict): + raise Exception("Optimization targets value has wrong format") + + advisor_parameters.update( + { + "ethos_u_model_optimizations": { + "optimizations": [optimization_targets], + }, + } + ) + + return advisor_parameters diff --git a/src/mlia/target/ethos_u/config.py b/src/mlia/target/ethos_u/config.py new file mode 100644 index 0000000..8d8f481 --- /dev/null +++ b/src/mlia/target/ethos_u/config.py @@ -0,0 +1,90 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Ethos-U configuration.""" +from __future__ import annotations + +import logging +from typing import Any + +from mlia.backend.vela.compiler import resolve_compiler_config +from mlia.backend.vela.compiler import VelaCompilerOptions +from mlia.target.config import IPConfiguration +from mlia.utils.filesystem import get_profile +from mlia.utils.filesystem import get_vela_config + + +logger = logging.getLogger(__name__) + + +class EthosUConfiguration(IPConfiguration): + """Ethos-U configuration.""" + + def __init__(self, target_profile: str) -> None: + """Init Ethos-U target configuration.""" + target_data = get_profile(target_profile) + _check_target_data_complete(target_data) + + target = target_data["target"] + super().__init__(target) + + mac = target_data["mac"] + _check_device_options_valid(target, mac) + + self.mac = mac + self.compiler_options = VelaCompilerOptions( + system_config=target_data["system_config"], + memory_mode=target_data["memory_mode"], + config_files=str(get_vela_config()), + accelerator_config=f"{self.target}-{mac}", # type: ignore + ) + + @property + def resolved_compiler_config(self) -> dict[str, Any]: + """Resolve compiler configuration.""" + return resolve_compiler_config(self.compiler_options) + + def __str__(self) -> str: + """Return string representation.""" + return ( + f"Ethos-U target={self.target} " + f"mac={self.mac} " + f"compiler_options={self.compiler_options}" + ) + + def __repr__(self) -> str: + """Return string representation.""" + return f"<Ethos-U configuration target={self.target}>" + + +def get_target(target_profile: str) -> EthosUConfiguration: + """Get target instance based on provided params.""" + if not target_profile: + raise Exception("No target profile given") + + return EthosUConfiguration(target_profile) + + +def _check_target_data_complete(target_data: dict[str, Any]) -> None: + """Check if profile contains all needed data.""" + mandatory_keys = {"target", "mac", "system_config", "memory_mode"} + missing_keys = sorted(mandatory_keys - target_data.keys()) + + if missing_keys: + raise Exception(f"Mandatory fields missing from target profile: {missing_keys}") + + +def _check_device_options_valid(target: str, mac: int) -> None: + """Check if mac is valid for selected device.""" + target_mac_ranges = { + "ethos-u55": [32, 64, 128, 256], + "ethos-u65": [256, 512], + } + + if target not in target_mac_ranges: + raise Exception(f"Unsupported target: {target}") + + target_mac_range = target_mac_ranges[target] + if mac not in target_mac_range: + raise Exception( + f"Mac value for selected device should be in {target_mac_range}" + ) diff --git a/src/mlia/target/ethos_u/data_analysis.py b/src/mlia/target/ethos_u/data_analysis.py new file mode 100644 index 0000000..6b66734 --- /dev/null +++ b/src/mlia/target/ethos_u/data_analysis.py @@ -0,0 +1,153 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Ethos-U data analysis module.""" +from __future__ import annotations + +from dataclasses import dataclass +from functools import singledispatchmethod + +from mlia.backend.vela.compat import Operators +from mlia.core.common import DataItem +from mlia.core.data_analysis import Fact +from mlia.core.data_analysis import FactExtractor +from mlia.nn.tensorflow.optimizations.select import OptimizationSettings +from mlia.target.ethos_u.performance import OptimizationPerformanceMetrics + + +@dataclass +class HasCPUOnlyOperators(Fact): + """Model has CPU only operators.""" + + cpu_only_ops: list[str] + + +@dataclass +class HasUnsupportedOnNPUOperators(Fact): + """Model has unsupported on NPU operators.""" + + npu_unsupported_ratio: float + + +@dataclass +class AllOperatorsSupportedOnNPU(Fact): + """All model's operators supported on NPU.""" + + +@dataclass +class PerfMetricDiff: + """Performance metric difference.""" + + original_value: int | float + optimized_value: int | float + + @property + def diff(self) -> float: + """Difference between metrics.""" + if self.original_value == 0: + return 0 + + return 100 - ((self.optimized_value / self.original_value) * 100) + + @property + def improved(self) -> bool: + """Return true if metric improved.""" + return self.diff > 0 + + @property + def degraded(self) -> bool: + """Return true if metric degraded.""" + return self.diff < 0 + + @property + def same(self) -> bool: + """Return true if metric stays the same.""" + return self.diff == 0 + + +@dataclass +class OptimizationDiff: + """Optimization performance impact.""" + + opt_type: list[OptimizationSettings] + opt_diffs: dict[str, PerfMetricDiff] + + +@dataclass +class OptimizationResults(Fact): + """Optimization results.""" + + diffs: list[OptimizationDiff] + + +class EthosUDataAnalyzer(FactExtractor): + """Ethos-U data analyzer.""" + + @singledispatchmethod + def analyze_data(self, data_item: DataItem) -> None: # type: ignore + """Analyse the data.""" + + @analyze_data.register + def analyze_operator_compatibility(self, operators: Operators) -> None: + """Analyse operator compatibility information.""" + cpu_only = [op.op_type for op in operators.ops if op.cpu_only] + if cpu_only: + self.add_fact(HasCPUOnlyOperators(cpu_only)) + + if operators.npu_unsupported_ratio != 0: + self.add_fact(HasUnsupportedOnNPUOperators(operators.npu_unsupported_ratio)) + + if operators.npu_unsupported_ratio == 0: + self.add_fact(AllOperatorsSupportedOnNPU()) + + @analyze_data.register + def analyze_optimization_results( + self, optimization_results: OptimizationPerformanceMetrics + ) -> None: + """Analyse optimization performance metrics.""" + optimizations = optimization_results.optimizations_perf_metrics + if not optimizations: + return + + orig = optimization_results.original_perf_metrics.in_kilobytes() + orig_memory = orig.memory_usage + orig_cycles = orig.npu_cycles + + diffs: list[OptimizationDiff] = [] + for opt_type, opt_perf_metrics in optimizations: + opt = opt_perf_metrics.in_kilobytes() + opt_memory = opt.memory_usage + opt_cycles = opt.npu_cycles + + opt_diffs: dict[str, PerfMetricDiff] = {} + + if orig_memory and opt_memory: + opt_diffs.update( + { + "sram": PerfMetricDiff( + orig_memory.sram_memory_area_size, + opt_memory.sram_memory_area_size, + ), + "dram": PerfMetricDiff( + orig_memory.dram_memory_area_size, + opt_memory.dram_memory_area_size, + ), + "on_chip_flash": PerfMetricDiff( + orig_memory.on_chip_flash_memory_area_size, + opt_memory.on_chip_flash_memory_area_size, + ), + "off_chip_flash": PerfMetricDiff( + orig_memory.off_chip_flash_memory_area_size, + opt_memory.off_chip_flash_memory_area_size, + ), + } + ) + if orig_cycles and opt_cycles: + opt_diffs["npu_total_cycles"] = PerfMetricDiff( + orig_cycles.npu_total_cycles, + opt_cycles.npu_total_cycles, + ) + + diff = OptimizationDiff(opt_type=opt_type, opt_diffs=opt_diffs) + diffs.append(diff) + + self.add_fact(OptimizationResults(diffs)) diff --git a/src/mlia/target/ethos_u/data_collection.py b/src/mlia/target/ethos_u/data_collection.py new file mode 100644 index 0000000..258876d --- /dev/null +++ b/src/mlia/target/ethos_u/data_collection.py @@ -0,0 +1,187 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Data collection module for Ethos-U.""" +from __future__ import annotations + +import logging +from pathlib import Path + +from mlia.backend.vela.compat import Operators +from mlia.backend.vela.compat import supported_operators +from mlia.core.context import Context +from mlia.core.data_collection import ContextAwareDataCollector +from mlia.core.errors import FunctionalityNotSupportedError +from mlia.core.performance import estimate_performance +from mlia.nn.tensorflow.config import get_keras_model +from mlia.nn.tensorflow.config import get_tflite_model +from mlia.nn.tensorflow.config import KerasModel +from mlia.nn.tensorflow.optimizations.select import get_optimizer +from mlia.nn.tensorflow.optimizations.select import OptimizationSettings +from mlia.nn.tensorflow.utils import save_keras_model +from mlia.target.ethos_u.config import EthosUConfiguration +from mlia.target.ethos_u.performance import EthosUPerformanceEstimator +from mlia.target.ethos_u.performance import OptimizationPerformanceMetrics +from mlia.target.ethos_u.performance import PerformanceMetrics +from mlia.utils.logging import log_action +from mlia.utils.types import is_list_of + +logger = logging.getLogger(__name__) + + +class EthosUOperatorCompatibility(ContextAwareDataCollector): + """Collect operator compatibility information.""" + + def __init__(self, model: Path, device: EthosUConfiguration) -> None: + """Init operator compatibility data collector.""" + self.model = model + self.device = device + + def collect_data(self) -> Operators: + """Collect operator compatibility information.""" + tflite_model = get_tflite_model(self.model, self.context) + + with log_action("Checking operator compatibility ..."): + return supported_operators( + Path(tflite_model.model_path), self.device.compiler_options + ) + + @classmethod + def name(cls) -> str: + """Return name of the collector.""" + return "ethos_u_operator_compatibility" + + +class EthosUPerformance(ContextAwareDataCollector): + """Collect performance metrics.""" + + def __init__( + self, + model: Path, + device: EthosUConfiguration, + backends: list[str] | None = None, + ) -> None: + """Init performance data collector.""" + self.model = model + self.device = device + self.backends = backends + + def collect_data(self) -> PerformanceMetrics: + """Collect model performance metrics.""" + tflite_model = get_tflite_model(self.model, self.context) + estimator = EthosUPerformanceEstimator( + self.context, + self.device, + self.backends, + ) + + return estimator.estimate(tflite_model) + + @classmethod + def name(cls) -> str: + """Return name of the collector.""" + return "ethos_u_performance" + + +class OptimizeModel: + """Helper class for model optimization.""" + + def __init__( + self, context: Context, opt_settings: list[OptimizationSettings] + ) -> None: + """Init helper.""" + self.context = context + self.opt_settings = opt_settings + + def __call__(self, keras_model: KerasModel) -> KerasModel: + """Run optimization.""" + optimizer = get_optimizer(keras_model, self.opt_settings) + + opts_as_str = ", ".join(str(opt) for opt in self.opt_settings) + logger.info("Applying model optimizations - [%s]", opts_as_str) + optimizer.apply_optimization() + + model = optimizer.get_model() + model_path = self.context.get_model_path("optimized_model.h5") + save_keras_model(model, model_path) + + return KerasModel(model_path) + + +class EthosUOptimizationPerformance(ContextAwareDataCollector): + """Collect performance metrics for the optimizations.""" + + def __init__( + self, + model: Path, + device: EthosUConfiguration, + optimizations: list[list[dict]], + backends: list[str] | None = None, + ) -> None: + """Init performance optimizations data collector.""" + self.model = model + self.device = device + self.optimizations = optimizations + self.backends = backends + + def collect_data(self) -> OptimizationPerformanceMetrics | None: + """Collect performance metrics for the optimizations.""" + logger.info("Estimate performance ...") + + if not self.optimizations: + raise FunctionalityNotSupportedError( + reason="Unable to estimate model optimizations impact", + description="No optimization targets provided", + ) + + opt_settings = self._parse_optimization_params(self.optimizations) + + try: + keras_model = get_keras_model(self.model, self.context) + except NotImplementedError as err: + raise FunctionalityNotSupportedError( + reason="Unable to run model optimizations", + description=f"{self.model} is not a Keras model and " + "could not be converted to a Keras model", + ) from err + + optimizers = [OptimizeModel(self.context, opts) for opts in opt_settings] + + estimator = EthosUPerformanceEstimator( + self.context, + self.device, + self.backends, + ) + original_metrics, *optimized_metrics = estimate_performance( + keras_model, estimator, optimizers # type: ignore + ) + + result = OptimizationPerformanceMetrics( + original_perf_metrics=original_metrics, + optimizations_perf_metrics=list(zip(opt_settings, optimized_metrics)), + ) + return result + + @staticmethod + def _parse_optimization_params( + optimizations: list[list[dict]], + ) -> list[list[OptimizationSettings]]: + """Parse optimization parameters.""" + if not is_list_of(optimizations, list): + raise Exception("Optimization parameters expected to be a list") + + return [ + [ + OptimizationSettings( + item.get("optimization_type"), # type: ignore + item.get("optimization_target"), # type: ignore + item.get("layers_to_optimized"), + ) + for item in opt_configuration + ] + for opt_configuration in optimizations + ] + + @classmethod + def name(cls) -> str: + """Return name of the collector.""" + return "ethos_u_model_optimizations" diff --git a/src/mlia/target/ethos_u/events.py b/src/mlia/target/ethos_u/events.py new file mode 100644 index 0000000..37cc1a9 --- /dev/null +++ b/src/mlia/target/ethos_u/events.py @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Ethos-U MLIA module events.""" +from dataclasses import dataclass +from pathlib import Path + +from mlia.core.events import Event +from mlia.core.events import EventDispatcher +from mlia.target.ethos_u.config import EthosUConfiguration + + +@dataclass +class EthosUAdvisorStartedEvent(Event): + """Event with Ethos-U advisor parameters.""" + + model: Path + device: EthosUConfiguration + + +class EthosUAdvisorEventHandler(EventDispatcher): + """Event handler for the Ethos-U inference advisor.""" + + def on_ethos_u_advisor_started(self, event: EthosUAdvisorStartedEvent) -> None: + """Handle EthosUAdvisorStarted event.""" diff --git a/src/mlia/target/ethos_u/handlers.py b/src/mlia/target/ethos_u/handlers.py new file mode 100644 index 0000000..84a9554 --- /dev/null +++ b/src/mlia/target/ethos_u/handlers.py @@ -0,0 +1,55 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Event handler.""" +from __future__ import annotations + +import logging + +from mlia.backend.vela.compat import Operators +from mlia.core.events import CollectedDataEvent +from mlia.core.handlers import WorkflowEventsHandler +from mlia.core.typing import PathOrFileLike +from mlia.target.ethos_u.events import EthosUAdvisorEventHandler +from mlia.target.ethos_u.events import EthosUAdvisorStartedEvent +from mlia.target.ethos_u.performance import OptimizationPerformanceMetrics +from mlia.target.ethos_u.performance import PerformanceMetrics +from mlia.target.ethos_u.reporters import ethos_u_formatters + +logger = logging.getLogger(__name__) + + +class EthosUEventHandler(WorkflowEventsHandler, EthosUAdvisorEventHandler): + """CLI event handler.""" + + def __init__(self, output: PathOrFileLike | None = None) -> None: + """Init event handler.""" + super().__init__(ethos_u_formatters, output) + + def on_collected_data(self, event: CollectedDataEvent) -> None: + """Handle CollectedDataEvent event.""" + data_item = event.data_item + + if isinstance(data_item, Operators): + self.reporter.submit([data_item.ops, data_item], delay_print=True) + + if isinstance(data_item, PerformanceMetrics): + self.reporter.submit(data_item, delay_print=True, space=True) + + if isinstance(data_item, OptimizationPerformanceMetrics): + original_metrics = data_item.original_perf_metrics + if not data_item.optimizations_perf_metrics: + return + + _opt_settings, optimized_metrics = data_item.optimizations_perf_metrics[0] + + self.reporter.submit( + [original_metrics, optimized_metrics], + delay_print=True, + columns_name="Metrics", + title="Performance metrics", + space=True, + ) + + def on_ethos_u_advisor_started(self, event: EthosUAdvisorStartedEvent) -> None: + """Handle EthosUAdvisorStarted event.""" + self.reporter.submit(event.device) diff --git a/src/mlia/target/ethos_u/operators.py b/src/mlia/target/ethos_u/operators.py new file mode 100644 index 0000000..97c2b17 --- /dev/null +++ b/src/mlia/target/ethos_u/operators.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Operators module.""" +import logging + +from mlia.backend.vela.compat import generate_supported_operators_report + + +logger = logging.getLogger(__name__) + + +def report() -> None: + """Generate supported operators report.""" + generate_supported_operators_report() diff --git a/src/mlia/target/ethos_u/performance.py b/src/mlia/target/ethos_u/performance.py new file mode 100644 index 0000000..e39f4d9 --- /dev/null +++ b/src/mlia/target/ethos_u/performance.py @@ -0,0 +1,261 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Performance estimation.""" +from __future__ import annotations + +import logging +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Union + +import mlia.backend.vela.compiler as vela_comp +import mlia.backend.vela.performance as vela_perf +from mlia.backend.corstone.performance import DeviceInfo +from mlia.backend.corstone.performance import estimate_performance +from mlia.backend.corstone.performance import ModelInfo +from mlia.backend.install import is_supported +from mlia.backend.install import supported_backends +from mlia.core.context import Context +from mlia.core.performance import PerformanceEstimator +from mlia.nn.tensorflow.config import get_tflite_model +from mlia.nn.tensorflow.config import ModelConfiguration +from mlia.nn.tensorflow.optimizations.select import OptimizationSettings +from mlia.target.ethos_u.config import EthosUConfiguration +from mlia.utils.logging import log_action + + +logger = logging.getLogger(__name__) + + +@dataclass +class NPUCycles: + """NPU cycles metrics.""" + + npu_active_cycles: int + npu_idle_cycles: int + npu_total_cycles: int + npu_axi0_rd_data_beat_received: int + npu_axi0_wr_data_beat_written: int + npu_axi1_rd_data_beat_received: int + + +BYTES_PER_KILOBYTE = 1024 + + +class MemorySizeType(Enum): + """Memory size type enumeration.""" + + BYTES = 0 + KILOBYTES = 1 + + +@dataclass +class MemoryUsage: + """Memory usage metrics.""" + + sram_memory_area_size: int | float + dram_memory_area_size: int | float + unknown_memory_area_size: int | float + on_chip_flash_memory_area_size: int | float + off_chip_flash_memory_area_size: int | float + memory_size_type: MemorySizeType = MemorySizeType.BYTES + + _default_columns = [ + "SRAM used", + "DRAM used", + "Unknown memory used", + "On chip flash used", + "Off chip flash used", + ] + + def in_kilobytes(self) -> MemoryUsage: + """Return memory usage with values in kilobytes.""" + if self.memory_size_type == MemorySizeType.KILOBYTES: + return self + + kilobytes = [ + value / BYTES_PER_KILOBYTE + for value in [ + self.sram_memory_area_size, + self.dram_memory_area_size, + self.unknown_memory_area_size, + self.on_chip_flash_memory_area_size, + self.off_chip_flash_memory_area_size, + ] + ] + + return MemoryUsage( + *kilobytes, # type: ignore + memory_size_type=MemorySizeType.KILOBYTES, + ) + + +@dataclass +class PerformanceMetrics: + """Performance metrics.""" + + device: EthosUConfiguration + npu_cycles: NPUCycles | None + memory_usage: MemoryUsage | None + + def in_kilobytes(self) -> PerformanceMetrics: + """Return metrics with memory usage in KiB.""" + if self.memory_usage is None: + return PerformanceMetrics(self.device, self.npu_cycles, self.memory_usage) + + return PerformanceMetrics( + self.device, self.npu_cycles, self.memory_usage.in_kilobytes() + ) + + +@dataclass +class OptimizationPerformanceMetrics: + """Optimization performance metrics.""" + + original_perf_metrics: PerformanceMetrics + optimizations_perf_metrics: list[ + tuple[list[OptimizationSettings], PerformanceMetrics] + ] + + +class VelaPerformanceEstimator( + PerformanceEstimator[Union[Path, ModelConfiguration], MemoryUsage] +): + """Vela based performance estimator.""" + + def __init__(self, context: Context, device: EthosUConfiguration) -> None: + """Init Vela based performance estimator.""" + self.context = context + self.device = device + + def estimate(self, model: Path | ModelConfiguration) -> MemoryUsage: + """Estimate performance.""" + with log_action("Getting the memory usage metrics ..."): + model_path = ( + Path(model.model_path) + if isinstance(model, ModelConfiguration) + else model + ) + + vela_perf_metrics = vela_perf.estimate_performance( + model_path, self.device.compiler_options + ) + + return MemoryUsage( + vela_perf_metrics.sram_memory_area_size, + vela_perf_metrics.dram_memory_area_size, + vela_perf_metrics.unknown_memory_area_size, + vela_perf_metrics.on_chip_flash_memory_area_size, + vela_perf_metrics.off_chip_flash_memory_area_size, + ) + + +class CorstonePerformanceEstimator( + PerformanceEstimator[Union[Path, ModelConfiguration], NPUCycles] +): + """Corstone-based performance estimator.""" + + def __init__( + self, context: Context, device: EthosUConfiguration, backend: str + ) -> None: + """Init Corstone-based performance estimator.""" + self.context = context + self.device = device + self.backend = backend + + def estimate(self, model: Path | ModelConfiguration) -> NPUCycles: + """Estimate performance.""" + with log_action(f"Getting the performance metrics for '{self.backend}' ..."): + logger.info( + "WARNING: This task may require several minutes " + "(press ctrl-c to interrupt)" + ) + + model_path = ( + Path(model.model_path) + if isinstance(model, ModelConfiguration) + else model + ) + + optimized_model_path = self.context.get_model_path( + f"{model_path.stem}_vela.tflite" + ) + + vela_comp.optimize_model( + model_path, self.device.compiler_options, optimized_model_path + ) + + model_info = ModelInfo(model_path=optimized_model_path) + device_info = DeviceInfo( + device_type=self.device.target, # type: ignore + mac=self.device.mac, + ) + + corstone_perf_metrics = estimate_performance( + model_info, device_info, self.backend + ) + + return NPUCycles( + corstone_perf_metrics.npu_active_cycles, + corstone_perf_metrics.npu_idle_cycles, + corstone_perf_metrics.npu_total_cycles, + corstone_perf_metrics.npu_axi0_rd_data_beat_received, + corstone_perf_metrics.npu_axi0_wr_data_beat_written, + corstone_perf_metrics.npu_axi1_rd_data_beat_received, + ) + + +class EthosUPerformanceEstimator( + PerformanceEstimator[Union[Path, ModelConfiguration], PerformanceMetrics] +): + """Ethos-U performance estimator.""" + + def __init__( + self, + context: Context, + device: EthosUConfiguration, + backends: list[str] | None = None, + ) -> None: + """Init performance estimator.""" + self.context = context + self.device = device + if backends is None: + backends = ["Vela"] # Only Vela is always available as default + for backend in backends: + if backend != "Vela" and not is_supported(backend): + raise ValueError( + f"Unsupported backend '{backend}'. " + f"Only 'Vela' and {supported_backends()} " + "are supported." + ) + self.backends = set(backends) + + def estimate(self, model: Path | ModelConfiguration) -> PerformanceMetrics: + """Estimate performance.""" + model_path = ( + Path(model.model_path) if isinstance(model, ModelConfiguration) else model + ) + + tflite_model = get_tflite_model(model_path, self.context) + + memory_usage = None + npu_cycles = None + + for backend in self.backends: + if backend == "Vela": + vela_estimator = VelaPerformanceEstimator(self.context, self.device) + memory_usage = vela_estimator.estimate(tflite_model) + elif backend in supported_backends(): + corstone_estimator = CorstonePerformanceEstimator( + self.context, self.device, backend + ) + npu_cycles = corstone_estimator.estimate(tflite_model) + else: + logger.warning( + "Backend '%s' is not supported for Ethos-U performance " + "estimation.", + backend, + ) + + return PerformanceMetrics(self.device, npu_cycles, memory_usage) diff --git a/src/mlia/target/ethos_u/reporters.py b/src/mlia/target/ethos_u/reporters.py new file mode 100644 index 0000000..dbc6f4a --- /dev/null +++ b/src/mlia/target/ethos_u/reporters.py @@ -0,0 +1,385 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Reports module.""" +from __future__ import annotations + +from collections import defaultdict +from typing import Any +from typing import Callable + +from mlia.backend.vela.compat import Operator +from mlia.backend.vela.compat import Operators +from mlia.core.advice_generation import Advice +from mlia.core.reporters import report_advice +from mlia.core.reporting import BytesCell +from mlia.core.reporting import Cell +from mlia.core.reporting import ClockCell +from mlia.core.reporting import Column +from mlia.core.reporting import CompoundFormatter +from mlia.core.reporting import CyclesCell +from mlia.core.reporting import Format +from mlia.core.reporting import NestedReport +from mlia.core.reporting import Report +from mlia.core.reporting import ReportItem +from mlia.core.reporting import SingleRow +from mlia.core.reporting import Table +from mlia.target.ethos_u.config import EthosUConfiguration +from mlia.target.ethos_u.performance import PerformanceMetrics +from mlia.utils.console import style_improvement +from mlia.utils.types import is_list_of + + +def report_operators_stat(operators: Operators) -> Report: + """Return table representation for the ops stats.""" + columns = [ + Column("Number of operators", alias="num_of_operators"), + Column("Number of NPU supported operators", "num_of_npu_supported_operators"), + Column("Unsupported ops ratio", "npu_unsupported_ratio"), + ] + rows = [ + ( + operators.total_number, + operators.npu_supported_number, + Cell( + operators.npu_unsupported_ratio * 100, + fmt=Format(str_fmt="{:.0f}%".format), + ), + ) + ] + + return SingleRow( + columns, rows, name="Operators statistics", alias="operators_stats" + ) + + +def report_operators(ops: list[Operator]) -> Report: + """Return table representation for the list of operators.""" + columns = [ + Column("#", only_for=["plain_text"]), + Column( + "Operator name", + alias="operator_name", + fmt=Format(wrap_width=30), + ), + Column( + "Operator type", + alias="operator_type", + fmt=Format(wrap_width=25), + ), + Column( + "Placement", + alias="placement", + fmt=Format(wrap_width=20), + ), + Column( + "Notes", + alias="notes", + fmt=Format(wrap_width=35), + ), + ] + + rows = [ + ( + i + 1, + op.name, + op.op_type, + Cell( + "NPU" if (npu := op.run_on_npu.supported) else "CPU", + Format(style=style_improvement(npu)), + ), + Table( + columns=[ + Column( + "Note", + alias="note", + fmt=Format(wrap_width=35), + ) + ], + rows=[ + (Cell(item, Format(str_fmt=lambda x: f"* {x}")),) + for reason in op.run_on_npu.reasons + for item in reason + if item + ], + name="Notes", + ), + ) + for i, op in enumerate(ops) + ] + + return Table(columns, rows, name="Operators", alias="operators") + + +def report_device_details(device: EthosUConfiguration) -> Report: + """Return table representation for the device.""" + compiler_config = device.resolved_compiler_config + + memory_settings = [ + ReportItem( + "Const mem area", + "const_mem_area", + compiler_config["const_mem_area"], + ), + ReportItem( + "Arena mem area", + "arena_mem_area", + compiler_config["arena_mem_area"], + ), + ReportItem( + "Cache mem area", + "cache_mem_area", + compiler_config["cache_mem_area"], + ), + ReportItem( + "Arena cache size", + "arena_cache_size", + BytesCell(compiler_config["arena_cache_size"]), + ), + ] + + mem_areas_settings = [ + ReportItem( + f"{mem_area_name}", + mem_area_name, + None, + nested_items=[ + ReportItem( + "Clock scales", + "clock_scales", + mem_area_settings["clock_scales"], + ), + ReportItem( + "Burst length", + "burst_length", + BytesCell(mem_area_settings["burst_length"]), + ), + ReportItem( + "Read latency", + "read_latency", + CyclesCell(mem_area_settings["read_latency"]), + ), + ReportItem( + "Write latency", + "write_latency", + CyclesCell(mem_area_settings["write_latency"]), + ), + ], + ) + for mem_area_name, mem_area_settings in compiler_config["memory_area"].items() + ] + + system_settings = [ + ReportItem( + "Accelerator clock", + "accelerator_clock", + ClockCell(compiler_config["core_clock"]), + ), + ReportItem( + "AXI0 port", + "axi0_port", + compiler_config["axi0_port"], + ), + ReportItem( + "AXI1 port", + "axi1_port", + compiler_config["axi1_port"], + ), + ReportItem( + "Memory area settings", "memory_area", None, nested_items=mem_areas_settings + ), + ] + + arch_settings = [ + ReportItem( + "Permanent storage mem area", + "permanent_storage_mem_area", + compiler_config["permanent_storage_mem_area"], + ), + ReportItem( + "Feature map storage mem area", + "feature_map_storage_mem_area", + compiler_config["feature_map_storage_mem_area"], + ), + ReportItem( + "Fast storage mem area", + "fast_storage_mem_area", + compiler_config["fast_storage_mem_area"], + ), + ] + + return NestedReport( + "Device information", + "device", + [ + ReportItem("Target", alias="target", value=device.target), + ReportItem("MAC", alias="mac", value=device.mac), + ReportItem( + "Memory mode", + alias="memory_mode", + value=compiler_config["memory_mode"], + nested_items=memory_settings, + ), + ReportItem( + "System config", + alias="system_config", + value=compiler_config["system_config"], + nested_items=system_settings, + ), + ReportItem( + "Architecture settings", + "arch_settings", + None, + nested_items=arch_settings, + ), + ], + ) + + +def metrics_as_records(perf_metrics: list[PerformanceMetrics]) -> list[tuple]: + """Convert perf metrics object into list of records.""" + perf_metrics = [item.in_kilobytes() for item in perf_metrics] + + def _cycles_as_records(perf_metrics: list[PerformanceMetrics]) -> list[tuple]: + metric_map = defaultdict(list) + for metrics in perf_metrics: + if not metrics.npu_cycles: + return [] + metric_map["NPU active cycles"].append(metrics.npu_cycles.npu_active_cycles) + metric_map["NPU idle cycles"].append(metrics.npu_cycles.npu_idle_cycles) + metric_map["NPU total cycles"].append(metrics.npu_cycles.npu_total_cycles) + + return [ + (name, *(Cell(value, Format(str_fmt="12,d")) for value in values), "cycles") + for name, values in metric_map.items() + ] + + def _memory_usage_as_records(perf_metrics: list[PerformanceMetrics]) -> list[tuple]: + metric_map = defaultdict(list) + for metrics in perf_metrics: + if not metrics.memory_usage: + return [] + metric_map["SRAM used"].append(metrics.memory_usage.sram_memory_area_size) + metric_map["DRAM used"].append(metrics.memory_usage.dram_memory_area_size) + metric_map["Unknown memory area used"].append( + metrics.memory_usage.unknown_memory_area_size + ) + metric_map["On-chip flash used"].append( + metrics.memory_usage.on_chip_flash_memory_area_size + ) + metric_map["Off-chip flash used"].append( + metrics.memory_usage.off_chip_flash_memory_area_size + ) + + return [ + (name, *(Cell(value, Format(str_fmt="12.2f")) for value in values), "KiB") + for name, values in metric_map.items() + if all(val > 0 for val in values) + ] + + def _data_beats_as_records(perf_metrics: list[PerformanceMetrics]) -> list[tuple]: + metric_map = defaultdict(list) + for metrics in perf_metrics: + if not metrics.npu_cycles: + return [] + metric_map["NPU AXI0 RD data beat received"].append( + metrics.npu_cycles.npu_axi0_rd_data_beat_received + ) + metric_map["NPU AXI0 WR data beat written"].append( + metrics.npu_cycles.npu_axi0_wr_data_beat_written + ) + metric_map["NPU AXI1 RD data beat received"].append( + metrics.npu_cycles.npu_axi1_rd_data_beat_received + ) + + return [ + (name, *(Cell(value, Format(str_fmt="12,d")) for value in values), "beats") + for name, values in metric_map.items() + ] + + return [ + metrics + for metrics_func in ( + _memory_usage_as_records, + _cycles_as_records, + _data_beats_as_records, + ) + for metrics in metrics_func(perf_metrics) + ] + + +def report_perf_metrics( + perf_metrics: PerformanceMetrics | list[PerformanceMetrics], +) -> Report: + """Return comparison table for the performance metrics.""" + if isinstance(perf_metrics, PerformanceMetrics): + perf_metrics = [perf_metrics] + + rows = metrics_as_records(perf_metrics) + + if len(perf_metrics) == 2: + return Table( + columns=[ + Column("Metric", alias="metric", fmt=Format(wrap_width=30)), + Column("Original", alias="original", fmt=Format(wrap_width=15)), + Column("Optimized", alias="optimized", fmt=Format(wrap_width=15)), + Column("Unit", alias="unit", fmt=Format(wrap_width=15)), + Column("Improvement (%)", alias="improvement"), + ], + rows=[ + ( + metric, + original_value, + optimized_value, + unit, + Cell( + ( + diff := 100 + - (optimized_value.value / original_value.value * 100) + ), + Format(str_fmt="15.2f", style=style_improvement(diff > 0)), + ) + if original_value.value != 0 + else None, + ) + for metric, original_value, optimized_value, unit in rows + ], + name="Performance metrics", + alias="performance_metrics", + notes="IMPORTANT: The performance figures above refer to NPU only", + ) + + return Table( + columns=[ + Column("Metric", alias="metric", fmt=Format(wrap_width=30)), + Column("Value", alias="value", fmt=Format(wrap_width=15)), + Column("Unit", alias="unit", fmt=Format(wrap_width=15)), + ], + rows=rows, + name="Performance metrics", + alias="performance_metrics", + notes="IMPORTANT: The performance figures above refer to NPU only", + ) + + +def ethos_u_formatters(data: Any) -> Callable[[Any], Report]: + """Find appropriate formatter for the provided data.""" + if isinstance(data, PerformanceMetrics) or is_list_of(data, PerformanceMetrics, 2): + return report_perf_metrics + + if is_list_of(data, Advice): + return report_advice + + if is_list_of(data, Operator): + return report_operators + + if isinstance(data, Operators): + return report_operators_stat + + if isinstance(data, EthosUConfiguration): + return report_device_details + + if isinstance(data, (list, tuple)): + formatters = [ethos_u_formatters(item) for item in data] + return CompoundFormatter(formatters) + + raise Exception(f"Unable to find appropriate formatter for {data}") diff --git a/src/mlia/target/tosa/__init__.py b/src/mlia/target/tosa/__init__.py new file mode 100644 index 0000000..762c831 --- /dev/null +++ b/src/mlia/target/tosa/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""TOSA target module.""" diff --git a/src/mlia/target/tosa/advice_generation.py b/src/mlia/target/tosa/advice_generation.py new file mode 100644 index 0000000..f531b84 --- /dev/null +++ b/src/mlia/target/tosa/advice_generation.py @@ -0,0 +1,40 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""TOSA advice generation.""" +from functools import singledispatchmethod + +from mlia.core.advice_generation import advice_category +from mlia.core.advice_generation import FactBasedAdviceProducer +from mlia.core.common import AdviceCategory +from mlia.core.common import DataItem +from mlia.target.tosa.data_analysis import ModelIsNotTOSACompatible +from mlia.target.tosa.data_analysis import ModelIsTOSACompatible + + +class TOSAAdviceProducer(FactBasedAdviceProducer): + """TOSA advice producer.""" + + @singledispatchmethod + def produce_advice(self, _data_item: DataItem) -> None: # type: ignore + """Produce advice.""" + + @produce_advice.register + @advice_category(AdviceCategory.ALL, AdviceCategory.OPERATORS) + def handle_model_is_tosa_compatible( + self, _data_item: ModelIsTOSACompatible + ) -> None: + """Advice for TOSA compatibility.""" + self.add_advice(["Model is fully TOSA compatible."]) + + @produce_advice.register + @advice_category(AdviceCategory.ALL, AdviceCategory.OPERATORS) + def handle_model_is_not_tosa_compatible( + self, _data_item: ModelIsNotTOSACompatible + ) -> None: + """Advice for TOSA compatibility.""" + self.add_advice( + [ + "Some operators in the model are not TOSA compatible. " + "Please, refer to the operators table for more information." + ] + ) diff --git a/src/mlia/target/tosa/advisor.py b/src/mlia/target/tosa/advisor.py new file mode 100644 index 0000000..2739dfd --- /dev/null +++ b/src/mlia/target/tosa/advisor.py @@ -0,0 +1,94 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""TOSA advisor.""" +from __future__ import annotations + +from pathlib import Path +from typing import Any + +from mlia.core.advice_generation import AdviceCategory +from mlia.core.advice_generation import AdviceProducer +from mlia.core.advisor import DefaultInferenceAdvisor +from mlia.core.advisor import InferenceAdvisor +from mlia.core.context import Context +from mlia.core.context import ExecutionContext +from mlia.core.data_analysis import DataAnalyzer +from mlia.core.data_collection import DataCollector +from mlia.core.events import Event +from mlia.core.typing import PathOrFileLike +from mlia.target.tosa.advice_generation import TOSAAdviceProducer +from mlia.target.tosa.config import TOSAConfiguration +from mlia.target.tosa.data_analysis import TOSADataAnalyzer +from mlia.target.tosa.data_collection import TOSAOperatorCompatibility +from mlia.target.tosa.events import TOSAAdvisorStartedEvent +from mlia.target.tosa.handlers import TOSAEventHandler + + +class TOSAInferenceAdvisor(DefaultInferenceAdvisor): + """TOSA inference advisor.""" + + @classmethod + def name(cls) -> str: + """Return name of the advisor.""" + return "tosa_inference_advisor" + + def get_collectors(self, context: Context) -> list[DataCollector]: + """Return list of the data collectors.""" + model = self.get_model(context) + + collectors: list[DataCollector] = [] + + if AdviceCategory.OPERATORS in context.advice_category: + collectors.append(TOSAOperatorCompatibility(model)) + + return collectors + + def get_analyzers(self, context: Context) -> list[DataAnalyzer]: + """Return list of the data analyzers.""" + return [ + TOSADataAnalyzer(), + ] + + def get_producers(self, context: Context) -> list[AdviceProducer]: + """Return list of the advice producers.""" + return [ + TOSAAdviceProducer(), + ] + + def get_events(self, context: Context) -> list[Event]: + """Return list of the startup events.""" + model = self.get_model(context) + target_profile = self.get_target_profile(context) + + return [ + TOSAAdvisorStartedEvent(model, TOSAConfiguration(target_profile)), + ] + + +def configure_and_get_tosa_advisor( + context: ExecutionContext, + target_profile: str, + model: str | Path, + output: PathOrFileLike | None = None, + **_extra_args: Any, +) -> InferenceAdvisor: + """Create and configure TOSA advisor.""" + if context.event_handlers is None: + context.event_handlers = [TOSAEventHandler(output)] + + if context.config_parameters is None: + context.config_parameters = _get_config_parameters(model, target_profile) + + return TOSAInferenceAdvisor() + + +def _get_config_parameters(model: str | Path, target_profile: str) -> dict[str, Any]: + """Get configuration parameters for the advisor.""" + advisor_parameters: dict[str, Any] = { + "tosa_inference_advisor": { + "model": str(model), + "target_profile": target_profile, + } + } + + return advisor_parameters diff --git a/src/mlia/target/tosa/config.py b/src/mlia/target/tosa/config.py new file mode 100644 index 0000000..22805b7 --- /dev/null +++ b/src/mlia/target/tosa/config.py @@ -0,0 +1,19 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""TOSA target configuration.""" +from mlia.target.config import IPConfiguration +from mlia.utils.filesystem import get_profile + + +class TOSAConfiguration(IPConfiguration): # pylint: disable=too-few-public-methods + """TOSA configuration.""" + + def __init__(self, target_profile: str) -> None: + """Init configuration.""" + target_data = get_profile(target_profile) + target = target_data["target"] + + if target != "tosa": + raise Exception(f"Wrong target {target} for TOSA configuration") + + super().__init__(target) diff --git a/src/mlia/target/tosa/data_analysis.py b/src/mlia/target/tosa/data_analysis.py new file mode 100644 index 0000000..7cbd61d --- /dev/null +++ b/src/mlia/target/tosa/data_analysis.py @@ -0,0 +1,36 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""TOSA data analysis module.""" +from dataclasses import dataclass +from functools import singledispatchmethod + +from mlia.backend.tosa_checker.compat import TOSACompatibilityInfo +from mlia.core.common import DataItem +from mlia.core.data_analysis import Fact +from mlia.core.data_analysis import FactExtractor + + +@dataclass +class ModelIsTOSACompatible(Fact): + """Model is completely TOSA compatible.""" + + +@dataclass +class ModelIsNotTOSACompatible(Fact): + """Model is not TOSA compatible.""" + + +class TOSADataAnalyzer(FactExtractor): + """TOSA data analyzer.""" + + @singledispatchmethod + def analyze_data(self, data_item: DataItem) -> None: # type: ignore + """Analyse the data.""" + + @analyze_data.register + def analyze_tosa_compatibility(self, data_item: TOSACompatibilityInfo) -> None: + """Analyse TOSA compatibility information.""" + if data_item.tosa_compatible: + self.add_fact(ModelIsTOSACompatible()) + else: + self.add_fact(ModelIsNotTOSACompatible()) diff --git a/src/mlia/target/tosa/data_collection.py b/src/mlia/target/tosa/data_collection.py new file mode 100644 index 0000000..105c501 --- /dev/null +++ b/src/mlia/target/tosa/data_collection.py @@ -0,0 +1,30 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""TOSA data collection module.""" +from pathlib import Path + +from mlia.backend.tosa_checker.compat import get_tosa_compatibility_info +from mlia.backend.tosa_checker.compat import TOSACompatibilityInfo +from mlia.core.data_collection import ContextAwareDataCollector +from mlia.nn.tensorflow.config import get_tflite_model +from mlia.utils.logging import log_action + + +class TOSAOperatorCompatibility(ContextAwareDataCollector): + """Collect operator compatibility information.""" + + def __init__(self, model: Path) -> None: + """Init the data collector.""" + self.model = model + + def collect_data(self) -> TOSACompatibilityInfo: + """Collect TOSA compatibility information.""" + tflite_model = get_tflite_model(self.model, self.context) + + with log_action("Checking operator compatibility ..."): + return get_tosa_compatibility_info(tflite_model.model_path) + + @classmethod + def name(cls) -> str: + """Return name of the collector.""" + return "tosa_operator_compatibility" diff --git a/src/mlia/target/tosa/events.py b/src/mlia/target/tosa/events.py new file mode 100644 index 0000000..67d499d --- /dev/null +++ b/src/mlia/target/tosa/events.py @@ -0,0 +1,24 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""TOSA advisor events.""" +from dataclasses import dataclass +from pathlib import Path + +from mlia.core.events import Event +from mlia.core.events import EventDispatcher +from mlia.target.tosa.config import TOSAConfiguration + + +@dataclass +class TOSAAdvisorStartedEvent(Event): + """Event with TOSA advisor parameters.""" + + model: Path + device: TOSAConfiguration + + +class TOSAAdvisorEventHandler(EventDispatcher): + """Event handler for the TOSA inference advisor.""" + + def on_tosa_advisor_started(self, event: TOSAAdvisorStartedEvent) -> None: + """Handle TOSAAdvisorStartedEvent event.""" diff --git a/src/mlia/target/tosa/handlers.py b/src/mlia/target/tosa/handlers.py new file mode 100644 index 0000000..863558c --- /dev/null +++ b/src/mlia/target/tosa/handlers.py @@ -0,0 +1,36 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""TOSA Advisor event handlers.""" +# pylint: disable=R0801 +from __future__ import annotations + +import logging + +from mlia.backend.tosa_checker.compat import TOSACompatibilityInfo +from mlia.core.events import CollectedDataEvent +from mlia.core.handlers import WorkflowEventsHandler +from mlia.core.typing import PathOrFileLike +from mlia.target.tosa.events import TOSAAdvisorEventHandler +from mlia.target.tosa.events import TOSAAdvisorStartedEvent +from mlia.target.tosa.reporters import tosa_formatters + +logger = logging.getLogger(__name__) + + +class TOSAEventHandler(WorkflowEventsHandler, TOSAAdvisorEventHandler): + """Event handler for TOSA advisor.""" + + def __init__(self, output: PathOrFileLike | None = None) -> None: + """Init event handler.""" + super().__init__(tosa_formatters, output) + + def on_tosa_advisor_started(self, event: TOSAAdvisorStartedEvent) -> None: + """Handle TOSAAdvisorStartedEvent event.""" + self.reporter.submit(event.device) + + def on_collected_data(self, event: CollectedDataEvent) -> None: + """Handle CollectedDataEvent event.""" + data_item = event.data_item + + if isinstance(data_item, TOSACompatibilityInfo): + self.reporter.submit(data_item.operators, delay_print=True) diff --git a/src/mlia/target/tosa/operators.py b/src/mlia/target/tosa/operators.py new file mode 100644 index 0000000..b75ceb0 --- /dev/null +++ b/src/mlia/target/tosa/operators.py @@ -0,0 +1,11 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Operators module.""" + + +def report() -> None: + """Generate supported operators report.""" + raise Exception( + "Generating a supported operators report is not " + "currently supported with TOSA target profile." + ) diff --git a/src/mlia/target/tosa/reporters.py b/src/mlia/target/tosa/reporters.py new file mode 100644 index 0000000..01fbb97 --- /dev/null +++ b/src/mlia/target/tosa/reporters.py @@ -0,0 +1,83 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Reports module.""" +from __future__ import annotations + +from typing import Any +from typing import Callable + +from mlia.backend.tosa_checker.compat import Operator +from mlia.core.advice_generation import Advice +from mlia.core.reporters import report_advice +from mlia.core.reporting import Cell +from mlia.core.reporting import Column +from mlia.core.reporting import Format +from mlia.core.reporting import NestedReport +from mlia.core.reporting import Report +from mlia.core.reporting import ReportItem +from mlia.core.reporting import Table +from mlia.target.tosa.config import TOSAConfiguration +from mlia.utils.console import style_improvement +from mlia.utils.types import is_list_of + + +def report_device(device: TOSAConfiguration) -> Report: + """Generate report for the device.""" + return NestedReport( + "Device information", + "device", + [ + ReportItem("Target", alias="target", value=device.target), + ], + ) + + +def report_tosa_operators(ops: list[Operator]) -> Report: + """Generate report for the operators.""" + return Table( + [ + Column("#", only_for=["plain_text"]), + Column( + "Operator location", + alias="operator_location", + fmt=Format(wrap_width=30), + ), + Column("Operator name", alias="operator_name", fmt=Format(wrap_width=20)), + Column( + "TOSA compatibility", + alias="is_tosa_compatible", + fmt=Format(wrap_width=25), + ), + ], + [ + ( + index + 1, + op.location, + op.name, + Cell( + op.is_tosa_compatible, + Format( + style=style_improvement(op.is_tosa_compatible), + str_fmt=lambda v: "Compatible" if v else "Not compatible", + ), + ), + ) + for index, op in enumerate(ops) + ], + name="Operators", + alias="operators", + ) + + +def tosa_formatters(data: Any) -> Callable[[Any], Report]: + """Find appropriate formatter for the provided data.""" + if is_list_of(data, Advice): + return report_advice + + if isinstance(data, TOSAConfiguration): + return report_device + + if is_list_of(data, Operator): + return report_tosa_operators + + raise Exception(f"Unable to find appropriate formatter for {data}") |