diff options
author | Dmitrii Agibov <dmitrii.agibov@arm.com> | 2022-09-08 14:24:39 +0100 |
---|---|---|
committer | Dmitrii Agibov <dmitrii.agibov@arm.com> | 2022-09-09 17:21:48 +0100 |
commit | f5b293d0927506c2a979a091bf0d07ecc78fa181 (patch) | |
tree | 4de585b7cb6ed34da8237063752270189a730a41 /src/mlia/nn | |
parent | cde0c6ee140bd108849bff40467d8f18ffc332ef (diff) | |
download | mlia-f5b293d0927506c2a979a091bf0d07ecc78fa181.tar.gz |
MLIA-386 Simplify typing in the source code
- Enable deferred annotations evaluation
- Use builtin types for type hints whenever possible
- Use | syntax for union types
- Rename mlia.core._typing into mlia.core.typing
Change-Id: I3f6ffc02fa069c589bdd9e8bddbccd504285427a
Diffstat (limited to 'src/mlia/nn')
-rw-r--r-- | src/mlia/nn/tensorflow/config.py | 36 | ||||
-rw-r--r-- | src/mlia/nn/tensorflow/optimizations/clustering.py | 9 | ||||
-rw-r--r-- | src/mlia/nn/tensorflow/optimizations/pruning.py | 13 | ||||
-rw-r--r-- | src/mlia/nn/tensorflow/optimizations/select.py | 34 | ||||
-rw-r--r-- | src/mlia/nn/tensorflow/tflite_metrics.py | 20 | ||||
-rw-r--r-- | src/mlia/nn/tensorflow/utils.py | 15 |
6 files changed, 59 insertions, 68 deletions
diff --git a/src/mlia/nn/tensorflow/config.py b/src/mlia/nn/tensorflow/config.py index d3235d7..6ee32e7 100644 --- a/src/mlia/nn/tensorflow/config.py +++ b/src/mlia/nn/tensorflow/config.py @@ -1,12 +1,12 @@ # SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. # SPDX-License-Identifier: Apache-2.0 """Model configuration.""" +from __future__ import annotations + import logging from pathlib import Path from typing import cast -from typing import Dict from typing import List -from typing import Union import tensorflow as tf @@ -24,17 +24,17 @@ logger = logging.getLogger(__name__) class ModelConfiguration: """Base class for model configuration.""" - def __init__(self, model_path: Union[str, Path]) -> None: + def __init__(self, model_path: str | Path) -> None: """Init model configuration instance.""" self.model_path = str(model_path) def convert_to_tflite( - self, tflite_model_path: Union[str, Path], quantized: bool = False - ) -> "TFLiteModel": + self, tflite_model_path: str | Path, quantized: bool = False + ) -> TFLiteModel: """Convert model to TFLite format.""" raise NotImplementedError() - def convert_to_keras(self, keras_model_path: Union[str, Path]) -> "KerasModel": + def convert_to_keras(self, keras_model_path: str | Path) -> KerasModel: """Convert model to Keras format.""" raise NotImplementedError() @@ -50,8 +50,8 @@ class KerasModel(ModelConfiguration): return tf.keras.models.load_model(self.model_path) def convert_to_tflite( - self, tflite_model_path: Union[str, Path], quantized: bool = False - ) -> "TFLiteModel": + self, tflite_model_path: str | Path, quantized: bool = False + ) -> TFLiteModel: """Convert model to TFLite format.""" logger.info("Converting Keras to TFLite ...") @@ -65,7 +65,7 @@ class KerasModel(ModelConfiguration): return TFLiteModel(tflite_model_path) - def convert_to_keras(self, keras_model_path: Union[str, Path]) -> "KerasModel": + def convert_to_keras(self, keras_model_path: str | Path) -> KerasModel: """Convert model to Keras format.""" return self @@ -73,14 +73,14 @@ class KerasModel(ModelConfiguration): class TFLiteModel(ModelConfiguration): # pylint: disable=abstract-method """TFLite model configuration.""" - def input_details(self) -> List[Dict]: + def input_details(self) -> list[dict]: """Get model's input details.""" interpreter = tf.lite.Interpreter(model_path=self.model_path) - return cast(List[Dict], interpreter.get_input_details()) + return cast(List[dict], interpreter.get_input_details()) def convert_to_tflite( - self, tflite_model_path: Union[str, Path], quantized: bool = False - ) -> "TFLiteModel": + self, tflite_model_path: str | Path, quantized: bool = False + ) -> TFLiteModel: """Convert model to TFLite format.""" return self @@ -92,8 +92,8 @@ class TfModel(ModelConfiguration): # pylint: disable=abstract-method """ def convert_to_tflite( - self, tflite_model_path: Union[str, Path], quantized: bool = False - ) -> "TFLiteModel": + self, tflite_model_path: str | Path, quantized: bool = False + ) -> TFLiteModel: """Convert model to TFLite format.""" converted_model = convert_tf_to_tflite(self.model_path, quantized) save_tflite_model(converted_model, tflite_model_path) @@ -101,7 +101,7 @@ class TfModel(ModelConfiguration): # pylint: disable=abstract-method return TFLiteModel(tflite_model_path) -def get_model(model: Union[Path, str]) -> "ModelConfiguration": +def get_model(model: str | Path) -> ModelConfiguration: """Return the model object.""" if is_tflite_model(model): return TFLiteModel(model) @@ -118,7 +118,7 @@ def get_model(model: Union[Path, str]) -> "ModelConfiguration": ) -def get_tflite_model(model: Union[str, Path], ctx: Context) -> "TFLiteModel": +def get_tflite_model(model: str | Path, ctx: Context) -> TFLiteModel: """Convert input model to TFLite and returns TFLiteModel object.""" tflite_model_path = ctx.get_model_path("converted_model.tflite") converted_model = get_model(model) @@ -126,7 +126,7 @@ def get_tflite_model(model: Union[str, Path], ctx: Context) -> "TFLiteModel": return converted_model.convert_to_tflite(tflite_model_path, True) -def get_keras_model(model: Union[str, Path], ctx: Context) -> "KerasModel": +def get_keras_model(model: str | Path, ctx: Context) -> KerasModel: """Convert input model to Keras and returns KerasModel object.""" keras_model_path = ctx.get_model_path("converted_model.h5") converted_model = get_model(model) diff --git a/src/mlia/nn/tensorflow/optimizations/clustering.py b/src/mlia/nn/tensorflow/optimizations/clustering.py index 16d9e4b..4aaa33e 100644 --- a/src/mlia/nn/tensorflow/optimizations/clustering.py +++ b/src/mlia/nn/tensorflow/optimizations/clustering.py @@ -7,11 +7,10 @@ In order to do this, we need to have a base model and corresponding training dat We also have to specify a subset of layers we want to cluster. For more details, please refer to the documentation for TensorFlow Model Optimization Toolkit. """ +from __future__ import annotations + from dataclasses import dataclass from typing import Any -from typing import Dict -from typing import List -from typing import Optional import tensorflow as tf import tensorflow_model_optimization as tfmot @@ -28,7 +27,7 @@ class ClusteringConfiguration(OptimizerConfiguration): """Clustering configuration.""" optimization_target: int - layers_to_optimize: Optional[List[str]] = None + layers_to_optimize: list[str] | None = None def __str__(self) -> str: """Return string representation of the configuration.""" @@ -61,7 +60,7 @@ class Clusterer(Optimizer): """Return string representation of the optimization config.""" return str(self.optimizer_configuration) - def _setup_clustering_params(self) -> Dict[str, Any]: + def _setup_clustering_params(self) -> dict[str, Any]: CentroidInitialization = tfmot.clustering.keras.CentroidInitialization return { "number_of_clusters": self.optimizer_configuration.optimization_target, diff --git a/src/mlia/nn/tensorflow/optimizations/pruning.py b/src/mlia/nn/tensorflow/optimizations/pruning.py index 0a3fda5..41954b9 100644 --- a/src/mlia/nn/tensorflow/optimizations/pruning.py +++ b/src/mlia/nn/tensorflow/optimizations/pruning.py @@ -7,11 +7,10 @@ In order to do this, we need to have a base model and corresponding training dat We also have to specify a subset of layers we want to prune. For more details, please refer to the documentation for TensorFlow Model Optimization Toolkit. """ +from __future__ import annotations + import typing from dataclasses import dataclass -from typing import List -from typing import Optional -from typing import Tuple import numpy as np import tensorflow as tf @@ -29,9 +28,9 @@ class PruningConfiguration(OptimizerConfiguration): """Pruning configuration.""" optimization_target: float - layers_to_optimize: Optional[List[str]] = None - x_train: Optional[np.ndarray] = None - y_train: Optional[np.ndarray] = None + layers_to_optimize: list[str] | None = None + x_train: np.ndarray | None = None + y_train: np.ndarray | None = None batch_size: int = 1 num_epochs: int = 1 @@ -74,7 +73,7 @@ class Pruner(Optimizer): """Return string representation of the optimization config.""" return str(self.optimizer_configuration) - def _mock_train_data(self) -> Tuple[np.ndarray, np.ndarray]: + def _mock_train_data(self) -> tuple[np.ndarray, np.ndarray]: # get rid of the batch_size dimension in input and output shape input_shape = tuple(x for x in self.model.input_shape if x is not None) output_shape = tuple(x for x in self.model.output_shape if x is not None) diff --git a/src/mlia/nn/tensorflow/optimizations/select.py b/src/mlia/nn/tensorflow/optimizations/select.py index 1b0c755..d4a8ea4 100644 --- a/src/mlia/nn/tensorflow/optimizations/select.py +++ b/src/mlia/nn/tensorflow/optimizations/select.py @@ -1,12 +1,10 @@ # SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. # SPDX-License-Identifier: Apache-2.0 """Module for optimization selection.""" +from __future__ import annotations + import math -from typing import List from typing import NamedTuple -from typing import Optional -from typing import Tuple -from typing import Union import tensorflow as tf @@ -25,14 +23,14 @@ class OptimizationSettings(NamedTuple): """Optimization settings.""" optimization_type: str - optimization_target: Union[int, float] - layers_to_optimize: Optional[List[str]] + optimization_target: int | float + layers_to_optimize: list[str] | None @staticmethod def create_from( - optimizer_params: List[Tuple[str, float]], - layers_to_optimize: Optional[List[str]] = None, - ) -> List["OptimizationSettings"]: + optimizer_params: list[tuple[str, float]], + layers_to_optimize: list[str] | None = None, + ) -> list[OptimizationSettings]: """Create optimization settings from the provided parameters.""" return [ OptimizationSettings( @@ -47,7 +45,7 @@ class OptimizationSettings(NamedTuple): """Return string representation.""" return f"{self.optimization_type}: {self.optimization_target}" - def next_target(self) -> "OptimizationSettings": + def next_target(self) -> OptimizationSettings: """Return next optimization target.""" if self.optimization_type == "pruning": next_target = round(min(self.optimization_target + 0.1, 0.9), 2) @@ -75,7 +73,7 @@ class MultiStageOptimizer(Optimizer): def __init__( self, model: tf.keras.Model, - optimizations: List[OptimizerConfiguration], + optimizations: list[OptimizerConfiguration], ) -> None: """Init MultiStageOptimizer instance.""" self.model = model @@ -98,10 +96,8 @@ class MultiStageOptimizer(Optimizer): def get_optimizer( - model: Union[tf.keras.Model, KerasModel], - config: Union[ - OptimizerConfiguration, OptimizationSettings, List[OptimizationSettings] - ], + model: tf.keras.Model | KerasModel, + config: OptimizerConfiguration | OptimizationSettings | list[OptimizationSettings], ) -> Optimizer: """Get optimizer for provided configuration.""" if isinstance(model, KerasModel): @@ -123,7 +119,7 @@ def get_optimizer( def _get_optimizer( model: tf.keras.Model, - optimization_settings: Union[OptimizationSettings, List[OptimizationSettings]], + optimization_settings: OptimizationSettings | list[OptimizationSettings], ) -> Optimizer: if isinstance(optimization_settings, OptimizationSettings): optimization_settings = [optimization_settings] @@ -145,8 +141,8 @@ def _get_optimizer( def _get_optimizer_configuration( optimization_type: str, - optimization_target: Union[int, float], - layers_to_optimize: Optional[List[str]] = None, + optimization_target: int | float, + layers_to_optimize: list[str] | None = None, ) -> OptimizerConfiguration: """Get optimizer configuration for provided parameters.""" _check_optimizer_params(optimization_type, optimization_target) @@ -169,7 +165,7 @@ def _get_optimizer_configuration( def _check_optimizer_params( - optimization_type: str, optimization_target: Union[int, float] + optimization_type: str, optimization_target: int | float ) -> None: """Check optimizer params.""" if not optimization_target: diff --git a/src/mlia/nn/tensorflow/tflite_metrics.py b/src/mlia/nn/tensorflow/tflite_metrics.py index 3f41487..0af7500 100644 --- a/src/mlia/nn/tensorflow/tflite_metrics.py +++ b/src/mlia/nn/tensorflow/tflite_metrics.py @@ -8,13 +8,13 @@ These metrics include: * Unique weights (clusters) (per layer) * gzip compression ratio """ +from __future__ import annotations + import os import typing from enum import Enum from pprint import pprint from typing import Any -from typing import List -from typing import Optional import numpy as np import tensorflow as tf @@ -42,7 +42,7 @@ def calculate_num_unique_weights(weights: np.ndarray) -> int: return num_unique_weights -def calculate_num_unique_weights_per_axis(weights: np.ndarray, axis: int) -> List[int]: +def calculate_num_unique_weights_per_axis(weights: np.ndarray, axis: int) -> list[int]: """Calculate unique weights per quantization axis.""" # Make quantized dimension the first dimension weights_trans = np.swapaxes(weights, 0, axis) @@ -74,7 +74,7 @@ class SparsityAccumulator: def calculate_sparsity( - weights: np.ndarray, accumulator: Optional[SparsityAccumulator] = None + weights: np.ndarray, accumulator: SparsityAccumulator | None = None ) -> float: """ Calculate the sparsity for the given weights. @@ -110,9 +110,7 @@ class TFLiteMetrics: * File compression via gzip """ - def __init__( - self, tflite_file: str, ignore_list: Optional[List[str]] = None - ) -> None: + def __init__(self, tflite_file: str, ignore_list: list[str] | None = None) -> None: """Load the TFLite file and filter layers.""" self.tflite_file = tflite_file if ignore_list is None: @@ -159,7 +157,7 @@ class TFLiteMetrics: acc(self.get_tensor(details)) return acc.sparsity() - def calc_num_clusters_per_axis(self, details: dict) -> List[int]: + def calc_num_clusters_per_axis(self, details: dict) -> list[int]: """Calculate number of clusters per axis.""" quant_params = details["quantization_parameters"] per_axis = len(quant_params["zero_points"]) > 1 @@ -178,14 +176,14 @@ class TFLiteMetrics: aggregation_func = self.calc_num_clusters_per_axis elif mode == ReportClusterMode.NUM_CLUSTERS_MIN_MAX: - def cluster_min_max(details: dict) -> List[int]: + def cluster_min_max(details: dict) -> list[int]: num_clusters = self.calc_num_clusters_per_axis(details) return [min(num_clusters), max(num_clusters)] aggregation_func = cluster_min_max elif mode == ReportClusterMode.NUM_CLUSTERS_HISTOGRAM: - def cluster_hist(details: dict) -> List[int]: + def cluster_hist(details: dict) -> list[int]: num_clusters = self.calc_num_clusters_per_axis(details) max_num = max(num_clusters) hist = [0] * (max_num) @@ -289,7 +287,7 @@ class TFLiteMetrics: print(f"- {self._prettify_name(name)}: {nums}") @staticmethod - def _print_in_outs(ios: List[dict], verbose: bool = False) -> None: + def _print_in_outs(ios: list[dict], verbose: bool = False) -> None: for item in ios: if verbose: pprint(item) diff --git a/src/mlia/nn/tensorflow/utils.py b/src/mlia/nn/tensorflow/utils.py index b1034d9..6250f56 100644 --- a/src/mlia/nn/tensorflow/utils.py +++ b/src/mlia/nn/tensorflow/utils.py @@ -2,11 +2,12 @@ # SPDX-FileCopyrightText: Copyright The TensorFlow Authors. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 """Collection of useful functions for optimizations.""" +from __future__ import annotations + import logging from pathlib import Path from typing import Callable from typing import Iterable -from typing import Union import numpy as np import tensorflow as tf @@ -101,21 +102,19 @@ def convert_tf_to_tflite(model: str, quantized: bool = False) -> Interpreter: return tflite_model -def save_keras_model(model: tf.keras.Model, save_path: Union[str, Path]) -> None: +def save_keras_model(model: tf.keras.Model, save_path: str | Path) -> None: """Save Keras model at provided path.""" # Checkpoint: saving the optimizer is necessary. model.save(save_path, include_optimizer=True) -def save_tflite_model( - model: tf.lite.TFLiteConverter, save_path: Union[str, Path] -) -> None: +def save_tflite_model(model: tf.lite.TFLiteConverter, save_path: str | Path) -> None: """Save TFLite model at provided path.""" with open(save_path, "wb") as file: file.write(model) -def is_tflite_model(model: Union[Path, str]) -> bool: +def is_tflite_model(model: str | Path) -> bool: """Check if model type is supported by TFLite API. TFLite model is indicated by the model file extension .tflite @@ -124,7 +123,7 @@ def is_tflite_model(model: Union[Path, str]) -> bool: return model_path.suffix == ".tflite" -def is_keras_model(model: Union[Path, str]) -> bool: +def is_keras_model(model: str | Path) -> bool: """Check if model type is supported by Keras API. Keras model is indicated by: @@ -139,7 +138,7 @@ def is_keras_model(model: Union[Path, str]) -> bool: return model_path.suffix in (".h5", ".hdf5") -def is_tf_model(model: Union[Path, str]) -> bool: +def is_tf_model(model: str | Path) -> bool: """Check if model type is supported by TensorFlow API. TensorFlow model is indicated if its directory (meaning saved model) |