From 37959522a805a5e23c930ed79aac84920c3cb208 Mon Sep 17 00:00:00 2001 From: Dmitrii Agibov Date: Fri, 18 Nov 2022 16:34:03 +0000 Subject: Move backends functionality into separate modules - Move backend management/executor code into module backend_core - Create separate module for each backend in "backend" module - Move each backend into corresponding module - Split Vela wrapper into several submodules Change-Id: If01b6774aab6501951212541cc5d7f5aa7c97e95 --- src/mlia/backend/manager.py | 505 ++++++++++++++++++-------------------------- 1 file changed, 202 insertions(+), 303 deletions(-) (limited to 'src/mlia/backend/manager.py') diff --git a/src/mlia/backend/manager.py b/src/mlia/backend/manager.py index 6a61ab0..c02dc6e 100644 --- a/src/mlia/backend/manager.py +++ b/src/mlia/backend/manager.py @@ -1,372 +1,271 @@ # SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. # SPDX-License-Identifier: Apache-2.0 -"""Module for backend integration.""" +"""Module for installation process.""" from __future__ import annotations import logging from abc import ABC from abc import abstractmethod -from dataclasses import dataclass from pathlib import Path -from typing import Literal +from typing import Callable -from mlia.backend.application import get_available_applications -from mlia.backend.application import install_application -from mlia.backend.execution import ExecutionContext -from mlia.backend.execution import run_application -from mlia.backend.output_consumer import Base64OutputConsumer -from mlia.backend.output_consumer import OutputConsumer -from mlia.backend.system import get_available_systems -from mlia.backend.system import install_system +from mlia.backend.install import DownloadAndInstall +from mlia.backend.install import Installation +from mlia.backend.install import InstallationType +from mlia.backend.install import InstallFromPath +from mlia.core.errors import ConfigurationError +from mlia.core.errors import InternalError +from mlia.utils.misc import yes logger = logging.getLogger(__name__) -# Mapping backend -> device_type -> system_name -_SUPPORTED_SYSTEMS = { - "Corstone-300": { - "ethos-u55": "Corstone-300: Cortex-M55+Ethos-U55", - "ethos-u65": "Corstone-300: Cortex-M55+Ethos-U65", - }, - "Corstone-310": { - "ethos-u55": "Corstone-310: Cortex-M85+Ethos-U55", - "ethos-u65": "Corstone-310: Cortex-M85+Ethos-U65", - }, -} +InstallationFilter = Callable[[Installation], bool] -# Mapping system_name -> application -_SYSTEM_TO_APP_MAP = { - "Corstone-300: Cortex-M55+Ethos-U55": "Generic Inference Runner: Ethos-U55", - "Corstone-300: Cortex-M55+Ethos-U65": "Generic Inference Runner: Ethos-U65", - "Corstone-310: Cortex-M85+Ethos-U55": "Generic Inference Runner: Ethos-U55", - "Corstone-310: Cortex-M85+Ethos-U65": "Generic Inference Runner: Ethos-U65", -} +class AlreadyInstalledFilter: + """Filter for already installed backends.""" -def get_system_name(backend: str, device_type: str) -> str: - """Get the system name for the given backend and device type.""" - return _SUPPORTED_SYSTEMS[backend][device_type] + def __call__(self, installation: Installation) -> bool: + """Installation filter.""" + return installation.already_installed -def is_supported(backend: str, device_type: str | None = None) -> bool: - """Check if the backend (and optionally device type) is supported.""" - if device_type is None: - return backend in _SUPPORTED_SYSTEMS +class ReadyForInstallationFilter: + """Filter for ready to be installed backends.""" - try: - get_system_name(backend, device_type) - return True - except KeyError: - return False + def __call__(self, installation: Installation) -> bool: + """Installation filter.""" + return installation.could_be_installed and not installation.already_installed -def supported_backends() -> list[str]: - """Get a list of all backends supported by the backend manager.""" - return list(_SUPPORTED_SYSTEMS.keys()) +class SupportsInstallTypeFilter: + """Filter backends that support certain type of the installation.""" + def __init__(self, installation_type: InstallationType) -> None: + """Init filter.""" + self.installation_type = installation_type -def get_all_system_names(backend: str) -> list[str]: - """Get all systems supported by the backend.""" - return list(_SUPPORTED_SYSTEMS.get(backend, {}).values()) + def __call__(self, installation: Installation) -> bool: + """Installation filter.""" + return installation.supports(self.installation_type) -def get_all_application_names(backend: str) -> list[str]: - """Get all applications supported by the backend.""" - app_set = {_SYSTEM_TO_APP_MAP[sys] for sys in get_all_system_names(backend)} - return list(app_set) +class SearchByNameFilter: + """Filter installation by name.""" + def __init__(self, backend_name: str | None) -> None: + """Init filter.""" + self.backend_name = backend_name -@dataclass -class DeviceInfo: - """Device information.""" - - device_type: Literal["ethos-u55", "ethos-u65"] - mac: int - - -@dataclass -class ModelInfo: - """Model info.""" - - model_path: Path - - -@dataclass -class PerformanceMetrics: - """Performance metrics parsed from generic inference output.""" - - npu_active_cycles: int - npu_idle_cycles: int - npu_total_cycles: int - npu_axi0_rd_data_beat_received: int - npu_axi0_wr_data_beat_written: int - npu_axi1_rd_data_beat_received: int - - -@dataclass -class ExecutionParams: - """Application execution params.""" - - application: str - system: str - application_params: list[str] - system_params: list[str] - - -class LogWriter(OutputConsumer): - """Redirect output to the logger.""" - - def feed(self, line: str) -> bool: - """Process line from the output.""" - logger.debug(line.strip()) - return False + def __call__(self, installation: Installation) -> bool: + """Installation filter.""" + return ( + not self.backend_name + or installation.name.casefold() == self.backend_name.casefold() + ) -class GenericInferenceOutputParser(Base64OutputConsumer): - """Generic inference app output parser.""" +class InstallationManager(ABC): + """Helper class for managing installations.""" - def __init__(self) -> None: - """Init generic inference output parser instance.""" - super().__init__() - self._map = { - "NPU ACTIVE": "npu_active_cycles", - "NPU IDLE": "npu_idle_cycles", - "NPU TOTAL": "npu_total_cycles", - "NPU AXI0_RD_DATA_BEAT_RECEIVED": "npu_axi0_rd_data_beat_received", - "NPU AXI0_WR_DATA_BEAT_WRITTEN": "npu_axi0_wr_data_beat_written", - "NPU AXI1_RD_DATA_BEAT_RECEIVED": "npu_axi1_rd_data_beat_received", - } + @abstractmethod + def install_from(self, backend_path: Path, backend_name: str, force: bool) -> None: + """Install backend from the local directory.""" - @property - def result(self) -> dict: - """Merge the raw results and map the names to the right output names.""" - merged_result = {} - for raw_result in self.parsed_output: - for profiling_result in raw_result: - for sample in profiling_result["samples"]: - name, values = (sample["name"], sample["value"]) - if name in merged_result: - raise KeyError( - f"Duplicate key '{name}' in base64 output.", - ) - new_name = self._map[name] - merged_result[new_name] = values[0] - return merged_result + @abstractmethod + def download_and_install( + self, backend_name: str, eula_agreement: bool, force: bool + ) -> None: + """Download and install backends.""" - def is_ready(self) -> bool: - """Return true if all expected data has been parsed.""" - return set(self.result.keys()) == set(self._map.values()) + @abstractmethod + def show_env_details(self) -> None: + """Show environment details.""" - def missed_keys(self) -> set[str]: - """Return a set of the keys that have not been found in the output.""" - return set(self._map.values()) - set(self.result.keys()) + @abstractmethod + def backend_installed(self, backend_name: str) -> bool: + """Return true if requested backend installed.""" + @abstractmethod + def uninstall(self, backend_name: str) -> None: + """Delete the existing installation.""" -class BackendRunner: - """Backend runner.""" - def __init__(self) -> None: - """Init BackendRunner instance.""" +class InstallationFiltersMixin: + """Mixin for filtering installation based on different conditions.""" - @staticmethod - def get_installed_systems() -> list[str]: - """Get list of the installed systems.""" - return [system.name for system in get_available_systems()] + installations: list[Installation] - @staticmethod - def get_installed_applications(system: str | None = None) -> list[str]: - """Get list of the installed application.""" + def filter_by(self, *filters: InstallationFilter) -> list[Installation]: + """Filter installations.""" return [ - app.name - for app in get_available_applications() - if system is None or app.can_run_on(system) + installation + for installation in self.installations + if all(filter_(installation) for filter_ in filters) ] - def is_application_installed(self, application: str, system: str) -> bool: - """Return true if requested application installed.""" - return application in self.get_installed_applications(system) - - def is_system_installed(self, system: str) -> bool: - """Return true if requested system installed.""" - return system in self.get_installed_systems() - - def systems_installed(self, systems: list[str]) -> bool: - """Check if all provided systems are installed.""" - if not systems: - return False - - installed_systems = self.get_installed_systems() - return all(system in installed_systems for system in systems) - - def applications_installed(self, applications: list[str]) -> bool: - """Check if all provided applications are installed.""" - if not applications: - return False - - installed_apps = self.get_installed_applications() - return all(app in installed_apps for app in applications) + def find_by_name(self, backend_name: str) -> list[Installation]: + """Return list of the backends filtered by name.""" + return self.filter_by(SearchByNameFilter(backend_name)) - def all_installed(self, systems: list[str], apps: list[str]) -> bool: - """Check if all provided artifacts are installed.""" - return self.systems_installed(systems) and self.applications_installed(apps) - - @staticmethod - def install_system(system_path: Path) -> None: - """Install system.""" - install_system(system_path) - - @staticmethod - def install_application(app_path: Path) -> None: - """Install application.""" - install_application(app_path) - - @staticmethod - def run_application(execution_params: ExecutionParams) -> ExecutionContext: - """Run requested application.""" - ctx = run_application( - execution_params.application, - execution_params.application_params, - execution_params.system, - execution_params.system_params, + def already_installed(self, backend_name: str = None) -> list[Installation]: + """Return list of backends that are already installed.""" + return self.filter_by( + AlreadyInstalledFilter(), + SearchByNameFilter(backend_name), ) - return ctx - @staticmethod - def _params(name: str, params: list[str]) -> list[str]: - return [p for item in [(name, param) for param in params] for p in item] + def ready_for_installation(self) -> list[Installation]: + """Return list of the backends that could be installed.""" + return self.filter_by(ReadyForInstallationFilter()) -class GenericInferenceRunner(ABC): - """Abstract class for generic inference runner.""" +class DefaultInstallationManager(InstallationManager, InstallationFiltersMixin): + """Interactive installation manager.""" - def __init__(self, backend_runner: BackendRunner): - """Init generic inference runner instance.""" - self.backend_runner = backend_runner - - def run( - self, model_info: ModelInfo, output_consumers: list[OutputConsumer] + def __init__( + self, installations: list[Installation], noninteractive: bool = False ) -> None: - """Run generic inference for the provided device/model.""" - execution_params = self.get_execution_params(model_info) - - ctx = self.backend_runner.run_application(execution_params) - if ctx.stdout is not None: - ctx.stdout = self.consume_output(ctx.stdout, output_consumers) - - @abstractmethod - def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams: - """Get execution params for the provided model.""" - - def check_system_and_application(self, system_name: str, app_name: str) -> None: - """Check if requested system and application installed.""" - if not self.backend_runner.is_system_installed(system_name): - raise Exception(f"System {system_name} is not installed") - - if not self.backend_runner.is_application_installed(app_name, system_name): - raise Exception( - f"Application {app_name} for the system {system_name} " - "is not installed" + """Init the manager.""" + self.installations = installations + self.noninteractive = noninteractive + + def _install( + self, + backend_name: str, + install_type: InstallationType, + prompt: Callable[[Installation], str], + force: bool, + ) -> None: + """Check metadata and install backend.""" + installs = self.find_by_name(backend_name) + + if not installs: + logger.info("Unknown backend '%s'.", backend_name) + logger.info( + "Please run command 'mlia-backend list' to get list of " + "supported backend names." ) - @staticmethod - def consume_output(output: bytearray, consumers: list[OutputConsumer]) -> bytearray: - """ - Pass program's output to the consumers and filter it. + return + + if len(installs) > 1: + raise InternalError(f"More than one backend with name {backend_name} found") + + installation = installs[0] + if not installation.supports(install_type): + if isinstance(install_type, InstallFromPath): + logger.info( + "Backend '%s' could not be installed using path '%s'.", + installation.name, + install_type.backend_path, + ) + logger.info( + "Please check that '%s' is a valid path to the installed backend.", + install_type.backend_path, + ) + else: + logger.info( + "Backend '%s' could not be downloaded and installed", + installation.name, + ) + logger.info( + "Please refer to the project's documentation for more details." + ) + + return + + if installation.already_installed and not force: + logger.info("Backend '%s' is already installed.", installation.name) + logger.info("Please, consider using --force option.") + return + + proceed = self.noninteractive or yes(prompt(installation)) + if not proceed: + logger.info("%s installation canceled.", installation.name) + return + + if installation.already_installed and force: + logger.info( + "Force installing %s, so delete the existing " + "installed backend first.", + installation.name, + ) + installation.uninstall() - Returns the filtered output. - """ - filtered_output = bytearray() - for line_bytes in output.splitlines(): - line = line_bytes.decode("utf-8") - remove_line = False - for consumer in consumers: - if consumer.feed(line): - remove_line = True - if not remove_line: - filtered_output.extend(line_bytes) + installation.install(install_type) + logger.info("%s successfully installed.", installation.name) - return filtered_output + def install_from( + self, backend_path: Path, backend_name: str, force: bool = False + ) -> None: + """Install from the provided directory.""" + def prompt(install: Installation) -> str: + return ( + f"{install.name} was found in {backend_path}. " + "Would you like to install it?" + ) -class GenericInferenceRunnerEthosU(GenericInferenceRunner): - """Generic inference runner on U55/65.""" + install_type = InstallFromPath(backend_path) + self._install(backend_name, install_type, prompt, force) - def __init__( - self, backend_runner: BackendRunner, device_info: DeviceInfo, backend: str + def download_and_install( + self, backend_name: str, eula_agreement: bool = True, force: bool = False ) -> None: - """Init generic inference runner instance.""" - super().__init__(backend_runner) + """Download and install available backends.""" - system_name, app_name = self.resolve_system_and_app(device_info, backend) - self.system_name = system_name - self.app_name = app_name - self.device_info = device_info + def prompt(install: Installation) -> str: + return f"Would you like to download and install {install.name}?" - @staticmethod - def resolve_system_and_app( - device_info: DeviceInfo, backend: str - ) -> tuple[str, str]: - """Find appropriate system and application for the provided device/backend.""" - try: - system_name = get_system_name(backend, device_info.device_type) - except KeyError as ex: - raise RuntimeError( - f"Unsupported device {device_info.device_type} " - f"for backend {backend}" - ) from ex - - try: - app_name = _SYSTEM_TO_APP_MAP[system_name] - except KeyError as err: - raise RuntimeError(f"System {system_name} is not installed") from err - - return system_name, app_name - - def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams: - """Get execution params for Ethos-U55/65.""" - self.check_system_and_application(self.system_name, self.app_name) - - system_params = [ - f"mac={self.device_info.mac}", - f"input_file={model_info.model_path.absolute()}", - ] + install_type = DownloadAndInstall(eula_agreement=eula_agreement) + self._install(backend_name, install_type, prompt, force) - return ExecutionParams( - self.app_name, - self.system_name, - [], - system_params, - ) + def show_env_details(self) -> None: + """Print current state of the execution environment.""" + if installed := self.already_installed(): + self._print_installation_list("Installed backends:", installed) + + if could_be_installed := self.ready_for_installation(): + self._print_installation_list( + "Following backends could be installed:", + could_be_installed, + new_section=bool(installed), + ) + if not installed and not could_be_installed: + logger.info("No backends installed") -def get_generic_runner(device_info: DeviceInfo, backend: str) -> GenericInferenceRunner: - """Get generic runner for provided device and backend.""" - backend_runner = get_backend_runner() - return GenericInferenceRunnerEthosU(backend_runner, device_info, backend) + @staticmethod + def _print_installation_list( + header: str, installations: list[Installation], new_section: bool = False + ) -> None: + """Print list of the installations.""" + logger.info("%s%s\n", "\n" if new_section else "", header) + for installation in installations: + logger.info(" - %s", installation.name) -def estimate_performance( - model_info: ModelInfo, device_info: DeviceInfo, backend: str -) -> PerformanceMetrics: - """Get performance estimations.""" - output_parser = GenericInferenceOutputParser() - output_consumers = [output_parser, LogWriter()] + def uninstall(self, backend_name: str) -> None: + """Uninstall the backend with name backend_name.""" + installations = self.already_installed(backend_name) - generic_runner = get_generic_runner(device_info, backend) - generic_runner.run(model_info, output_consumers) + if not installations: + raise ConfigurationError(f"Backend '{backend_name}' is not installed") - if not output_parser.is_ready(): - missed_data = ",".join(output_parser.missed_keys()) - logger.debug("Unable to get performance metrics, missed data %s", missed_data) - raise Exception("Unable to get performance metrics, insufficient data") + if len(installations) != 1: + raise InternalError( + f"More than one installed backend with name {backend_name} found" + ) - return PerformanceMetrics(**output_parser.result) + installation = installations[0] + installation.uninstall() + logger.info("%s successfully uninstalled.", installation.name) -def get_backend_runner() -> BackendRunner: - """ - Return BackendRunner instance. + def backend_installed(self, backend_name: str) -> bool: + """Return true if requested backend installed.""" + installations = self.already_installed(backend_name) - Note: This is needed for the unit tests. - """ - return BackendRunner() + return len(installations) == 1 -- cgit v1.2.1