diff options
author | Dmitrii Agibov <dmitrii.agibov@arm.com> | 2022-11-18 16:34:03 +0000 |
---|---|---|
committer | Dmitrii Agibov <dmitrii.agibov@arm.com> | 2022-11-29 14:44:13 +0000 |
commit | 37959522a805a5e23c930ed79aac84920c3cb208 (patch) | |
tree | 484af1240a93c955a72ce2e452432383b6704b56 /src/mlia/backend | |
parent | 5568f9f000d673ac53e710dcc8991fec6e8a5488 (diff) | |
download | mlia-37959522a805a5e23c930ed79aac84920c3cb208.tar.gz |
Move backends functionality into separate modules
- Move backend management/executor code into module backend_core
- Create separate module for each backend in "backend" module
- Move each backend into corresponding module
- Split Vela wrapper into several submodules
Change-Id: If01b6774aab6501951212541cc5d7f5aa7c97e95
Diffstat (limited to 'src/mlia/backend')
23 files changed, 1745 insertions, 350 deletions
diff --git a/src/mlia/backend/__init__.py b/src/mlia/backend/__init__.py index 3d60372..745aa1b 100644 --- a/src/mlia/backend/__init__.py +++ b/src/mlia/backend/__init__.py @@ -1,3 +1,3 @@ # SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. # SPDX-License-Identifier: Apache-2.0 -"""Backend module.""" +"""Backends module.""" diff --git a/src/mlia/backend/corstone/__init__.py b/src/mlia/backend/corstone/__init__.py new file mode 100644 index 0000000..a1eac14 --- /dev/null +++ b/src/mlia/backend/corstone/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Corstone backend module.""" diff --git a/src/mlia/backend/corstone/install.py b/src/mlia/backend/corstone/install.py new file mode 100644 index 0000000..2a0e5c9 --- /dev/null +++ b/src/mlia/backend/corstone/install.py @@ -0,0 +1,155 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Module for Corstone based FVPs. + +The import of subprocess module raises a B404 bandit error. MLIA usage of +subprocess is needed and can be considered safe hence disabling the security +check. +""" +from __future__ import annotations + +import logging +import subprocess # nosec +from pathlib import Path + +from mlia.backend.executor.runner import BackendRunner +from mlia.backend.install import BackendInstallation +from mlia.backend.install import BackendMetadata +from mlia.backend.install import CompoundPathChecker +from mlia.backend.install import Installation +from mlia.backend.install import PackagePathChecker +from mlia.backend.install import StaticPathChecker +from mlia.utils.download import DownloadArtifact +from mlia.utils.filesystem import working_directory + + +logger = logging.getLogger(__name__) + + +class Corstone300Installer: + """Helper class that wraps Corstone 300 installation logic.""" + + def __call__(self, eula_agreement: bool, dist_dir: Path) -> Path: + """Install Corstone-300 and return path to the models.""" + with working_directory(dist_dir): + install_dir = "corstone-300" + try: + fvp_install_cmd = [ + "./FVP_Corstone_SSE-300.sh", + "-q", + "-d", + install_dir, + ] + if not eula_agreement: + fvp_install_cmd += [ + "--nointeractive", + "--i-agree-to-the-contained-eula", + ] + + # The following line raises a B603 error for bandit. In this + # specific case, the input is pretty much static and cannot be + # changed byt the user hence disabling the security check for + # this instance + subprocess.check_call(fvp_install_cmd) # nosec + except subprocess.CalledProcessError as err: + raise Exception( + "Error occurred during Corstone-300 installation" + ) from err + + return dist_dir / install_dir + + +def get_corstone_300_installation() -> Installation: + """Get Corstone-300 installation.""" + corstone_300 = BackendInstallation( + backend_runner=BackendRunner(), + # pylint: disable=line-too-long + metadata=BackendMetadata( + name="Corstone-300", + description="Corstone-300 FVP", + system_config="backend_configs/systems/corstone-300/backend-config.json", + apps_resources=[], + fvp_dir_name="corstone_300", + download_artifact=DownloadArtifact( + name="Corstone-300 FVP", + url="https://developer.arm.com/-/media/Arm%20Developer%20Community/Downloads/OSS/FVP/Corstone-300/FVP_Corstone_SSE-300_11.16_26.tgz", + filename="FVP_Corstone_SSE-300_11.16_26.tgz", + version="11.16_26", + sha256_hash="e26139be756b5003a30d978c629de638aed1934d597dc24a17043d4708e934d7", + ), + supported_platforms=["Linux"], + ), + # pylint: enable=line-too-long + path_checker=CompoundPathChecker( + PackagePathChecker( + expected_files=[ + "models/Linux64_GCC-6.4/FVP_Corstone_SSE-300_Ethos-U55", + "models/Linux64_GCC-6.4/FVP_Corstone_SSE-300_Ethos-U65", + ], + backend_subfolder="models/Linux64_GCC-6.4", + ), + StaticPathChecker( + static_backend_path=Path("/opt/VHT"), + expected_files=[ + "VHT_Corstone_SSE-300_Ethos-U55", + "VHT_Corstone_SSE-300_Ethos-U65", + ], + copy_source=False, + system_config=( + "backend_configs/systems/corstone-300-vht/backend-config.json" + ), + ), + ), + backend_installer=Corstone300Installer(), + ) + + return corstone_300 + + +def get_corstone_310_installation() -> Installation: + """Get Corstone-310 installation.""" + corstone_310 = BackendInstallation( + backend_runner=BackendRunner(), + # pylint: disable=line-too-long + metadata=BackendMetadata( + name="Corstone-310", + description="Corstone-310 FVP", + system_config="backend_configs/systems/corstone-310/backend-config.json", + apps_resources=[], + fvp_dir_name="corstone_310", + download_artifact=None, + supported_platforms=["Linux"], + ), + # pylint: enable=line-too-long + path_checker=CompoundPathChecker( + PackagePathChecker( + expected_files=[ + "models/Linux64_GCC-9.3/FVP_Corstone_SSE-310", + "models/Linux64_GCC-9.3/FVP_Corstone_SSE-310_Ethos-U65", + ], + backend_subfolder="models/Linux64_GCC-9.3", + ), + StaticPathChecker( + static_backend_path=Path("/opt/VHT"), + expected_files=[ + "VHT_Corstone_SSE-310", + "VHT_Corstone_SSE-310_Ethos-U65", + ], + copy_source=False, + system_config=( + "backend_configs/systems/corstone-310-vht/backend-config.json" + ), + ), + ), + backend_installer=None, + ) + + return corstone_310 + + +def get_corstone_installations() -> list[Installation]: + """Get Corstone installations.""" + return [ + get_corstone_300_installation(), + get_corstone_310_installation(), + ] diff --git a/src/mlia/backend/corstone/performance.py b/src/mlia/backend/corstone/performance.py new file mode 100644 index 0000000..5aabfa5 --- /dev/null +++ b/src/mlia/backend/corstone/performance.py @@ -0,0 +1,233 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Module for backend integration.""" +from __future__ import annotations + +import logging +from abc import ABC +from abc import abstractmethod +from dataclasses import dataclass +from pathlib import Path +from typing import Literal + +from mlia.backend.executor.output_consumer import Base64OutputConsumer +from mlia.backend.executor.output_consumer import OutputConsumer +from mlia.backend.executor.runner import BackendRunner +from mlia.backend.executor.runner import ExecutionParams +from mlia.backend.install import get_application_name +from mlia.backend.install import get_system_name + + +logger = logging.getLogger(__name__) + + +@dataclass +class DeviceInfo: + """Device information.""" + + device_type: Literal["ethos-u55", "ethos-u65"] + mac: int + + +@dataclass +class ModelInfo: + """Model info.""" + + model_path: Path + + +@dataclass +class PerformanceMetrics: + """Performance metrics parsed from generic inference output.""" + + npu_active_cycles: int + npu_idle_cycles: int + npu_total_cycles: int + npu_axi0_rd_data_beat_received: int + npu_axi0_wr_data_beat_written: int + npu_axi1_rd_data_beat_received: int + + +class LogWriter(OutputConsumer): + """Redirect output to the logger.""" + + def feed(self, line: str) -> bool: + """Process line from the output.""" + logger.debug(line.strip()) + return False + + +class GenericInferenceOutputParser(Base64OutputConsumer): + """Generic inference app output parser.""" + + def __init__(self) -> None: + """Init generic inference output parser instance.""" + super().__init__() + self._map = { + "NPU ACTIVE": "npu_active_cycles", + "NPU IDLE": "npu_idle_cycles", + "NPU TOTAL": "npu_total_cycles", + "NPU AXI0_RD_DATA_BEAT_RECEIVED": "npu_axi0_rd_data_beat_received", + "NPU AXI0_WR_DATA_BEAT_WRITTEN": "npu_axi0_wr_data_beat_written", + "NPU AXI1_RD_DATA_BEAT_RECEIVED": "npu_axi1_rd_data_beat_received", + } + + @property + def result(self) -> dict: + """Merge the raw results and map the names to the right output names.""" + merged_result = {} + for raw_result in self.parsed_output: + for profiling_result in raw_result: + for sample in profiling_result["samples"]: + name, values = (sample["name"], sample["value"]) + if name in merged_result: + raise KeyError( + f"Duplicate key '{name}' in base64 output.", + ) + new_name = self._map[name] + merged_result[new_name] = values[0] + return merged_result + + def is_ready(self) -> bool: + """Return true if all expected data has been parsed.""" + return set(self.result.keys()) == set(self._map.values()) + + def missed_keys(self) -> set[str]: + """Return a set of the keys that have not been found in the output.""" + return set(self._map.values()) - set(self.result.keys()) + + +class GenericInferenceRunner(ABC): + """Abstract class for generic inference runner.""" + + def __init__(self, backend_runner: BackendRunner): + """Init generic inference runner instance.""" + self.backend_runner = backend_runner + + def run( + self, model_info: ModelInfo, output_consumers: list[OutputConsumer] + ) -> None: + """Run generic inference for the provided device/model.""" + execution_params = self.get_execution_params(model_info) + + ctx = self.backend_runner.run_application(execution_params) + if ctx.stdout is not None: + ctx.stdout = self.consume_output(ctx.stdout, output_consumers) + + @abstractmethod + def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams: + """Get execution params for the provided model.""" + + def check_system_and_application(self, system_name: str, app_name: str) -> None: + """Check if requested system and application installed.""" + if not self.backend_runner.is_system_installed(system_name): + raise Exception(f"System {system_name} is not installed") + + if not self.backend_runner.is_application_installed(app_name, system_name): + raise Exception( + f"Application {app_name} for the system {system_name} " + "is not installed" + ) + + @staticmethod + def consume_output(output: bytearray, consumers: list[OutputConsumer]) -> bytearray: + """ + Pass program's output to the consumers and filter it. + + Returns the filtered output. + """ + filtered_output = bytearray() + for line_bytes in output.splitlines(): + line = line_bytes.decode("utf-8") + remove_line = False + for consumer in consumers: + if consumer.feed(line): + remove_line = True + if not remove_line: + filtered_output.extend(line_bytes) + + return filtered_output + + +class GenericInferenceRunnerEthosU(GenericInferenceRunner): + """Generic inference runner on U55/65.""" + + def __init__( + self, backend_runner: BackendRunner, device_info: DeviceInfo, backend: str + ) -> None: + """Init generic inference runner instance.""" + super().__init__(backend_runner) + + system_name, app_name = self.resolve_system_and_app(device_info, backend) + self.system_name = system_name + self.app_name = app_name + self.device_info = device_info + + @staticmethod + def resolve_system_and_app( + device_info: DeviceInfo, backend: str + ) -> tuple[str, str]: + """Find appropriate system and application for the provided device/backend.""" + try: + system_name = get_system_name(backend, device_info.device_type) + except KeyError as ex: + raise RuntimeError( + f"Unsupported device {device_info.device_type} " + f"for backend {backend}" + ) from ex + + try: + app_name = get_application_name(system_name) + except KeyError as err: + raise RuntimeError(f"System {system_name} is not installed") from err + + return system_name, app_name + + def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams: + """Get execution params for Ethos-U55/65.""" + self.check_system_and_application(self.system_name, self.app_name) + + system_params = [ + f"mac={self.device_info.mac}", + f"input_file={model_info.model_path.absolute()}", + ] + + return ExecutionParams( + self.app_name, + self.system_name, + [], + system_params, + ) + + +def get_generic_runner(device_info: DeviceInfo, backend: str) -> GenericInferenceRunner: + """Get generic runner for provided device and backend.""" + backend_runner = get_backend_runner() + return GenericInferenceRunnerEthosU(backend_runner, device_info, backend) + + +def estimate_performance( + model_info: ModelInfo, device_info: DeviceInfo, backend: str +) -> PerformanceMetrics: + """Get performance estimations.""" + output_parser = GenericInferenceOutputParser() + output_consumers = [output_parser, LogWriter()] + + generic_runner = get_generic_runner(device_info, backend) + generic_runner.run(model_info, output_consumers) + + if not output_parser.is_ready(): + missed_data = ",".join(output_parser.missed_keys()) + logger.debug("Unable to get performance metrics, missed data %s", missed_data) + raise Exception("Unable to get performance metrics, insufficient data") + + return PerformanceMetrics(**output_parser.result) + + +def get_backend_runner() -> BackendRunner: + """ + Return BackendRunner instance. + + Note: This is needed for the unit tests. + """ + return BackendRunner() diff --git a/src/mlia/backend/executor/__init__.py b/src/mlia/backend/executor/__init__.py new file mode 100644 index 0000000..3d60372 --- /dev/null +++ b/src/mlia/backend/executor/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Backend module.""" diff --git a/src/mlia/backend/application.py b/src/mlia/backend/executor/application.py index a5d99f7..738ac4e 100644 --- a/src/mlia/backend/application.py +++ b/src/mlia/backend/executor/application.py @@ -9,18 +9,18 @@ from typing import Any from typing import cast from typing import List -from mlia.backend.common import Backend -from mlia.backend.common import ConfigurationException -from mlia.backend.common import get_backend_configs -from mlia.backend.common import get_backend_directories -from mlia.backend.common import load_application_configs -from mlia.backend.common import load_config -from mlia.backend.common import remove_backend -from mlia.backend.config import ApplicationConfig -from mlia.backend.config import ExtendedApplicationConfig -from mlia.backend.fs import get_backends_path -from mlia.backend.source import create_destination_and_install -from mlia.backend.source import get_source +from mlia.backend.executor.common import Backend +from mlia.backend.executor.common import ConfigurationException +from mlia.backend.executor.common import get_backend_configs +from mlia.backend.executor.common import get_backend_directories +from mlia.backend.executor.common import load_application_configs +from mlia.backend.executor.common import load_config +from mlia.backend.executor.common import remove_backend +from mlia.backend.executor.config import ApplicationConfig +from mlia.backend.executor.config import ExtendedApplicationConfig +from mlia.backend.executor.fs import get_backends_path +from mlia.backend.executor.source import create_destination_and_install +from mlia.backend.executor.source import get_source def get_available_application_directory_names() -> list[str]: diff --git a/src/mlia/backend/common.py b/src/mlia/backend/executor/common.py index 0f04553..48dbd4a 100644 --- a/src/mlia/backend/common.py +++ b/src/mlia/backend/executor/common.py @@ -19,14 +19,14 @@ from typing import Match from typing import NamedTuple from typing import Pattern -from mlia.backend.config import BackendConfig -from mlia.backend.config import BaseBackendConfig -from mlia.backend.config import NamedExecutionConfig -from mlia.backend.config import UserParamConfig -from mlia.backend.config import UserParamsConfig -from mlia.backend.fs import get_backends_path -from mlia.backend.fs import remove_resource -from mlia.backend.fs import ResourceType +from mlia.backend.executor.config import BackendConfig +from mlia.backend.executor.config import BaseBackendConfig +from mlia.backend.executor.config import NamedExecutionConfig +from mlia.backend.executor.config import UserParamConfig +from mlia.backend.executor.config import UserParamsConfig +from mlia.backend.executor.fs import get_backends_path +from mlia.backend.executor.fs import remove_resource +from mlia.backend.executor.fs import ResourceType BACKEND_CONFIG_FILE: Final[str] = "backend-config.json" diff --git a/src/mlia/backend/config.py b/src/mlia/backend/executor/config.py index dca53da..dca53da 100644 --- a/src/mlia/backend/config.py +++ b/src/mlia/backend/executor/config.py diff --git a/src/mlia/backend/execution.py b/src/mlia/backend/executor/execution.py index 5c8e53f..e253b16 100644 --- a/src/mlia/backend/execution.py +++ b/src/mlia/backend/executor/execution.py @@ -7,13 +7,13 @@ import logging import re from typing import cast -from mlia.backend.application import Application -from mlia.backend.application import get_application -from mlia.backend.common import Backend -from mlia.backend.common import ConfigurationException -from mlia.backend.common import Param -from mlia.backend.system import get_system -from mlia.backend.system import System +from mlia.backend.executor.application import Application +from mlia.backend.executor.application import get_application +from mlia.backend.executor.common import Backend +from mlia.backend.executor.common import ConfigurationException +from mlia.backend.executor.common import Param +from mlia.backend.executor.system import get_system +from mlia.backend.executor.system import System logger = logging.getLogger(__name__) diff --git a/src/mlia/backend/fs.py b/src/mlia/backend/executor/fs.py index 3fce19c..3fce19c 100644 --- a/src/mlia/backend/fs.py +++ b/src/mlia/backend/executor/fs.py diff --git a/src/mlia/backend/output_consumer.py b/src/mlia/backend/executor/output_consumer.py index 3c3b132..3c3b132 100644 --- a/src/mlia/backend/output_consumer.py +++ b/src/mlia/backend/executor/output_consumer.py diff --git a/src/mlia/backend/proc.py b/src/mlia/backend/executor/proc.py index 4838e47..39a0689 100644 --- a/src/mlia/backend/proc.py +++ b/src/mlia/backend/executor/proc.py @@ -21,7 +21,7 @@ from sh import CommandNotFound from sh import ErrorReturnCode from sh import RunningCommand -from mlia.backend.fs import valid_for_filename +from mlia.backend.executor.fs import valid_for_filename logger = logging.getLogger(__name__) diff --git a/src/mlia/backend/executor/runner.py b/src/mlia/backend/executor/runner.py new file mode 100644 index 0000000..2330fd9 --- /dev/null +++ b/src/mlia/backend/executor/runner.py @@ -0,0 +1,98 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Module for backend runner.""" +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + +from mlia.backend.executor.application import get_available_applications +from mlia.backend.executor.application import install_application +from mlia.backend.executor.execution import ExecutionContext +from mlia.backend.executor.execution import run_application +from mlia.backend.executor.system import get_available_systems +from mlia.backend.executor.system import install_system + + +@dataclass +class ExecutionParams: + """Application execution params.""" + + application: str + system: str + application_params: list[str] + system_params: list[str] + + +class BackendRunner: + """Backend runner.""" + + def __init__(self) -> None: + """Init BackendRunner instance.""" + + @staticmethod + def get_installed_systems() -> list[str]: + """Get list of the installed systems.""" + return [system.name for system in get_available_systems()] + + @staticmethod + def get_installed_applications(system: str | None = None) -> list[str]: + """Get list of the installed application.""" + return [ + app.name + for app in get_available_applications() + if system is None or app.can_run_on(system) + ] + + def is_application_installed(self, application: str, system: str) -> bool: + """Return true if requested application installed.""" + return application in self.get_installed_applications(system) + + def is_system_installed(self, system: str) -> bool: + """Return true if requested system installed.""" + return system in self.get_installed_systems() + + def systems_installed(self, systems: list[str]) -> bool: + """Check if all provided systems are installed.""" + if not systems: + return False + + installed_systems = self.get_installed_systems() + return all(system in installed_systems for system in systems) + + def applications_installed(self, applications: list[str]) -> bool: + """Check if all provided applications are installed.""" + if not applications: + return False + + installed_apps = self.get_installed_applications() + return all(app in installed_apps for app in applications) + + def all_installed(self, systems: list[str], apps: list[str]) -> bool: + """Check if all provided artifacts are installed.""" + return self.systems_installed(systems) and self.applications_installed(apps) + + @staticmethod + def install_system(system_path: Path) -> None: + """Install system.""" + install_system(system_path) + + @staticmethod + def install_application(app_path: Path) -> None: + """Install application.""" + install_application(app_path) + + @staticmethod + def run_application(execution_params: ExecutionParams) -> ExecutionContext: + """Run requested application.""" + ctx = run_application( + execution_params.application, + execution_params.application_params, + execution_params.system, + execution_params.system_params, + ) + return ctx + + @staticmethod + def _params(name: str, params: list[str]) -> list[str]: + return [p for item in [(name, param) for param in params] for p in item] diff --git a/src/mlia/backend/source.py b/src/mlia/backend/executor/source.py index c951eae..6abc49f 100644 --- a/src/mlia/backend/source.py +++ b/src/mlia/backend/executor/source.py @@ -11,13 +11,13 @@ from abc import abstractmethod from pathlib import Path from tarfile import TarFile -from mlia.backend.common import BACKEND_CONFIG_FILE -from mlia.backend.common import ConfigurationException -from mlia.backend.common import get_backend_config -from mlia.backend.common import is_backend_directory -from mlia.backend.common import load_config -from mlia.backend.config import BackendConfig -from mlia.backend.fs import copy_directory_content +from mlia.backend.executor.common import BACKEND_CONFIG_FILE +from mlia.backend.executor.common import ConfigurationException +from mlia.backend.executor.common import get_backend_config +from mlia.backend.executor.common import is_backend_directory +from mlia.backend.executor.common import load_config +from mlia.backend.executor.config import BackendConfig +from mlia.backend.executor.fs import copy_directory_content class Source(ABC): diff --git a/src/mlia/backend/system.py b/src/mlia/backend/executor/system.py index 0e51ab2..a5ecf19 100644 --- a/src/mlia/backend/system.py +++ b/src/mlia/backend/executor/system.py @@ -8,17 +8,17 @@ from typing import Any from typing import cast from typing import List -from mlia.backend.common import Backend -from mlia.backend.common import ConfigurationException -from mlia.backend.common import get_backend_configs -from mlia.backend.common import get_backend_directories -from mlia.backend.common import load_config -from mlia.backend.common import remove_backend -from mlia.backend.config import SystemConfig -from mlia.backend.fs import get_backends_path -from mlia.backend.proc import run_and_wait -from mlia.backend.source import create_destination_and_install -from mlia.backend.source import get_source +from mlia.backend.executor.common import Backend +from mlia.backend.executor.common import ConfigurationException +from mlia.backend.executor.common import get_backend_configs +from mlia.backend.executor.common import get_backend_directories +from mlia.backend.executor.common import load_config +from mlia.backend.executor.common import remove_backend +from mlia.backend.executor.config import SystemConfig +from mlia.backend.executor.fs import get_backends_path +from mlia.backend.executor.proc import run_and_wait +from mlia.backend.executor.source import create_destination_and_install +from mlia.backend.executor.source import get_source class System(Backend): diff --git a/src/mlia/backend/install.py b/src/mlia/backend/install.py new file mode 100644 index 0000000..eea3403 --- /dev/null +++ b/src/mlia/backend/install.py @@ -0,0 +1,450 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Module for installation process.""" +from __future__ import annotations + +import logging +import platform +import tarfile +from abc import ABC +from abc import abstractmethod +from dataclasses import dataclass +from pathlib import Path +from typing import Callable +from typing import Iterable +from typing import Optional +from typing import Union + +from mlia.backend.executor.runner import BackendRunner +from mlia.backend.executor.system import remove_system +from mlia.utils.download import DownloadArtifact +from mlia.utils.filesystem import all_files_exist +from mlia.utils.filesystem import all_paths_valid +from mlia.utils.filesystem import copy_all +from mlia.utils.filesystem import get_mlia_resources +from mlia.utils.filesystem import temp_directory +from mlia.utils.filesystem import working_directory +from mlia.utils.py_manager import get_package_manager + + +logger = logging.getLogger(__name__) + + +# Mapping backend -> device_type -> system_name +_SUPPORTED_SYSTEMS = { + "Corstone-300": { + "ethos-u55": "Corstone-300: Cortex-M55+Ethos-U55", + "ethos-u65": "Corstone-300: Cortex-M55+Ethos-U65", + }, + "Corstone-310": { + "ethos-u55": "Corstone-310: Cortex-M85+Ethos-U55", + "ethos-u65": "Corstone-310: Cortex-M85+Ethos-U65", + }, +} + +# Mapping system_name -> application +_SYSTEM_TO_APP_MAP = { + "Corstone-300: Cortex-M55+Ethos-U55": "Generic Inference Runner: Ethos-U55", + "Corstone-300: Cortex-M55+Ethos-U65": "Generic Inference Runner: Ethos-U65", + "Corstone-310: Cortex-M85+Ethos-U55": "Generic Inference Runner: Ethos-U55", + "Corstone-310: Cortex-M85+Ethos-U65": "Generic Inference Runner: Ethos-U65", +} + + +def get_system_name(backend: str, device_type: str) -> str: + """Get the system name for the given backend and device type.""" + return _SUPPORTED_SYSTEMS[backend][device_type] + + +def get_application_name(system_name: str) -> str: + """Get application name for the provided system name.""" + return _SYSTEM_TO_APP_MAP[system_name] + + +def is_supported(backend: str, device_type: str | None = None) -> bool: + """Check if the backend (and optionally device type) is supported.""" + if device_type is None: + return backend in _SUPPORTED_SYSTEMS + + try: + get_system_name(backend, device_type) + return True + except KeyError: + return False + + +def supported_backends() -> list[str]: + """Get a list of all backends supported by the backend manager.""" + return list(_SUPPORTED_SYSTEMS.keys()) + + +def get_all_system_names(backend: str) -> list[str]: + """Get all systems supported by the backend.""" + return list(_SUPPORTED_SYSTEMS.get(backend, {}).values()) + + +def get_all_application_names(backend: str) -> list[str]: + """Get all applications supported by the backend.""" + app_set = {_SYSTEM_TO_APP_MAP[sys] for sys in get_all_system_names(backend)} + return list(app_set) + + +@dataclass +class InstallFromPath: + """Installation from the local path.""" + + backend_path: Path + + +@dataclass +class DownloadAndInstall: + """Download and install.""" + + eula_agreement: bool = True + + +InstallationType = Union[InstallFromPath, DownloadAndInstall] + + +class Installation(ABC): + """Base class for the installation process of the backends.""" + + @property + @abstractmethod + def name(self) -> str: + """Return name of the backend.""" + + @property + @abstractmethod + def description(self) -> str: + """Return description of the backend.""" + + @property + @abstractmethod + def could_be_installed(self) -> bool: + """Return true if backend could be installed in current environment.""" + + @property + @abstractmethod + def already_installed(self) -> bool: + """Return true if backend is already installed.""" + + @abstractmethod + def supports(self, install_type: InstallationType) -> bool: + """Return true if installation supports requested installation type.""" + + @abstractmethod + def install(self, install_type: InstallationType) -> None: + """Install the backend.""" + + @abstractmethod + def uninstall(self) -> None: + """Uninstall the backend.""" + + +@dataclass +class BackendInfo: + """Backend information.""" + + backend_path: Path + copy_source: bool = True + system_config: str | None = None + + +PathChecker = Callable[[Path], Optional[BackendInfo]] +BackendInstaller = Callable[[bool, Path], Path] + + +class BackendMetadata: + """Backend installation metadata.""" + + def __init__( + self, + name: str, + description: str, + system_config: str, + apps_resources: list[str], + fvp_dir_name: str, + download_artifact: DownloadArtifact | None, + supported_platforms: list[str] | None = None, + ) -> None: + """ + Initialize BackendMetadata. + + Members expected_systems and expected_apps are filled automatically. + """ + self.name = name + self.description = description + self.system_config = system_config + self.apps_resources = apps_resources + self.fvp_dir_name = fvp_dir_name + self.download_artifact = download_artifact + self.supported_platforms = supported_platforms + + self.expected_systems = get_all_system_names(name) + self.expected_apps = get_all_application_names(name) + + @property + def expected_resources(self) -> Iterable[Path]: + """Return list of expected resources.""" + resources = [self.system_config, *self.apps_resources] + + return (get_mlia_resources() / resource for resource in resources) + + @property + def supported_platform(self) -> bool: + """Return true if current platform supported.""" + if not self.supported_platforms: + return True + + return platform.system() in self.supported_platforms + + +class BackendInstallation(Installation): + """Backend installation.""" + + def __init__( + self, + backend_runner: BackendRunner, + metadata: BackendMetadata, + path_checker: PathChecker, + backend_installer: BackendInstaller | None, + ) -> None: + """Init the backend installation.""" + self.backend_runner = backend_runner + self.metadata = metadata + self.path_checker = path_checker + self.backend_installer = backend_installer + + @property + def name(self) -> str: + """Return name of the backend.""" + return self.metadata.name + + @property + def description(self) -> str: + """Return description of the backend.""" + return self.metadata.description + + @property + def already_installed(self) -> bool: + """Return true if backend already installed.""" + return self.backend_runner.all_installed( + self.metadata.expected_systems, self.metadata.expected_apps + ) + + @property + def could_be_installed(self) -> bool: + """Return true if backend could be installed.""" + if not self.metadata.supported_platform: + return False + + return all_paths_valid(self.metadata.expected_resources) + + def supports(self, install_type: InstallationType) -> bool: + """Return true if backends supported type of the installation.""" + if isinstance(install_type, DownloadAndInstall): + return self.metadata.download_artifact is not None + + if isinstance(install_type, InstallFromPath): + return self.path_checker(install_type.backend_path) is not None + + return False # type: ignore + + def install(self, install_type: InstallationType) -> None: + """Install the backend.""" + if isinstance(install_type, DownloadAndInstall): + download_artifact = self.metadata.download_artifact + assert download_artifact is not None, "No artifact provided" + + self.download_and_install(download_artifact, install_type.eula_agreement) + elif isinstance(install_type, InstallFromPath): + backend_path = self.path_checker(install_type.backend_path) + assert backend_path is not None, "Unable to resolve backend path" + + self.install_from(backend_path) + else: + raise Exception(f"Unable to install {install_type}") + + def install_from(self, backend_info: BackendInfo) -> None: + """Install backend from the directory.""" + mlia_resources = get_mlia_resources() + + with temp_directory() as tmpdir: + fvp_dist_dir = tmpdir / self.metadata.fvp_dir_name + + system_config = self.metadata.system_config + if backend_info.system_config: + system_config = backend_info.system_config + + resources_to_copy = [mlia_resources / system_config] + if backend_info.copy_source: + resources_to_copy.append(backend_info.backend_path) + + copy_all(*resources_to_copy, dest=fvp_dist_dir) + + self.backend_runner.install_system(fvp_dist_dir) + + for app in self.metadata.apps_resources: + self.backend_runner.install_application(mlia_resources / app) + + def download_and_install( + self, download_artifact: DownloadArtifact, eula_agrement: bool + ) -> None: + """Download and install the backend.""" + with temp_directory() as tmpdir: + try: + downloaded_to = download_artifact.download_to(tmpdir) + except Exception as err: + raise Exception("Unable to download backend artifact") from err + + with working_directory(tmpdir / "dist", create_dir=True) as dist_dir: + with tarfile.open(downloaded_to) as archive: + archive.extractall(dist_dir) + + assert self.backend_installer, ( + f"Backend '{self.metadata.name}' does not support " + "download and installation." + ) + backend_path = self.backend_installer(eula_agrement, dist_dir) + if self.path_checker(backend_path) is None: + raise Exception("Downloaded artifact has invalid structure") + + self.install(InstallFromPath(backend_path)) + + def uninstall(self) -> None: + """Uninstall the backend.""" + remove_system(self.metadata.fvp_dir_name) + + +class PackagePathChecker: + """Package path checker.""" + + def __init__( + self, expected_files: list[str], backend_subfolder: str | None = None + ) -> None: + """Init the path checker.""" + self.expected_files = expected_files + self.backend_subfolder = backend_subfolder + + def __call__(self, backend_path: Path) -> BackendInfo | None: + """Check if directory contains all expected files.""" + resolved_paths = (backend_path / file for file in self.expected_files) + if not all_files_exist(resolved_paths): + return None + + if self.backend_subfolder: + subfolder = backend_path / self.backend_subfolder + + if not subfolder.is_dir(): + return None + + return BackendInfo(subfolder) + + return BackendInfo(backend_path) + + +class StaticPathChecker: + """Static path checker.""" + + def __init__( + self, + static_backend_path: Path, + expected_files: list[str], + copy_source: bool = False, + system_config: str | None = None, + ) -> None: + """Init static path checker.""" + self.static_backend_path = static_backend_path + self.expected_files = expected_files + self.copy_source = copy_source + self.system_config = system_config + + def __call__(self, backend_path: Path) -> BackendInfo | None: + """Check if directory equals static backend path with all expected files.""" + if backend_path != self.static_backend_path: + return None + + resolved_paths = (backend_path / file for file in self.expected_files) + if not all_files_exist(resolved_paths): + return None + + return BackendInfo( + backend_path, + copy_source=self.copy_source, + system_config=self.system_config, + ) + + +class CompoundPathChecker: + """Compound path checker.""" + + def __init__(self, *path_checkers: PathChecker) -> None: + """Init compound path checker.""" + self.path_checkers = path_checkers + + def __call__(self, backend_path: Path) -> BackendInfo | None: + """Iterate over checkers and return first non empty backend info.""" + first_resolved_backend_info = ( + backend_info + for path_checker in self.path_checkers + if (backend_info := path_checker(backend_path)) is not None + ) + + return next(first_resolved_backend_info, None) + + +class PyPackageBackendInstallation(Installation): + """Backend based on the python package.""" + + def __init__( + self, + name: str, + description: str, + packages_to_install: list[str], + packages_to_uninstall: list[str], + expected_packages: list[str], + ) -> None: + """Init the backend installation.""" + self._name = name + self._description = description + self._packages_to_install = packages_to_install + self._packages_to_uninstall = packages_to_uninstall + self._expected_packages = expected_packages + + self.package_manager = get_package_manager() + + @property + def name(self) -> str: + """Return name of the backend.""" + return self._name + + @property + def description(self) -> str: + """Return description of the backend.""" + return self._description + + @property + def could_be_installed(self) -> bool: + """Check if backend could be installed.""" + return True + + @property + def already_installed(self) -> bool: + """Check if backend already installed.""" + return self.package_manager.packages_installed(self._expected_packages) + + def supports(self, install_type: InstallationType) -> bool: + """Return true if installation supports requested installation type.""" + return isinstance(install_type, DownloadAndInstall) + + def install(self, install_type: InstallationType) -> None: + """Install the backend.""" + if not self.supports(install_type): + raise Exception(f"Unsupported installation type {install_type}") + + self.package_manager.install(self._packages_to_install) + + def uninstall(self) -> None: + """Uninstall the backend.""" + self.package_manager.uninstall(self._packages_to_uninstall) diff --git a/src/mlia/backend/manager.py b/src/mlia/backend/manager.py index 6a61ab0..c02dc6e 100644 --- a/src/mlia/backend/manager.py +++ b/src/mlia/backend/manager.py @@ -1,372 +1,271 @@ # SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. # SPDX-License-Identifier: Apache-2.0 -"""Module for backend integration.""" +"""Module for installation process.""" from __future__ import annotations import logging from abc import ABC from abc import abstractmethod -from dataclasses import dataclass from pathlib import Path -from typing import Literal +from typing import Callable -from mlia.backend.application import get_available_applications -from mlia.backend.application import install_application -from mlia.backend.execution import ExecutionContext -from mlia.backend.execution import run_application -from mlia.backend.output_consumer import Base64OutputConsumer -from mlia.backend.output_consumer import OutputConsumer -from mlia.backend.system import get_available_systems -from mlia.backend.system import install_system +from mlia.backend.install import DownloadAndInstall +from mlia.backend.install import Installation +from mlia.backend.install import InstallationType +from mlia.backend.install import InstallFromPath +from mlia.core.errors import ConfigurationError +from mlia.core.errors import InternalError +from mlia.utils.misc import yes logger = logging.getLogger(__name__) -# Mapping backend -> device_type -> system_name -_SUPPORTED_SYSTEMS = { - "Corstone-300": { - "ethos-u55": "Corstone-300: Cortex-M55+Ethos-U55", - "ethos-u65": "Corstone-300: Cortex-M55+Ethos-U65", - }, - "Corstone-310": { - "ethos-u55": "Corstone-310: Cortex-M85+Ethos-U55", - "ethos-u65": "Corstone-310: Cortex-M85+Ethos-U65", - }, -} +InstallationFilter = Callable[[Installation], bool] -# Mapping system_name -> application -_SYSTEM_TO_APP_MAP = { - "Corstone-300: Cortex-M55+Ethos-U55": "Generic Inference Runner: Ethos-U55", - "Corstone-300: Cortex-M55+Ethos-U65": "Generic Inference Runner: Ethos-U65", - "Corstone-310: Cortex-M85+Ethos-U55": "Generic Inference Runner: Ethos-U55", - "Corstone-310: Cortex-M85+Ethos-U65": "Generic Inference Runner: Ethos-U65", -} +class AlreadyInstalledFilter: + """Filter for already installed backends.""" -def get_system_name(backend: str, device_type: str) -> str: - """Get the system name for the given backend and device type.""" - return _SUPPORTED_SYSTEMS[backend][device_type] + def __call__(self, installation: Installation) -> bool: + """Installation filter.""" + return installation.already_installed -def is_supported(backend: str, device_type: str | None = None) -> bool: - """Check if the backend (and optionally device type) is supported.""" - if device_type is None: - return backend in _SUPPORTED_SYSTEMS +class ReadyForInstallationFilter: + """Filter for ready to be installed backends.""" - try: - get_system_name(backend, device_type) - return True - except KeyError: - return False + def __call__(self, installation: Installation) -> bool: + """Installation filter.""" + return installation.could_be_installed and not installation.already_installed -def supported_backends() -> list[str]: - """Get a list of all backends supported by the backend manager.""" - return list(_SUPPORTED_SYSTEMS.keys()) +class SupportsInstallTypeFilter: + """Filter backends that support certain type of the installation.""" + def __init__(self, installation_type: InstallationType) -> None: + """Init filter.""" + self.installation_type = installation_type -def get_all_system_names(backend: str) -> list[str]: - """Get all systems supported by the backend.""" - return list(_SUPPORTED_SYSTEMS.get(backend, {}).values()) + def __call__(self, installation: Installation) -> bool: + """Installation filter.""" + return installation.supports(self.installation_type) -def get_all_application_names(backend: str) -> list[str]: - """Get all applications supported by the backend.""" - app_set = {_SYSTEM_TO_APP_MAP[sys] for sys in get_all_system_names(backend)} - return list(app_set) +class SearchByNameFilter: + """Filter installation by name.""" + def __init__(self, backend_name: str | None) -> None: + """Init filter.""" + self.backend_name = backend_name -@dataclass -class DeviceInfo: - """Device information.""" - - device_type: Literal["ethos-u55", "ethos-u65"] - mac: int - - -@dataclass -class ModelInfo: - """Model info.""" - - model_path: Path - - -@dataclass -class PerformanceMetrics: - """Performance metrics parsed from generic inference output.""" - - npu_active_cycles: int - npu_idle_cycles: int - npu_total_cycles: int - npu_axi0_rd_data_beat_received: int - npu_axi0_wr_data_beat_written: int - npu_axi1_rd_data_beat_received: int - - -@dataclass -class ExecutionParams: - """Application execution params.""" - - application: str - system: str - application_params: list[str] - system_params: list[str] - - -class LogWriter(OutputConsumer): - """Redirect output to the logger.""" - - def feed(self, line: str) -> bool: - """Process line from the output.""" - logger.debug(line.strip()) - return False + def __call__(self, installation: Installation) -> bool: + """Installation filter.""" + return ( + not self.backend_name + or installation.name.casefold() == self.backend_name.casefold() + ) -class GenericInferenceOutputParser(Base64OutputConsumer): - """Generic inference app output parser.""" +class InstallationManager(ABC): + """Helper class for managing installations.""" - def __init__(self) -> None: - """Init generic inference output parser instance.""" - super().__init__() - self._map = { - "NPU ACTIVE": "npu_active_cycles", - "NPU IDLE": "npu_idle_cycles", - "NPU TOTAL": "npu_total_cycles", - "NPU AXI0_RD_DATA_BEAT_RECEIVED": "npu_axi0_rd_data_beat_received", - "NPU AXI0_WR_DATA_BEAT_WRITTEN": "npu_axi0_wr_data_beat_written", - "NPU AXI1_RD_DATA_BEAT_RECEIVED": "npu_axi1_rd_data_beat_received", - } + @abstractmethod + def install_from(self, backend_path: Path, backend_name: str, force: bool) -> None: + """Install backend from the local directory.""" - @property - def result(self) -> dict: - """Merge the raw results and map the names to the right output names.""" - merged_result = {} - for raw_result in self.parsed_output: - for profiling_result in raw_result: - for sample in profiling_result["samples"]: - name, values = (sample["name"], sample["value"]) - if name in merged_result: - raise KeyError( - f"Duplicate key '{name}' in base64 output.", - ) - new_name = self._map[name] - merged_result[new_name] = values[0] - return merged_result + @abstractmethod + def download_and_install( + self, backend_name: str, eula_agreement: bool, force: bool + ) -> None: + """Download and install backends.""" - def is_ready(self) -> bool: - """Return true if all expected data has been parsed.""" - return set(self.result.keys()) == set(self._map.values()) + @abstractmethod + def show_env_details(self) -> None: + """Show environment details.""" - def missed_keys(self) -> set[str]: - """Return a set of the keys that have not been found in the output.""" - return set(self._map.values()) - set(self.result.keys()) + @abstractmethod + def backend_installed(self, backend_name: str) -> bool: + """Return true if requested backend installed.""" + @abstractmethod + def uninstall(self, backend_name: str) -> None: + """Delete the existing installation.""" -class BackendRunner: - """Backend runner.""" - def __init__(self) -> None: - """Init BackendRunner instance.""" +class InstallationFiltersMixin: + """Mixin for filtering installation based on different conditions.""" - @staticmethod - def get_installed_systems() -> list[str]: - """Get list of the installed systems.""" - return [system.name for system in get_available_systems()] + installations: list[Installation] - @staticmethod - def get_installed_applications(system: str | None = None) -> list[str]: - """Get list of the installed application.""" + def filter_by(self, *filters: InstallationFilter) -> list[Installation]: + """Filter installations.""" return [ - app.name - for app in get_available_applications() - if system is None or app.can_run_on(system) + installation + for installation in self.installations + if all(filter_(installation) for filter_ in filters) ] - def is_application_installed(self, application: str, system: str) -> bool: - """Return true if requested application installed.""" - return application in self.get_installed_applications(system) - - def is_system_installed(self, system: str) -> bool: - """Return true if requested system installed.""" - return system in self.get_installed_systems() - - def systems_installed(self, systems: list[str]) -> bool: - """Check if all provided systems are installed.""" - if not systems: - return False - - installed_systems = self.get_installed_systems() - return all(system in installed_systems for system in systems) - - def applications_installed(self, applications: list[str]) -> bool: - """Check if all provided applications are installed.""" - if not applications: - return False - - installed_apps = self.get_installed_applications() - return all(app in installed_apps for app in applications) + def find_by_name(self, backend_name: str) -> list[Installation]: + """Return list of the backends filtered by name.""" + return self.filter_by(SearchByNameFilter(backend_name)) - def all_installed(self, systems: list[str], apps: list[str]) -> bool: - """Check if all provided artifacts are installed.""" - return self.systems_installed(systems) and self.applications_installed(apps) - - @staticmethod - def install_system(system_path: Path) -> None: - """Install system.""" - install_system(system_path) - - @staticmethod - def install_application(app_path: Path) -> None: - """Install application.""" - install_application(app_path) - - @staticmethod - def run_application(execution_params: ExecutionParams) -> ExecutionContext: - """Run requested application.""" - ctx = run_application( - execution_params.application, - execution_params.application_params, - execution_params.system, - execution_params.system_params, + def already_installed(self, backend_name: str = None) -> list[Installation]: + """Return list of backends that are already installed.""" + return self.filter_by( + AlreadyInstalledFilter(), + SearchByNameFilter(backend_name), ) - return ctx - @staticmethod - def _params(name: str, params: list[str]) -> list[str]: - return [p for item in [(name, param) for param in params] for p in item] + def ready_for_installation(self) -> list[Installation]: + """Return list of the backends that could be installed.""" + return self.filter_by(ReadyForInstallationFilter()) -class GenericInferenceRunner(ABC): - """Abstract class for generic inference runner.""" +class DefaultInstallationManager(InstallationManager, InstallationFiltersMixin): + """Interactive installation manager.""" - def __init__(self, backend_runner: BackendRunner): - """Init generic inference runner instance.""" - self.backend_runner = backend_runner - - def run( - self, model_info: ModelInfo, output_consumers: list[OutputConsumer] + def __init__( + self, installations: list[Installation], noninteractive: bool = False ) -> None: - """Run generic inference for the provided device/model.""" - execution_params = self.get_execution_params(model_info) - - ctx = self.backend_runner.run_application(execution_params) - if ctx.stdout is not None: - ctx.stdout = self.consume_output(ctx.stdout, output_consumers) - - @abstractmethod - def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams: - """Get execution params for the provided model.""" - - def check_system_and_application(self, system_name: str, app_name: str) -> None: - """Check if requested system and application installed.""" - if not self.backend_runner.is_system_installed(system_name): - raise Exception(f"System {system_name} is not installed") - - if not self.backend_runner.is_application_installed(app_name, system_name): - raise Exception( - f"Application {app_name} for the system {system_name} " - "is not installed" + """Init the manager.""" + self.installations = installations + self.noninteractive = noninteractive + + def _install( + self, + backend_name: str, + install_type: InstallationType, + prompt: Callable[[Installation], str], + force: bool, + ) -> None: + """Check metadata and install backend.""" + installs = self.find_by_name(backend_name) + + if not installs: + logger.info("Unknown backend '%s'.", backend_name) + logger.info( + "Please run command 'mlia-backend list' to get list of " + "supported backend names." ) - @staticmethod - def consume_output(output: bytearray, consumers: list[OutputConsumer]) -> bytearray: - """ - Pass program's output to the consumers and filter it. + return + + if len(installs) > 1: + raise InternalError(f"More than one backend with name {backend_name} found") + + installation = installs[0] + if not installation.supports(install_type): + if isinstance(install_type, InstallFromPath): + logger.info( + "Backend '%s' could not be installed using path '%s'.", + installation.name, + install_type.backend_path, + ) + logger.info( + "Please check that '%s' is a valid path to the installed backend.", + install_type.backend_path, + ) + else: + logger.info( + "Backend '%s' could not be downloaded and installed", + installation.name, + ) + logger.info( + "Please refer to the project's documentation for more details." + ) + + return + + if installation.already_installed and not force: + logger.info("Backend '%s' is already installed.", installation.name) + logger.info("Please, consider using --force option.") + return + + proceed = self.noninteractive or yes(prompt(installation)) + if not proceed: + logger.info("%s installation canceled.", installation.name) + return + + if installation.already_installed and force: + logger.info( + "Force installing %s, so delete the existing " + "installed backend first.", + installation.name, + ) + installation.uninstall() - Returns the filtered output. - """ - filtered_output = bytearray() - for line_bytes in output.splitlines(): - line = line_bytes.decode("utf-8") - remove_line = False - for consumer in consumers: - if consumer.feed(line): - remove_line = True - if not remove_line: - filtered_output.extend(line_bytes) + installation.install(install_type) + logger.info("%s successfully installed.", installation.name) - return filtered_output + def install_from( + self, backend_path: Path, backend_name: str, force: bool = False + ) -> None: + """Install from the provided directory.""" + def prompt(install: Installation) -> str: + return ( + f"{install.name} was found in {backend_path}. " + "Would you like to install it?" + ) -class GenericInferenceRunnerEthosU(GenericInferenceRunner): - """Generic inference runner on U55/65.""" + install_type = InstallFromPath(backend_path) + self._install(backend_name, install_type, prompt, force) - def __init__( - self, backend_runner: BackendRunner, device_info: DeviceInfo, backend: str + def download_and_install( + self, backend_name: str, eula_agreement: bool = True, force: bool = False ) -> None: - """Init generic inference runner instance.""" - super().__init__(backend_runner) + """Download and install available backends.""" - system_name, app_name = self.resolve_system_and_app(device_info, backend) - self.system_name = system_name - self.app_name = app_name - self.device_info = device_info + def prompt(install: Installation) -> str: + return f"Would you like to download and install {install.name}?" - @staticmethod - def resolve_system_and_app( - device_info: DeviceInfo, backend: str - ) -> tuple[str, str]: - """Find appropriate system and application for the provided device/backend.""" - try: - system_name = get_system_name(backend, device_info.device_type) - except KeyError as ex: - raise RuntimeError( - f"Unsupported device {device_info.device_type} " - f"for backend {backend}" - ) from ex - - try: - app_name = _SYSTEM_TO_APP_MAP[system_name] - except KeyError as err: - raise RuntimeError(f"System {system_name} is not installed") from err - - return system_name, app_name - - def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams: - """Get execution params for Ethos-U55/65.""" - self.check_system_and_application(self.system_name, self.app_name) - - system_params = [ - f"mac={self.device_info.mac}", - f"input_file={model_info.model_path.absolute()}", - ] + install_type = DownloadAndInstall(eula_agreement=eula_agreement) + self._install(backend_name, install_type, prompt, force) - return ExecutionParams( - self.app_name, - self.system_name, - [], - system_params, - ) + def show_env_details(self) -> None: + """Print current state of the execution environment.""" + if installed := self.already_installed(): + self._print_installation_list("Installed backends:", installed) + + if could_be_installed := self.ready_for_installation(): + self._print_installation_list( + "Following backends could be installed:", + could_be_installed, + new_section=bool(installed), + ) + if not installed and not could_be_installed: + logger.info("No backends installed") -def get_generic_runner(device_info: DeviceInfo, backend: str) -> GenericInferenceRunner: - """Get generic runner for provided device and backend.""" - backend_runner = get_backend_runner() - return GenericInferenceRunnerEthosU(backend_runner, device_info, backend) + @staticmethod + def _print_installation_list( + header: str, installations: list[Installation], new_section: bool = False + ) -> None: + """Print list of the installations.""" + logger.info("%s%s\n", "\n" if new_section else "", header) + for installation in installations: + logger.info(" - %s", installation.name) -def estimate_performance( - model_info: ModelInfo, device_info: DeviceInfo, backend: str -) -> PerformanceMetrics: - """Get performance estimations.""" - output_parser = GenericInferenceOutputParser() - output_consumers = [output_parser, LogWriter()] + def uninstall(self, backend_name: str) -> None: + """Uninstall the backend with name backend_name.""" + installations = self.already_installed(backend_name) - generic_runner = get_generic_runner(device_info, backend) - generic_runner.run(model_info, output_consumers) + if not installations: + raise ConfigurationError(f"Backend '{backend_name}' is not installed") - if not output_parser.is_ready(): - missed_data = ",".join(output_parser.missed_keys()) - logger.debug("Unable to get performance metrics, missed data %s", missed_data) - raise Exception("Unable to get performance metrics, insufficient data") + if len(installations) != 1: + raise InternalError( + f"More than one installed backend with name {backend_name} found" + ) - return PerformanceMetrics(**output_parser.result) + installation = installations[0] + installation.uninstall() + logger.info("%s successfully uninstalled.", installation.name) -def get_backend_runner() -> BackendRunner: - """ - Return BackendRunner instance. + def backend_installed(self, backend_name: str) -> bool: + """Return true if requested backend installed.""" + installations = self.already_installed(backend_name) - Note: This is needed for the unit tests. - """ - return BackendRunner() + return len(installations) == 1 diff --git a/src/mlia/backend/tosa_checker/__init__.py b/src/mlia/backend/tosa_checker/__init__.py new file mode 100644 index 0000000..cec210d --- /dev/null +++ b/src/mlia/backend/tosa_checker/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""TOSA checker backend module.""" diff --git a/src/mlia/backend/tosa_checker/install.py b/src/mlia/backend/tosa_checker/install.py new file mode 100644 index 0000000..72454bc --- /dev/null +++ b/src/mlia/backend/tosa_checker/install.py @@ -0,0 +1,19 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Module for python package based installations.""" +from __future__ import annotations + +from mlia.backend.install import Installation +from mlia.backend.install import PyPackageBackendInstallation + + +def get_tosa_backend_installation() -> Installation: + """Get TOSA backend installation.""" + return PyPackageBackendInstallation( + name="tosa-checker", + description="Tool to check if a ML model is compatible " + "with the TOSA specification", + packages_to_install=["mlia[tosa]"], + packages_to_uninstall=["tosa-checker"], + expected_packages=["tosa-checker"], + ) diff --git a/src/mlia/backend/vela/__init__.py b/src/mlia/backend/vela/__init__.py new file mode 100644 index 0000000..6ea0c21 --- /dev/null +++ b/src/mlia/backend/vela/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Vela backend module.""" diff --git a/src/mlia/backend/vela/compat.py b/src/mlia/backend/vela/compat.py new file mode 100644 index 0000000..3ec42d1 --- /dev/null +++ b/src/mlia/backend/vela/compat.py @@ -0,0 +1,158 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Vela operator compatibility module.""" +from __future__ import annotations + +import itertools +import logging +from dataclasses import dataclass +from pathlib import Path + +from ethosu.vela.operation import Op +from ethosu.vela.tflite_mapping import optype_to_builtintype +from ethosu.vela.tflite_model_semantic import TFLiteSemantic +from ethosu.vela.tflite_supported_operators import TFLiteSupportedOperators +from ethosu.vela.vela import generate_supported_ops + +from mlia.backend.vela.compiler import VelaCompiler +from mlia.backend.vela.compiler import VelaCompilerOptions +from mlia.utils.logging import redirect_output + + +logger = logging.getLogger(__name__) + +VELA_INTERNAL_OPS = (Op.Placeholder, Op.SubgraphInput, Op.Const) + + +@dataclass +class NpuSupported: + """Operator's npu supported attribute.""" + + supported: bool + reasons: list[tuple[str, str]] + + +@dataclass +class Operator: + """Model operator.""" + + name: str + op_type: str + run_on_npu: NpuSupported + + @property + def cpu_only(self) -> bool: + """Return true if operator is CPU only.""" + cpu_only_reasons = [("CPU only operator", "")] + return ( + not self.run_on_npu.supported + and self.run_on_npu.reasons == cpu_only_reasons + ) + + +@dataclass +class Operators: + """Model's operators.""" + + ops: list[Operator] + + @property + def npu_supported_ratio(self) -> float: + """Return NPU supported ratio.""" + total = self.total_number + npu_supported = self.npu_supported_number + + if total == 0 or npu_supported == 0: + return 0 + + return npu_supported / total + + @property + def npu_unsupported_ratio(self) -> float: + """Return NPU unsupported ratio.""" + return 1 - self.npu_supported_ratio + + @property + def total_number(self) -> int: + """Return total number of operators.""" + return len(self.ops) + + @property + def npu_supported_number(self) -> int: + """Return number of npu supported operators.""" + return sum(op.run_on_npu.supported for op in self.ops) + + +def supported_operators( + model_path: Path, compiler_options: VelaCompilerOptions +) -> Operators: + """Return list of model's operators.""" + logger.debug("Check supported operators for the model %s", model_path) + + vela_compiler = VelaCompiler(compiler_options) + initial_model = vela_compiler.read_model(model_path) + + return Operators( + [ + Operator(op.name, optype_to_builtintype(op.type), run_on_npu(op)) + for sg in initial_model.nng.subgraphs + for op in sg.get_all_ops() + if op.type not in VELA_INTERNAL_OPS + ] + ) + + +def run_on_npu(operator: Op) -> NpuSupported: + """Return information if operator can run on NPU. + + Vela does a number of checks that can help establish whether + a particular operator is supported to run on NPU. + + There are two groups of checks: + - general TensorFlow Lite constraints + - operator specific constraints + + If an operator is not supported on NPU then this function + will return the reason of that. + + The reason is split in two parts: + - general description of why the operator cannot be placed on NPU + - details on the particular operator + """ + semantic_checker = TFLiteSemantic() + semantic_constraints = itertools.chain( + semantic_checker.generic_constraints, + semantic_checker.specific_constraints[operator.type], + ) + + for constraint in semantic_constraints: + op_valid, op_reason = constraint(operator) + if not op_valid: + return NpuSupported(False, [(constraint.__doc__, op_reason)]) + + if operator.type not in TFLiteSupportedOperators.supported_operators: + reasons = ( + [("CPU only operator", "")] + if operator.type not in VELA_INTERNAL_OPS + else [] + ) + + return NpuSupported(False, reasons) + + tflite_supported_operators = TFLiteSupportedOperators() + operation_constraints = itertools.chain( + tflite_supported_operators.generic_constraints, + tflite_supported_operators.specific_constraints[operator.type], + ) + for constraint in operation_constraints: + op_valid, op_reason = constraint(operator) + if not op_valid: + return NpuSupported(False, [(constraint.__doc__, op_reason)]) + + return NpuSupported(True, []) + + +def generate_supported_operators_report() -> None: + """Generate supported operators report in current working directory.""" + with redirect_output(logger): + generate_supported_ops() diff --git a/src/mlia/backend/vela/compiler.py b/src/mlia/backend/vela/compiler.py new file mode 100644 index 0000000..3d3847a --- /dev/null +++ b/src/mlia/backend/vela/compiler.py @@ -0,0 +1,274 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Vela compiler wrapper module.""" +from __future__ import annotations + +import logging +import sys +from dataclasses import dataclass +from pathlib import Path +from typing import Any +from typing import Literal + +from ethosu.vela.architecture_features import ArchitectureFeatures +from ethosu.vela.compiler_driver import compiler_driver +from ethosu.vela.compiler_driver import CompilerOptions +from ethosu.vela.compiler_driver import TensorAllocator +from ethosu.vela.model_reader import ModelReaderOptions +from ethosu.vela.model_reader import read_model +from ethosu.vela.nn_graph import Graph +from ethosu.vela.nn_graph import NetworkType +from ethosu.vela.operation import CustomType +from ethosu.vela.scheduler import OptimizationStrategy +from ethosu.vela.scheduler import SchedulerOptions +from ethosu.vela.tensor import BandwidthDirection +from ethosu.vela.tensor import MemArea +from ethosu.vela.tensor import Tensor +from ethosu.vela.tflite_writer import write_tflite + +from mlia.utils.logging import redirect_output + +logger = logging.getLogger(__name__) + + +@dataclass +class Model: + """Model metadata.""" + + nng: Graph + network_type: NetworkType + + @property + def optimized(self) -> bool: + """Return true if model is already optimized.""" + return any( + op.attrs.get("custom_type") == CustomType.ExistingNpuOp + for sg in self.nng.subgraphs + for op in sg.get_all_ops() + ) + + +@dataclass +class OptimizedModel: + """Instance of the Vela optimized model.""" + + nng: Graph + arch: ArchitectureFeatures + compiler_options: CompilerOptions + scheduler_options: SchedulerOptions + + def save(self, output_filename: str | Path) -> None: + """Save instance of the optimized model to the file.""" + write_tflite(self.nng, output_filename) + + +AcceleratorConfigType = Literal[ + "ethos-u55-32", + "ethos-u55-64", + "ethos-u55-128", + "ethos-u55-256", + "ethos-u65-256", + "ethos-u65-512", +] + +TensorAllocatorType = Literal["LinearAlloc", "Greedy", "HillClimb"] + +OptimizationStrategyType = Literal["Performance", "Size"] + + +@dataclass +class VelaCompilerOptions: # pylint: disable=too-many-instance-attributes + """Vela compiler options.""" + + config_files: str | list[str] | None = None + system_config: str = ArchitectureFeatures.DEFAULT_CONFIG + memory_mode: str = ArchitectureFeatures.DEFAULT_CONFIG + accelerator_config: AcceleratorConfigType | None = None + max_block_dependency: int = ArchitectureFeatures.MAX_BLOCKDEP + arena_cache_size: int | None = None + tensor_allocator: TensorAllocatorType = "HillClimb" + cpu_tensor_alignment: int = Tensor.AllocationQuantum + optimization_strategy: OptimizationStrategyType = "Performance" + output_dir: str | None = None + recursion_limit: int = 1000 + + +class VelaCompiler: # pylint: disable=too-many-instance-attributes + """Vela compiler wrapper.""" + + def __init__(self, compiler_options: VelaCompilerOptions): + """Init Vela wrapper instance.""" + self.config_files = compiler_options.config_files + self.system_config = compiler_options.system_config + self.memory_mode = compiler_options.memory_mode + self.accelerator_config = compiler_options.accelerator_config + self.max_block_dependency = compiler_options.max_block_dependency + self.arena_cache_size = compiler_options.arena_cache_size + self.tensor_allocator = TensorAllocator[compiler_options.tensor_allocator] + self.cpu_tensor_alignment = compiler_options.cpu_tensor_alignment + self.optimization_strategy = OptimizationStrategy[ + compiler_options.optimization_strategy + ] + self.output_dir = compiler_options.output_dir + self.recursion_limit = compiler_options.recursion_limit + + sys.setrecursionlimit(self.recursion_limit) + + def read_model(self, model: str | Path) -> Model: + """Read model.""" + logger.debug("Read model %s", model) + + nng, network_type = self._read_model(model) + return Model(nng, network_type) + + def compile_model(self, model: str | Path | Model) -> OptimizedModel: + """Compile the model.""" + if isinstance(model, (str, Path)): + nng, network_type = self._read_model(model) + else: + nng, network_type = model.nng, NetworkType.TFLite + + if not nng: + raise Exception("Unable to read model") + + try: + arch = self._architecture_features() + compiler_options = self._compiler_options() + scheduler_options = self._scheduler_options() + + with redirect_output( + logger, stdout_level=logging.DEBUG, stderr_level=logging.DEBUG + ): + compiler_driver( + nng, arch, compiler_options, scheduler_options, network_type + ) + + return OptimizedModel(nng, arch, compiler_options, scheduler_options) + except (SystemExit, Exception) as err: + raise Exception("Model could not be optimized with Vela compiler") from err + + def get_config(self) -> dict[str, Any]: + """Get compiler configuration.""" + arch = self._architecture_features() + + memory_area = { + mem.name: { + "clock_scales": arch.memory_clock_scales[mem], + "burst_length": arch.memory_burst_length[mem], + "read_latency": arch.memory_latency[mem][BandwidthDirection.Read], + "write_latency": arch.memory_latency[mem][BandwidthDirection.Write], + } + for mem in ( + MemArea.Sram, + MemArea.Dram, + MemArea.OnChipFlash, + MemArea.OffChipFlash, + ) + } + + return { + "accelerator_config": arch.accelerator_config.value, + "system_config": arch.system_config, + "core_clock": arch.core_clock, + "axi0_port": arch.axi0_port.name, + "axi1_port": arch.axi1_port.name, + "memory_mode": arch.memory_mode, + "const_mem_area": arch.const_mem_area.name, + "arena_mem_area": arch.arena_mem_area.name, + "cache_mem_area": arch.cache_mem_area.name, + "arena_cache_size": arch.arena_cache_size, + "permanent_storage_mem_area": arch.permanent_storage_mem_area.name, + "feature_map_storage_mem_area": arch.feature_map_storage_mem_area.name, + "fast_storage_mem_area": arch.fast_storage_mem_area.name, + "memory_area": memory_area, + } + + @staticmethod + def _read_model(model: str | Path) -> tuple[Graph, NetworkType]: + """Read TensorFlow Lite model.""" + try: + model_path = str(model) if isinstance(model, Path) else model + + with redirect_output( + logger, stdout_level=logging.DEBUG, stderr_level=logging.DEBUG + ): + return read_model(model_path, ModelReaderOptions()) # type: ignore + except (SystemExit, Exception) as err: + raise Exception(f"Unable to read model {model_path}") from err + + def _architecture_features(self) -> ArchitectureFeatures: + """Return ArchitectureFeatures instance.""" + return ArchitectureFeatures( + vela_config_files=self.config_files, + accelerator_config=self.accelerator_config, + system_config=self.system_config, + memory_mode=self.memory_mode, + max_blockdep=self.max_block_dependency, + verbose_config=False, + arena_cache_size=self.arena_cache_size, + ) + + def _scheduler_options(self) -> SchedulerOptions: + """Return SchedulerOptions instance.""" + arch = self._architecture_features() + + return SchedulerOptions( + optimization_strategy=self.optimization_strategy, + sram_target=arch.arena_cache_size, + verbose_schedule=False, + ) + + def _compiler_options(self) -> CompilerOptions: + """Return CompilerOptions instance.""" + return CompilerOptions( + verbose_graph=False, + verbose_quantization=False, + verbose_packing=False, + verbose_tensor_purpose=False, + verbose_tensor_format=False, + verbose_allocation=False, + verbose_high_level_command_stream=False, + verbose_register_command_stream=False, + verbose_operators=False, + verbose_weights=False, + show_cpu_operations=False, + tensor_allocator=self.tensor_allocator, + timing=False, + output_dir=self.output_dir, + cpu_tensor_alignment=self.cpu_tensor_alignment, + ) + + +def resolve_compiler_config( + vela_compiler_options: VelaCompilerOptions, +) -> dict[str, Any]: + """Resolve passed compiler options. + + Vela has number of configuration parameters that being + resolved during passing compiler options. E.g. Vela + reads configuration parameters from vela.ini and fills + it's internal structures with resolved values (memory mode, + system mode, etc.). + + In order to get this information we need to create + instance of the Vela compiler first. + """ + vela_compiler = VelaCompiler(vela_compiler_options) + return vela_compiler.get_config() + + +def optimize_model( + model_path: Path, compiler_options: VelaCompilerOptions, output_model_path: Path +) -> None: + """Optimize model and return it's path after optimization.""" + logger.debug( + "Optimize model %s for device %s", + model_path, + compiler_options.accelerator_config, + ) + + vela_compiler = VelaCompiler(compiler_options) + optimized_model = vela_compiler.compile_model(model_path) + + logger.debug("Save optimized model into %s", output_model_path) + optimized_model.save(output_model_path) diff --git a/src/mlia/backend/vela/performance.py b/src/mlia/backend/vela/performance.py new file mode 100644 index 0000000..ccd2f6f --- /dev/null +++ b/src/mlia/backend/vela/performance.py @@ -0,0 +1,97 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Vela performance module.""" +from __future__ import annotations + +import logging +from dataclasses import dataclass +from pathlib import Path + +import numpy as np +from ethosu.vela.npu_performance import PassCycles +from ethosu.vela.tensor import MemArea + +from mlia.backend.vela.compiler import OptimizedModel +from mlia.backend.vela.compiler import VelaCompiler +from mlia.backend.vela.compiler import VelaCompilerOptions + + +logger = logging.getLogger(__name__) + + +@dataclass +class PerformanceMetrics: # pylint: disable=too-many-instance-attributes + """Contains all the performance metrics Vela generates in a run.""" + + npu_cycles: int + sram_access_cycles: int + dram_access_cycles: int + on_chip_flash_access_cycles: int + off_chip_flash_access_cycles: int + total_cycles: int + batch_inference_time: float + inferences_per_second: float + batch_size: int + unknown_memory_area_size: int + sram_memory_area_size: int + dram_memory_area_size: int + on_chip_flash_memory_area_size: int + off_chip_flash_memory_area_size: int + + +def estimate_performance( + model_path: Path, compiler_options: VelaCompilerOptions +) -> PerformanceMetrics: + """Return performance estimations for the model/device. + + Logic for this function comes from Vela module stats_writer.py + """ + logger.debug( + "Estimate performance for the model %s on %s", + model_path, + compiler_options.accelerator_config, + ) + + vela_compiler = VelaCompiler(compiler_options) + + initial_model = vela_compiler.read_model(model_path) + if initial_model.optimized: + raise Exception("Unable to estimate performance for the given optimized model") + + optimized_model = vela_compiler.compile_model(initial_model) + + return _performance_metrics(optimized_model) + + +def _performance_metrics(optimized_model: OptimizedModel) -> PerformanceMetrics: + """Return performance metrics for optimized model.""" + cycles = optimized_model.nng.cycles + + def memory_usage(mem_area: MemArea) -> int: + """Get memory usage for the proviced memory area type.""" + memory_used: dict[MemArea, int] = optimized_model.nng.memory_used + bandwidths = optimized_model.nng.bandwidths + + return memory_used.get(mem_area, 0) if np.sum(bandwidths[mem_area]) > 0 else 0 + + midpoint_fps = np.nan + midpoint_inference_time = cycles[PassCycles.Total] / optimized_model.arch.core_clock + if midpoint_inference_time > 0: + midpoint_fps = 1 / midpoint_inference_time + + return PerformanceMetrics( + npu_cycles=int(cycles[PassCycles.Npu]), + sram_access_cycles=int(cycles[PassCycles.SramAccess]), + dram_access_cycles=int(cycles[PassCycles.DramAccess]), + on_chip_flash_access_cycles=int(cycles[PassCycles.OnChipFlashAccess]), + off_chip_flash_access_cycles=int(cycles[PassCycles.OffChipFlashAccess]), + total_cycles=int(cycles[PassCycles.Total]), + batch_inference_time=midpoint_inference_time * 1000, + inferences_per_second=midpoint_fps, + batch_size=optimized_model.nng.batch_size, + unknown_memory_area_size=memory_usage(MemArea.Unknown), + sram_memory_area_size=memory_usage(MemArea.Sram), + dram_memory_area_size=memory_usage(MemArea.Dram), + on_chip_flash_memory_area_size=memory_usage(MemArea.OnChipFlash), + off_chip_flash_memory_area_size=memory_usage(MemArea.OffChipFlash), + ) |