diff options
Diffstat (limited to 'src/mlia/tools')
-rw-r--r-- | src/mlia/tools/__init__.py | 3 | ||||
-rw-r--r-- | src/mlia/tools/aiet_wrapper.py | 435 | ||||
-rw-r--r-- | src/mlia/tools/metadata/__init__.py | 3 | ||||
-rw-r--r-- | src/mlia/tools/metadata/common.py | 290 | ||||
-rw-r--r-- | src/mlia/tools/metadata/corstone.py | 402 | ||||
-rw-r--r-- | src/mlia/tools/vela_wrapper.py | 500 |
6 files changed, 1633 insertions, 0 deletions
diff --git a/src/mlia/tools/__init__.py b/src/mlia/tools/__init__.py new file mode 100644 index 0000000..184e966 --- /dev/null +++ b/src/mlia/tools/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Tools module.""" diff --git a/src/mlia/tools/aiet_wrapper.py b/src/mlia/tools/aiet_wrapper.py new file mode 100644 index 0000000..73e82ee --- /dev/null +++ b/src/mlia/tools/aiet_wrapper.py @@ -0,0 +1,435 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Module for AIET integration.""" +import logging +import re +from abc import ABC +from abc import abstractmethod +from dataclasses import dataclass +from pathlib import Path +from typing import Any +from typing import Dict +from typing import List +from typing import Literal +from typing import Optional +from typing import Tuple + +from aiet.backend.application import get_available_applications +from aiet.backend.application import install_application +from aiet.backend.system import get_available_systems +from aiet.backend.system import install_system +from mlia.utils.proc import CommandExecutor +from mlia.utils.proc import OutputConsumer +from mlia.utils.proc import RunningCommand + + +logger = logging.getLogger(__name__) + +# Mapping backend -> device_type -> system_name +_SUPPORTED_SYSTEMS = { + "Corstone-300": { + "ethos-u55": "Corstone-300: Cortex-M55+Ethos-U55", + "ethos-u65": "Corstone-300: Cortex-M55+Ethos-U65", + }, + "Corstone-310": { + "ethos-u55": "Corstone-310: Cortex-M85+Ethos-U55", + }, +} + +# Mapping system_name -> memory_mode -> application +_SYSTEM_TO_APP_MAP = { + "Corstone-300: Cortex-M55+Ethos-U55": { + "Sram": "Generic Inference Runner: Ethos-U55 SRAM", + "Shared_Sram": "Generic Inference Runner: Ethos-U55/65 Shared SRAM", + }, + "Corstone-300: Cortex-M55+Ethos-U65": { + "Shared_Sram": "Generic Inference Runner: Ethos-U55/65 Shared SRAM", + "Dedicated_Sram": "Generic Inference Runner: Ethos-U65 Dedicated SRAM", + }, + "Corstone-310: Cortex-M85+Ethos-U55": { + "Sram": "Generic Inference Runner: Ethos-U55 SRAM", + "Shared_Sram": "Generic Inference Runner: Ethos-U55/65 Shared SRAM", + }, +} + + +def get_system_name(backend: str, device_type: str) -> str: + """Get the AIET system name for the given backend and device type.""" + return _SUPPORTED_SYSTEMS[backend][device_type] + + +def is_supported(backend: str, device_type: Optional[str] = None) -> bool: + """Check if the backend (and optionally device type) is supported.""" + if device_type is None: + return backend in _SUPPORTED_SYSTEMS + + try: + get_system_name(backend, device_type) + return True + except KeyError: + return False + + +def supported_backends() -> List[str]: + """Get a list of all backends supported by the AIET wrapper.""" + return list(_SUPPORTED_SYSTEMS.keys()) + + +def get_all_system_names(backend: str) -> List[str]: + """Get all systems supported by the backend.""" + return list(_SUPPORTED_SYSTEMS.get(backend, {}).values()) + + +def get_all_application_names(backend: str) -> List[str]: + """Get all applications supported by the backend.""" + app_set = { + app + for sys in get_all_system_names(backend) + for app in _SYSTEM_TO_APP_MAP[sys].values() + } + return list(app_set) + + +@dataclass +class DeviceInfo: + """Device information.""" + + device_type: Literal["ethos-u55", "ethos-u65"] + mac: int + memory_mode: Literal["Sram", "Shared_Sram", "Dedicated_Sram"] + + +@dataclass +class ModelInfo: + """Model info.""" + + model_path: Path + + +@dataclass +class PerformanceMetrics: + """Performance metrics parsed from generic inference output.""" + + npu_active_cycles: int + npu_idle_cycles: int + npu_total_cycles: int + npu_axi0_rd_data_beat_received: int + npu_axi0_wr_data_beat_written: int + npu_axi1_rd_data_beat_received: int + + +@dataclass +class ExecutionParams: + """Application execution params.""" + + application: str + system: str + application_params: List[str] + system_params: List[str] + deploy_params: List[str] + + +class AIETLogWriter(OutputConsumer): + """Redirect AIET command output to the logger.""" + + def feed(self, line: str) -> None: + """Process line from the output.""" + logger.debug(line.strip()) + + +class GenericInferenceOutputParser(OutputConsumer): + """Generic inference app output parser.""" + + PATTERNS = { + name: tuple(re.compile(pattern, re.IGNORECASE) for pattern in patterns) + for name, patterns in ( + ( + "npu_active_cycles", + ( + r"NPU ACTIVE cycles: (?P<value>\d+)", + r"NPU ACTIVE: (?P<value>\d+) cycles", + ), + ), + ( + "npu_idle_cycles", + ( + r"NPU IDLE cycles: (?P<value>\d+)", + r"NPU IDLE: (?P<value>\d+) cycles", + ), + ), + ( + "npu_total_cycles", + ( + r"NPU TOTAL cycles: (?P<value>\d+)", + r"NPU TOTAL: (?P<value>\d+) cycles", + ), + ), + ( + "npu_axi0_rd_data_beat_received", + ( + r"NPU AXI0_RD_DATA_BEAT_RECEIVED beats: (?P<value>\d+)", + r"NPU AXI0_RD_DATA_BEAT_RECEIVED: (?P<value>\d+) beats", + ), + ), + ( + "npu_axi0_wr_data_beat_written", + ( + r"NPU AXI0_WR_DATA_BEAT_WRITTEN beats: (?P<value>\d+)", + r"NPU AXI0_WR_DATA_BEAT_WRITTEN: (?P<value>\d+) beats", + ), + ), + ( + "npu_axi1_rd_data_beat_received", + ( + r"NPU AXI1_RD_DATA_BEAT_RECEIVED beats: (?P<value>\d+)", + r"NPU AXI1_RD_DATA_BEAT_RECEIVED: (?P<value>\d+) beats", + ), + ), + ) + } + + def __init__(self) -> None: + """Init generic inference output parser instance.""" + self.result: Dict = {} + + def feed(self, line: str) -> None: + """Feed new line to the parser.""" + for name, patterns in self.PATTERNS.items(): + for pattern in patterns: + match = pattern.search(line) + + if match: + self.result[name] = int(match["value"]) + return + + def is_ready(self) -> bool: + """Return true if all expected data has been parsed.""" + return self.result.keys() == self.PATTERNS.keys() + + def missed_keys(self) -> List[str]: + """Return list of the keys that have not been found in the output.""" + return sorted(self.PATTERNS.keys() - self.result.keys()) + + +class AIETRunner: + """AIET runner.""" + + def __init__(self, executor: CommandExecutor) -> None: + """Init AIET runner instance.""" + self.executor = executor + + @staticmethod + def get_installed_systems() -> List[str]: + """Get list of the installed systems.""" + return [system.name for system in get_available_systems()] + + @staticmethod + def get_installed_applications(system: Optional[str] = None) -> List[str]: + """Get list of the installed application.""" + return [ + app.name + for app in get_available_applications() + if system is None or app.can_run_on(system) + ] + + def is_application_installed(self, application: str, system: str) -> bool: + """Return true if requested application installed.""" + return application in self.get_installed_applications(system) + + def is_system_installed(self, system: str) -> bool: + """Return true if requested system installed.""" + return system in self.get_installed_systems() + + def systems_installed(self, systems: List[str]) -> bool: + """Check if all provided systems are installed.""" + if not systems: + return False + + installed_systems = self.get_installed_systems() + return all(system in installed_systems for system in systems) + + def applications_installed(self, applications: List[str]) -> bool: + """Check if all provided applications are installed.""" + if not applications: + return False + + installed_apps = self.get_installed_applications() + return all(app in installed_apps for app in applications) + + def all_installed(self, systems: List[str], apps: List[str]) -> bool: + """Check if all provided artifacts are installed.""" + return self.systems_installed(systems) and self.applications_installed(apps) + + @staticmethod + def install_system(system_path: Path) -> None: + """Install system.""" + install_system(system_path) + + @staticmethod + def install_application(app_path: Path) -> None: + """Install application.""" + install_application(app_path) + + def run_application(self, execution_params: ExecutionParams) -> RunningCommand: + """Run requested application.""" + command = [ + "aiet", + "application", + "run", + "-n", + execution_params.application, + "-s", + execution_params.system, + *self._params("-p", execution_params.application_params), + *self._params("--system-param", execution_params.system_params), + *self._params("--deploy", execution_params.deploy_params), + ] + + return self._submit(command) + + @staticmethod + def _params(name: str, params: List[str]) -> List[str]: + return [p for item in [(name, param) for param in params] for p in item] + + def _submit(self, command: List[str]) -> RunningCommand: + """Submit command for the execution.""" + logger.debug("Submit command %s", " ".join(command)) + return self.executor.submit(command) + + +class GenericInferenceRunner(ABC): + """Abstract class for generic inference runner.""" + + def __init__(self, aiet_runner: AIETRunner): + """Init generic inference runner instance.""" + self.aiet_runner = aiet_runner + self.running_inference: Optional[RunningCommand] = None + + def run( + self, model_info: ModelInfo, output_consumers: List[OutputConsumer] + ) -> None: + """Run generic inference for the provided device/model.""" + execution_params = self.get_execution_params(model_info) + + self.running_inference = self.aiet_runner.run_application(execution_params) + self.running_inference.output_consumers = output_consumers + self.running_inference.consume_output() + + def stop(self) -> None: + """Stop running inference.""" + if self.running_inference is None: + return + + self.running_inference.stop() + + @abstractmethod + def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams: + """Get execution params for the provided model.""" + + def __enter__(self) -> "GenericInferenceRunner": + """Enter context.""" + return self + + def __exit__(self, *_args: Any) -> None: + """Exit context.""" + self.stop() + + def check_system_and_application(self, system_name: str, app_name: str) -> None: + """Check if requested system and application installed.""" + if not self.aiet_runner.is_system_installed(system_name): + raise Exception(f"System {system_name} is not installed") + + if not self.aiet_runner.is_application_installed(app_name, system_name): + raise Exception( + f"Application {app_name} for the system {system_name} " + "is not installed" + ) + + +class GenericInferenceRunnerEthosU(GenericInferenceRunner): + """Generic inference runner on U55/65.""" + + def __init__( + self, aiet_runner: AIETRunner, device_info: DeviceInfo, backend: str + ) -> None: + """Init generic inference runner instance.""" + super().__init__(aiet_runner) + + system_name, app_name = self.resolve_system_and_app(device_info, backend) + self.system_name = system_name + self.app_name = app_name + self.device_info = device_info + + @staticmethod + def resolve_system_and_app( + device_info: DeviceInfo, backend: str + ) -> Tuple[str, str]: + """Find appropriate system and application for the provided device/backend.""" + try: + system_name = get_system_name(backend, device_info.device_type) + except KeyError as ex: + raise RuntimeError( + f"Unsupported device {device_info.device_type} " + f"for backend {backend}" + ) from ex + + if system_name not in _SYSTEM_TO_APP_MAP: + raise RuntimeError(f"System {system_name} is not installed") + + try: + app_name = _SYSTEM_TO_APP_MAP[system_name][device_info.memory_mode] + except KeyError as err: + raise RuntimeError( + f"Unsupported memory mode {device_info.memory_mode}" + ) from err + + return system_name, app_name + + def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams: + """Get execution params for Ethos-U55/65.""" + self.check_system_and_application(self.system_name, self.app_name) + + system_params = [ + f"mac={self.device_info.mac}", + f"input_file={model_info.model_path.absolute()}", + ] + + return ExecutionParams( + self.app_name, + self.system_name, + [], + system_params, + [], + ) + + +def get_generic_runner(device_info: DeviceInfo, backend: str) -> GenericInferenceRunner: + """Get generic runner for provided device and backend.""" + aiet_runner = get_aiet_runner() + return GenericInferenceRunnerEthosU(aiet_runner, device_info, backend) + + +def estimate_performance( + model_info: ModelInfo, device_info: DeviceInfo, backend: str +) -> PerformanceMetrics: + """Get performance estimations.""" + with get_generic_runner(device_info, backend) as generic_runner: + output_parser = GenericInferenceOutputParser() + output_consumers = [output_parser, AIETLogWriter()] + + generic_runner.run(model_info, output_consumers) + + if not output_parser.is_ready(): + missed_data = ",".join(output_parser.missed_keys()) + logger.debug( + "Unable to get performance metrics, missed data %s", missed_data + ) + raise Exception("Unable to get performance metrics, insufficient data") + + return PerformanceMetrics(**output_parser.result) + + +def get_aiet_runner() -> AIETRunner: + """Return AIET runner.""" + executor = CommandExecutor() + return AIETRunner(executor) diff --git a/src/mlia/tools/metadata/__init__.py b/src/mlia/tools/metadata/__init__.py new file mode 100644 index 0000000..f877e4f --- /dev/null +++ b/src/mlia/tools/metadata/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Module for the tools metadata.""" diff --git a/src/mlia/tools/metadata/common.py b/src/mlia/tools/metadata/common.py new file mode 100644 index 0000000..c17a738 --- /dev/null +++ b/src/mlia/tools/metadata/common.py @@ -0,0 +1,290 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Module for installation process.""" +import logging +from abc import ABC +from abc import abstractmethod +from dataclasses import dataclass +from pathlib import Path +from typing import Callable +from typing import List +from typing import Optional +from typing import Union + +from mlia.utils.misc import yes + + +logger = logging.getLogger(__name__) + + +@dataclass +class InstallFromPath: + """Installation from the local path.""" + + backend_path: Path + + +@dataclass +class DownloadAndInstall: + """Download and install.""" + + eula_agreement: bool = True + + +InstallationType = Union[InstallFromPath, DownloadAndInstall] + + +class Installation(ABC): + """Base class for the installation process of the backends.""" + + @property + @abstractmethod + def name(self) -> str: + """Return name of the backend.""" + + @property + @abstractmethod + def description(self) -> str: + """Return description of the backend.""" + + @property + @abstractmethod + def could_be_installed(self) -> bool: + """Return true if backend could be installed in current environment.""" + + @property + @abstractmethod + def already_installed(self) -> bool: + """Return true if backend is already installed.""" + + @abstractmethod + def supports(self, install_type: InstallationType) -> bool: + """Return true if installation supports requested installation type.""" + + @abstractmethod + def install(self, install_type: InstallationType) -> None: + """Install the backend.""" + + +InstallationFilter = Callable[[Installation], bool] + + +class AlreadyInstalledFilter: + """Filter for already installed backends.""" + + def __call__(self, installation: Installation) -> bool: + """Installation filter.""" + return installation.already_installed + + +class ReadyForInstallationFilter: + """Filter for ready to be installed backends.""" + + def __call__(self, installation: Installation) -> bool: + """Installation filter.""" + return installation.could_be_installed and not installation.already_installed + + +class SupportsInstallTypeFilter: + """Filter backends that support certain type of the installation.""" + + def __init__(self, installation_type: InstallationType) -> None: + """Init filter.""" + self.installation_type = installation_type + + def __call__(self, installation: Installation) -> bool: + """Installation filter.""" + return installation.supports(self.installation_type) + + +class SearchByNameFilter: + """Filter installation by name.""" + + def __init__(self, backend_name: Optional[str]) -> None: + """Init filter.""" + self.backend_name = backend_name + + def __call__(self, installation: Installation) -> bool: + """Installation filter.""" + return not self.backend_name or installation.name == self.backend_name + + +class InstallationManager(ABC): + """Helper class for managing installations.""" + + @abstractmethod + def install_from(self, backend_path: Path, backend_name: Optional[str]) -> None: + """Install backend from the local directory.""" + + @abstractmethod + def download_and_install( + self, backend_name: Optional[str], eula_agreement: bool + ) -> None: + """Download and install backends.""" + + @abstractmethod + def show_env_details(self) -> None: + """Show environment details.""" + + @abstractmethod + def backend_installed(self, backend_name: str) -> bool: + """Return true if requested backend installed.""" + + +class InstallationFiltersMixin: + """Mixin for filtering installation based on different conditions.""" + + installations: List[Installation] + + def filter_by(self, *filters: InstallationFilter) -> List[Installation]: + """Filter installations.""" + return [ + installation + for installation in self.installations + if all(filter_(installation) for filter_ in filters) + ] + + def could_be_installed_from( + self, backend_path: Path, backend_name: Optional[str] + ) -> List[Installation]: + """Return installations that could be installed from provided directory.""" + return self.filter_by( + SupportsInstallTypeFilter(InstallFromPath(backend_path)), + SearchByNameFilter(backend_name), + ) + + def could_be_downloaded_and_installed( + self, backend_name: Optional[str] = None + ) -> List[Installation]: + """Return installations that could be downloaded and installed.""" + return self.filter_by( + SupportsInstallTypeFilter(DownloadAndInstall()), + SearchByNameFilter(backend_name), + ReadyForInstallationFilter(), + ) + + def already_installed( + self, backend_name: Optional[str] = None + ) -> List[Installation]: + """Return list of backends that are already installed.""" + return self.filter_by( + AlreadyInstalledFilter(), SearchByNameFilter(backend_name) + ) + + def ready_for_installation(self) -> List[Installation]: + """Return list of the backends that could be installed.""" + return self.filter_by(ReadyForInstallationFilter()) + + +class DefaultInstallationManager(InstallationManager, InstallationFiltersMixin): + """Interactive installation manager.""" + + def __init__( + self, installations: List[Installation], noninteractive: bool = False + ) -> None: + """Init the manager.""" + self.installations = installations + self.noninteractive = noninteractive + + def choose_installation_for_path( + self, backend_path: Path, backend_name: Optional[str] + ) -> Optional[Installation]: + """Check available installation and select one if possible.""" + installs = self.could_be_installed_from(backend_path, backend_name) + + if not installs: + logger.info( + "Unfortunatelly, it was not possible to automatically " + "detect type of the installed FVP. " + "Please, check provided path to the installed FVP." + ) + return None + + if len(installs) != 1: + names = ",".join((install.name for install in installs)) + logger.info( + "Unable to correctly detect type of the installed FVP." + "The following FVPs are detected %s. Installation skipped.", + names, + ) + return None + + installation = installs[0] + if installation.already_installed: + logger.info( + "%s was found in %s, but it has been already installed.", + installation.name, + backend_path, + ) + return None + + return installation + + def install_from(self, backend_path: Path, backend_name: Optional[str]) -> None: + """Install from the provided directory.""" + installation = self.choose_installation_for_path(backend_path, backend_name) + + if not installation: + return + + prompt = ( + f"{installation.name} was found in {backend_path}. " + "Would you like to install it?" + ) + self._install(installation, InstallFromPath(backend_path), prompt) + + def download_and_install( + self, backend_name: Optional[str] = None, eula_agreement: bool = True + ) -> None: + """Download and install available backends.""" + installations = self.could_be_downloaded_and_installed(backend_name) + + if not installations: + logger.info("No backends available for the installation.") + return + + names = ",".join((installation.name for installation in installations)) + logger.info("Following backends are available for downloading: %s", names) + + for installation in installations: + prompt = f"Would you like to download and install {installation.name}?" + self._install( + installation, DownloadAndInstall(eula_agreement=eula_agreement), prompt + ) + + def show_env_details(self) -> None: + """Print current state of the execution environment.""" + if installed := self.already_installed(): + logger.info("Installed backends:\n") + + for installation in installed: + logger.info(" - %s", installation.name) + + if could_be_installed := self.ready_for_installation(): + logger.info("Following backends could be installed:") + + for installation in could_be_installed: + logger.info(" - %s", installation.name) + + if not installed and not could_be_installed: + logger.info("No backends installed") + + def _install( + self, + installation: Installation, + installation_type: InstallationType, + prompt: str, + ) -> None: + proceed = self.noninteractive or yes(prompt) + + if proceed: + installation.install(installation_type) + logger.info("%s successfully installed.", installation.name) + else: + logger.info("%s installation canceled.", installation.name) + + def backend_installed(self, backend_name: str) -> bool: + """Return true if requested backend installed.""" + installations = self.already_installed(backend_name) + + return len(installations) == 1 diff --git a/src/mlia/tools/metadata/corstone.py b/src/mlia/tools/metadata/corstone.py new file mode 100644 index 0000000..7a9d113 --- /dev/null +++ b/src/mlia/tools/metadata/corstone.py @@ -0,0 +1,402 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Module for Corstone based FVPs.""" +import logging +import platform +import subprocess +import tarfile +from dataclasses import dataclass +from pathlib import Path +from typing import Callable +from typing import Iterable +from typing import List +from typing import Optional + +import mlia.tools.aiet_wrapper as aiet +from mlia.tools.metadata.common import DownloadAndInstall +from mlia.tools.metadata.common import Installation +from mlia.tools.metadata.common import InstallationType +from mlia.tools.metadata.common import InstallFromPath +from mlia.utils.download import DownloadArtifact +from mlia.utils.filesystem import all_files_exist +from mlia.utils.filesystem import all_paths_valid +from mlia.utils.filesystem import copy_all +from mlia.utils.filesystem import get_mlia_resources +from mlia.utils.filesystem import temp_directory +from mlia.utils.proc import working_directory + +logger = logging.getLogger(__name__) + + +@dataclass +class BackendInfo: + """Backend information.""" + + backend_path: Path + copy_source: bool = True + system_config: Optional[str] = None + + +PathChecker = Callable[[Path], Optional[BackendInfo]] +BackendInstaller = Callable[[bool, Path], Path] + + +class AIETMetadata: + """AIET installation metadata.""" + + def __init__( + self, + name: str, + description: str, + system_config: str, + apps_resources: List[str], + fvp_dir_name: str, + download_artifact: Optional[DownloadArtifact], + supported_platforms: Optional[List[str]] = None, + ) -> None: + """ + Initialize AIETMetaData. + + Members expected_systems and expected_apps are filled automatically. + """ + self.name = name + self.description = description + self.system_config = system_config + self.apps_resources = apps_resources + self.fvp_dir_name = fvp_dir_name + self.download_artifact = download_artifact + self.supported_platforms = supported_platforms + + self.expected_systems = aiet.get_all_system_names(name) + self.expected_apps = aiet.get_all_application_names(name) + + @property + def expected_resources(self) -> Iterable[Path]: + """Return list of expected resources.""" + resources = [self.system_config, *self.apps_resources] + + return (get_mlia_resources() / resource for resource in resources) + + @property + def supported_platform(self) -> bool: + """Return true if current platform supported.""" + if not self.supported_platforms: + return True + + return platform.system() in self.supported_platforms + + +class AIETBasedInstallation(Installation): + """Backend installation based on AIET functionality.""" + + def __init__( + self, + aiet_runner: aiet.AIETRunner, + metadata: AIETMetadata, + path_checker: PathChecker, + backend_installer: Optional[BackendInstaller], + ) -> None: + """Init the tool installation.""" + self.aiet_runner = aiet_runner + self.metadata = metadata + self.path_checker = path_checker + self.backend_installer = backend_installer + + @property + def name(self) -> str: + """Return name of the tool.""" + return self.metadata.name + + @property + def description(self) -> str: + """Return description of the tool.""" + return self.metadata.description + + @property + def already_installed(self) -> bool: + """Return true if tool already installed.""" + return self.aiet_runner.all_installed( + self.metadata.expected_systems, self.metadata.expected_apps + ) + + @property + def could_be_installed(self) -> bool: + """Return true if tool could be installed.""" + if not self.metadata.supported_platform: + return False + + return all_paths_valid(self.metadata.expected_resources) + + def supports(self, install_type: InstallationType) -> bool: + """Return true if tools supported type of the installation.""" + if isinstance(install_type, DownloadAndInstall): + return self.metadata.download_artifact is not None + + if isinstance(install_type, InstallFromPath): + return self.path_checker(install_type.backend_path) is not None + + return False # type: ignore + + def install(self, install_type: InstallationType) -> None: + """Install the tool.""" + if isinstance(install_type, DownloadAndInstall): + download_artifact = self.metadata.download_artifact + assert download_artifact is not None, "No artifact provided" + + self.download_and_install(download_artifact, install_type.eula_agreement) + elif isinstance(install_type, InstallFromPath): + backend_path = self.path_checker(install_type.backend_path) + assert backend_path is not None, "Unable to resolve backend path" + + self.install_from(backend_path) + else: + raise Exception(f"Unable to install {install_type}") + + def install_from(self, backend_info: BackendInfo) -> None: + """Install tool from the directory.""" + mlia_resources = get_mlia_resources() + + with temp_directory() as tmpdir: + fvp_dist_dir = tmpdir / self.metadata.fvp_dir_name + + system_config = self.metadata.system_config + if backend_info.system_config: + system_config = backend_info.system_config + + resources_to_copy = [mlia_resources / system_config] + if backend_info.copy_source: + resources_to_copy.append(backend_info.backend_path) + + copy_all(*resources_to_copy, dest=fvp_dist_dir) + + self.aiet_runner.install_system(fvp_dist_dir) + + for app in self.metadata.apps_resources: + self.aiet_runner.install_application(mlia_resources / app) + + def download_and_install( + self, download_artifact: DownloadArtifact, eula_agrement: bool + ) -> None: + """Download and install the tool.""" + with temp_directory() as tmpdir: + try: + downloaded_to = download_artifact.download_to(tmpdir) + except Exception as err: + raise Exception("Unable to download backend artifact") from err + + with working_directory(tmpdir / "dist", create_dir=True) as dist_dir: + with tarfile.open(downloaded_to) as archive: + archive.extractall(dist_dir) + + assert self.backend_installer, ( + f"Backend '{self.metadata.name}' does not support " + "download and installation." + ) + backend_path = self.backend_installer(eula_agrement, dist_dir) + if self.path_checker(backend_path) is None: + raise Exception("Downloaded artifact has invalid structure") + + self.install(InstallFromPath(backend_path)) + + +class PackagePathChecker: + """Package path checker.""" + + def __init__( + self, expected_files: List[str], backend_subfolder: Optional[str] = None + ) -> None: + """Init the path checker.""" + self.expected_files = expected_files + self.backend_subfolder = backend_subfolder + + def __call__(self, backend_path: Path) -> Optional[BackendInfo]: + """Check if directory contains all expected files.""" + resolved_paths = (backend_path / file for file in self.expected_files) + if not all_files_exist(resolved_paths): + return None + + if self.backend_subfolder: + subfolder = backend_path / self.backend_subfolder + + if not subfolder.is_dir(): + return None + + return BackendInfo(subfolder) + + return BackendInfo(backend_path) + + +class StaticPathChecker: + """Static path checker.""" + + def __init__( + self, + static_backend_path: Path, + expected_files: List[str], + copy_source: bool = False, + system_config: Optional[str] = None, + ) -> None: + """Init static path checker.""" + self.static_backend_path = static_backend_path + self.expected_files = expected_files + self.copy_source = copy_source + self.system_config = system_config + + def __call__(self, backend_path: Path) -> Optional[BackendInfo]: + """Check if directory equals static backend path with all expected files.""" + if backend_path != self.static_backend_path: + return None + + resolved_paths = (backend_path / file for file in self.expected_files) + if not all_files_exist(resolved_paths): + return None + + return BackendInfo( + backend_path, + copy_source=self.copy_source, + system_config=self.system_config, + ) + + +class CompoundPathChecker: + """Compound path checker.""" + + def __init__(self, *path_checkers: PathChecker) -> None: + """Init compound path checker.""" + self.path_checkers = path_checkers + + def __call__(self, backend_path: Path) -> Optional[BackendInfo]: + """Iterate over checkers and return first non empty backend info.""" + first_resolved_backend_info = ( + backend_info + for path_checker in self.path_checkers + if (backend_info := path_checker(backend_path)) is not None + ) + + return next(first_resolved_backend_info, None) + + +class Corstone300Installer: + """Helper class that wraps Corstone 300 installation logic.""" + + def __call__(self, eula_agreement: bool, dist_dir: Path) -> Path: + """Install Corstone-300 and return path to the models.""" + with working_directory(dist_dir): + install_dir = "corstone-300" + try: + fvp_install_cmd = [ + "./FVP_Corstone_SSE-300.sh", + "-q", + "-d", + install_dir, + ] + if not eula_agreement: + fvp_install_cmd += [ + "--nointeractive", + "--i-agree-to-the-contained-eula", + ] + + subprocess.check_call(fvp_install_cmd) + except subprocess.CalledProcessError as err: + raise Exception( + "Error occurred during Corstone-300 installation" + ) from err + + return dist_dir / install_dir + + +def get_corstone_300_installation() -> Installation: + """Get Corstone-300 installation.""" + corstone_300 = AIETBasedInstallation( + aiet_runner=aiet.get_aiet_runner(), + # pylint: disable=line-too-long + metadata=AIETMetadata( + name="Corstone-300", + description="Corstone-300 FVP", + system_config="aiet/systems/corstone-300/aiet-config.json", + apps_resources=[ + "aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Shared_Sram-TA", + "aiet/applications/inference_runner-sse-300-22.05.01-ethos-U55-Sram_Only-TA", + "aiet/applications/inference_runner-sse-300-22.05.01-ethos-U65-Dedicated_Sram-TA", + ], + fvp_dir_name="corstone_300", + download_artifact=DownloadArtifact( + name="Corstone-300 FVP", + url="https://developer.arm.com/-/media/Arm%20Developer%20Community/Downloads/OSS/FVP/Corstone-300/FVP_Corstone_SSE-300_11.16_26.tgz", + filename="FVP_Corstone_SSE-300_11.16_26.tgz", + version="11.16_26", + sha256_hash="e26139be756b5003a30d978c629de638aed1934d597dc24a17043d4708e934d7", + ), + supported_platforms=["Linux"], + ), + # pylint: enable=line-too-long + path_checker=CompoundPathChecker( + PackagePathChecker( + expected_files=[ + "models/Linux64_GCC-6.4/FVP_Corstone_SSE-300_Ethos-U55", + "models/Linux64_GCC-6.4/FVP_Corstone_SSE-300_Ethos-U65", + ], + backend_subfolder="models/Linux64_GCC-6.4", + ), + StaticPathChecker( + static_backend_path=Path("/opt/VHT"), + expected_files=[ + "VHT_Corstone_SSE-300_Ethos-U55", + "VHT_Corstone_SSE-300_Ethos-U65", + ], + copy_source=False, + system_config="aiet/systems/corstone-300-vht/aiet-config.json", + ), + ), + backend_installer=Corstone300Installer(), + ) + + return corstone_300 + + +def get_corstone_310_installation() -> Installation: + """Get Corstone-310 installation.""" + corstone_310 = AIETBasedInstallation( + aiet_runner=aiet.get_aiet_runner(), + # pylint: disable=line-too-long + metadata=AIETMetadata( + name="Corstone-310", + description="Corstone-310 FVP", + system_config="aiet/systems/corstone-310/aiet-config.json", + apps_resources=[ + "aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Shared_Sram-TA", + "aiet/applications/inference_runner-sse-310-22.05.01-ethos-U55-Sram_Only-TA", + ], + fvp_dir_name="corstone_310", + download_artifact=None, + supported_platforms=["Linux"], + ), + # pylint: enable=line-too-long + path_checker=CompoundPathChecker( + PackagePathChecker( + expected_files=[ + "models/Linux64_GCC-9.3/FVP_Corstone_SSE-310", + ], + backend_subfolder="models/Linux64_GCC-9.3", + ), + StaticPathChecker( + static_backend_path=Path("/opt/VHT"), + expected_files=[ + "VHT_Corstone_SSE-310", + ], + copy_source=False, + system_config="aiet/systems/corstone-310-vht/aiet-config.json", + ), + ), + backend_installer=None, + ) + + return corstone_310 + + +def get_corstone_installations() -> List[Installation]: + """Get Corstone installations.""" + return [ + get_corstone_300_installation(), + get_corstone_310_installation(), + ] diff --git a/src/mlia/tools/vela_wrapper.py b/src/mlia/tools/vela_wrapper.py new file mode 100644 index 0000000..7225797 --- /dev/null +++ b/src/mlia/tools/vela_wrapper.py @@ -0,0 +1,500 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Vela wrapper module.""" +import itertools +import logging +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any +from typing import Dict +from typing import List +from typing import Literal +from typing import Optional +from typing import Tuple +from typing import Union + +import numpy as np +from ethosu.vela.architecture_features import ArchitectureFeatures +from ethosu.vela.compiler_driver import compiler_driver +from ethosu.vela.compiler_driver import CompilerOptions +from ethosu.vela.compiler_driver import TensorAllocator +from ethosu.vela.model_reader import ModelReaderOptions +from ethosu.vela.model_reader import read_model +from ethosu.vela.nn_graph import Graph +from ethosu.vela.nn_graph import NetworkType +from ethosu.vela.npu_performance import PassCycles +from ethosu.vela.operation import CustomType +from ethosu.vela.operation import Op +from ethosu.vela.scheduler import OptimizationStrategy +from ethosu.vela.scheduler import SchedulerOptions +from ethosu.vela.tensor import BandwidthDirection +from ethosu.vela.tensor import MemArea +from ethosu.vela.tensor import Tensor +from ethosu.vela.tflite_mapping import optype_to_builtintype +from ethosu.vela.tflite_model_semantic import TFLiteSemantic +from ethosu.vela.tflite_supported_operators import TFLiteSupportedOperators +from ethosu.vela.tflite_writer import write_tflite +from ethosu.vela.vela import generate_supported_ops + +from mlia.utils.logging import redirect_output + + +logger = logging.getLogger(__name__) + +VELA_INTERNAL_OPS = (Op.Placeholder, Op.SubgraphInput, Op.Const) + + +@dataclass +class PerformanceMetrics: # pylint: disable=too-many-instance-attributes + """Contains all the performance metrics Vela generates in a run.""" + + npu_cycles: int + sram_access_cycles: int + dram_access_cycles: int + on_chip_flash_access_cycles: int + off_chip_flash_access_cycles: int + total_cycles: int + batch_inference_time: float + inferences_per_second: float + batch_size: int + unknown_memory_area_size: int + sram_memory_area_size: int + dram_memory_area_size: int + on_chip_flash_memory_area_size: int + off_chip_flash_memory_area_size: int + + +@dataclass +class NpuSupported: + """Operator's npu supported attribute.""" + + supported: bool + reasons: List[Tuple[str, str]] + + +@dataclass +class Operator: + """Model operator.""" + + name: str + op_type: str + run_on_npu: NpuSupported + + @property + def cpu_only(self) -> bool: + """Return true if operator is CPU only.""" + cpu_only_reasons = [("CPU only operator", "")] + return ( + not self.run_on_npu.supported + and self.run_on_npu.reasons == cpu_only_reasons + ) + + +@dataclass +class Operators: + """Model's operators.""" + + ops: List[Operator] + + @property + def npu_supported_ratio(self) -> float: + """Return NPU supported ratio.""" + total = self.total_number + npu_supported = self.npu_supported_number + + if total == 0 or npu_supported == 0: + return 0 + + return npu_supported / total + + @property + def npu_unsupported_ratio(self) -> float: + """Return NPU unsupported ratio.""" + return 1 - self.npu_supported_ratio + + @property + def total_number(self) -> int: + """Return total number of operators.""" + return len(self.ops) + + @property + def npu_supported_number(self) -> int: + """Return number of npu supported operators.""" + return sum(op.run_on_npu.supported for op in self.ops) + + +@dataclass +class Model: + """Model metadata.""" + + nng: Graph + network_type: NetworkType + + @property + def optimized(self) -> bool: + """Return true if model is already optimized.""" + return any( + op.attrs.get("custom_type") == CustomType.ExistingNpuOp + for sg in self.nng.subgraphs + for op in sg.get_all_ops() + ) + + +@dataclass +class OptimizedModel: + """Instance of the Vela optimized model.""" + + nng: Graph + arch: ArchitectureFeatures + compiler_options: CompilerOptions + scheduler_options: SchedulerOptions + + def save(self, output_filename: Union[str, Path]) -> None: + """Save instance of the optimized model to the file.""" + write_tflite(self.nng, output_filename) + + +AcceleratorConfigType = Literal[ + "ethos-u55-32", + "ethos-u55-64", + "ethos-u55-128", + "ethos-u55-256", + "ethos-u65-256", + "ethos-u65-512", +] + +TensorAllocatorType = Literal["LinearAlloc", "Greedy", "HillClimb"] + +OptimizationStrategyType = Literal["Performance", "Size"] + + +@dataclass +class VelaCompilerOptions: # pylint: disable=too-many-instance-attributes + """Vela compiler options.""" + + config_files: Optional[Union[str, List[str]]] = None + system_config: str = ArchitectureFeatures.DEFAULT_CONFIG + memory_mode: str = ArchitectureFeatures.DEFAULT_CONFIG + accelerator_config: Optional[AcceleratorConfigType] = None + max_block_dependency: int = ArchitectureFeatures.MAX_BLOCKDEP + arena_cache_size: Optional[int] = None + tensor_allocator: TensorAllocatorType = "HillClimb" + cpu_tensor_alignment: int = Tensor.AllocationQuantum + optimization_strategy: OptimizationStrategyType = "Performance" + output_dir: Optional[str] = None + recursion_limit: int = 1000 + + +class VelaCompiler: # pylint: disable=too-many-instance-attributes + """Vela compiler wrapper.""" + + def __init__(self, compiler_options: VelaCompilerOptions): + """Init Vela wrapper instance.""" + self.config_files = compiler_options.config_files + self.system_config = compiler_options.system_config + self.memory_mode = compiler_options.memory_mode + self.accelerator_config = compiler_options.accelerator_config + self.max_block_dependency = compiler_options.max_block_dependency + self.arena_cache_size = compiler_options.arena_cache_size + self.tensor_allocator = TensorAllocator[compiler_options.tensor_allocator] + self.cpu_tensor_alignment = compiler_options.cpu_tensor_alignment + self.optimization_strategy = OptimizationStrategy[ + compiler_options.optimization_strategy + ] + self.output_dir = compiler_options.output_dir + self.recursion_limit = compiler_options.recursion_limit + + sys.setrecursionlimit(self.recursion_limit) + + def read_model(self, model: Union[str, Path]) -> Model: + """Read model.""" + logger.debug("Read model %s", model) + + nng, network_type = self._read_model(model) + return Model(nng, network_type) + + def compile_model(self, model: Union[str, Path, Model]) -> OptimizedModel: + """Compile the model.""" + if isinstance(model, (str, Path)): + nng, network_type = self._read_model(model) + else: + nng, network_type = model.nng, NetworkType.TFLite + + if not nng: + raise Exception("Unable to read model") + + try: + arch = self._architecture_features() + compiler_options = self._compiler_options() + scheduler_options = self._scheduler_options() + + with redirect_output( + logger, stdout_level=logging.DEBUG, stderr_level=logging.DEBUG + ): + compiler_driver( + nng, arch, compiler_options, scheduler_options, network_type + ) + + return OptimizedModel(nng, arch, compiler_options, scheduler_options) + except (SystemExit, Exception) as err: + raise Exception("Model could not be optimized with Vela compiler") from err + + def get_config(self) -> Dict[str, Any]: + """Get compiler configuration.""" + arch = self._architecture_features() + + memory_area = { + mem.name: { + "clock_scales": arch.memory_clock_scales[mem], + "burst_length": arch.memory_burst_length[mem], + "read_latency": arch.memory_latency[mem][BandwidthDirection.Read], + "write_latency": arch.memory_latency[mem][BandwidthDirection.Write], + } + for mem in ( + MemArea.Sram, + MemArea.Dram, + MemArea.OnChipFlash, + MemArea.OffChipFlash, + ) + } + + return { + "accelerator_config": arch.accelerator_config.value, + "system_config": arch.system_config, + "core_clock": arch.core_clock, + "axi0_port": arch.axi0_port.name, + "axi1_port": arch.axi1_port.name, + "memory_mode": arch.memory_mode, + "const_mem_area": arch.const_mem_area.name, + "arena_mem_area": arch.arena_mem_area.name, + "cache_mem_area": arch.cache_mem_area.name, + "arena_cache_size": arch.arena_cache_size, + "permanent_storage_mem_area": arch.permanent_storage_mem_area.name, + "feature_map_storage_mem_area": arch.feature_map_storage_mem_area.name, + "fast_storage_mem_area": arch.fast_storage_mem_area.name, + "memory_area": memory_area, + } + + @staticmethod + def _read_model(model: Union[str, Path]) -> Tuple[Graph, NetworkType]: + """Read TFLite model.""" + try: + model_path = str(model) if isinstance(model, Path) else model + + with redirect_output( + logger, stdout_level=logging.DEBUG, stderr_level=logging.DEBUG + ): + return read_model(model_path, ModelReaderOptions()) # type: ignore + except (SystemExit, Exception) as err: + raise Exception(f"Unable to read model {model_path}") from err + + def _architecture_features(self) -> ArchitectureFeatures: + """Return ArchitectureFeatures instance.""" + return ArchitectureFeatures( + vela_config_files=self.config_files, + accelerator_config=self.accelerator_config, + system_config=self.system_config, + memory_mode=self.memory_mode, + max_blockdep=self.max_block_dependency, + verbose_config=False, + arena_cache_size=self.arena_cache_size, + ) + + def _scheduler_options(self) -> SchedulerOptions: + """Return SchedulerOptions instance.""" + arch = self._architecture_features() + + return SchedulerOptions( + optimization_strategy=self.optimization_strategy, + sram_target=arch.arena_cache_size, + verbose_schedule=False, + ) + + def _compiler_options(self) -> CompilerOptions: + """Return CompilerOptions instance.""" + return CompilerOptions( + verbose_graph=False, + verbose_quantization=False, + verbose_packing=False, + verbose_tensor_purpose=False, + verbose_tensor_format=False, + verbose_allocation=False, + verbose_high_level_command_stream=False, + verbose_register_command_stream=False, + verbose_operators=False, + verbose_weights=False, + show_cpu_operations=False, + tensor_allocator=self.tensor_allocator, + timing=False, + output_dir=self.output_dir, + cpu_tensor_alignment=self.cpu_tensor_alignment, + ) + + +def resolve_compiler_config( + vela_compiler_options: VelaCompilerOptions, +) -> Dict[str, Any]: + """Resolve passed compiler options. + + Vela has number of configuration parameters that being + resolved during passing compiler options. E.g. Vela + reads configuration parameters from vela.ini and fills + it's internal structures with resolved values (memory mode, + system mode, etc.). + + In order to get this information we need to create + instance of the Vela compiler first. + """ + vela_compiler = VelaCompiler(vela_compiler_options) + return vela_compiler.get_config() + + +def estimate_performance( + model_path: Path, compiler_options: VelaCompilerOptions +) -> PerformanceMetrics: + """Return performance estimations for the model/device. + + Logic for this function comes from Vela module stats_writer.py + """ + logger.debug( + "Estimate performance for the model %s on %s", + model_path, + compiler_options.accelerator_config, + ) + + vela_compiler = VelaCompiler(compiler_options) + + initial_model = vela_compiler.read_model(model_path) + if initial_model.optimized: + raise Exception("Unable to estimate performance for the given optimized model") + + optimized_model = vela_compiler.compile_model(initial_model) + + return _performance_metrics(optimized_model) + + +def optimize_model( + model_path: Path, compiler_options: VelaCompilerOptions, output_model_path: Path +) -> None: + """Optimize model and return it's path after optimization.""" + logger.debug( + "Optimize model %s for device %s", + model_path, + compiler_options.accelerator_config, + ) + + vela_compiler = VelaCompiler(compiler_options) + optimized_model = vela_compiler.compile_model(model_path) + + logger.debug("Save optimized model into %s", output_model_path) + optimized_model.save(output_model_path) + + +def _performance_metrics(optimized_model: OptimizedModel) -> PerformanceMetrics: + """Return performance metrics for optimized model.""" + cycles = optimized_model.nng.cycles + + def memory_usage(mem_area: MemArea) -> int: + """Get memory usage for the proviced memory area type.""" + memory_used: Dict[MemArea, int] = optimized_model.nng.memory_used + bandwidths = optimized_model.nng.bandwidths + + return memory_used.get(mem_area, 0) if np.sum(bandwidths[mem_area]) > 0 else 0 + + midpoint_fps = np.nan + midpoint_inference_time = cycles[PassCycles.Total] / optimized_model.arch.core_clock + if midpoint_inference_time > 0: + midpoint_fps = 1 / midpoint_inference_time + + return PerformanceMetrics( + npu_cycles=int(cycles[PassCycles.Npu]), + sram_access_cycles=int(cycles[PassCycles.SramAccess]), + dram_access_cycles=int(cycles[PassCycles.DramAccess]), + on_chip_flash_access_cycles=int(cycles[PassCycles.OnChipFlashAccess]), + off_chip_flash_access_cycles=int(cycles[PassCycles.OffChipFlashAccess]), + total_cycles=int(cycles[PassCycles.Total]), + batch_inference_time=midpoint_inference_time * 1000, + inferences_per_second=midpoint_fps, + batch_size=optimized_model.nng.batch_size, + unknown_memory_area_size=memory_usage(MemArea.Unknown), + sram_memory_area_size=memory_usage(MemArea.Sram), + dram_memory_area_size=memory_usage(MemArea.Dram), + on_chip_flash_memory_area_size=memory_usage(MemArea.OnChipFlash), + off_chip_flash_memory_area_size=memory_usage(MemArea.OffChipFlash), + ) + + +def supported_operators( + model_path: Path, compiler_options: VelaCompilerOptions +) -> Operators: + """Return list of model's operators.""" + logger.debug("Check supported operators for the model %s", model_path) + + vela_compiler = VelaCompiler(compiler_options) + initial_model = vela_compiler.read_model(model_path) + + return Operators( + [ + Operator(op.name, optype_to_builtintype(op.type), run_on_npu(op)) + for sg in initial_model.nng.subgraphs + for op in sg.get_all_ops() + if op.type not in VELA_INTERNAL_OPS + ] + ) + + +def run_on_npu(operator: Op) -> NpuSupported: + """Return information if operator can run on NPU. + + Vela does a number of checks that can help establish whether + a particular operator is supported to run on NPU. + + There are two groups of checks: + - general TFLite constraints + - operator specific constraints + + If an operator is not supported on NPU then this function + will return the reason of that. + + The reason is split in two parts: + - general description of why the operator cannot be placed on NPU + - details on the particular operator + """ + semantic_checker = TFLiteSemantic() + semantic_constraints = itertools.chain( + semantic_checker.generic_constraints, + semantic_checker.specific_constraints[operator.type], + ) + + for constraint in semantic_constraints: + op_valid, op_reason = constraint(operator) + if not op_valid: + return NpuSupported(False, [(constraint.__doc__, op_reason)]) + + if operator.type not in TFLiteSupportedOperators.supported_operators: + reasons = ( + [("CPU only operator", "")] + if operator.type not in VELA_INTERNAL_OPS + else [] + ) + + return NpuSupported(False, reasons) + + tflite_supported_operators = TFLiteSupportedOperators() + operation_constraints = itertools.chain( + tflite_supported_operators.generic_constraints, + tflite_supported_operators.specific_constraints[operator.type], + ) + for constraint in operation_constraints: + op_valid, op_reason = constraint(operator) + if not op_valid: + return NpuSupported(False, [(constraint.__doc__, op_reason)]) + + return NpuSupported(True, []) + + +def generate_supported_operators_report() -> None: + """Generate supported operators report in current working directory.""" + with redirect_output(logger): + generate_supported_ops() |