diff options
Diffstat (limited to 'src/mlia/target/common/optimization.py')
-rw-r--r-- | src/mlia/target/common/optimization.py | 219 |
1 files changed, 219 insertions, 0 deletions
diff --git a/src/mlia/target/common/optimization.py b/src/mlia/target/common/optimization.py new file mode 100644 index 0000000..5f359c5 --- /dev/null +++ b/src/mlia/target/common/optimization.py @@ -0,0 +1,219 @@ +# SPDX-FileCopyrightText: Copyright 2022-2023, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Data collector support for performance optimizations.""" +from __future__ import annotations + +import logging +from abc import abstractmethod +from functools import partial +from pathlib import Path +from typing import Any +from typing import Callable + +from mlia.core.common import DataItem +from mlia.core.context import Context +from mlia.core.data_collection import ContextAwareDataCollector +from mlia.core.errors import FunctionalityNotSupportedError +from mlia.core.performance import estimate_performance +from mlia.core.performance import P +from mlia.core.performance import PerformanceEstimator +from mlia.nn.select import get_optimizer +from mlia.nn.select import OptimizationSettings +from mlia.nn.tensorflow.config import get_keras_model +from mlia.nn.tensorflow.config import KerasModel +from mlia.nn.tensorflow.config import TFLiteModel +from mlia.nn.tensorflow.utils import save_keras_model +from mlia.nn.tensorflow.utils import save_tflite_model +from mlia.target.config import TargetProfile +from mlia.utils.types import is_list_of + +logger = logging.getLogger(__name__) + + +class OptimizingDataCollector(ContextAwareDataCollector): + """Collect performance metrics for the optimizations.""" + + def __init__( + self, + model: Path, + target_config: TargetProfile, + backends: list[str] | None = None, + ) -> None: + """Init performance optimizations data collector.""" + self.model = model + self.target = target_config + self.backends = backends + + def collect_data(self) -> DataItem: + """Collect performance metrics for the optimizations.""" + logger.info("Estimate performance ...") + + optimizations = self._get_optimization_settings(self.context) + + if not optimizations or optimizations == [[]]: + raise FunctionalityNotSupportedError( + reason="No optimization targets provided", + description="Unable to estimate model optimizations impact", + ) + + opt_settings = self._parse_optimization_params(optimizations) + + optimization_types = { + setting.optimization_type for opt in opt_settings for setting in opt + } + + if optimization_types != {"rewrite"}: + try: + model = get_keras_model(self.model, self.context) + except NotImplementedError as err: + raise FunctionalityNotSupportedError( + reason=f"{self.model} is not a Keras model and " + "could not be converted to a Keras model", + description="Unable to run model optimizations", + ) from err + else: + model = self.model # type: ignore + + optimizers: list[Callable] = [ + partial(self.optimize_model, opts) for opts in opt_settings + ] + + return self.optimize_and_estimate_performance(model, optimizers, opt_settings) + + def optimize_model( + self, opt_settings: list[OptimizationSettings], model: KerasModel | TFLiteModel + ) -> Any: + """Run optimization.""" + optimizer = get_optimizer(model, opt_settings) + + opts_as_str = ", ".join(str(opt) for opt in opt_settings) + logger.info("Applying model optimizations - [%s]", opts_as_str) + optimizer.apply_optimization() + + model = optimizer.get_model() # type: ignore + + if isinstance(model, Path): + return model + + if isinstance(model, TFLiteModel): + model_path = self.context.get_model_path("optimized_model.tflite") + with open(model.model_path, "rb") as file_handle: + model_data = bytearray(file_handle.read()) + save_tflite_model(model_data, model_path) + return TFLiteModel(model_path) + + model_path = self.context.get_model_path("optimized_model.h5") + save_keras_model(model, model_path) + return KerasModel(model_path) + + def _get_optimization_settings(self, context: Context) -> list[list[dict]]: + """Get optimization settings.""" + return self.get_parameter( # type: ignore + OptimizingDataCollector.name(), + "optimizations", + expected_type=list, + expected=False, + context=context, + ) + + @staticmethod + def _parse_optimization_params( + optimizations: list[list[dict]], + ) -> list[list[OptimizationSettings]]: + """Parse optimization parameters.""" + if not is_list_of(optimizations, list): + raise TypeError("Optimization parameters expected to be a list.") + + return [ + [ + OptimizationSettings( + item.get("optimization_type"), # type: ignore + item.get("optimization_target"), # type: ignore + item.get("layers_to_optimize"), + item.get("dataset"), + ) + for item in opt_configuration + ] + for opt_configuration in optimizations + ] + + def optimize_and_estimate_performance( + self, + model: KerasModel | Path, + optimizers: list[Callable], + _: list[list[OptimizationSettings]], + ) -> DataItem: + """Run optimizers and estimate perfomance on the results.""" + for optimizer in optimizers: + optimizer(model) + + return {} + + @classmethod + def name(cls) -> str: + """Return name of the collector.""" + return "common_optimizations" + + +class OptimizingPerformaceDataCollector(OptimizingDataCollector): + """Collect performance metrics for the optimizations.""" + + @abstractmethod + def create_estimator(self) -> PerformanceEstimator: + """Create a PerformanceEstimator, to be overridden in subclasses.""" + + @abstractmethod + def create_optimization_performance_metrics( + self, original_metrics: P, optimizations_perf_metrics: list[P] + ) -> Any: + """Create an optimization metrics object.""" + + def optimize_and_estimate_performance( + self, + model: KerasModel | Path, + optimizers: list[Callable], + opt_settings: list[list[OptimizationSettings]], + ) -> Any: + """Run optimizers and estimate perfomance on the results.""" + estimator = self.create_estimator() + + original_metrics, *optimized_metrics = estimate_performance( + model, estimator, optimizers + ) + + return self.create_optimization_performance_metrics( + original_metrics, + list(zip(opt_settings, optimized_metrics)), + ) + + +_DEFAULT_OPTIMIZATION_TARGETS = [ + { + "optimization_type": "pruning", + "optimization_target": 0.5, + "layers_to_optimize": None, + }, + { + "optimization_type": "clustering", + "optimization_target": 32, + "layers_to_optimize": None, + }, +] + + +def add_common_optimization_params(advisor_parameters: dict, extra_args: dict) -> None: + """Add common optimization parameters.""" + optimization_targets = extra_args.get("optimization_targets") + if not optimization_targets: + optimization_targets = _DEFAULT_OPTIMIZATION_TARGETS + + if not is_list_of(optimization_targets, dict): + raise TypeError("Optimization targets value has wrong format.") + + advisor_parameters.update( + { + "common_optimizations": { + "optimizations": [optimization_targets], + }, + } + ) |