diff options
Diffstat (limited to 'src/mlia/target')
-rw-r--r-- | src/mlia/target/common/optimization.py | 219 | ||||
-rw-r--r-- | src/mlia/target/cortex_a/advisor.py | 16 | ||||
-rw-r--r-- | src/mlia/target/ethos_u/advisor.py | 41 | ||||
-rw-r--r-- | src/mlia/target/ethos_u/data_collection.py | 135 | ||||
-rw-r--r-- | src/mlia/target/tosa/advisor.py | 18 |
5 files changed, 263 insertions, 166 deletions
diff --git a/src/mlia/target/common/optimization.py b/src/mlia/target/common/optimization.py new file mode 100644 index 0000000..5f359c5 --- /dev/null +++ b/src/mlia/target/common/optimization.py @@ -0,0 +1,219 @@ +# SPDX-FileCopyrightText: Copyright 2022-2023, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Data collector support for performance optimizations.""" +from __future__ import annotations + +import logging +from abc import abstractmethod +from functools import partial +from pathlib import Path +from typing import Any +from typing import Callable + +from mlia.core.common import DataItem +from mlia.core.context import Context +from mlia.core.data_collection import ContextAwareDataCollector +from mlia.core.errors import FunctionalityNotSupportedError +from mlia.core.performance import estimate_performance +from mlia.core.performance import P +from mlia.core.performance import PerformanceEstimator +from mlia.nn.select import get_optimizer +from mlia.nn.select import OptimizationSettings +from mlia.nn.tensorflow.config import get_keras_model +from mlia.nn.tensorflow.config import KerasModel +from mlia.nn.tensorflow.config import TFLiteModel +from mlia.nn.tensorflow.utils import save_keras_model +from mlia.nn.tensorflow.utils import save_tflite_model +from mlia.target.config import TargetProfile +from mlia.utils.types import is_list_of + +logger = logging.getLogger(__name__) + + +class OptimizingDataCollector(ContextAwareDataCollector): + """Collect performance metrics for the optimizations.""" + + def __init__( + self, + model: Path, + target_config: TargetProfile, + backends: list[str] | None = None, + ) -> None: + """Init performance optimizations data collector.""" + self.model = model + self.target = target_config + self.backends = backends + + def collect_data(self) -> DataItem: + """Collect performance metrics for the optimizations.""" + logger.info("Estimate performance ...") + + optimizations = self._get_optimization_settings(self.context) + + if not optimizations or optimizations == [[]]: + raise FunctionalityNotSupportedError( + reason="No optimization targets provided", + description="Unable to estimate model optimizations impact", + ) + + opt_settings = self._parse_optimization_params(optimizations) + + optimization_types = { + setting.optimization_type for opt in opt_settings for setting in opt + } + + if optimization_types != {"rewrite"}: + try: + model = get_keras_model(self.model, self.context) + except NotImplementedError as err: + raise FunctionalityNotSupportedError( + reason=f"{self.model} is not a Keras model and " + "could not be converted to a Keras model", + description="Unable to run model optimizations", + ) from err + else: + model = self.model # type: ignore + + optimizers: list[Callable] = [ + partial(self.optimize_model, opts) for opts in opt_settings + ] + + return self.optimize_and_estimate_performance(model, optimizers, opt_settings) + + def optimize_model( + self, opt_settings: list[OptimizationSettings], model: KerasModel | TFLiteModel + ) -> Any: + """Run optimization.""" + optimizer = get_optimizer(model, opt_settings) + + opts_as_str = ", ".join(str(opt) for opt in opt_settings) + logger.info("Applying model optimizations - [%s]", opts_as_str) + optimizer.apply_optimization() + + model = optimizer.get_model() # type: ignore + + if isinstance(model, Path): + return model + + if isinstance(model, TFLiteModel): + model_path = self.context.get_model_path("optimized_model.tflite") + with open(model.model_path, "rb") as file_handle: + model_data = bytearray(file_handle.read()) + save_tflite_model(model_data, model_path) + return TFLiteModel(model_path) + + model_path = self.context.get_model_path("optimized_model.h5") + save_keras_model(model, model_path) + return KerasModel(model_path) + + def _get_optimization_settings(self, context: Context) -> list[list[dict]]: + """Get optimization settings.""" + return self.get_parameter( # type: ignore + OptimizingDataCollector.name(), + "optimizations", + expected_type=list, + expected=False, + context=context, + ) + + @staticmethod + def _parse_optimization_params( + optimizations: list[list[dict]], + ) -> list[list[OptimizationSettings]]: + """Parse optimization parameters.""" + if not is_list_of(optimizations, list): + raise TypeError("Optimization parameters expected to be a list.") + + return [ + [ + OptimizationSettings( + item.get("optimization_type"), # type: ignore + item.get("optimization_target"), # type: ignore + item.get("layers_to_optimize"), + item.get("dataset"), + ) + for item in opt_configuration + ] + for opt_configuration in optimizations + ] + + def optimize_and_estimate_performance( + self, + model: KerasModel | Path, + optimizers: list[Callable], + _: list[list[OptimizationSettings]], + ) -> DataItem: + """Run optimizers and estimate perfomance on the results.""" + for optimizer in optimizers: + optimizer(model) + + return {} + + @classmethod + def name(cls) -> str: + """Return name of the collector.""" + return "common_optimizations" + + +class OptimizingPerformaceDataCollector(OptimizingDataCollector): + """Collect performance metrics for the optimizations.""" + + @abstractmethod + def create_estimator(self) -> PerformanceEstimator: + """Create a PerformanceEstimator, to be overridden in subclasses.""" + + @abstractmethod + def create_optimization_performance_metrics( + self, original_metrics: P, optimizations_perf_metrics: list[P] + ) -> Any: + """Create an optimization metrics object.""" + + def optimize_and_estimate_performance( + self, + model: KerasModel | Path, + optimizers: list[Callable], + opt_settings: list[list[OptimizationSettings]], + ) -> Any: + """Run optimizers and estimate perfomance on the results.""" + estimator = self.create_estimator() + + original_metrics, *optimized_metrics = estimate_performance( + model, estimator, optimizers + ) + + return self.create_optimization_performance_metrics( + original_metrics, + list(zip(opt_settings, optimized_metrics)), + ) + + +_DEFAULT_OPTIMIZATION_TARGETS = [ + { + "optimization_type": "pruning", + "optimization_target": 0.5, + "layers_to_optimize": None, + }, + { + "optimization_type": "clustering", + "optimization_target": 32, + "layers_to_optimize": None, + }, +] + + +def add_common_optimization_params(advisor_parameters: dict, extra_args: dict) -> None: + """Add common optimization parameters.""" + optimization_targets = extra_args.get("optimization_targets") + if not optimization_targets: + optimization_targets = _DEFAULT_OPTIMIZATION_TARGETS + + if not is_list_of(optimization_targets, dict): + raise TypeError("Optimization targets value has wrong format.") + + advisor_parameters.update( + { + "common_optimizations": { + "optimizations": [optimization_targets], + }, + } + ) diff --git a/src/mlia/target/cortex_a/advisor.py b/src/mlia/target/cortex_a/advisor.py index db07b96..eb7720a 100644 --- a/src/mlia/target/cortex_a/advisor.py +++ b/src/mlia/target/cortex_a/advisor.py @@ -16,6 +16,8 @@ from mlia.core.context import ExecutionContext from mlia.core.data_analysis import DataAnalyzer from mlia.core.data_collection import DataCollector from mlia.core.events import Event +from mlia.target.common.optimization import add_common_optimization_params +from mlia.target.common.optimization import OptimizingDataCollector from mlia.target.cortex_a.advice_generation import CortexAAdviceProducer from mlia.target.cortex_a.config import CortexAConfiguration from mlia.target.cortex_a.data_analysis import CortexADataAnalyzer @@ -50,9 +52,7 @@ class CortexAInferenceAdvisor(DefaultInferenceAdvisor): ) if context.category_enabled(AdviceCategory.OPTIMIZATION): - raise RuntimeError( - "Model optimizations are currently not supported for Cortex-A." - ) + collectors.append(OptimizingDataCollector(model, target_config)) return collectors @@ -82,20 +82,22 @@ def configure_and_get_cortexa_advisor( context: ExecutionContext, target_profile: str | Path, model: str | Path, - **_extra_args: Any, + **extra_args: Any, ) -> InferenceAdvisor: """Create and configure Cortex-A advisor.""" if context.event_handlers is None: context.event_handlers = [CortexAEventHandler()] if context.config_parameters is None: - context.config_parameters = _get_config_parameters(model, target_profile) + context.config_parameters = _get_config_parameters( + model, target_profile, **extra_args + ) return CortexAInferenceAdvisor() def _get_config_parameters( - model: str | Path, target_profile: str | Path + model: str | Path, target_profile: str | Path, **extra_args: Any ) -> dict[str, Any]: """Get configuration parameters for the advisor.""" advisor_parameters: dict[str, Any] = { @@ -104,5 +106,5 @@ def _get_config_parameters( "target_profile": target_profile, }, } - + add_common_optimization_params(advisor_parameters, extra_args) return advisor_parameters diff --git a/src/mlia/target/ethos_u/advisor.py b/src/mlia/target/ethos_u/advisor.py index 321734c..9f5b3a6 100644 --- a/src/mlia/target/ethos_u/advisor.py +++ b/src/mlia/target/ethos_u/advisor.py @@ -17,6 +17,8 @@ from mlia.core.data_analysis import DataAnalyzer from mlia.core.data_collection import DataCollector from mlia.core.events import Event from mlia.nn.tensorflow.utils import is_tflite_model +from mlia.target.common.optimization import add_common_optimization_params +from mlia.target.common.optimization import OptimizingDataCollector from mlia.target.ethos_u.advice_generation import EthosUAdviceProducer from mlia.target.ethos_u.advice_generation import EthosUStaticAdviceProducer from mlia.target.ethos_u.config import EthosUConfiguration @@ -65,9 +67,7 @@ class EthosUInferenceAdvisor(DefaultInferenceAdvisor): ) collectors.append( - EthosUOptimizationPerformance( - model, target_config, optimization_settings, backends - ) + EthosUOptimizationPerformance(model, target_config, backends) ) if context.category_enabled(AdviceCategory.PERFORMANCE): collectors.append(EthosUPerformance(model, target_config, backends)) @@ -76,9 +76,7 @@ class EthosUInferenceAdvisor(DefaultInferenceAdvisor): if context.category_enabled(AdviceCategory.OPTIMIZATION): optimization_settings = self._get_optimization_settings(context) collectors.append( - EthosUOptimizationPerformance( - model, target_config, optimization_settings, backends - ) + EthosUOptimizationPerformance(model, target_config, backends) ) elif context.category_enabled(AdviceCategory.PERFORMANCE): collectors.append(EthosUPerformance(model, target_config, backends)) @@ -115,7 +113,7 @@ class EthosUInferenceAdvisor(DefaultInferenceAdvisor): def _get_optimization_settings(self, context: Context) -> list[list[dict]]: """Get optimization settings.""" return self.get_parameter( # type: ignore - EthosUOptimizationPerformance.name(), + OptimizingDataCollector.name(), "optimizations", expected_type=list, expected=False, @@ -151,20 +149,6 @@ def configure_and_get_ethosu_advisor( return EthosUInferenceAdvisor() -_DEFAULT_OPTIMIZATION_TARGETS = [ - { - "optimization_type": "pruning", - "optimization_target": 0.5, - "layers_to_optimize": None, - }, - { - "optimization_type": "clustering", - "optimization_target": 32, - "layers_to_optimize": None, - }, -] - - def _get_config_parameters( model: str | Path, target_profile: str | Path, @@ -186,19 +170,6 @@ def _get_config_parameters( advisor_parameters["ethos_u_inference_advisor"]["backends"] = backends - optimization_targets = extra_args.get("optimization_targets") - if not optimization_targets: - optimization_targets = _DEFAULT_OPTIMIZATION_TARGETS - - if not is_list_of(optimization_targets, dict): - raise ValueError("Optimization targets value has wrong format.") - - advisor_parameters.update( - { - "ethos_u_model_optimizations": { - "optimizations": [optimization_targets], - }, - } - ) + add_common_optimization_params(advisor_parameters, extra_args) return advisor_parameters diff --git a/src/mlia/target/ethos_u/data_collection.py b/src/mlia/target/ethos_u/data_collection.py index 4ea6120..2b41d7d 100644 --- a/src/mlia/target/ethos_u/data_collection.py +++ b/src/mlia/target/ethos_u/data_collection.py @@ -6,30 +6,23 @@ from __future__ import annotations import logging from pathlib import Path from typing import Any +from typing import cast from mlia.backend.vela.compat import Operators from mlia.backend.vela.compat import supported_operators -from mlia.core.context import Context from mlia.core.data_collection import ContextAwareDataCollector -from mlia.core.errors import FunctionalityNotSupportedError -from mlia.core.performance import estimate_performance -from mlia.nn.select import get_optimizer -from mlia.nn.select import OptimizationSettings -from mlia.nn.tensorflow.config import get_keras_model +from mlia.core.performance import P +from mlia.core.performance import PerformanceEstimator from mlia.nn.tensorflow.config import get_tflite_model -from mlia.nn.tensorflow.config import KerasModel -from mlia.nn.tensorflow.config import TFLiteModel from mlia.nn.tensorflow.tflite_compat import TFLiteChecker from mlia.nn.tensorflow.tflite_compat import TFLiteCompatibilityInfo from mlia.nn.tensorflow.utils import is_tflite_model -from mlia.nn.tensorflow.utils import save_keras_model -from mlia.nn.tensorflow.utils import save_tflite_model +from mlia.target.common.optimization import OptimizingPerformaceDataCollector from mlia.target.ethos_u.config import EthosUConfiguration from mlia.target.ethos_u.performance import EthosUPerformanceEstimator from mlia.target.ethos_u.performance import OptimizationPerformanceMetrics from mlia.target.ethos_u.performance import PerformanceMetrics from mlia.utils.logging import log_action -from mlia.utils.types import is_list_of logger = logging.getLogger(__name__) @@ -96,118 +89,26 @@ class EthosUPerformance(ContextAwareDataCollector): return "ethos_u_performance" -class OptimizeModel: - """Helper class for model optimization.""" +# pylint: disable=too-many-ancestors +class EthosUOptimizationPerformance(OptimizingPerformaceDataCollector): + """Collect performance metrics for performance optimizations.""" - def __init__( - self, context: Context, opt_settings: list[OptimizationSettings] - ) -> None: - """Init helper.""" - self.context = context - self.opt_settings = opt_settings - - def __call__(self, model: KerasModel | TFLiteModel) -> Any: - """Run optimization.""" - optimizer = get_optimizer(model, self.opt_settings) - - opts_as_str = ", ".join(str(opt) for opt in self.opt_settings) - logger.info("Applying model optimizations - [%s]", opts_as_str) - optimizer.apply_optimization() - model = optimizer.get_model() # type: ignore - - if isinstance(model, Path): - return model - - if isinstance(model, TFLiteModel): - model_path = self.context.get_model_path("optimized_model.tflite") - with open(model.model_path, "rb") as file_handle: - model_data = bytearray(file_handle.read()) - save_tflite_model(model_data, model_path) - return TFLiteModel(model_path) - - model_path = self.context.get_model_path("optimized_model.h5") - save_keras_model(model, model_path) - return KerasModel(model_path) - - -class EthosUOptimizationPerformance(ContextAwareDataCollector): - """Collect performance metrics for the optimizations.""" - - def __init__( - self, - model: Path, - target_config: EthosUConfiguration, - optimizations: list[list[dict]], - backends: list[str] | None = None, - ) -> None: - """Init performance optimizations data collector.""" - self.model = model - self.target = target_config - self.optimizations = optimizations - self.backends = backends - - def collect_data(self) -> OptimizationPerformanceMetrics | None: - """Collect performance metrics for the optimizations.""" - logger.info("Estimate performance ...") - - if not self.optimizations: - raise FunctionalityNotSupportedError( - reason="Unable to estimate model optimizations impact", - description="No optimization targets provided", - ) - - opt_settings = self._parse_optimization_params(self.optimizations) - - if opt_settings[0][0].optimization_type != "rewrite": - try: - model = get_keras_model(self.model, self.context) - except NotImplementedError as err: - raise FunctionalityNotSupportedError( - reason="Unable to run model optimizations", - description=f"{self.model} is not a Keras model and " - "could not be converted to a Keras model", - ) from err - else: - model = self.model # type: ignore - - optimizers = [OptimizeModel(self.context, opts) for opts in opt_settings] - - estimator = EthosUPerformanceEstimator( + def create_estimator(self) -> PerformanceEstimator: + """Create a PerformanceEstimator, to be overridden in subclasses.""" + return EthosUPerformanceEstimator( self.context, - self.target, + cast(EthosUConfiguration, self.target), self.backends, ) - original_metrics, *optimized_metrics = estimate_performance( - model, estimator, optimizers # type: ignore - ) - - result = OptimizationPerformanceMetrics( - original_perf_metrics=original_metrics, - optimizations_perf_metrics=list(zip(opt_settings, optimized_metrics)), + def create_optimization_performance_metrics( + self, original_metrics: P, optimizations_perf_metrics: list[P] + ) -> Any: + """Create an optimization metrics object.""" + return OptimizationPerformanceMetrics( + original_perf_metrics=original_metrics, # type: ignore + optimizations_perf_metrics=optimizations_perf_metrics, # type: ignore ) - return result - - @staticmethod - def _parse_optimization_params( - optimizations: list[list[dict]], - ) -> list[list[OptimizationSettings]]: - """Parse optimization parameters.""" - if not is_list_of(optimizations, list): - raise ValueError("Optimization parameters expected to be a list.") - - return [ - [ - OptimizationSettings( - item.get("optimization_type"), # type: ignore - item.get("optimization_target"), # type: ignore - item.get("layers_to_optimize"), - item.get("dataset"), - ) - for item in opt_configuration - ] - for opt_configuration in optimizations - ] @classmethod def name(cls) -> str: diff --git a/src/mlia/target/tosa/advisor.py b/src/mlia/target/tosa/advisor.py index 2d5163e..3619a83 100644 --- a/src/mlia/target/tosa/advisor.py +++ b/src/mlia/target/tosa/advisor.py @@ -16,6 +16,8 @@ from mlia.core.context import ExecutionContext from mlia.core.data_analysis import DataAnalyzer from mlia.core.data_collection import DataCollector from mlia.core.events import Event +from mlia.target.common.optimization import add_common_optimization_params +from mlia.target.common.optimization import OptimizingDataCollector from mlia.target.registry import profile from mlia.target.tosa.advice_generation import TOSAAdviceProducer from mlia.target.tosa.config import TOSAConfiguration @@ -49,9 +51,9 @@ class TOSAInferenceAdvisor(DefaultInferenceAdvisor): ) if context.category_enabled(AdviceCategory.OPTIMIZATION): - raise RuntimeError( - "Model optimizations are currently not supported for TOSA." - ) + target_profile = self.get_target_profile(context) + target_config = profile(target_profile) + collectors.append(OptimizingDataCollector(model, target_config)) return collectors @@ -85,20 +87,22 @@ def configure_and_get_tosa_advisor( context: ExecutionContext, target_profile: str | Path, model: str | Path, - **_extra_args: Any, + **extra_args: Any, ) -> InferenceAdvisor: """Create and configure TOSA advisor.""" if context.event_handlers is None: context.event_handlers = [TOSAEventHandler()] if context.config_parameters is None: - context.config_parameters = _get_config_parameters(model, target_profile) + context.config_parameters = _get_config_parameters( + model, target_profile, **extra_args + ) return TOSAInferenceAdvisor() def _get_config_parameters( - model: str | Path, target_profile: str | Path + model: str | Path, target_profile: str | Path, **extra_args: Any ) -> dict[str, Any]: """Get configuration parameters for the advisor.""" advisor_parameters: dict[str, Any] = { @@ -107,5 +111,5 @@ def _get_config_parameters( "target_profile": target_profile, } } - + add_common_optimization_params(advisor_parameters, extra_args) return advisor_parameters |