aboutsummaryrefslogtreecommitdiff
path: root/src/mlia/tools
diff options
context:
space:
mode:
Diffstat (limited to 'src/mlia/tools')
-rw-r--r--src/mlia/tools/__init__.py3
-rw-r--r--src/mlia/tools/aiet_wrapper.py435
-rw-r--r--src/mlia/tools/metadata/__init__.py3
-rw-r--r--src/mlia/tools/metadata/common.py290
-rw-r--r--src/mlia/tools/metadata/corstone.py402
-rw-r--r--src/mlia/tools/vela_wrapper.py500
6 files changed, 1633 insertions, 0 deletions
diff --git a/src/mlia/tools/__init__.py b/src/mlia/tools/__init__.py
new file mode 100644
index 0000000..184e966
--- /dev/null
+++ b/src/mlia/tools/__init__.py
@@ -0,0 +1,3 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Tools module."""
diff --git a/src/mlia/tools/aiet_wrapper.py b/src/mlia/tools/aiet_wrapper.py
new file mode 100644
index 0000000..73e82ee
--- /dev/null
+++ b/src/mlia/tools/aiet_wrapper.py
@@ -0,0 +1,435 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Module for AIET integration."""
+import logging
+import re
+from abc import ABC
+from abc import abstractmethod
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Literal
+from typing import Optional
+from typing import Tuple
+
+from aiet.backend.application import get_available_applications
+from aiet.backend.application import install_application
+from aiet.backend.system import get_available_systems
+from aiet.backend.system import install_system
+from mlia.utils.proc import CommandExecutor
+from mlia.utils.proc import OutputConsumer
+from mlia.utils.proc import RunningCommand
+
+
+logger = logging.getLogger(__name__)
+
+# Mapping backend -> device_type -> system_name
+_SUPPORTED_SYSTEMS = {
+ "Corstone-300": {
+ "ethos-u55": "Corstone-300: Cortex-M55+Ethos-U55",
+ "ethos-u65": "Corstone-300: Cortex-M55+Ethos-U65",
+ },
+ "Corstone-310": {
+ "ethos-u55": "Corstone-310: Cortex-M85+Ethos-U55",
+ },
+}
+
+# Mapping system_name -> memory_mode -> application
+_SYSTEM_TO_APP_MAP = {
+ "Corstone-300: Cortex-M55+Ethos-U55": {
+ "Sram": "Generic Inference Runner: Ethos-U55 SRAM",
+ "Shared_Sram": "Generic Inference Runner: Ethos-U55/65 Shared SRAM",
+ },
+ "Corstone-300: Cortex-M55+Ethos-U65": {
+ "Shared_Sram": "Generic Inference Runner: Ethos-U55/65 Shared SRAM",
+ "Dedicated_Sram": "Generic Inference Runner: Ethos-U65 Dedicated SRAM",
+ },
+ "Corstone-310: Cortex-M85+Ethos-U55": {
+ "Sram": "Generic Inference Runner: Ethos-U55 SRAM",
+ "Shared_Sram": "Generic Inference Runner: Ethos-U55/65 Shared SRAM",
+ },
+}
+
+
+def get_system_name(backend: str, device_type: str) -> str:
+ """Get the AIET system name for the given backend and device type."""
+ return _SUPPORTED_SYSTEMS[backend][device_type]
+
+
+def is_supported(backend: str, device_type: Optional[str] = None) -> bool:
+ """Check if the backend (and optionally device type) is supported."""
+ if device_type is None:
+ return backend in _SUPPORTED_SYSTEMS
+
+ try:
+ get_system_name(backend, device_type)
+ return True
+ except KeyError:
+ return False
+
+
+def supported_backends() -> List[str]:
+ """Get a list of all backends supported by the AIET wrapper."""
+ return list(_SUPPORTED_SYSTEMS.keys())
+
+
+def get_all_system_names(backend: str) -> List[str]:
+ """Get all systems supported by the backend."""
+ return list(_SUPPORTED_SYSTEMS.get(backend, {}).values())
+
+
+def get_all_application_names(backend: str) -> List[str]:
+ """Get all applications supported by the backend."""
+ app_set = {
+ app
+ for sys in get_all_system_names(backend)
+ for app in _SYSTEM_TO_APP_MAP[sys].values()
+ }
+ return list(app_set)
+
+
+@dataclass
+class DeviceInfo:
+ """Device information."""
+
+ device_type: Literal["ethos-u55", "ethos-u65"]
+ mac: int
+ memory_mode: Literal["Sram", "Shared_Sram", "Dedicated_Sram"]
+
+
+@dataclass
+class ModelInfo:
+ """Model info."""
+
+ model_path: Path
+
+
+@dataclass
+class PerformanceMetrics:
+ """Performance metrics parsed from generic inference output."""
+
+ npu_active_cycles: int
+ npu_idle_cycles: int
+ npu_total_cycles: int
+ npu_axi0_rd_data_beat_received: int
+ npu_axi0_wr_data_beat_written: int
+ npu_axi1_rd_data_beat_received: int
+
+
+@dataclass
+class ExecutionParams:
+ """Application execution params."""
+
+ application: str
+ system: str
+ application_params: List[str]
+ system_params: List[str]
+ deploy_params: List[str]
+
+
+class AIETLogWriter(OutputConsumer):
+ """Redirect AIET command output to the logger."""
+
+ def feed(self, line: str) -> None:
+ """Process line from the output."""
+ logger.debug(line.strip())
+
+
+class GenericInferenceOutputParser(OutputConsumer):
+ """Generic inference app output parser."""
+
+ PATTERNS = {
+ name: tuple(re.compile(pattern, re.IGNORECASE) for pattern in patterns)
+ for name, patterns in (
+ (
+ "npu_active_cycles",
+ (
+ r"NPU ACTIVE cycles: (?P<value>\d+)",
+ r"NPU ACTIVE: (?P<value>\d+) cycles",
+ ),
+ ),
+ (
+ "npu_idle_cycles",
+ (
+ r"NPU IDLE cycles: (?P<value>\d+)",
+ r"NPU IDLE: (?P<value>\d+) cycles",
+ ),
+ ),
+ (
+ "npu_total_cycles",
+ (
+ r"NPU TOTAL cycles: (?P<value>\d+)",
+ r"NPU TOTAL: (?P<value>\d+) cycles",
+ ),
+ ),
+ (
+ "npu_axi0_rd_data_beat_received",
+ (
+ r"NPU AXI0_RD_DATA_BEAT_RECEIVED beats: (?P<value>\d+)",
+ r"NPU AXI0_RD_DATA_BEAT_RECEIVED: (?P<value>\d+) beats",
+ ),
+ ),
+ (
+ "npu_axi0_wr_data_beat_written",
+ (
+ r"NPU AXI0_WR_DATA_BEAT_WRITTEN beats: (?P<value>\d+)",
+ r"NPU AXI0_WR_DATA_BEAT_WRITTEN: (?P<value>\d+) beats",
+ ),
+ ),
+ (
+ "npu_axi1_rd_data_beat_received",
+ (
+ r"NPU AXI1_RD_DATA_BEAT_RECEIVED beats: (?P<value>\d+)",
+ r"NPU AXI1_RD_DATA_BEAT_RECEIVED: (?P<value>\d+) beats",
+ ),
+ ),
+ )
+ }
+
+ def __init__(self) -> None:
+ """Init generic inference output parser instance."""
+ self.result: Dict = {}
+
+ def feed(self, line: str) -> None:
+ """Feed new line to the parser."""
+ for name, patterns in self.PATTERNS.items():
+ for pattern in patterns:
+ match = pattern.search(line)
+
+ if match:
+ self.result[name] = int(match["value"])
+ return
+
+ def is_ready(self) -> bool:
+ """Return true if all expected data has been parsed."""
+ return self.result.keys() == self.PATTERNS.keys()
+
+ def missed_keys(self) -> List[str]:
+ """Return list of the keys that have not been found in the output."""
+ return sorted(self.PATTERNS.keys() - self.result.keys())
+
+
+class AIETRunner:
+ """AIET runner."""
+
+ def __init__(self, executor: CommandExecutor) -> None:
+ """Init AIET runner instance."""
+ self.executor = executor
+
+ @staticmethod
+ def get_installed_systems() -> List[str]:
+ """Get list of the installed systems."""
+ return [system.name for system in get_available_systems()]
+
+ @staticmethod
+ def get_installed_applications(system: Optional[str] = None) -> List[str]:
+ """Get list of the installed application."""
+ return [
+ app.name
+ for app in get_available_applications()
+ if system is None or app.can_run_on(system)
+ ]
+
+ def is_application_installed(self, application: str, system: str) -> bool:
+ """Return true if requested application installed."""
+ return application in self.get_installed_applications(system)
+
+ def is_system_installed(self, system: str) -> bool:
+ """Return true if requested system installed."""
+ return system in self.get_installed_systems()
+
+ def systems_installed(self, systems: List[str]) -> bool:
+ """Check if all provided systems are installed."""
+ if not systems:
+ return False
+
+ installed_systems = self.get_installed_systems()
+ return all(system in installed_systems for system in systems)
+
+ def applications_installed(self, applications: List[str]) -> bool:
+ """Check if all provided applications are installed."""
+ if not applications:
+ return False
+
+ installed_apps = self.get_installed_applications()
+ return all(app in installed_apps for app in applications)
+
+ def all_installed(self, systems: List[str], apps: List[str]) -> bool:
+ """Check if all provided artifacts are installed."""
+ return self.systems_installed(systems) and self.applications_installed(apps)
+
+ @staticmethod
+ def install_system(system_path: Path) -> None:
+ """Install system."""
+ install_system(system_path)
+
+ @staticmethod
+ def install_application(app_path: Path) -> None:
+ """Install application."""
+ install_application(app_path)
+
+ def run_application(self, execution_params: ExecutionParams) -> RunningCommand:
+ """Run requested application."""
+ command = [
+ "aiet",
+ "application",
+ "run",
+ "-n",
+ execution_params.application,
+ "-s",
+ execution_params.system,
+ *self._params("-p", execution_params.application_params),
+ *self._params("--system-param", execution_params.system_params),
+ *self._params("--deploy", execution_params.deploy_params),
+ ]
+
+ return self._submit(command)
+
+ @staticmethod
+ def _params(name: str, params: List[str]) -> List[str]:
+ return [p for item in [(name, param) for param in params] for p in item]
+
+ def _submit(self, command: List[str]) -> RunningCommand:
+ """Submit command for the execution."""
+ logger.debug("Submit command %s", " ".join(command))
+ return self.executor.submit(command)
+
+
+class GenericInferenceRunner(ABC):
+ """Abstract class for generic inference runner."""
+
+ def __init__(self, aiet_runner: AIETRunner):
+ """Init generic inference runner instance."""
+ self.aiet_runner = aiet_runner
+ self.running_inference: Optional[RunningCommand] = None
+
+ def run(
+ self, model_info: ModelInfo, output_consumers: List[OutputConsumer]
+ ) -> None:
+ """Run generic inference for the provided device/model."""
+ execution_params = self.get_execution_params(model_info)
+
+ self.running_inference = self.aiet_runner.run_application(execution_params)
+ self.running_inference.output_consumers = output_consumers
+ self.running_inference.consume_output()
+
+ def stop(self) -> None:
+ """Stop running inference."""
+ if self.running_inference is None:
+ return
+
+ self.running_inference.stop()
+
+ @abstractmethod
+ def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams:
+ """Get execution params for the provided model."""
+
+ def __enter__(self) -> "GenericInferenceRunner":
+ """Enter context."""
+ return self
+
+ def __exit__(self, *_args: Any) -> None:
+ """Exit context."""
+ self.stop()
+
+ def check_system_and_application(self, system_name: str, app_name: str) -> None:
+ """Check if requested system and application installed."""
+ if not self.aiet_runner.is_system_installed(system_name):
+ raise Exception(f"System {system_name} is not installed")
+
+ if not self.aiet_runner.is_application_installed(app_name, system_name):
+ raise Exception(
+ f"Application {app_name} for the system {system_name} "
+ "is not installed"
+ )
+
+
+class GenericInferenceRunnerEthosU(GenericInferenceRunner):
+ """Generic inference runner on U55/65."""
+
+ def __init__(
+ self, aiet_runner: AIETRunner, device_info: DeviceInfo, backend: str
+ ) -> None:
+ """Init generic inference runner instance."""
+ super().__init__(aiet_runner)
+
+ system_name, app_name = self.resolve_system_and_app(device_info, backend)
+ self.system_name = system_name
+ self.app_name = app_name
+ self.device_info = device_info
+
+ @staticmethod
+ def resolve_system_and_app(
+ device_info: DeviceInfo, backend: str
+ ) -> Tuple[str, str]:
+ """Find appropriate system and application for the provided device/backend."""
+ try:
+ system_name = get_system_name(backend, device_info.device_type)
+ except KeyError as ex:
+ raise RuntimeError(
+ f"Unsupported device {device_info.device_type} "
+ f"for backend {backend}"
+ ) from ex
+
+ if system_name not in _SYSTEM_TO_APP_MAP:
+ raise RuntimeError(f"System {system_name} is not installed")
+
+ try:
+ app_name = _SYSTEM_TO_APP_MAP[system_name][device_info.memory_mode]
+ except KeyError as err:
+ raise RuntimeError(
+ f"Unsupported memory mode {device_info.memory_mode}"
+ ) from err
+
+ return system_name, app_name
+
+ def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams:
+ """Get execution params for Ethos-U55/65."""
+ self.check_system_and_application(self.system_name, self.app_name)
+
+ system_params = [
+ f"mac={self.device_info.mac}",
+ f"input_file={model_info.model_path.absolute()}",
+ ]
+
+ return ExecutionParams(
+ self.app_name,
+ self.system_name,
+ [],
+ system_params,
+ [],
+ )
+
+
+def get_generic_runner(device_info: DeviceInfo, backend: str) -> GenericInferenceRunner:
+ """Get generic runner for provided device and backend."""
+ aiet_runner = get_aiet_runner()
+ return GenericInferenceRunnerEthosU(aiet_runner, device_info, backend)
+
+
+def estimate_performance(
+ model_info: ModelInfo, device_info: DeviceInfo, backend: str
+) -> PerformanceMetrics:
+ """Get performance estimations."""
+ with get_generic_runner(device_info, backend) as generic_runner:
+ output_parser = GenericInferenceOutputParser()
+ output_consumers = [output_parser, AIETLogWriter()]
+
+ generic_runner.run(model_info, output_consumers)
+
+ if not output_parser.is_ready():
+ missed_data = ",".join(output_parser.missed_keys())
+ logger.debug(
+ "Unable to get performance metrics, missed data %s", missed_data
+ )
+ raise Exception("Unable to get performance metrics, insufficient data")
+
+ return PerformanceMetrics(**output_parser.result)
+
+
+def get_aiet_runner() -> AIETRunner:
+ """Return AIET runner."""
+ executor = CommandExecutor()
+ return AIETRunner(executor)
diff --git a/src/mlia/tools/metadata/__init__.py b/src/mlia/tools/metadata/__init__.py
new file mode 100644
index 0000000..f877e4f
--- /dev/null
+++ b/src/mlia/tools/metadata/__init__.py
@@ -0,0 +1,3 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Module for the tools metadata."""
diff --git a/src/mlia/tools/metadata/common.py b/src/mlia/tools/metadata/common.py
new file mode 100644
index 0000000..c17a738
--- /dev/null
+++ b/src/mlia/tools/metadata/common.py
@@ -0,0 +1,290 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Module for installation process."""
+import logging
+from abc import ABC
+from abc import abstractmethod
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Callable
+from typing import List
+from typing import Optional
+from typing import Union
+
+from mlia.utils.misc import yes
+
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class InstallFromPath:
+ """Installation from the local path."""
+
+ backend_path: Path
+
+
+@dataclass
+class DownloadAndInstall:
+ """Download and install."""
+
+ eula_agreement: bool = True
+
+
+InstallationType = Union[InstallFromPath, DownloadAndInstall]
+
+
+class Installation(ABC):
+ """Base class for the installation process of the backends."""
+
+ @property
+ @abstractmethod
+ def name(self) -> str:
+ """Return name of the backend."""
+
+ @property
+ @abstractmethod
+ def description(self) -> str:
+ """Return description of the backend."""
+
+ @property
+ @abstractmethod
+ def could_be_installed(self) -> bool:
+ """Return true if backend could be installed in current environment."""
+
+ @property
+ @abstractmethod
+ def already_installed(self) -> bool:
+ """Return true if backend is already installed."""
+
+ @abstractmethod
+ def supports(self, install_type: InstallationType) -> bool:
+ """Return true if installation supports requested installation type."""
+
+ @abstractmethod
+ def install(self, install_type: InstallationType) -> None:
+ """Install the backend."""
+
+
+InstallationFilter = Callable[[Installation], bool]
+
+
+class AlreadyInstalledFilter:
+ """Filter for already installed backends."""
+
+ def __call__(self, installation: Installation) -> bool:
+ """Installation filter."""
+ return installation.already_installed
+
+
+class ReadyForInstallationFilter:
+ """Filter for ready to be installed backends."""
+
+ def __call__(self, installation: Installation) -> bool:
+ """Installation filter."""
+ return installation.could_be_installed and not installation.already_installed
+
+
+class SupportsInstallTypeFilter:
+ """Filter backends that support certain type of the installation."""
+
+ def __init__(self, installation_type: InstallationType) -> None:
+ """Init filter."""
+ self.installation_type = installation_type
+
+ def __call__(self, installation: Installation) -> bool:
+ """Installation filter."""
+ return installation.supports(self.installation_type)
+
+
+class SearchByNameFilter:
+ """Filter installation by name."""
+
+ def __init__(self, backend_name: Optional[str]) -> None:
+ """Init filter."""
+ self.backend_name = backend_name
+
+ def __call__(self, installation: Installation) -> bool:
+ """Installation filter."""
+ return not self.backend_name or installation.name == self.backend_name
+
+
+class InstallationManager(ABC):
+ """Helper class for managing installations."""
+
+ @abstractmethod
+ def install_from(self, backend_path: Path, backend_name: Optional[str]) -> None:
+ """Install backend from the local directory."""
+
+ @abstractmethod
+ def download_and_install(
+ self, backend_name: Optional[str], eula_agreement: bool
+ ) -> None:
+ """Download and install backends."""
+
+ @abstractmethod
+ def show_env_details(self) -> None:
+ """Show environment details."""
+
+ @abstractmethod
+ def backend_installed(self, backend_name: str) -> bool:
+ """Return true if requested backend installed."""
+
+
+class InstallationFiltersMixin:
+ """Mixin for filtering installation based on different conditions."""
+
+ installations: List[Installation]
+
+ def filter_by(self, *filters: InstallationFilter) -> List[Installation]:
+ """Filter installations."""
+ return [
+ installation
+ for installation in self.installations
+ if all(filter_(installation) for filter_ in filters)
+ ]
+
+ def could_be_installed_from(
+ self, backend_path: Path, backend_name: Optional[str]
+ ) -> List[Installation]:
+ """Return installations that could be installed from provided directory."""
+ return self.filter_by(
+ SupportsInstallTypeFilter(InstallFromPath(backend_path)),
+ SearchByNameFilter(backend_name),
+ )
+
+ def could_be_downloaded_and_installed(
+ self, backend_name: Optional[str] = None
+ ) -> List[Installation]:
+ """Return installations that could be downloaded and installed."""
+ return self.filter_by(
+ SupportsInstallTypeFilter(DownloadAndInstall()),
+ SearchByNameFilter(backend_name),
+ ReadyForInstallationFilter(),
+ )
+
+ def already_installed(
+ self, backend_name: Optional[str] = None
+ ) -> List[Installation]:
+ """Return list of backends that are already installed."""
+ return self.filter_by(
+ AlreadyInstalledFilter(), SearchByNameFilter(backend_name)
+ )
+
+ def ready_for_installation(self) -> List[Installation]:
+ """Return list of the backends that could be installed."""
+ return self.filter_by(ReadyForInstallationFilter())
+
+
+class DefaultInstallationManager(InstallationManager, InstallationFiltersMixin):
+ """Interactive installation manager."""
+
+ def __init__(
+ self, installations: List[Installation], noninteractive: bool = False
+ ) -> None:
+ """Init the manager."""
+ self.installations = installations
+ self.noninteractive = noninteractive
+
+ def choose_installation_for_path(
+ self, backend_path: Path, backend_name: Optional[str]
+ ) -> Optional[Installation]:
+ """Check available installation and select one if possible."""
+ installs = self.could_be_installed_from(backend_path, backend_name)
+
+ if not installs:
+ logger.info(
+ "Unfortunatelly, it was not possible to automatically "
+ "detect type of the installed FVP. "
+ "Please, check provided path to the installed FVP."
+ )
+ return None
+
+ if len(installs) != 1:
+ names = ",".join((install.name for install in installs))
+ logger.info(
+ "Unable to correctly detect type of the installed FVP."
+ "The following FVPs are detected %s. Installation skipped.",
+ names,
+ )
+ return None
+
+ installation = installs[0]
+ if installation.already_installed:
+ logger.info(
+ "%s was found in %s, but it has been already installed.",
+ installation.name,
+ backend_path,
+ )
+ return None
+
+ return installation
+
+ def install_from(self, backend_path: Path, backend_name: Optional[str]) -> None:
+ """Install from the provided directory."""
+ installation = self.choose_installation_for_path(backend_path, backend_name)
+
+ if not installation:
+ return
+
+ prompt = (
+ f"{installation.name} was found in {backend_path}. "
+ "Would you like to install it?"
+ )
+ self._install(installation, InstallFromPath(backend_path), prompt)
+
+ def download_and_install(
+ self, backend_name: Optional[str] = None, eula_agreement: bool = True
+ ) -> None:
+ """Download and install available backends."""
+ installations = self.could_be_downloaded_and_installed(backend_name)
+
+ if not installations:
+ logger.info("No backends available for the installation.")
+ return
+
+ names = ",".join((installation.name for installation in installations))
+ logger.info("Following backends are available for downloading: %s", names)
+
+ for installation in installations:
+ prompt = f"Would you like to download and install {installation.name}?"
+ self._install(
+ installation, DownloadAndInstall(eula_agreement=eula_agreement), prompt
+ )
+
+ def show_env_details(self) -> None:
+ """Print current state of the execution environment."""
+ if installed := self.already_installed():
+ logger.info("Installed backends:\n")
+
+ for installation in installed:
+ logger.info(" - %s", installation.name)
+
+ if could_be_installed := self.ready_for_installation():
+ logger.info("Following backends could be installed:")
+
+ for installation in could_be_installed:
+ logger.info(" - %s", installation.name)
+
+ if not installed and not could_be_installed:
+ logger.info("No backends installed")
+
+ def _install(
+ self,
+ installation: Installation,
+ installation_type: InstallationType,
+ prompt: str,
+ ) -> None:
+ proceed = self.noninteractive or yes(prompt)
+
+ if proceed:
+ installation.install(installation_type)
+ logger.info("%s successfully installed.", installation.name)
+ else:
+ logger.info("%s installation canceled.", installation.name)
+
+ def backend_installed(self, backend_name: str) -> bool:
+ """Return true if requested backend installed."""
+ installations = self.already_installed(backend_name)
+
+ return len(installations) == 1
diff --git a/src/mlia/tools/metadata/corstone.py b/src/mlia/tools/metadata/corstone.py
new file mode 100644
index 0000000..7a9d113
--- /dev/null
+++ b/src/mlia/tools/metadata/corstone.py
@@ -0,0 +1,402 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Module for Corstone based FVPs."""
+import logging
+import platform
+import subprocess
+import tarfile
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Callable
+from typing import Iterable
+from typing import List
+from typing import Optional
+
+import mlia.tools.aiet_wrapper as aiet
+from mlia.tools.metadata.common import DownloadAndInstall
+from mlia.tools.metadata.common import Installation
+from mlia.tools.metadata.common import InstallationType
+from mlia.tools.metadata.common import InstallFromPath
+from mlia.utils.download import DownloadArtifact
+from mlia.utils.filesystem import all_files_exist
+from mlia.utils.filesystem import all_paths_valid
+from mlia.utils.filesystem import copy_all
+from mlia.utils.filesystem import get_mlia_resources
+from mlia.utils.filesystem import temp_directory
+from mlia.utils.proc import working_directory
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class BackendInfo:
+ """Backend information."""
+
+ backend_path: Path
+ copy_source: bool = True
+ system_config: Optional[str] = None
+
+
+PathChecker = Callable[[Path], Optional[BackendInfo]]
+BackendInstaller = Callable[[bool, Path], Path]
+
+
+class AIETMetadata:
+ """AIET installation metadata."""
+
+ def __init__(
+ self,
+ name: str,
+ description: str,
+ system_config: str,
+ apps_resources: List[str],
+ fvp_dir_name: str,
+ download_artifact: Optional[DownloadArtifact],
+ supported_platforms: Optional[List[str]] = None,
+ ) -> None:
+ """
+ Initialize AIETMetaData.
+
+ Members expected_systems and expected_apps are filled automatically.
+ """
+ self.name = name
+ self.description = description
+ self.system_config = system_config
+ self.apps_resources = apps_resources
+ self.fvp_dir_name = fvp_dir_name
+ self.download_artifact = download_artifact
+ self.supported_platforms = supported_platforms
+
+ self.expected_systems = aiet.get_all_system_names(name)
+ self.expected_apps = aiet.get_all_application_names(name)
+
+ @property
+ def expected_resources(self) -> Iterable[Path]:
+ """Return list of expected resources."""
+ resources = [self.system_config, *self.apps_resources]
+
+ return (get_mlia_resources() / resource for resource in resources)
+
+ @property
+ def supported_platform(self) -> bool:
+ """Return true if current platform supported."""
+ if not self.supported_platforms:
+ return True
+
+ return platform.system() in self.supported_platforms
+
+
+class AIETBasedInstallation(Installation):
+ """Backend installation based on AIET functionality."""
+
+ def __init__(
+ self,
+ aiet_runner: aiet.AIETRunner,
+ metadata: AIETMetadata,
+ path_checker: PathChecker,
+ backend_installer: Optional[BackendInstaller],
+ ) -> None:
+ """Init the tool installation."""
+ self.aiet_runner = aiet_runner
+ self.metadata = metadata
+ self.path_checker = path_checker
+ self.backend_installer = backend_installer
+
+ @property
+ def name(self) -> str:
+ """Return name of the tool."""
+ return self.metadata.name
+
+ @property
+ def description(self) -> str:
+ """Return description of the tool."""
+ return self.metadata.description
+
+ @property
+ def already_installed(self) -> bool:
+ """Return true if tool already installed."""
+ return self.aiet_runner.all_installed(
+ self.metadata.expected_systems, self.metadata.expected_apps
+ )
+
+ @property
+ def could_be_installed(self) -> bool:
+ """Return true if tool could be installed."""
+ if not self.metadata.supported_platform:
+ return False
+
+ return all_paths_valid(self.metadata.expected_resources)
+
+ def supports(self, install_type: InstallationType) -> bool:
+ """Return true if tools supported type of the installation."""
+ if isinstance(install_type, DownloadAndInstall):
+ return self.metadata.download_artifact is not None
+
+ if isinstance(install_type, InstallFromPath):
+ return self.path_checker(install_type.backend_path) is not None
+
+ return False # type: ignore
+
+ def install(self, install_type: InstallationType) -> None:
+ """Install the tool."""
+ if isinstance(install_type, DownloadAndInstall):
+ download_artifact = self.metadata.download_artifact
+ assert download_artifact is not None, "No artifact provided"
+
+ self.download_and_install(download_artifact, install_type.eula_agreement)
+ elif isinstance(install_type, InstallFromPath):
+ backend_path = self.path_checker(install_type.backend_path)
+ assert backend_path is not None, "Unable to resolve backend path"
+
+ self.install_from(backend_path)
+ else:
+ raise Exception(f"Unable to install {install_type}")
+
+ def install_from(self, backend_info: BackendInfo) -> None:
+ """Install tool from the directory."""
+ mlia_resources = get_mlia_resources()
+
+ with temp_directory() as tmpdir:
+ fvp_dist_dir = tmpdir / self.metadata.fvp_dir_name
+
+ system_config = self.metadata.system_config
+ if backend_info.system_config:
+ system_config = backend_info.system_config
+
+ resources_to_copy = [mlia_resources / system_config]
+ if backend_info.copy_source:
+ resources_to_copy.append(backend_info.backend_path)
+
+ copy_all(*resources_to_copy, dest=fvp_dist_dir)
+
+ self.aiet_runner.install_system(fvp_dist_dir)
+
+ for app in self.metadata.apps_resources:
+ self.aiet_runner.install_application(mlia_resources / app)
+
+ def download_and_install(
+ self, download_artifact: DownloadArtifact, eula_agrement: bool
+ ) -> None:
+ """Download and install the tool."""
+ with temp_directory() as tmpdir:
+ try:
+ downloaded_to = download_artifact.download_to(tmpdir)
+ except Exception as err:
+ raise Exception("Unable to download backend artifact") from err
+
+ with working_directory(tmpdir / "dist", create_dir=True) as dist_dir:
+ with tarfile.open(downloaded_to) as archive:
+ archive.extractall(dist_dir)
+
+ assert self.backend_installer, (
+ f"Backend '{self.metadata.name}' does not support "
+ "download and installation."
+ )
+ backend_path = self.backend_installer(eula_agrement, dist_dir)
+ if self.path_checker(backend_path) is None:
+ raise Exception("Downloaded artifact has invalid structure")
+
+ self.install(InstallFromPath(backend_path))
+
+
+class PackagePathChecker:
+ """Package path checker."""
+
+ def __init__(
+ self, expected_files: List[str], backend_subfolder: Optional[str] = None
+ ) -> None:
+ """Init the path checker."""
+ self.expected_files = expected_files
+ self.backend_subfolder = backend_subfolder
+
+ def __call__(self, backend_path: Path) -> Optional[BackendInfo]:
+ """Check if directory contains all expected files."""
+ resolved_paths = (backend_path / file for file in self.expected_files)
+ if not all_files_exist(resolved_paths):
+ return None
+
+ if self.backend_subfolder:
+ subfolder = backend_path / self.backend_subfolder
+
+ if not subfolder.is_dir():
+ return None
+
+ return BackendInfo(subfolder)
+
+ return BackendInfo(backend_path)
+
+
+class StaticPathChecker:
+ """Static path checker."""
+
+ def __init__(
+ self,
+ static_backend_path: Path,
+ expected_files: List[str],
+ copy_source: bool = False,
+ system_config: Optional[str] = None,
+ ) -> None:
+ """Init static path checker."""
+ self.static_backend_path = static_backend_path
+ self.expected_files = expected_files
+ self.copy_source = copy_source
+ self.system_config = system_config
+
+ def __call__(self, backend_path: Path) -> Optional[BackendInfo]:
+ """Check if directory equals static backend path with all expected files."""
+ if backend_path != self.static_backend_path:
+ return None
+
+ resolved_paths = (backend_path / file for file in self.expected_files)
+ if not all_files_exist(resolved_paths):
+ return None
+
+ return BackendInfo(
+ backend_path,
+ copy_source=self.copy_source,
+ system_config=self.system_config,
+ )
+
+
+class CompoundPathChecker:
+ """Compound path checker."""
+
+ def __init__(self, *path_checkers: PathChecker) -> None:
+ """Init compound path checker."""
+ self.path_checkers = path_checkers
+
+ def __call__(self, backend_path: Path) -> Optional[BackendInfo]:
+ """Iterate over checkers and return first non empty backend info."""
+ first_resolved_backend_info = (
+ backend_info
+ for path_checker in self.path_checkers
+ if (backend_info := path_checker(backend_path)) is not None
+ )
+
+ return next(first_resolved_backend_info, None)
+
+
+class Corstone300Installer:
+ """Helper class that wraps Corstone 300 installation logic."""
+
+ def __call__(self, eula_agreement: bool, dist_dir: Path) -> Path:
+ """Install Corstone-300 and return path to the models."""
+ with working_directory(dist_dir):
+ install_dir = "corstone-300"
+ try:
+ fvp_install_cmd = [
+ "./FVP_Corstone_SSE-300.sh",
+ "-q",
+ "-d",
+ install_dir,
+ ]
+ if not eula_agreement:
+ fvp_install_cmd += [
+ "--nointeractive",
+ "--i-agree-to-the-contained-eula",
+ ]
+
+ subprocess.check_call(fvp_install_cmd)
+ except subprocess.CalledProcessError as err:
+ raise Exception(
+ "Error occurred during Corstone-300 installation"
+ ) from err
+
+ return dist_dir / install_dir
+
+
+def get_corstone_300_installation() -> Installation:
+ """Get Corstone-300 installation."""
+ corstone_300 = AIETBasedInstallation(
+ aiet_runner=aiet.get_aiet_runner(),
+ # pylint: disable=line-too-long
+ metadata=AIETMetadata(
+ name="Corstone-300",
+ description="Corstone-300 FVP",
+ system_config="aiet/systems/corstone-300/aiet-config.json",
+ apps_resources=[
+ "aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA",
+ "aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA",
+ "aiet/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA",
+ ],
+ fvp_dir_name="corstone_300",
+ download_artifact=DownloadArtifact(
+ name="Corstone-300 FVP",
+ url="https://developer.arm.com/-/media/Arm%20Developer%20Community/Downloads/OSS/FVP/Corstone-300/FVP_Corstone_SSE-300_11.16_26.tgz",
+ filename="FVP_Corstone_SSE-300_11.16_26.tgz",
+ version="11.16_26",
+ sha256_hash="e26139be756b5003a30d978c629de638aed1934d597dc24a17043d4708e934d7",
+ ),
+ supported_platforms=["Linux"],
+ ),
+ # pylint: enable=line-too-long
+ path_checker=CompoundPathChecker(
+ PackagePathChecker(
+ expected_files=[
+ "models/Linux64_GCC-6.4/FVP_Corstone_SSE-300_Ethos-U55",
+ "models/Linux64_GCC-6.4/FVP_Corstone_SSE-300_Ethos-U65",
+ ],
+ backend_subfolder="models/Linux64_GCC-6.4",
+ ),
+ StaticPathChecker(
+ static_backend_path=Path("/opt/VHT"),
+ expected_files=[
+ "VHT_Corstone_SSE-300_Ethos-U55",
+ "VHT_Corstone_SSE-300_Ethos-U65",
+ ],
+ copy_source=False,
+ system_config="aiet/systems/corstone-300-vht/aiet-config.json",
+ ),
+ ),
+ backend_installer=Corstone300Installer(),
+ )
+
+ return corstone_300
+
+
+def get_corstone_310_installation() -> Installation:
+ """Get Corstone-310 installation."""
+ corstone_310 = AIETBasedInstallation(
+ aiet_runner=aiet.get_aiet_runner(),
+ # pylint: disable=line-too-long
+ metadata=AIETMetadata(
+ name="Corstone-310",
+ description="Corstone-310 FVP",
+ system_config="aiet/systems/corstone-310/aiet-config.json",
+ apps_resources=[
+ "aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA",
+ "aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA",
+ ],
+ fvp_dir_name="corstone_310",
+ download_artifact=None,
+ supported_platforms=["Linux"],
+ ),
+ # pylint: enable=line-too-long
+ path_checker=CompoundPathChecker(
+ PackagePathChecker(
+ expected_files=[
+ "models/Linux64_GCC-9.3/FVP_Corstone_SSE-310",
+ ],
+ backend_subfolder="models/Linux64_GCC-9.3",
+ ),
+ StaticPathChecker(
+ static_backend_path=Path("/opt/VHT"),
+ expected_files=[
+ "VHT_Corstone_SSE-310",
+ ],
+ copy_source=False,
+ system_config="aiet/systems/corstone-310-vht/aiet-config.json",
+ ),
+ ),
+ backend_installer=None,
+ )
+
+ return corstone_310
+
+
+def get_corstone_installations() -> List[Installation]:
+ """Get Corstone installations."""
+ return [
+ get_corstone_300_installation(),
+ get_corstone_310_installation(),
+ ]
diff --git a/src/mlia/tools/vela_wrapper.py b/src/mlia/tools/vela_wrapper.py
new file mode 100644
index 0000000..7225797
--- /dev/null
+++ b/src/mlia/tools/vela_wrapper.py
@@ -0,0 +1,500 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Vela wrapper module."""
+import itertools
+import logging
+import sys
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Literal
+from typing import Optional
+from typing import Tuple
+from typing import Union
+
+import numpy as np
+from ethosu.vela.architecture_features import ArchitectureFeatures
+from ethosu.vela.compiler_driver import compiler_driver
+from ethosu.vela.compiler_driver import CompilerOptions
+from ethosu.vela.compiler_driver import TensorAllocator
+from ethosu.vela.model_reader import ModelReaderOptions
+from ethosu.vela.model_reader import read_model
+from ethosu.vela.nn_graph import Graph
+from ethosu.vela.nn_graph import NetworkType
+from ethosu.vela.npu_performance import PassCycles
+from ethosu.vela.operation import CustomType
+from ethosu.vela.operation import Op
+from ethosu.vela.scheduler import OptimizationStrategy
+from ethosu.vela.scheduler import SchedulerOptions
+from ethosu.vela.tensor import BandwidthDirection
+from ethosu.vela.tensor import MemArea
+from ethosu.vela.tensor import Tensor
+from ethosu.vela.tflite_mapping import optype_to_builtintype
+from ethosu.vela.tflite_model_semantic import TFLiteSemantic
+from ethosu.vela.tflite_supported_operators import TFLiteSupportedOperators
+from ethosu.vela.tflite_writer import write_tflite
+from ethosu.vela.vela import generate_supported_ops
+
+from mlia.utils.logging import redirect_output
+
+
+logger = logging.getLogger(__name__)
+
+VELA_INTERNAL_OPS = (Op.Placeholder, Op.SubgraphInput, Op.Const)
+
+
+@dataclass
+class PerformanceMetrics: # pylint: disable=too-many-instance-attributes
+ """Contains all the performance metrics Vela generates in a run."""
+
+ npu_cycles: int
+ sram_access_cycles: int
+ dram_access_cycles: int
+ on_chip_flash_access_cycles: int
+ off_chip_flash_access_cycles: int
+ total_cycles: int
+ batch_inference_time: float
+ inferences_per_second: float
+ batch_size: int
+ unknown_memory_area_size: int
+ sram_memory_area_size: int
+ dram_memory_area_size: int
+ on_chip_flash_memory_area_size: int
+ off_chip_flash_memory_area_size: int
+
+
+@dataclass
+class NpuSupported:
+ """Operator's npu supported attribute."""
+
+ supported: bool
+ reasons: List[Tuple[str, str]]
+
+
+@dataclass
+class Operator:
+ """Model operator."""
+
+ name: str
+ op_type: str
+ run_on_npu: NpuSupported
+
+ @property
+ def cpu_only(self) -> bool:
+ """Return true if operator is CPU only."""
+ cpu_only_reasons = [("CPU only operator", "")]
+ return (
+ not self.run_on_npu.supported
+ and self.run_on_npu.reasons == cpu_only_reasons
+ )
+
+
+@dataclass
+class Operators:
+ """Model's operators."""
+
+ ops: List[Operator]
+
+ @property
+ def npu_supported_ratio(self) -> float:
+ """Return NPU supported ratio."""
+ total = self.total_number
+ npu_supported = self.npu_supported_number
+
+ if total == 0 or npu_supported == 0:
+ return 0
+
+ return npu_supported / total
+
+ @property
+ def npu_unsupported_ratio(self) -> float:
+ """Return NPU unsupported ratio."""
+ return 1 - self.npu_supported_ratio
+
+ @property
+ def total_number(self) -> int:
+ """Return total number of operators."""
+ return len(self.ops)
+
+ @property
+ def npu_supported_number(self) -> int:
+ """Return number of npu supported operators."""
+ return sum(op.run_on_npu.supported for op in self.ops)
+
+
+@dataclass
+class Model:
+ """Model metadata."""
+
+ nng: Graph
+ network_type: NetworkType
+
+ @property
+ def optimized(self) -> bool:
+ """Return true if model is already optimized."""
+ return any(
+ op.attrs.get("custom_type") == CustomType.ExistingNpuOp
+ for sg in self.nng.subgraphs
+ for op in sg.get_all_ops()
+ )
+
+
+@dataclass
+class OptimizedModel:
+ """Instance of the Vela optimized model."""
+
+ nng: Graph
+ arch: ArchitectureFeatures
+ compiler_options: CompilerOptions
+ scheduler_options: SchedulerOptions
+
+ def save(self, output_filename: Union[str, Path]) -> None:
+ """Save instance of the optimized model to the file."""
+ write_tflite(self.nng, output_filename)
+
+
+AcceleratorConfigType = Literal[
+ "ethos-u55-32",
+ "ethos-u55-64",
+ "ethos-u55-128",
+ "ethos-u55-256",
+ "ethos-u65-256",
+ "ethos-u65-512",
+]
+
+TensorAllocatorType = Literal["LinearAlloc", "Greedy", "HillClimb"]
+
+OptimizationStrategyType = Literal["Performance", "Size"]
+
+
+@dataclass
+class VelaCompilerOptions: # pylint: disable=too-many-instance-attributes
+ """Vela compiler options."""
+
+ config_files: Optional[Union[str, List[str]]] = None
+ system_config: str = ArchitectureFeatures.DEFAULT_CONFIG
+ memory_mode: str = ArchitectureFeatures.DEFAULT_CONFIG
+ accelerator_config: Optional[AcceleratorConfigType] = None
+ max_block_dependency: int = ArchitectureFeatures.MAX_BLOCKDEP
+ arena_cache_size: Optional[int] = None
+ tensor_allocator: TensorAllocatorType = "HillClimb"
+ cpu_tensor_alignment: int = Tensor.AllocationQuantum
+ optimization_strategy: OptimizationStrategyType = "Performance"
+ output_dir: Optional[str] = None
+ recursion_limit: int = 1000
+
+
+class VelaCompiler: # pylint: disable=too-many-instance-attributes
+ """Vela compiler wrapper."""
+
+ def __init__(self, compiler_options: VelaCompilerOptions):
+ """Init Vela wrapper instance."""
+ self.config_files = compiler_options.config_files
+ self.system_config = compiler_options.system_config
+ self.memory_mode = compiler_options.memory_mode
+ self.accelerator_config = compiler_options.accelerator_config
+ self.max_block_dependency = compiler_options.max_block_dependency
+ self.arena_cache_size = compiler_options.arena_cache_size
+ self.tensor_allocator = TensorAllocator[compiler_options.tensor_allocator]
+ self.cpu_tensor_alignment = compiler_options.cpu_tensor_alignment
+ self.optimization_strategy = OptimizationStrategy[
+ compiler_options.optimization_strategy
+ ]
+ self.output_dir = compiler_options.output_dir
+ self.recursion_limit = compiler_options.recursion_limit
+
+ sys.setrecursionlimit(self.recursion_limit)
+
+ def read_model(self, model: Union[str, Path]) -> Model:
+ """Read model."""
+ logger.debug("Read model %s", model)
+
+ nng, network_type = self._read_model(model)
+ return Model(nng, network_type)
+
+ def compile_model(self, model: Union[str, Path, Model]) -> OptimizedModel:
+ """Compile the model."""
+ if isinstance(model, (str, Path)):
+ nng, network_type = self._read_model(model)
+ else:
+ nng, network_type = model.nng, NetworkType.TFLite
+
+ if not nng:
+ raise Exception("Unable to read model")
+
+ try:
+ arch = self._architecture_features()
+ compiler_options = self._compiler_options()
+ scheduler_options = self._scheduler_options()
+
+ with redirect_output(
+ logger, stdout_level=logging.DEBUG, stderr_level=logging.DEBUG
+ ):
+ compiler_driver(
+ nng, arch, compiler_options, scheduler_options, network_type
+ )
+
+ return OptimizedModel(nng, arch, compiler_options, scheduler_options)
+ except (SystemExit, Exception) as err:
+ raise Exception("Model could not be optimized with Vela compiler") from err
+
+ def get_config(self) -> Dict[str, Any]:
+ """Get compiler configuration."""
+ arch = self._architecture_features()
+
+ memory_area = {
+ mem.name: {
+ "clock_scales": arch.memory_clock_scales[mem],
+ "burst_length": arch.memory_burst_length[mem],
+ "read_latency": arch.memory_latency[mem][BandwidthDirection.Read],
+ "write_latency": arch.memory_latency[mem][BandwidthDirection.Write],
+ }
+ for mem in (
+ MemArea.Sram,
+ MemArea.Dram,
+ MemArea.OnChipFlash,
+ MemArea.OffChipFlash,
+ )
+ }
+
+ return {
+ "accelerator_config": arch.accelerator_config.value,
+ "system_config": arch.system_config,
+ "core_clock": arch.core_clock,
+ "axi0_port": arch.axi0_port.name,
+ "axi1_port": arch.axi1_port.name,
+ "memory_mode": arch.memory_mode,
+ "const_mem_area": arch.const_mem_area.name,
+ "arena_mem_area": arch.arena_mem_area.name,
+ "cache_mem_area": arch.cache_mem_area.name,
+ "arena_cache_size": arch.arena_cache_size,
+ "permanent_storage_mem_area": arch.permanent_storage_mem_area.name,
+ "feature_map_storage_mem_area": arch.feature_map_storage_mem_area.name,
+ "fast_storage_mem_area": arch.fast_storage_mem_area.name,
+ "memory_area": memory_area,
+ }
+
+ @staticmethod
+ def _read_model(model: Union[str, Path]) -> Tuple[Graph, NetworkType]:
+ """Read TFLite model."""
+ try:
+ model_path = str(model) if isinstance(model, Path) else model
+
+ with redirect_output(
+ logger, stdout_level=logging.DEBUG, stderr_level=logging.DEBUG
+ ):
+ return read_model(model_path, ModelReaderOptions()) # type: ignore
+ except (SystemExit, Exception) as err:
+ raise Exception(f"Unable to read model {model_path}") from err
+
+ def _architecture_features(self) -> ArchitectureFeatures:
+ """Return ArchitectureFeatures instance."""
+ return ArchitectureFeatures(
+ vela_config_files=self.config_files,
+ accelerator_config=self.accelerator_config,
+ system_config=self.system_config,
+ memory_mode=self.memory_mode,
+ max_blockdep=self.max_block_dependency,
+ verbose_config=False,
+ arena_cache_size=self.arena_cache_size,
+ )
+
+ def _scheduler_options(self) -> SchedulerOptions:
+ """Return SchedulerOptions instance."""
+ arch = self._architecture_features()
+
+ return SchedulerOptions(
+ optimization_strategy=self.optimization_strategy,
+ sram_target=arch.arena_cache_size,
+ verbose_schedule=False,
+ )
+
+ def _compiler_options(self) -> CompilerOptions:
+ """Return CompilerOptions instance."""
+ return CompilerOptions(
+ verbose_graph=False,
+ verbose_quantization=False,
+ verbose_packing=False,
+ verbose_tensor_purpose=False,
+ verbose_tensor_format=False,
+ verbose_allocation=False,
+ verbose_high_level_command_stream=False,
+ verbose_register_command_stream=False,
+ verbose_operators=False,
+ verbose_weights=False,
+ show_cpu_operations=False,
+ tensor_allocator=self.tensor_allocator,
+ timing=False,
+ output_dir=self.output_dir,
+ cpu_tensor_alignment=self.cpu_tensor_alignment,
+ )
+
+
+def resolve_compiler_config(
+ vela_compiler_options: VelaCompilerOptions,
+) -> Dict[str, Any]:
+ """Resolve passed compiler options.
+
+ Vela has number of configuration parameters that being
+ resolved during passing compiler options. E.g. Vela
+ reads configuration parameters from vela.ini and fills
+ it's internal structures with resolved values (memory mode,
+ system mode, etc.).
+
+ In order to get this information we need to create
+ instance of the Vela compiler first.
+ """
+ vela_compiler = VelaCompiler(vela_compiler_options)
+ return vela_compiler.get_config()
+
+
+def estimate_performance(
+ model_path: Path, compiler_options: VelaCompilerOptions
+) -> PerformanceMetrics:
+ """Return performance estimations for the model/device.
+
+ Logic for this function comes from Vela module stats_writer.py
+ """
+ logger.debug(
+ "Estimate performance for the model %s on %s",
+ model_path,
+ compiler_options.accelerator_config,
+ )
+
+ vela_compiler = VelaCompiler(compiler_options)
+
+ initial_model = vela_compiler.read_model(model_path)
+ if initial_model.optimized:
+ raise Exception("Unable to estimate performance for the given optimized model")
+
+ optimized_model = vela_compiler.compile_model(initial_model)
+
+ return _performance_metrics(optimized_model)
+
+
+def optimize_model(
+ model_path: Path, compiler_options: VelaCompilerOptions, output_model_path: Path
+) -> None:
+ """Optimize model and return it's path after optimization."""
+ logger.debug(
+ "Optimize model %s for device %s",
+ model_path,
+ compiler_options.accelerator_config,
+ )
+
+ vela_compiler = VelaCompiler(compiler_options)
+ optimized_model = vela_compiler.compile_model(model_path)
+
+ logger.debug("Save optimized model into %s", output_model_path)
+ optimized_model.save(output_model_path)
+
+
+def _performance_metrics(optimized_model: OptimizedModel) -> PerformanceMetrics:
+ """Return performance metrics for optimized model."""
+ cycles = optimized_model.nng.cycles
+
+ def memory_usage(mem_area: MemArea) -> int:
+ """Get memory usage for the proviced memory area type."""
+ memory_used: Dict[MemArea, int] = optimized_model.nng.memory_used
+ bandwidths = optimized_model.nng.bandwidths
+
+ return memory_used.get(mem_area, 0) if np.sum(bandwidths[mem_area]) > 0 else 0
+
+ midpoint_fps = np.nan
+ midpoint_inference_time = cycles[PassCycles.Total] / optimized_model.arch.core_clock
+ if midpoint_inference_time > 0:
+ midpoint_fps = 1 / midpoint_inference_time
+
+ return PerformanceMetrics(
+ npu_cycles=int(cycles[PassCycles.Npu]),
+ sram_access_cycles=int(cycles[PassCycles.SramAccess]),
+ dram_access_cycles=int(cycles[PassCycles.DramAccess]),
+ on_chip_flash_access_cycles=int(cycles[PassCycles.OnChipFlashAccess]),
+ off_chip_flash_access_cycles=int(cycles[PassCycles.OffChipFlashAccess]),
+ total_cycles=int(cycles[PassCycles.Total]),
+ batch_inference_time=midpoint_inference_time * 1000,
+ inferences_per_second=midpoint_fps,
+ batch_size=optimized_model.nng.batch_size,
+ unknown_memory_area_size=memory_usage(MemArea.Unknown),
+ sram_memory_area_size=memory_usage(MemArea.Sram),
+ dram_memory_area_size=memory_usage(MemArea.Dram),
+ on_chip_flash_memory_area_size=memory_usage(MemArea.OnChipFlash),
+ off_chip_flash_memory_area_size=memory_usage(MemArea.OffChipFlash),
+ )
+
+
+def supported_operators(
+ model_path: Path, compiler_options: VelaCompilerOptions
+) -> Operators:
+ """Return list of model's operators."""
+ logger.debug("Check supported operators for the model %s", model_path)
+
+ vela_compiler = VelaCompiler(compiler_options)
+ initial_model = vela_compiler.read_model(model_path)
+
+ return Operators(
+ [
+ Operator(op.name, optype_to_builtintype(op.type), run_on_npu(op))
+ for sg in initial_model.nng.subgraphs
+ for op in sg.get_all_ops()
+ if op.type not in VELA_INTERNAL_OPS
+ ]
+ )
+
+
+def run_on_npu(operator: Op) -> NpuSupported:
+ """Return information if operator can run on NPU.
+
+ Vela does a number of checks that can help establish whether
+ a particular operator is supported to run on NPU.
+
+ There are two groups of checks:
+ - general TFLite constraints
+ - operator specific constraints
+
+ If an operator is not supported on NPU then this function
+ will return the reason of that.
+
+ The reason is split in two parts:
+ - general description of why the operator cannot be placed on NPU
+ - details on the particular operator
+ """
+ semantic_checker = TFLiteSemantic()
+ semantic_constraints = itertools.chain(
+ semantic_checker.generic_constraints,
+ semantic_checker.specific_constraints[operator.type],
+ )
+
+ for constraint in semantic_constraints:
+ op_valid, op_reason = constraint(operator)
+ if not op_valid:
+ return NpuSupported(False, [(constraint.__doc__, op_reason)])
+
+ if operator.type not in TFLiteSupportedOperators.supported_operators:
+ reasons = (
+ [("CPU only operator", "")]
+ if operator.type not in VELA_INTERNAL_OPS
+ else []
+ )
+
+ return NpuSupported(False, reasons)
+
+ tflite_supported_operators = TFLiteSupportedOperators()
+ operation_constraints = itertools.chain(
+ tflite_supported_operators.generic_constraints,
+ tflite_supported_operators.specific_constraints[operator.type],
+ )
+ for constraint in operation_constraints:
+ op_valid, op_reason = constraint(operator)
+ if not op_valid:
+ return NpuSupported(False, [(constraint.__doc__, op_reason)])
+
+ return NpuSupported(True, [])
+
+
+def generate_supported_operators_report() -> None:
+ """Generate supported operators report in current working directory."""
+ with redirect_output(logger):
+ generate_supported_ops()