diff options
Diffstat (limited to 'tests/test_nn_tensorflow_optimizations_clustering.py')
-rw-r--r-- | tests/test_nn_tensorflow_optimizations_clustering.py | 131 |
1 files changed, 131 insertions, 0 deletions
diff --git a/tests/test_nn_tensorflow_optimizations_clustering.py b/tests/test_nn_tensorflow_optimizations_clustering.py new file mode 100644 index 0000000..c12a1e8 --- /dev/null +++ b/tests/test_nn_tensorflow_optimizations_clustering.py @@ -0,0 +1,131 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Test for module optimizations/clustering.""" +from pathlib import Path +from typing import List +from typing import Optional + +import pytest +import tensorflow as tf + +from mlia.nn.tensorflow.optimizations.clustering import Clusterer +from mlia.nn.tensorflow.optimizations.clustering import ClusteringConfiguration +from mlia.nn.tensorflow.optimizations.pruning import Pruner +from mlia.nn.tensorflow.optimizations.pruning import PruningConfiguration +from mlia.nn.tensorflow.tflite_metrics import ReportClusterMode +from mlia.nn.tensorflow.tflite_metrics import TFLiteMetrics +from mlia.nn.tensorflow.utils import convert_to_tflite +from mlia.nn.tensorflow.utils import save_tflite_model +from tests.utils.common import get_dataset +from tests.utils.common import train_model + + +def _prune_model( + model: tf.keras.Model, target_sparsity: float, layers_to_prune: Optional[List[str]] +) -> tf.keras.Model: + x_train, y_train = get_dataset() + batch_size = 1 + num_epochs = 1 + + pruner = Pruner( + model, + PruningConfiguration( + target_sparsity, + layers_to_prune, + x_train, + y_train, + batch_size, + num_epochs, + ), + ) + pruner.apply_optimization() + pruned_model = pruner.get_model() + + return pruned_model + + +def _test_num_unique_weights( + metrics: TFLiteMetrics, + target_num_clusters: int, + layers_to_cluster: Optional[List[str]], +) -> None: + clustered_uniqueness_dict = metrics.num_unique_weights( + ReportClusterMode.NUM_CLUSTERS_PER_AXIS + ) + num_clustered_layers = 0 + num_optimizable_layers = len(clustered_uniqueness_dict) + if layers_to_cluster: + expected_num_clustered_layers = len(layers_to_cluster) + else: + expected_num_clustered_layers = num_optimizable_layers + for layer_name in clustered_uniqueness_dict: + # the +1 is there temporarily because of a bug that's been fixed + # but the fix hasn't been merged yet. + # Will need to be removed in the future. + if clustered_uniqueness_dict[layer_name][0] <= (target_num_clusters + 1): + num_clustered_layers = num_clustered_layers + 1 + # make sure we are having exactly as many clustered layers as we wanted + assert num_clustered_layers == expected_num_clustered_layers + + +def _test_sparsity( + metrics: TFLiteMetrics, + target_sparsity: float, + layers_to_cluster: Optional[List[str]], +) -> None: + pruned_sparsity_dict = metrics.sparsity_per_layer() + num_sparse_layers = 0 + num_optimizable_layers = len(pruned_sparsity_dict) + error_margin = 0.03 + if layers_to_cluster: + expected_num_sparse_layers = len(layers_to_cluster) + else: + expected_num_sparse_layers = num_optimizable_layers + for layer_name in pruned_sparsity_dict: + if abs(pruned_sparsity_dict[layer_name] - target_sparsity) < error_margin: + num_sparse_layers = num_sparse_layers + 1 + # make sure we are having exactly as many sparse layers as we wanted + assert num_sparse_layers == expected_num_sparse_layers + + +@pytest.mark.skip(reason="Test fails randomly, further investigation is needed") +@pytest.mark.parametrize("target_num_clusters", (32, 4)) +@pytest.mark.parametrize("sparsity_aware", (False, True)) +@pytest.mark.parametrize("layers_to_cluster", (["conv1"], ["conv1", "conv2"], None)) +def test_cluster_simple_model_fully( + target_num_clusters: int, + sparsity_aware: bool, + layers_to_cluster: Optional[List[str]], + tmp_path: Path, + test_keras_model: Path, +) -> None: + """Simple MNIST test to see if clustering works correctly.""" + target_sparsity = 0.5 + + base_model = tf.keras.models.load_model(str(test_keras_model)) + train_model(base_model) + + if sparsity_aware: + base_model = _prune_model(base_model, target_sparsity, layers_to_cluster) + + clusterer = Clusterer( + base_model, + ClusteringConfiguration( + target_num_clusters, + layers_to_cluster, + ), + ) + clusterer.apply_optimization() + clustered_model = clusterer.get_model() + + temp_file = tmp_path / "test_cluster_simple_model_fully_after.tflite" + tflite_clustered_model = convert_to_tflite(clustered_model) + save_tflite_model(tflite_clustered_model, temp_file) + clustered_tflite_metrics = TFLiteMetrics(str(temp_file)) + + _test_num_unique_weights( + clustered_tflite_metrics, target_num_clusters, layers_to_cluster + ) + + if sparsity_aware: + _test_sparsity(clustered_tflite_metrics, target_sparsity, layers_to_cluster) |