aboutsummaryrefslogtreecommitdiff
path: root/tests/mlia/test_nn_tensorflow_optimizations_clustering.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/mlia/test_nn_tensorflow_optimizations_clustering.py')
-rw-r--r--tests/mlia/test_nn_tensorflow_optimizations_clustering.py131
1 files changed, 131 insertions, 0 deletions
diff --git a/tests/mlia/test_nn_tensorflow_optimizations_clustering.py b/tests/mlia/test_nn_tensorflow_optimizations_clustering.py
new file mode 100644
index 0000000..9bcf918
--- /dev/null
+++ b/tests/mlia/test_nn_tensorflow_optimizations_clustering.py
@@ -0,0 +1,131 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Test for module optimizations/clustering."""
+from pathlib import Path
+from typing import List
+from typing import Optional
+
+import pytest
+import tensorflow as tf
+
+from mlia.nn.tensorflow.optimizations.clustering import Clusterer
+from mlia.nn.tensorflow.optimizations.clustering import ClusteringConfiguration
+from mlia.nn.tensorflow.optimizations.pruning import Pruner
+from mlia.nn.tensorflow.optimizations.pruning import PruningConfiguration
+from mlia.nn.tensorflow.tflite_metrics import ReportClusterMode
+from mlia.nn.tensorflow.tflite_metrics import TFLiteMetrics
+from mlia.nn.tensorflow.utils import convert_to_tflite
+from mlia.nn.tensorflow.utils import save_tflite_model
+from tests.mlia.utils.common import get_dataset
+from tests.mlia.utils.common import train_model
+
+
+def _prune_model(
+ model: tf.keras.Model, target_sparsity: float, layers_to_prune: Optional[List[str]]
+) -> tf.keras.Model:
+ x_train, y_train = get_dataset()
+ batch_size = 1
+ num_epochs = 1
+
+ pruner = Pruner(
+ model,
+ PruningConfiguration(
+ target_sparsity,
+ layers_to_prune,
+ x_train,
+ y_train,
+ batch_size,
+ num_epochs,
+ ),
+ )
+ pruner.apply_optimization()
+ pruned_model = pruner.get_model()
+
+ return pruned_model
+
+
+def _test_num_unique_weights(
+ metrics: TFLiteMetrics,
+ target_num_clusters: int,
+ layers_to_cluster: Optional[List[str]],
+) -> None:
+ clustered_uniqueness_dict = metrics.num_unique_weights(
+ ReportClusterMode.NUM_CLUSTERS_PER_AXIS
+ )
+ num_clustered_layers = 0
+ num_optimizable_layers = len(clustered_uniqueness_dict)
+ if layers_to_cluster:
+ expected_num_clustered_layers = len(layers_to_cluster)
+ else:
+ expected_num_clustered_layers = num_optimizable_layers
+ for layer_name in clustered_uniqueness_dict:
+ # the +1 is there temporarily because of a bug that's been fixed
+ # but the fix hasn't been merged yet.
+ # Will need to be removed in the future.
+ if clustered_uniqueness_dict[layer_name][0] <= (target_num_clusters + 1):
+ num_clustered_layers = num_clustered_layers + 1
+ # make sure we are having exactly as many clustered layers as we wanted
+ assert num_clustered_layers == expected_num_clustered_layers
+
+
+def _test_sparsity(
+ metrics: TFLiteMetrics,
+ target_sparsity: float,
+ layers_to_cluster: Optional[List[str]],
+) -> None:
+ pruned_sparsity_dict = metrics.sparsity_per_layer()
+ num_sparse_layers = 0
+ num_optimizable_layers = len(pruned_sparsity_dict)
+ error_margin = 0.03
+ if layers_to_cluster:
+ expected_num_sparse_layers = len(layers_to_cluster)
+ else:
+ expected_num_sparse_layers = num_optimizable_layers
+ for layer_name in pruned_sparsity_dict:
+ if abs(pruned_sparsity_dict[layer_name] - target_sparsity) < error_margin:
+ num_sparse_layers = num_sparse_layers + 1
+ # make sure we are having exactly as many sparse layers as we wanted
+ assert num_sparse_layers == expected_num_sparse_layers
+
+
+@pytest.mark.skip(reason="Test fails randomly, further investigation is needed")
+@pytest.mark.parametrize("target_num_clusters", (32, 4))
+@pytest.mark.parametrize("sparsity_aware", (False, True))
+@pytest.mark.parametrize("layers_to_cluster", (["conv1"], ["conv1", "conv2"], None))
+def test_cluster_simple_model_fully(
+ target_num_clusters: int,
+ sparsity_aware: bool,
+ layers_to_cluster: Optional[List[str]],
+ tmp_path: Path,
+ test_keras_model: Path,
+) -> None:
+ """Simple MNIST test to see if clustering works correctly."""
+ target_sparsity = 0.5
+
+ base_model = tf.keras.models.load_model(str(test_keras_model))
+ train_model(base_model)
+
+ if sparsity_aware:
+ base_model = _prune_model(base_model, target_sparsity, layers_to_cluster)
+
+ clusterer = Clusterer(
+ base_model,
+ ClusteringConfiguration(
+ target_num_clusters,
+ layers_to_cluster,
+ ),
+ )
+ clusterer.apply_optimization()
+ clustered_model = clusterer.get_model()
+
+ temp_file = tmp_path / "test_cluster_simple_model_fully_after.tflite"
+ tflite_clustered_model = convert_to_tflite(clustered_model)
+ save_tflite_model(tflite_clustered_model, temp_file)
+ clustered_tflite_metrics = TFLiteMetrics(str(temp_file))
+
+ _test_num_unique_weights(
+ clustered_tflite_metrics, target_num_clusters, layers_to_cluster
+ )
+
+ if sparsity_aware:
+ _test_sparsity(clustered_tflite_metrics, target_sparsity, layers_to_cluster)