diff options
Diffstat (limited to 'src/mlia/nn/tensorflow')
-rw-r--r-- | src/mlia/nn/tensorflow/config.py | 100 | ||||
-rw-r--r-- | src/mlia/nn/tensorflow/tflite_graph.py | 27 |
2 files changed, 117 insertions, 10 deletions
diff --git a/src/mlia/nn/tensorflow/config.py b/src/mlia/nn/tensorflow/config.py index d7d430f..c6a7c88 100644 --- a/src/mlia/nn/tensorflow/config.py +++ b/src/mlia/nn/tensorflow/config.py @@ -4,13 +4,16 @@ from __future__ import annotations import logging +import tempfile +from collections import defaultdict from pathlib import Path -from typing import cast -from typing import List +import numpy as np import tensorflow as tf from mlia.core.context import Context +from mlia.nn.tensorflow.tflite_graph import load_fb +from mlia.nn.tensorflow.tflite_graph import save_fb from mlia.nn.tensorflow.utils import convert_to_tflite from mlia.nn.tensorflow.utils import is_keras_model from mlia.nn.tensorflow.utils import is_saved_model @@ -71,10 +74,89 @@ class KerasModel(ModelConfiguration): class TFLiteModel(ModelConfiguration): # pylint: disable=abstract-method """TensorFlow Lite model configuration.""" - def input_details(self) -> list[dict]: - """Get model's input details.""" - interpreter = tf.lite.Interpreter(model_path=self.model_path) - return cast(List[dict], interpreter.get_input_details()) + def __init__( + self, + model_path: str | Path, + batch_size: int | None = None, + num_threads: int | None = None, + ) -> None: + """Initiate a TFLite Model.""" + super().__init__(model_path) + if not num_threads: + num_threads = None + if not batch_size: + self.interpreter = tf.lite.Interpreter( + model_path=self.model_path, num_threads=num_threads + ) + else: # if a batch size is specified, modify the TFLite model to use this size + with tempfile.TemporaryDirectory() as tmp: + flatbuffer = load_fb(self.model_path) + for subgraph in flatbuffer.subgraphs: + for tensor in list(subgraph.inputs) + list(subgraph.outputs): + subgraph.tensors[tensor].shape = np.array( + [batch_size] + list(subgraph.tensors[tensor].shape[1:]), + dtype=np.int32, + ) + tempname = Path(tmp, "rewrite_tmp.tflite") + save_fb(flatbuffer, tempname) + self.interpreter = tf.lite.Interpreter( + model_path=str(tempname), num_threads=num_threads + ) + + try: + self.interpreter.allocate_tensors() + except RuntimeError: + self.interpreter = tf.lite.Interpreter( + model_path=self.model_path, num_threads=num_threads + ) + self.interpreter.allocate_tensors() + + # Get input and output tensors. + self.input_details = self.interpreter.get_input_details() + self.output_details = self.interpreter.get_output_details() + details = list(self.input_details) + list(self.output_details) + self.handle_from_name = {d["name"]: d["index"] for d in details} + self.shape_from_name = {d["name"]: d["shape"] for d in details} + self.batch_size = next(iter(self.shape_from_name.values()))[0] + + def __call__(self, named_input: dict) -> dict: + """Execute the model on one or a batch of named inputs \ + (a dict of name: numpy array).""" + input_len = next(iter(named_input.values())).shape[0] + full_steps = input_len // self.batch_size + remainder = input_len % self.batch_size + + named_ys = defaultdict(list) + for i in range(full_steps): + for name, x_batch in named_input.items(): + x_tensor = x_batch[i : i + self.batch_size] # noqa: E203 + self.interpreter.set_tensor(self.handle_from_name[name], x_tensor) + self.interpreter.invoke() + for output_detail in self.output_details: + named_ys[output_detail["name"]].append( + self.interpreter.get_tensor(output_detail["index"]) + ) + if remainder: + for name, x_batch in named_input.items(): + x_tensor = np.zeros( # pylint: disable=invalid-name + self.shape_from_name[name] + ).astype(x_batch.dtype) + x_tensor[:remainder] = x_batch[-remainder:] + self.interpreter.set_tensor(self.handle_from_name[name], x_tensor) + self.interpreter.invoke() + for output_detail in self.output_details: + named_ys[output_detail["name"]].append( + self.interpreter.get_tensor(output_detail["index"])[:remainder] + ) + return {k: np.concatenate(v) for k, v in named_ys.items()} + + def input_tensors(self) -> list: + """Return name from input details.""" + return [d["name"] for d in self.input_details] + + def output_tensors(self) -> list: + """Return name from output details.""" + return [d["name"] for d in self.output_details] def convert_to_tflite( self, tflite_model_path: str | Path, quantized: bool = False @@ -118,10 +200,10 @@ def get_model(model: str | Path) -> ModelConfiguration: def get_tflite_model(model: str | Path, ctx: Context) -> TFLiteModel: """Convert input model to TensorFlow Lite and returns TFLiteModel object.""" - tflite_model_path = ctx.get_model_path("converted_model.tflite") - converted_model = get_model(model) + dst_model_path = ctx.get_model_path("converted_model.tflite") + src_model = get_model(model) - return converted_model.convert_to_tflite(tflite_model_path, True) + return src_model.convert_to_tflite(dst_model_path, quantized=True) def get_keras_model(model: str | Path, ctx: Context) -> KerasModel: diff --git a/src/mlia/nn/tensorflow/tflite_graph.py b/src/mlia/nn/tensorflow/tflite_graph.py index 4f5e85f..7ca9337 100644 --- a/src/mlia/nn/tensorflow/tflite_graph.py +++ b/src/mlia/nn/tensorflow/tflite_graph.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-FileCopyrightText: Copyright 2022-2023, Arm Limited and/or its affiliates. # SPDX-License-Identifier: Apache-2.0 """Utilities for TensorFlow Lite graphs.""" from __future__ import annotations @@ -10,7 +10,10 @@ from pathlib import Path from typing import Any from typing import cast +import flatbuffers from tensorflow.lite.python import schema_py_generated as schema_fb +from tensorflow.lite.python.schema_py_generated import Model +from tensorflow.lite.python.schema_py_generated import ModelT from tensorflow.lite.tools import visualize @@ -137,3 +140,25 @@ def parse_subgraphs(tflite_file: Path) -> list[list[Op]]: ] return graphs + + +def load_fb(input_tflite_file: str | Path) -> ModelT: + """Load a flatbuffer model from file.""" + if not Path(input_tflite_file).exists(): + raise FileNotFoundError(f"TFLite file not found at {input_tflite_file}\n") + with open(input_tflite_file, "rb") as file_handle: + file_data = bytearray(file_handle.read()) + model_obj = Model.GetRootAsModel(file_data, 0) + model = ModelT.InitFromObj(model_obj) + return model + + +def save_fb(model: ModelT, output_tflite_file: str | Path) -> None: + """Save a flatbuffer model to a given file.""" + builder = flatbuffers.Builder(1024) # Initial size of the buffer, which + # will grow automatically if needed + model_offset = model.Pack(builder) + builder.Finish(model_offset, file_identifier=b"TFL3") + model_data = builder.Output() + with open(output_tflite_file, "wb") as out_file: + out_file.write(model_data) |