aboutsummaryrefslogtreecommitdiff
path: root/src/mlia/backend/manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/mlia/backend/manager.py')
-rw-r--r--src/mlia/backend/manager.py505
1 files changed, 202 insertions, 303 deletions
diff --git a/src/mlia/backend/manager.py b/src/mlia/backend/manager.py
index 6a61ab0..c02dc6e 100644
--- a/src/mlia/backend/manager.py
+++ b/src/mlia/backend/manager.py
@@ -1,372 +1,271 @@
# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
# SPDX-License-Identifier: Apache-2.0
-"""Module for backend integration."""
+"""Module for installation process."""
from __future__ import annotations
import logging
from abc import ABC
from abc import abstractmethod
-from dataclasses import dataclass
from pathlib import Path
-from typing import Literal
+from typing import Callable
-from mlia.backend.application import get_available_applications
-from mlia.backend.application import install_application
-from mlia.backend.execution import ExecutionContext
-from mlia.backend.execution import run_application
-from mlia.backend.output_consumer import Base64OutputConsumer
-from mlia.backend.output_consumer import OutputConsumer
-from mlia.backend.system import get_available_systems
-from mlia.backend.system import install_system
+from mlia.backend.install import DownloadAndInstall
+from mlia.backend.install import Installation
+from mlia.backend.install import InstallationType
+from mlia.backend.install import InstallFromPath
+from mlia.core.errors import ConfigurationError
+from mlia.core.errors import InternalError
+from mlia.utils.misc import yes
logger = logging.getLogger(__name__)
-# Mapping backend -> device_type -> system_name
-_SUPPORTED_SYSTEMS = {
- "Corstone-300": {
- "ethos-u55": "Corstone-300: Cortex-M55+Ethos-U55",
- "ethos-u65": "Corstone-300: Cortex-M55+Ethos-U65",
- },
- "Corstone-310": {
- "ethos-u55": "Corstone-310: Cortex-M85+Ethos-U55",
- "ethos-u65": "Corstone-310: Cortex-M85+Ethos-U65",
- },
-}
+InstallationFilter = Callable[[Installation], bool]
-# Mapping system_name -> application
-_SYSTEM_TO_APP_MAP = {
- "Corstone-300: Cortex-M55+Ethos-U55": "Generic Inference Runner: Ethos-U55",
- "Corstone-300: Cortex-M55+Ethos-U65": "Generic Inference Runner: Ethos-U65",
- "Corstone-310: Cortex-M85+Ethos-U55": "Generic Inference Runner: Ethos-U55",
- "Corstone-310: Cortex-M85+Ethos-U65": "Generic Inference Runner: Ethos-U65",
-}
+class AlreadyInstalledFilter:
+ """Filter for already installed backends."""
-def get_system_name(backend: str, device_type: str) -> str:
- """Get the system name for the given backend and device type."""
- return _SUPPORTED_SYSTEMS[backend][device_type]
+ def __call__(self, installation: Installation) -> bool:
+ """Installation filter."""
+ return installation.already_installed
-def is_supported(backend: str, device_type: str | None = None) -> bool:
- """Check if the backend (and optionally device type) is supported."""
- if device_type is None:
- return backend in _SUPPORTED_SYSTEMS
+class ReadyForInstallationFilter:
+ """Filter for ready to be installed backends."""
- try:
- get_system_name(backend, device_type)
- return True
- except KeyError:
- return False
+ def __call__(self, installation: Installation) -> bool:
+ """Installation filter."""
+ return installation.could_be_installed and not installation.already_installed
-def supported_backends() -> list[str]:
- """Get a list of all backends supported by the backend manager."""
- return list(_SUPPORTED_SYSTEMS.keys())
+class SupportsInstallTypeFilter:
+ """Filter backends that support certain type of the installation."""
+ def __init__(self, installation_type: InstallationType) -> None:
+ """Init filter."""
+ self.installation_type = installation_type
-def get_all_system_names(backend: str) -> list[str]:
- """Get all systems supported by the backend."""
- return list(_SUPPORTED_SYSTEMS.get(backend, {}).values())
+ def __call__(self, installation: Installation) -> bool:
+ """Installation filter."""
+ return installation.supports(self.installation_type)
-def get_all_application_names(backend: str) -> list[str]:
- """Get all applications supported by the backend."""
- app_set = {_SYSTEM_TO_APP_MAP[sys] for sys in get_all_system_names(backend)}
- return list(app_set)
+class SearchByNameFilter:
+ """Filter installation by name."""
+ def __init__(self, backend_name: str | None) -> None:
+ """Init filter."""
+ self.backend_name = backend_name
-@dataclass
-class DeviceInfo:
- """Device information."""
-
- device_type: Literal["ethos-u55", "ethos-u65"]
- mac: int
-
-
-@dataclass
-class ModelInfo:
- """Model info."""
-
- model_path: Path
-
-
-@dataclass
-class PerformanceMetrics:
- """Performance metrics parsed from generic inference output."""
-
- npu_active_cycles: int
- npu_idle_cycles: int
- npu_total_cycles: int
- npu_axi0_rd_data_beat_received: int
- npu_axi0_wr_data_beat_written: int
- npu_axi1_rd_data_beat_received: int
-
-
-@dataclass
-class ExecutionParams:
- """Application execution params."""
-
- application: str
- system: str
- application_params: list[str]
- system_params: list[str]
-
-
-class LogWriter(OutputConsumer):
- """Redirect output to the logger."""
-
- def feed(self, line: str) -> bool:
- """Process line from the output."""
- logger.debug(line.strip())
- return False
+ def __call__(self, installation: Installation) -> bool:
+ """Installation filter."""
+ return (
+ not self.backend_name
+ or installation.name.casefold() == self.backend_name.casefold()
+ )
-class GenericInferenceOutputParser(Base64OutputConsumer):
- """Generic inference app output parser."""
+class InstallationManager(ABC):
+ """Helper class for managing installations."""
- def __init__(self) -> None:
- """Init generic inference output parser instance."""
- super().__init__()
- self._map = {
- "NPU ACTIVE": "npu_active_cycles",
- "NPU IDLE": "npu_idle_cycles",
- "NPU TOTAL": "npu_total_cycles",
- "NPU AXI0_RD_DATA_BEAT_RECEIVED": "npu_axi0_rd_data_beat_received",
- "NPU AXI0_WR_DATA_BEAT_WRITTEN": "npu_axi0_wr_data_beat_written",
- "NPU AXI1_RD_DATA_BEAT_RECEIVED": "npu_axi1_rd_data_beat_received",
- }
+ @abstractmethod
+ def install_from(self, backend_path: Path, backend_name: str, force: bool) -> None:
+ """Install backend from the local directory."""
- @property
- def result(self) -> dict:
- """Merge the raw results and map the names to the right output names."""
- merged_result = {}
- for raw_result in self.parsed_output:
- for profiling_result in raw_result:
- for sample in profiling_result["samples"]:
- name, values = (sample["name"], sample["value"])
- if name in merged_result:
- raise KeyError(
- f"Duplicate key '{name}' in base64 output.",
- )
- new_name = self._map[name]
- merged_result[new_name] = values[0]
- return merged_result
+ @abstractmethod
+ def download_and_install(
+ self, backend_name: str, eula_agreement: bool, force: bool
+ ) -> None:
+ """Download and install backends."""
- def is_ready(self) -> bool:
- """Return true if all expected data has been parsed."""
- return set(self.result.keys()) == set(self._map.values())
+ @abstractmethod
+ def show_env_details(self) -> None:
+ """Show environment details."""
- def missed_keys(self) -> set[str]:
- """Return a set of the keys that have not been found in the output."""
- return set(self._map.values()) - set(self.result.keys())
+ @abstractmethod
+ def backend_installed(self, backend_name: str) -> bool:
+ """Return true if requested backend installed."""
+ @abstractmethod
+ def uninstall(self, backend_name: str) -> None:
+ """Delete the existing installation."""
-class BackendRunner:
- """Backend runner."""
- def __init__(self) -> None:
- """Init BackendRunner instance."""
+class InstallationFiltersMixin:
+ """Mixin for filtering installation based on different conditions."""
- @staticmethod
- def get_installed_systems() -> list[str]:
- """Get list of the installed systems."""
- return [system.name for system in get_available_systems()]
+ installations: list[Installation]
- @staticmethod
- def get_installed_applications(system: str | None = None) -> list[str]:
- """Get list of the installed application."""
+ def filter_by(self, *filters: InstallationFilter) -> list[Installation]:
+ """Filter installations."""
return [
- app.name
- for app in get_available_applications()
- if system is None or app.can_run_on(system)
+ installation
+ for installation in self.installations
+ if all(filter_(installation) for filter_ in filters)
]
- def is_application_installed(self, application: str, system: str) -> bool:
- """Return true if requested application installed."""
- return application in self.get_installed_applications(system)
-
- def is_system_installed(self, system: str) -> bool:
- """Return true if requested system installed."""
- return system in self.get_installed_systems()
-
- def systems_installed(self, systems: list[str]) -> bool:
- """Check if all provided systems are installed."""
- if not systems:
- return False
-
- installed_systems = self.get_installed_systems()
- return all(system in installed_systems for system in systems)
-
- def applications_installed(self, applications: list[str]) -> bool:
- """Check if all provided applications are installed."""
- if not applications:
- return False
-
- installed_apps = self.get_installed_applications()
- return all(app in installed_apps for app in applications)
+ def find_by_name(self, backend_name: str) -> list[Installation]:
+ """Return list of the backends filtered by name."""
+ return self.filter_by(SearchByNameFilter(backend_name))
- def all_installed(self, systems: list[str], apps: list[str]) -> bool:
- """Check if all provided artifacts are installed."""
- return self.systems_installed(systems) and self.applications_installed(apps)
-
- @staticmethod
- def install_system(system_path: Path) -> None:
- """Install system."""
- install_system(system_path)
-
- @staticmethod
- def install_application(app_path: Path) -> None:
- """Install application."""
- install_application(app_path)
-
- @staticmethod
- def run_application(execution_params: ExecutionParams) -> ExecutionContext:
- """Run requested application."""
- ctx = run_application(
- execution_params.application,
- execution_params.application_params,
- execution_params.system,
- execution_params.system_params,
+ def already_installed(self, backend_name: str = None) -> list[Installation]:
+ """Return list of backends that are already installed."""
+ return self.filter_by(
+ AlreadyInstalledFilter(),
+ SearchByNameFilter(backend_name),
)
- return ctx
- @staticmethod
- def _params(name: str, params: list[str]) -> list[str]:
- return [p for item in [(name, param) for param in params] for p in item]
+ def ready_for_installation(self) -> list[Installation]:
+ """Return list of the backends that could be installed."""
+ return self.filter_by(ReadyForInstallationFilter())
-class GenericInferenceRunner(ABC):
- """Abstract class for generic inference runner."""
+class DefaultInstallationManager(InstallationManager, InstallationFiltersMixin):
+ """Interactive installation manager."""
- def __init__(self, backend_runner: BackendRunner):
- """Init generic inference runner instance."""
- self.backend_runner = backend_runner
-
- def run(
- self, model_info: ModelInfo, output_consumers: list[OutputConsumer]
+ def __init__(
+ self, installations: list[Installation], noninteractive: bool = False
) -> None:
- """Run generic inference for the provided device/model."""
- execution_params = self.get_execution_params(model_info)
-
- ctx = self.backend_runner.run_application(execution_params)
- if ctx.stdout is not None:
- ctx.stdout = self.consume_output(ctx.stdout, output_consumers)
-
- @abstractmethod
- def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams:
- """Get execution params for the provided model."""
-
- def check_system_and_application(self, system_name: str, app_name: str) -> None:
- """Check if requested system and application installed."""
- if not self.backend_runner.is_system_installed(system_name):
- raise Exception(f"System {system_name} is not installed")
-
- if not self.backend_runner.is_application_installed(app_name, system_name):
- raise Exception(
- f"Application {app_name} for the system {system_name} "
- "is not installed"
+ """Init the manager."""
+ self.installations = installations
+ self.noninteractive = noninteractive
+
+ def _install(
+ self,
+ backend_name: str,
+ install_type: InstallationType,
+ prompt: Callable[[Installation], str],
+ force: bool,
+ ) -> None:
+ """Check metadata and install backend."""
+ installs = self.find_by_name(backend_name)
+
+ if not installs:
+ logger.info("Unknown backend '%s'.", backend_name)
+ logger.info(
+ "Please run command 'mlia-backend list' to get list of "
+ "supported backend names."
)
- @staticmethod
- def consume_output(output: bytearray, consumers: list[OutputConsumer]) -> bytearray:
- """
- Pass program's output to the consumers and filter it.
+ return
+
+ if len(installs) > 1:
+ raise InternalError(f"More than one backend with name {backend_name} found")
+
+ installation = installs[0]
+ if not installation.supports(install_type):
+ if isinstance(install_type, InstallFromPath):
+ logger.info(
+ "Backend '%s' could not be installed using path '%s'.",
+ installation.name,
+ install_type.backend_path,
+ )
+ logger.info(
+ "Please check that '%s' is a valid path to the installed backend.",
+ install_type.backend_path,
+ )
+ else:
+ logger.info(
+ "Backend '%s' could not be downloaded and installed",
+ installation.name,
+ )
+ logger.info(
+ "Please refer to the project's documentation for more details."
+ )
+
+ return
+
+ if installation.already_installed and not force:
+ logger.info("Backend '%s' is already installed.", installation.name)
+ logger.info("Please, consider using --force option.")
+ return
+
+ proceed = self.noninteractive or yes(prompt(installation))
+ if not proceed:
+ logger.info("%s installation canceled.", installation.name)
+ return
+
+ if installation.already_installed and force:
+ logger.info(
+ "Force installing %s, so delete the existing "
+ "installed backend first.",
+ installation.name,
+ )
+ installation.uninstall()
- Returns the filtered output.
- """
- filtered_output = bytearray()
- for line_bytes in output.splitlines():
- line = line_bytes.decode("utf-8")
- remove_line = False
- for consumer in consumers:
- if consumer.feed(line):
- remove_line = True
- if not remove_line:
- filtered_output.extend(line_bytes)
+ installation.install(install_type)
+ logger.info("%s successfully installed.", installation.name)
- return filtered_output
+ def install_from(
+ self, backend_path: Path, backend_name: str, force: bool = False
+ ) -> None:
+ """Install from the provided directory."""
+ def prompt(install: Installation) -> str:
+ return (
+ f"{install.name} was found in {backend_path}. "
+ "Would you like to install it?"
+ )
-class GenericInferenceRunnerEthosU(GenericInferenceRunner):
- """Generic inference runner on U55/65."""
+ install_type = InstallFromPath(backend_path)
+ self._install(backend_name, install_type, prompt, force)
- def __init__(
- self, backend_runner: BackendRunner, device_info: DeviceInfo, backend: str
+ def download_and_install(
+ self, backend_name: str, eula_agreement: bool = True, force: bool = False
) -> None:
- """Init generic inference runner instance."""
- super().__init__(backend_runner)
+ """Download and install available backends."""
- system_name, app_name = self.resolve_system_and_app(device_info, backend)
- self.system_name = system_name
- self.app_name = app_name
- self.device_info = device_info
+ def prompt(install: Installation) -> str:
+ return f"Would you like to download and install {install.name}?"
- @staticmethod
- def resolve_system_and_app(
- device_info: DeviceInfo, backend: str
- ) -> tuple[str, str]:
- """Find appropriate system and application for the provided device/backend."""
- try:
- system_name = get_system_name(backend, device_info.device_type)
- except KeyError as ex:
- raise RuntimeError(
- f"Unsupported device {device_info.device_type} "
- f"for backend {backend}"
- ) from ex
-
- try:
- app_name = _SYSTEM_TO_APP_MAP[system_name]
- except KeyError as err:
- raise RuntimeError(f"System {system_name} is not installed") from err
-
- return system_name, app_name
-
- def get_execution_params(self, model_info: ModelInfo) -> ExecutionParams:
- """Get execution params for Ethos-U55/65."""
- self.check_system_and_application(self.system_name, self.app_name)
-
- system_params = [
- f"mac={self.device_info.mac}",
- f"input_file={model_info.model_path.absolute()}",
- ]
+ install_type = DownloadAndInstall(eula_agreement=eula_agreement)
+ self._install(backend_name, install_type, prompt, force)
- return ExecutionParams(
- self.app_name,
- self.system_name,
- [],
- system_params,
- )
+ def show_env_details(self) -> None:
+ """Print current state of the execution environment."""
+ if installed := self.already_installed():
+ self._print_installation_list("Installed backends:", installed)
+
+ if could_be_installed := self.ready_for_installation():
+ self._print_installation_list(
+ "Following backends could be installed:",
+ could_be_installed,
+ new_section=bool(installed),
+ )
+ if not installed and not could_be_installed:
+ logger.info("No backends installed")
-def get_generic_runner(device_info: DeviceInfo, backend: str) -> GenericInferenceRunner:
- """Get generic runner for provided device and backend."""
- backend_runner = get_backend_runner()
- return GenericInferenceRunnerEthosU(backend_runner, device_info, backend)
+ @staticmethod
+ def _print_installation_list(
+ header: str, installations: list[Installation], new_section: bool = False
+ ) -> None:
+ """Print list of the installations."""
+ logger.info("%s%s\n", "\n" if new_section else "", header)
+ for installation in installations:
+ logger.info(" - %s", installation.name)
-def estimate_performance(
- model_info: ModelInfo, device_info: DeviceInfo, backend: str
-) -> PerformanceMetrics:
- """Get performance estimations."""
- output_parser = GenericInferenceOutputParser()
- output_consumers = [output_parser, LogWriter()]
+ def uninstall(self, backend_name: str) -> None:
+ """Uninstall the backend with name backend_name."""
+ installations = self.already_installed(backend_name)
- generic_runner = get_generic_runner(device_info, backend)
- generic_runner.run(model_info, output_consumers)
+ if not installations:
+ raise ConfigurationError(f"Backend '{backend_name}' is not installed")
- if not output_parser.is_ready():
- missed_data = ",".join(output_parser.missed_keys())
- logger.debug("Unable to get performance metrics, missed data %s", missed_data)
- raise Exception("Unable to get performance metrics, insufficient data")
+ if len(installations) != 1:
+ raise InternalError(
+ f"More than one installed backend with name {backend_name} found"
+ )
- return PerformanceMetrics(**output_parser.result)
+ installation = installations[0]
+ installation.uninstall()
+ logger.info("%s successfully uninstalled.", installation.name)
-def get_backend_runner() -> BackendRunner:
- """
- Return BackendRunner instance.
+ def backend_installed(self, backend_name: str) -> bool:
+ """Return true if requested backend installed."""
+ installations = self.already_installed(backend_name)
- Note: This is needed for the unit tests.
- """
- return BackendRunner()
+ return len(installations) == 1