From 3e3dcb9bd5abb88adcd85b4f89e8a81e7f6fa293 Mon Sep 17 00:00:00 2001 From: Dmitrii Agibov Date: Fri, 27 Jan 2023 09:12:50 +0000 Subject: MLIA-595 Remove old backend configuration mechanism - Remove old backend configuration code - Install backends into directory ~/.mlia - Rename targets/backends in registry to make it consistent across codebase. Change-Id: I9c8b012fe863280f1c692940c0dcad3ef638aaae --- src/mlia/backend/executor/__init__.py | 3 - src/mlia/backend/executor/application.py | 170 --------- src/mlia/backend/executor/common.py | 517 --------------------------- src/mlia/backend/executor/config.py | 68 ---- src/mlia/backend/executor/execution.py | 342 ------------------ src/mlia/backend/executor/fs.py | 88 ----- src/mlia/backend/executor/output_consumer.py | 67 ---- src/mlia/backend/executor/proc.py | 191 ---------- src/mlia/backend/executor/runner.py | 98 ----- src/mlia/backend/executor/source.py | 207 ----------- src/mlia/backend/executor/system.py | 178 --------- 11 files changed, 1929 deletions(-) delete mode 100644 src/mlia/backend/executor/__init__.py delete mode 100644 src/mlia/backend/executor/application.py delete mode 100644 src/mlia/backend/executor/common.py delete mode 100644 src/mlia/backend/executor/config.py delete mode 100644 src/mlia/backend/executor/execution.py delete mode 100644 src/mlia/backend/executor/fs.py delete mode 100644 src/mlia/backend/executor/output_consumer.py delete mode 100644 src/mlia/backend/executor/proc.py delete mode 100644 src/mlia/backend/executor/runner.py delete mode 100644 src/mlia/backend/executor/source.py delete mode 100644 src/mlia/backend/executor/system.py (limited to 'src/mlia/backend/executor') diff --git a/src/mlia/backend/executor/__init__.py b/src/mlia/backend/executor/__init__.py deleted file mode 100644 index 3d60372..0000000 --- a/src/mlia/backend/executor/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. -# SPDX-License-Identifier: Apache-2.0 -"""Backend module.""" diff --git a/src/mlia/backend/executor/application.py b/src/mlia/backend/executor/application.py deleted file mode 100644 index 738ac4e..0000000 --- a/src/mlia/backend/executor/application.py +++ /dev/null @@ -1,170 +0,0 @@ -# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. -# SPDX-License-Identifier: Apache-2.0 -"""Application backend module.""" -from __future__ import annotations - -import re -from pathlib import Path -from typing import Any -from typing import cast -from typing import List - -from mlia.backend.executor.common import Backend -from mlia.backend.executor.common import ConfigurationException -from mlia.backend.executor.common import get_backend_configs -from mlia.backend.executor.common import get_backend_directories -from mlia.backend.executor.common import load_application_configs -from mlia.backend.executor.common import load_config -from mlia.backend.executor.common import remove_backend -from mlia.backend.executor.config import ApplicationConfig -from mlia.backend.executor.config import ExtendedApplicationConfig -from mlia.backend.executor.fs import get_backends_path -from mlia.backend.executor.source import create_destination_and_install -from mlia.backend.executor.source import get_source - - -def get_available_application_directory_names() -> list[str]: - """Return a list of directory names for all available applications.""" - return [entry.name for entry in get_backend_directories("applications")] - - -def get_available_applications() -> list[Application]: - """Return a list with all available applications.""" - available_applications = [] - for config_json in get_backend_configs("applications"): - config_entries = cast(List[ExtendedApplicationConfig], load_config(config_json)) - for config_entry in config_entries: - config_entry["config_location"] = config_json.parent.absolute() - applications = load_applications(config_entry) - available_applications += applications - - return sorted(available_applications, key=lambda application: application.name) - - -def get_application( - application_name: str, system_name: str | None = None -) -> list[Application]: - """Return a list of application instances with provided name.""" - return [ - application - for application in get_available_applications() - if application.name == application_name - and (not system_name or application.can_run_on(system_name)) - ] - - -def install_application(source_path: Path) -> None: - """Install application.""" - try: - source = get_source(source_path) - config = cast(List[ExtendedApplicationConfig], source.config()) - applications_to_install = [ - s for entry in config for s in load_applications(entry) - ] - except Exception as error: - raise ConfigurationException("Unable to read application definition") from error - - if not applications_to_install: - raise ConfigurationException("No application definition found") - - available_applications = get_available_applications() - already_installed = [ - s for s in applications_to_install if s in available_applications - ] - if already_installed: - names = {application.name for application in already_installed} - raise ConfigurationException( - f"Applications [{','.join(names)}] are already installed." - ) - - create_destination_and_install(source, get_backends_path("applications")) - - -def remove_application(directory_name: str) -> None: - """Remove application directory.""" - remove_backend(directory_name, "applications") - - -def get_unique_application_names(system_name: str | None = None) -> list[str]: - """Extract a list of unique application names of all application available.""" - return list( - { - application.name - for application in get_available_applications() - if not system_name or application.can_run_on(system_name) - } - ) - - -class Application(Backend): - """Class for representing a single application component.""" - - def __init__(self, config: ApplicationConfig) -> None: - """Construct a Application instance from a dict.""" - super().__init__(config) - - self.supported_systems = config.get("supported_systems", []) - - def __eq__(self, other: object) -> bool: - """Overload operator ==.""" - if not isinstance(other, Application): - return False - - return ( - super().__eq__(other) - and self.name == other.name - and set(self.supported_systems) == set(other.supported_systems) - ) - - def can_run_on(self, system_name: str) -> bool: - """Check if the application can run on the system passed as argument.""" - return system_name in self.supported_systems - - def get_details(self) -> dict[str, Any]: - """Return dictionary with information about the Application instance.""" - output = { - "type": "application", - "name": self.name, - "description": self.description, - "supported_systems": self.supported_systems, - "commands": self._get_command_details(), - } - - return output - - def remove_unused_params(self) -> None: - """Remove unused params in commands. - - After merging default and system related configuration application - could have parameters that are not being used in commands. They - should be removed. - """ - for command in self.commands.values(): - indexes_or_aliases = [ - m - for cmd_str in command.command_strings - for m in re.findall(r"{user_params:(?P\w+)}", cmd_str) - ] - - only_aliases = all(not item.isnumeric() for item in indexes_or_aliases) - if only_aliases: - used_params = [ - param - for param in command.params - if param.alias in indexes_or_aliases - ] - command.params = used_params - - -def load_applications(config: ExtendedApplicationConfig) -> list[Application]: - """Load application. - - Application configuration could contain different parameters/commands for different - supported systems. For each supported system this function will return separate - Application instance with appropriate configuration. - """ - configs = load_application_configs(config, ApplicationConfig) - applications = [Application(cfg) for cfg in configs] - for application in applications: - application.remove_unused_params() - return applications diff --git a/src/mlia/backend/executor/common.py b/src/mlia/backend/executor/common.py deleted file mode 100644 index 48dbd4a..0000000 --- a/src/mlia/backend/executor/common.py +++ /dev/null @@ -1,517 +0,0 @@ -# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. -# SPDX-License-Identifier: Apache-2.0 -"""Contain all common functions for the backends.""" -from __future__ import annotations - -import json -import logging -import re -from abc import ABC -from collections import Counter -from pathlib import Path -from typing import Any -from typing import Callable -from typing import cast -from typing import Final -from typing import IO -from typing import Iterable -from typing import Match -from typing import NamedTuple -from typing import Pattern - -from mlia.backend.executor.config import BackendConfig -from mlia.backend.executor.config import BaseBackendConfig -from mlia.backend.executor.config import NamedExecutionConfig -from mlia.backend.executor.config import UserParamConfig -from mlia.backend.executor.config import UserParamsConfig -from mlia.backend.executor.fs import get_backends_path -from mlia.backend.executor.fs import remove_resource -from mlia.backend.executor.fs import ResourceType - - -BACKEND_CONFIG_FILE: Final[str] = "backend-config.json" - - -class ConfigurationException(Exception): - """Configuration exception.""" - - -def get_backend_config(dir_path: Path) -> Path: - """Get path to backendir configuration file.""" - return dir_path / BACKEND_CONFIG_FILE - - -def get_backend_configs(resource_type: ResourceType) -> Iterable[Path]: - """Get path to the backend configs for provided resource_type.""" - return ( - get_backend_config(entry) for entry in get_backend_directories(resource_type) - ) - - -def get_backend_directories(resource_type: ResourceType) -> Iterable[Path]: - """Get path to the backend directories for provided resource_type.""" - return ( - entry - for entry in get_backends_path(resource_type).iterdir() - if is_backend_directory(entry) - ) - - -def is_backend_directory(dir_path: Path) -> bool: - """Check if path is backend's configuration directory.""" - return dir_path.is_dir() and get_backend_config(dir_path).is_file() - - -def remove_backend(directory_name: str, resource_type: ResourceType) -> None: - """Remove backend with provided type and directory_name.""" - if not directory_name: - raise Exception("No directory name provided") - - remove_resource(directory_name, resource_type) - - -def load_config(config: Path | IO[bytes] | None) -> BackendConfig: - """Return a loaded json file.""" - if config is None: - raise Exception("Unable to read config") - - if isinstance(config, Path): - with config.open() as json_file: - return cast(BackendConfig, json.load(json_file)) - - return cast(BackendConfig, json.load(config)) - - -def parse_raw_parameter(parameter: str) -> tuple[str, str | None]: - """Split the parameter string in name and optional value. - - It manages the following cases: - --param=1 -> --param, 1 - --param 1 -> --param, 1 - --flag -> --flag, None - """ - data = re.split(" |=", parameter) - if len(data) == 1: - param_name = data[0] - param_value = None - else: - param_name = " ".join(data[0:-1]) - param_value = data[-1] - return param_name, param_value - - -class DataPaths(NamedTuple): - """DataPaths class.""" - - src: Path - dst: str - - -class Backend(ABC): - """Backend class.""" - - # pylint: disable=too-many-instance-attributes - - def __init__(self, config: BaseBackendConfig): - """Initialize backend.""" - name = config.get("name") - if not name: - raise ConfigurationException("Name is empty") - - self.name = name - self.description = config.get("description", "") - self.config_location = config.get("config_location") - self.variables = config.get("variables", {}) - self.annotations = config.get("annotations", {}) - - self._parse_commands_and_params(config) - - def validate_parameter(self, command_name: str, parameter: str) -> bool: - """Validate the parameter string against the application configuration. - - We take the parameter string, extract the parameter name/value and - check them against the current configuration. - """ - param_name, param_value = parse_raw_parameter(parameter) - valid_param_name = valid_param_value = False - - command = self.commands.get(command_name) - if not command: - raise AttributeError(f"Unknown command: '{command_name}'") - - # Iterate over all available parameters until we have a match. - for param in command.params: - if self._same_parameter(param_name, param): - valid_param_name = True - # This is a non-empty list - if param.values: - # We check if the value is allowed in the configuration - valid_param_value = param_value in param.values - else: - # In this case we don't validate the value and accept - # whatever we have set. - valid_param_value = True - break - - return valid_param_name and valid_param_value - - def __eq__(self, other: object) -> bool: - """Overload operator ==.""" - if not isinstance(other, Backend): - return False - - return ( - self.name == other.name - and self.description == other.description - and self.commands == other.commands - ) - - def __repr__(self) -> str: - """Represent the Backend instance by its name.""" - return self.name - - def _parse_commands_and_params(self, config: BaseBackendConfig) -> None: - """Parse commands and user parameters.""" - self.commands: dict[str, Command] = {} - - commands = config.get("commands") - if commands: - params = config.get("user_params") - - for command_name in commands.keys(): - command_params = self._parse_params(params, command_name) - command_strings = [ - self._substitute_variables(cmd) - for cmd in commands.get(command_name, []) - ] - self.commands[command_name] = Command(command_strings, command_params) - - def _substitute_variables(self, str_val: str) -> str: - """Substitute variables in string. - - Variables is being substituted at backend's creation stage because - they could contain references to other params which will be - resolved later. - """ - if not str_val: - return str_val - - var_pattern: Final[Pattern] = re.compile(r"{variables:(?P\w+)}") - - def var_value(match: Match) -> str: - var_name = match["var_name"] - if var_name not in self.variables: - raise ConfigurationException(f"Unknown variable {var_name}") - - return self.variables[var_name] - - return var_pattern.sub(var_value, str_val) - - @classmethod - def _parse_params( - cls, params: UserParamsConfig | None, command: str - ) -> list[Param]: - if not params: - return [] - - return [cls._parse_param(p) for p in params.get(command, [])] - - @classmethod - def _parse_param(cls, param: UserParamConfig) -> Param: - """Parse a single parameter.""" - name = param.get("name") - if name is not None and not name: - raise ConfigurationException("Parameter has an empty 'name' attribute.") - values = param.get("values", None) - default_value = param.get("default_value", None) - description = param.get("description", "") - alias = param.get("alias") - - return Param( - name=name, - description=description, - values=values, - default_value=default_value, - alias=alias, - ) - - def _get_command_details(self) -> dict: - command_details = { - command_name: command.get_details() - for command_name, command in self.commands.items() - } - return command_details - - def _get_user_param_value(self, user_params: list[str], param: Param) -> str | None: - """Get the user-specified value of a parameter.""" - for user_param in user_params: - user_param_name, user_param_value = parse_raw_parameter(user_param) - if user_param_name == param.name: - warn_message = ( - "The direct use of parameter name is deprecated" - " and might be removed in the future.\n" - f"Please use alias '{param.alias}' instead of " - "'{user_param_name}' to provide the parameter." - ) - logging.warning(warn_message) - - if self._same_parameter(user_param_name, param): - return user_param_value - - return None - - @staticmethod - def _same_parameter(user_param_name_or_alias: str, param: Param) -> bool: - """Compare user parameter name with param name or alias.""" - # Strip the "=" sign in the param_name. This is needed just for - # comparison with the parameters passed by the user. - # The equal sign needs to be honoured when re-building the - # parameter back. - param_name = None if not param.name else param.name.rstrip("=") - return user_param_name_or_alias in [param_name, param.alias] - - def resolved_parameters( - self, command_name: str, user_params: list[str] - ) -> list[tuple[str | None, Param]]: - """Return list of parameters with values.""" - result: list[tuple[str | None, Param]] = [] - command = self.commands.get(command_name) - if not command: - return result - - for param in command.params: - value = self._get_user_param_value(user_params, param) - if not value: - value = param.default_value - result.append((value, param)) - - return result - - def build_command( - self, - command_name: str, - user_params: list[str], - param_resolver: Callable[[str, str, list[tuple[str | None, Param]]], str], - ) -> list[str]: - """ - Return a list of executable command strings. - - Given a command and associated parameters, returns a list of executable command - strings. - """ - command = self.commands.get(command_name) - if not command: - raise ConfigurationException( - f"Command '{command_name}' could not be found." - ) - - commands_to_run = [] - - params_values = self.resolved_parameters(command_name, user_params) - for cmd_str in command.command_strings: - cmd_str = resolve_all_parameters( - cmd_str, param_resolver, command_name, params_values - ) - commands_to_run.append(cmd_str) - - return commands_to_run - - -class Param: - """Class for representing a generic application parameter.""" - - def __init__( # pylint: disable=too-many-arguments - self, - name: str | None, - description: str, - values: list[str] | None = None, - default_value: str | None = None, - alias: str | None = None, - ) -> None: - """Construct a Param instance.""" - if not name and not alias: - raise ConfigurationException( - "Either name, alias or both must be set to identify a parameter." - ) - self.name = name - self.values = values - self.description = description - self.default_value = default_value - self.alias = alias - - def get_details(self) -> dict: - """Return a dictionary with all relevant information of a Param.""" - return {key: value for key, value in self.__dict__.items() if value} - - def __eq__(self, other: object) -> bool: - """Overload operator ==.""" - if not isinstance(other, Param): - return False - - return ( - self.name == other.name - and self.values == other.values - and self.default_value == other.default_value - and self.description == other.description - ) - - -class Command: - """Class for representing a command.""" - - def __init__( - self, command_strings: list[str], params: list[Param] | None = None - ) -> None: - """Construct a Command instance.""" - self.command_strings = command_strings - - if params: - self.params = params - else: - self.params = [] - - self._validate() - - def _validate(self) -> None: - """Validate command.""" - if not self.params: - return - - aliases = [param.alias for param in self.params if param.alias is not None] - repeated_aliases = [ - alias for alias, count in Counter(aliases).items() if count > 1 - ] - - if repeated_aliases: - raise ConfigurationException( - f"Non-unique aliases {', '.join(repeated_aliases)}" - ) - - both_name_and_alias = [ - param.name - for param in self.params - if param.name in aliases and param.name != param.alias - ] - if both_name_and_alias: - raise ConfigurationException( - f"Aliases {', '.join(both_name_and_alias)} could not be used " - "as parameter name." - ) - - def get_details(self) -> dict: - """Return a dictionary with all relevant information of a Command.""" - output = { - "command_strings": self.command_strings, - "user_params": [param.get_details() for param in self.params], - } - return output - - def __eq__(self, other: object) -> bool: - """Overload operator ==.""" - if not isinstance(other, Command): - return False - - return ( - self.command_strings == other.command_strings - and self.params == other.params - ) - - -def resolve_all_parameters( - str_val: str, - param_resolver: Callable[[str, str, list[tuple[str | None, Param]]], str], - command_name: str | None = None, - params_values: list[tuple[str | None, Param]] | None = None, -) -> str: - """Resolve all parameters in the string.""" - if not str_val: - return str_val - - param_pattern: Final[Pattern] = re.compile(r"{(?P[\w.:]+)}") - while param_pattern.findall(str_val): - str_val = param_pattern.sub( - lambda m: param_resolver( - m["param_name"], command_name or "", params_values or [] - ), - str_val, - ) - return str_val - - -def load_application_configs( - config: Any, - config_type: type[Any], - is_system_required: bool = True, -) -> Any: - """Get one config for each system supported by the application. - - The configuration could contain different parameters/commands for different - supported systems. For each supported system this function will return separate - config with appropriate configuration. - """ - merged_configs = [] - supported_systems: list[NamedExecutionConfig] | None = config.get( - "supported_systems" - ) - if not supported_systems: - if is_system_required: - raise ConfigurationException("No supported systems definition provided") - # Create an empty system to be used in the parsing below - supported_systems = [cast(NamedExecutionConfig, {})] - - default_user_params = config.get("user_params", {}) - - def merge_config(system: NamedExecutionConfig) -> Any: - system_name = system.get("name") - if not system_name and is_system_required: - raise ConfigurationException( - "Unable to read supported system definition, name is missed" - ) - - merged_config = config_type(**config) - merged_config["supported_systems"] = [system_name] if system_name else [] - # merge default configuration and specific to the system - merged_config["commands"] = { - **config.get("commands", {}), - **system.get("commands", {}), - } - - params = {} - tool_user_params = system.get("user_params", {}) - command_names = tool_user_params.keys() | default_user_params.keys() - for command_name in command_names: - if command_name not in merged_config["commands"]: - continue - - params_default = default_user_params.get(command_name, []) - params_tool = tool_user_params.get(command_name, []) - if not params_default or not params_tool: - params[command_name] = params_tool or params_default - if params_default and params_tool: - if any(not p.get("alias") for p in params_default): - raise ConfigurationException( - f"Default parameters for command {command_name} " - "should have aliases" - ) - if any(not p.get("alias") for p in params_tool): - raise ConfigurationException( - f"{system_name} parameters for command {command_name} " - "should have aliases." - ) - - merged_by_alias = { - **{p.get("alias"): p for p in params_default}, - **{p.get("alias"): p for p in params_tool}, - } - params[command_name] = list(merged_by_alias.values()) - - merged_config["user_params"] = params - merged_config["variables"] = { - **config.get("variables", {}), - **system.get("variables", {}), - } - return merged_config - - merged_configs = [merge_config(system) for system in supported_systems] - - return merged_configs diff --git a/src/mlia/backend/executor/config.py b/src/mlia/backend/executor/config.py deleted file mode 100644 index dca53da..0000000 --- a/src/mlia/backend/executor/config.py +++ /dev/null @@ -1,68 +0,0 @@ -# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. -# SPDX-License-Identifier: Apache-2.0 -"""Contain definition of backend configuration.""" -from __future__ import annotations - -from pathlib import Path -from typing import Dict -from typing import List -from typing import TypedDict -from typing import Union - - -class UserParamConfig(TypedDict, total=False): - """User parameter configuration.""" - - name: str | None - default_value: str - values: list[str] - description: str - alias: str - - -UserParamsConfig = Dict[str, List[UserParamConfig]] - - -class ExecutionConfig(TypedDict, total=False): - """Execution configuration.""" - - commands: dict[str, list[str]] - user_params: UserParamsConfig - variables: dict[str, str] - - -class NamedExecutionConfig(ExecutionConfig): - """Execution configuration with name.""" - - name: str - - -class BaseBackendConfig(ExecutionConfig, total=False): - """Base backend configuration.""" - - name: str - description: str - config_location: Path - annotations: dict[str, str | list[str]] - - -class ApplicationConfig(BaseBackendConfig, total=False): - """Application configuration.""" - - supported_systems: list[str] - - -class ExtendedApplicationConfig(BaseBackendConfig, total=False): - """Extended application configuration.""" - - supported_systems: list[NamedExecutionConfig] - - -class SystemConfig(BaseBackendConfig, total=False): - """System configuration.""" - - reporting: dict[str, dict] - - -BackendItemConfig = Union[ApplicationConfig, SystemConfig] -BackendConfig = Union[List[ExtendedApplicationConfig], List[SystemConfig]] diff --git a/src/mlia/backend/executor/execution.py b/src/mlia/backend/executor/execution.py deleted file mode 100644 index e253b16..0000000 --- a/src/mlia/backend/executor/execution.py +++ /dev/null @@ -1,342 +0,0 @@ -# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. -# SPDX-License-Identifier: Apache-2.0 -"""Application execution module.""" -from __future__ import annotations - -import logging -import re -from typing import cast - -from mlia.backend.executor.application import Application -from mlia.backend.executor.application import get_application -from mlia.backend.executor.common import Backend -from mlia.backend.executor.common import ConfigurationException -from mlia.backend.executor.common import Param -from mlia.backend.executor.system import get_system -from mlia.backend.executor.system import System - -logger = logging.getLogger(__name__) - - -class AnotherInstanceIsRunningException(Exception): - """Concurrent execution error.""" - - -class ExecutionContext: # pylint: disable=too-few-public-methods - """Command execution context.""" - - def __init__( - self, - app: Application, - app_params: list[str], - system: System, - system_params: list[str], - ): - """Init execution context.""" - self.app = app - self.app_params = app_params - self.system = system - self.system_params = system_params - - self.param_resolver = ParamResolver(self) - - self.stdout: bytearray | None = None - self.stderr: bytearray | None = None - - -class ParamResolver: - """Parameter resolver.""" - - def __init__(self, context: ExecutionContext): - """Init parameter resolver.""" - self.ctx = context - - @staticmethod - def resolve_user_params( - cmd_name: str | None, - index_or_alias: str, - resolved_params: list[tuple[str | None, Param]] | None, - ) -> str: - """Resolve user params.""" - if not cmd_name or resolved_params is None: - raise ConfigurationException("Unable to resolve user params") - - param_value: str | None = None - param: Param | None = None - - if index_or_alias.isnumeric(): - i = int(index_or_alias) - if i not in range(len(resolved_params)): - raise ConfigurationException( - f"Invalid index {i} for user params of command {cmd_name}" - ) - param_value, param = resolved_params[i] - else: - for val, par in resolved_params: - if par.alias == index_or_alias: - param_value, param = val, par - break - - if param is None: - raise ConfigurationException( - f"No user parameter for command '{cmd_name}' with " - f"alias '{index_or_alias}'." - ) - - if param_value: - # We need to handle to cases of parameters here: - # 1) Optional parameters (non-positional with a name and value) - # 2) Positional parameters (value only, no name needed) - # Default to empty strings for positional arguments - param_name = "" - separator = "" - if param.name is not None: - # A valid param name means we have an optional/non-positional argument: - # The separator is an empty string in case the param_name - # has an equal sign as we have to honour it. - # If the parameter doesn't end with an equal sign then a - # space character is injected to split the parameter name - # and its value - param_name = param.name - separator = "" if param.name.endswith("=") else " " - - return f"{param_name}{separator}{param_value}" - - if param.name is None: - raise ConfigurationException( - f"Missing user parameter with alias '{index_or_alias}' for " - f"command '{cmd_name}'." - ) - - return param.name # flag: just return the parameter name - - def resolve_commands_and_params( - self, backend_type: str, cmd_name: str, return_params: bool, index_or_alias: str - ) -> str: - """Resolve command or command's param value.""" - if backend_type == "system": - backend = cast(Backend, self.ctx.system) - backend_params = self.ctx.system_params - else: # Application backend - backend = cast(Backend, self.ctx.app) - backend_params = self.ctx.app_params - - if cmd_name not in backend.commands: - raise ConfigurationException(f"Command {cmd_name} not found") - - if return_params: - params = backend.resolved_parameters(cmd_name, backend_params) - if index_or_alias.isnumeric(): - i = int(index_or_alias) - if i not in range(len(params)): - raise ConfigurationException( - f"Invalid parameter index {i} for command {cmd_name}" - ) - - param_value = params[i][0] - else: - param_value = None - for value, param in params: - if param.alias == index_or_alias: - param_value = value - break - - if not param_value: - raise ConfigurationException( - "No value for parameter with index or " - f"alias {index_or_alias} of command {cmd_name}." - ) - return param_value - - if not index_or_alias.isnumeric(): - raise ConfigurationException(f"Bad command index {index_or_alias}") - - i = int(index_or_alias) - commands = backend.build_command(cmd_name, backend_params, self.param_resolver) - if i not in range(len(commands)): - raise ConfigurationException(f"Invalid index {i} for command {cmd_name}") - - return commands[i] - - def resolve_variables(self, backend_type: str, var_name: str) -> str: - """Resolve variable value.""" - if backend_type == "system": - backend = cast(Backend, self.ctx.system) - else: # Application backend - backend = cast(Backend, self.ctx.app) - - if var_name not in backend.variables: - raise ConfigurationException(f"Unknown variable {var_name}") - - return backend.variables[var_name] - - def param_matcher( - self, - param_name: str, - cmd_name: str | None, - resolved_params: list[tuple[str | None, Param]] | None, - ) -> str: - """Regexp to resolve a param from the param_name.""" - # this pattern supports parameter names like "application.commands.run:0" and - # "system.commands.run.params:0" - # Note: 'software' is included for backward compatibility. - commands_and_params_match = re.match( - r"(?Papplication|software|system)[.]commands[.]" - r"(?P\w+)" - r"(?P[.]params|)[:]" - r"(?P\w+)", - param_name, - ) - - if commands_and_params_match: - backend_type, cmd_name, return_params, index_or_alias = ( - commands_and_params_match["type"], - commands_and_params_match["name"], - commands_and_params_match["params"], - commands_and_params_match["index_or_alias"], - ) - return self.resolve_commands_and_params( - backend_type, cmd_name, bool(return_params), index_or_alias - ) - - # Note: 'software' is included for backward compatibility. - variables_match = re.match( - r"(?Papplication|software|system)[.]variables:(?P\w+)", - param_name, - ) - if variables_match: - backend_type, var_name = ( - variables_match["type"], - variables_match["var_name"], - ) - return self.resolve_variables(backend_type, var_name) - - user_params_match = re.match(r"user_params:(?P\w+)", param_name) - if user_params_match: - index_or_alias = user_params_match["index_or_alias"] - return self.resolve_user_params(cmd_name, index_or_alias, resolved_params) - - raise ConfigurationException(f"Unable to resolve parameter {param_name}") - - def param_resolver( - self, - param_name: str, - cmd_name: str | None = None, - resolved_params: list[tuple[str | None, Param]] | None = None, - ) -> str: - """Resolve parameter value based on current execution context.""" - # Note: 'software.*' is included for backward compatibility. - resolved_param = None - if param_name in ["application.name", "software.name"]: - resolved_param = self.ctx.app.name - elif param_name in ["application.description", "software.description"]: - resolved_param = self.ctx.app.description - elif self.ctx.app.config_location and ( - param_name in ["application.config_dir", "software.config_dir"] - ): - resolved_param = str(self.ctx.app.config_location.absolute()) - elif self.ctx.system is not None: - if param_name == "system.name": - resolved_param = self.ctx.system.name - elif param_name == "system.description": - resolved_param = self.ctx.system.description - elif param_name == "system.config_dir" and self.ctx.system.config_location: - resolved_param = str(self.ctx.system.config_location.absolute()) - - if not resolved_param: - resolved_param = self.param_matcher(param_name, cmd_name, resolved_params) - return resolved_param - - def __call__( - self, - param_name: str, - cmd_name: str | None = None, - resolved_params: list[tuple[str | None, Param]] | None = None, - ) -> str: - """Resolve provided parameter.""" - return self.param_resolver(param_name, cmd_name, resolved_params) - - -def validate_parameters( - backend: Backend, command_names: list[str], params: list[str] -) -> None: - """Check parameters passed to backend.""" - for param in params: - acceptable = any( - backend.validate_parameter(command_name, param) - for command_name in command_names - if command_name in backend.commands - ) - - if not acceptable: - backend_type = "System" if isinstance(backend, System) else "Application" - raise ValueError( - f"{backend_type} parameter '{param}' not valid for " - f"command '{' or '.join(command_names)}'." - ) - - -def get_application_by_name_and_system( - application_name: str, system_name: str -) -> Application: - """Get application.""" - applications = get_application(application_name, system_name) - if not applications: - raise ValueError( - f"Application '{application_name}' doesn't support the " - f"system '{system_name}'." - ) - - if len(applications) != 1: - raise ValueError( - f"Error during getting application {application_name} for the " - f"system {system_name}." - ) - - return applications[0] - - -def get_application_and_system( - application_name: str, system_name: str -) -> tuple[Application, System]: - """Return application and system by provided names.""" - system = get_system(system_name) - if not system: - raise ValueError(f"System {system_name} is not found.") - - application = get_application_by_name_and_system(application_name, system_name) - - return application, system - - -def run_application( - application_name: str, - application_params: list[str], - system_name: str, - system_params: list[str], -) -> ExecutionContext: - """Run application on the provided system.""" - application, system = get_application_and_system(application_name, system_name) - validate_parameters(application, ["run"], application_params) - validate_parameters(system, ["run"], system_params) - - ctx = ExecutionContext( - app=application, - app_params=application_params, - system=system, - system_params=system_params, - ) - - logger.debug("Generating commands to execute") - commands_to_run = ctx.system.build_command( - "run", ctx.system_params, ctx.param_resolver - ) - - for command in commands_to_run: - logger.debug("Running: %s", command) - exit_code, ctx.stdout, ctx.stderr = ctx.system.run(command) - - if exit_code != 0: - logger.warning("Application exited with exit code %i", exit_code) - - return ctx diff --git a/src/mlia/backend/executor/fs.py b/src/mlia/backend/executor/fs.py deleted file mode 100644 index 3fce19c..0000000 --- a/src/mlia/backend/executor/fs.py +++ /dev/null @@ -1,88 +0,0 @@ -# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. -# SPDX-License-Identifier: Apache-2.0 -"""Module to host all file system related functions.""" -from __future__ import annotations - -import re -import shutil -from pathlib import Path -from typing import Literal - -from mlia.utils.filesystem import get_mlia_resources - -ResourceType = Literal["applications", "systems"] - - -def get_backend_resources() -> Path: - """Get backend resources folder path.""" - return get_mlia_resources() / "backends" - - -def get_backends_path(name: ResourceType) -> Path: - """Return the absolute path of the specified resource. - - It uses importlib to return resources packaged with MANIFEST.in. - """ - if not name: - raise ResourceWarning("Resource name is not provided") - - resource_path = get_backend_resources() / name - if resource_path.is_dir(): - return resource_path - - raise ResourceWarning(f"Resource '{name}' not found.") - - -def copy_directory_content(source: Path, destination: Path) -> None: - """Copy content of the source directory into destination directory.""" - for item in source.iterdir(): - src = source / item.name - dest = destination / item.name - - if src.is_dir(): - shutil.copytree(src, dest) - else: - shutil.copy2(src, dest) - - -def remove_resource(resource_directory: str, resource_type: ResourceType) -> None: - """Remove resource data.""" - resources = get_backends_path(resource_type) - - resource_location = resources / resource_directory - if not resource_location.exists(): - raise Exception(f"Resource {resource_directory} does not exist") - - if not resource_location.is_dir(): - raise Exception(f"Wrong resource {resource_directory}") - - shutil.rmtree(resource_location) - - -def remove_directory(directory_path: Path | None) -> None: - """Remove directory.""" - if not directory_path or not directory_path.is_dir(): - raise Exception("No directory path provided") - - shutil.rmtree(directory_path) - - -def recreate_directory(directory_path: Path | None) -> None: - """Recreate directory.""" - if not directory_path: - raise Exception("No directory path provided") - - if directory_path.exists() and not directory_path.is_dir(): - raise Exception( - f"Path {str(directory_path)} does exist and it is not a directory." - ) - - if directory_path.is_dir(): - remove_directory(directory_path) - - directory_path.mkdir() - - -def valid_for_filename(value: str, replacement: str = "") -> str: - """Replace non alpha numeric characters.""" - return re.sub(r"[^\w.]", replacement, value, flags=re.ASCII) diff --git a/src/mlia/backend/executor/output_consumer.py b/src/mlia/backend/executor/output_consumer.py deleted file mode 100644 index 3c3b132..0000000 --- a/src/mlia/backend/executor/output_consumer.py +++ /dev/null @@ -1,67 +0,0 @@ -# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. -# SPDX-License-Identifier: Apache-2.0 -"""Output consumers module.""" -from __future__ import annotations - -import base64 -import json -import re -from typing import Protocol -from typing import runtime_checkable - - -@runtime_checkable -class OutputConsumer(Protocol): - """Protocol to consume output.""" - - def feed(self, line: str) -> bool: - """ - Feed a new line to be parsed. - - Return True if the line should be removed from the output. - """ - - -class Base64OutputConsumer(OutputConsumer): - """ - Parser to extract base64-encoded JSON from tagged standard output. - - Example of the tagged output: - ``` - # Encoded JSON: {"test": 1234} - eyJ0ZXN0IjogMTIzNH0 - ``` - """ - - TAG_NAME = "metrics" - - def __init__(self) -> None: - """Set up the regular expression to extract tagged strings.""" - self._regex = re.compile(rf"<{self.TAG_NAME}>(.*)") - self.parsed_output: list = [] - - def feed(self, line: str) -> bool: - """ - Parse the output line and save the decoded output. - - Returns True if the line contains tagged output. - - Example: - Using the tagged output from the class docs the parser should collect - the following: - ``` - [ - {"test": 1234} - ] - ``` - """ - res_b64 = self._regex.search(line) - if res_b64: - res_json = base64.b64decode(res_b64.group(1), validate=True) - res = json.loads(res_json) - self.parsed_output.append(res) - # Remove this line from the output, i.e. consume it, as it - # does not contain any human readable content. - return True - - return False diff --git a/src/mlia/backend/executor/proc.py b/src/mlia/backend/executor/proc.py deleted file mode 100644 index 39a0689..0000000 --- a/src/mlia/backend/executor/proc.py +++ /dev/null @@ -1,191 +0,0 @@ -# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. -# SPDX-License-Identifier: Apache-2.0 -"""Processes module. - -This module contains all classes and functions for dealing with Linux -processes. -""" -from __future__ import annotations - -import datetime -import logging -import shlex -import signal -import tempfile -import time -from pathlib import Path -from typing import Any - -from sh import Command -from sh import CommandNotFound -from sh import ErrorReturnCode -from sh import RunningCommand - -from mlia.backend.executor.fs import valid_for_filename - -logger = logging.getLogger(__name__) - - -class CommandFailedException(Exception): - """Exception for failed command execution.""" - - -class ShellCommand: - """Wrapper class for shell commands.""" - - def run( - self, - cmd: str, - *args: str, - _cwd: Path | None = None, - _tee: bool = True, - _bg: bool = True, - _out: Any = None, - _err: Any = None, - _search_paths: list[Path] | None = None, - ) -> RunningCommand: - """Run the shell command with the given arguments. - - There are special arguments that modify the behaviour of the process. - _cwd: current working directory - _tee: it redirects the stdout both to console and file - _bg: if True, it runs the process in background and the command is not - blocking. - _out: use this object for stdout redirect, - _err: use this object for stderr redirect, - _search_paths: If presented used for searching executable - """ - try: - kwargs = {} - if _cwd: - kwargs["_cwd"] = str(_cwd) - command = Command(cmd, _search_paths).bake(args, **kwargs) - except CommandNotFound as error: - logging.error("Command '%s' not found", error.args[0]) - raise error - - out, err = _out, _err - if not _out and not _err: - out, err = (str(item) for item in self.get_stdout_stderr_paths(cmd)) - - return command(_out=out, _err=err, _tee=_tee, _bg=_bg, _bg_exc=False) - - @classmethod - def get_stdout_stderr_paths(cls, cmd: str) -> tuple[Path, Path]: - """Construct and returns the paths of stdout/stderr files.""" - timestamp = datetime.datetime.now().timestamp() - base_path = Path(tempfile.mkdtemp(prefix="mlia-", suffix=f"{timestamp}")) - base = base_path / f"{valid_for_filename(cmd, '_')}_{timestamp}" - stdout = base.with_suffix(".out") - stderr = base.with_suffix(".err") - try: - stdout.touch() - stderr.touch() - except FileNotFoundError as error: - logging.error("File not found: %s", error.filename) - raise error - return stdout, stderr - - -def parse_command(command: str, shell: str = "bash") -> list[str]: - """Parse command.""" - cmd, *args = shlex.split(command, posix=True) - - if is_shell_script(cmd): - args = [cmd] + args - cmd = shell - - return [cmd] + args - - -def execute_command( # pylint: disable=invalid-name - command: str, - cwd: Path, - bg: bool = False, - shell: str = "bash", - out: Any = None, - err: Any = None, -) -> RunningCommand: - """Execute shell command.""" - cmd, *args = parse_command(command, shell) - - search_paths = None - if cmd != shell and (cwd / cmd).is_file(): - search_paths = [cwd] - - return ShellCommand().run( - cmd, *args, _cwd=cwd, _bg=bg, _search_paths=search_paths, _out=out, _err=err - ) - - -def is_shell_script(cmd: str) -> bool: - """Check if command is shell script.""" - return cmd.endswith(".sh") - - -def run_and_wait( - command: str, - cwd: Path, - terminate_on_error: bool = False, - out: Any = None, - err: Any = None, -) -> tuple[int, bytearray, bytearray]: - """ - Run command and wait while it is executing. - - Returns a tuple: (exit_code, stdout, stderr) - """ - running_cmd: RunningCommand | None = None - try: - running_cmd = execute_command(command, cwd, bg=True, out=out, err=err) - return running_cmd.exit_code, running_cmd.stdout, running_cmd.stderr - except ErrorReturnCode as cmd_failed: - raise CommandFailedException() from cmd_failed - except Exception as error: - is_running = running_cmd is not None and running_cmd.is_alive() - if terminate_on_error and is_running: - logger.debug("Terminating ...") - terminate_command(running_cmd) - - raise error - - -def terminate_command( - running_cmd: RunningCommand, - wait: bool = True, - wait_period: float = 0.5, - number_of_attempts: int = 20, -) -> None: - """Terminate running command.""" - try: - running_cmd.process.signal_group(signal.SIGINT) - if wait: - for _ in range(number_of_attempts): - time.sleep(wait_period) - if not running_cmd.is_alive(): - return - logger.error( - "Unable to terminate process %i. Sending SIGTERM...", - running_cmd.process.pid, - ) - running_cmd.process.signal_group(signal.SIGTERM) - except ProcessLookupError: - pass - - -def print_command_stdout(command: RunningCommand) -> None: - """Print the stdout of a command. - - The command has 2 states: running and done. - If the command is running, the output is taken by the running process. - If the command has ended its execution, the stdout is taken from stdout - property - """ - if command.is_alive(): - while True: - try: - print(command.next(), end="") - except StopIteration: - break - else: - print(command.stdout) diff --git a/src/mlia/backend/executor/runner.py b/src/mlia/backend/executor/runner.py deleted file mode 100644 index 2330fd9..0000000 --- a/src/mlia/backend/executor/runner.py +++ /dev/null @@ -1,98 +0,0 @@ -# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. -# SPDX-License-Identifier: Apache-2.0 -"""Module for backend runner.""" -from __future__ import annotations - -from dataclasses import dataclass -from pathlib import Path - -from mlia.backend.executor.application import get_available_applications -from mlia.backend.executor.application import install_application -from mlia.backend.executor.execution import ExecutionContext -from mlia.backend.executor.execution import run_application -from mlia.backend.executor.system import get_available_systems -from mlia.backend.executor.system import install_system - - -@dataclass -class ExecutionParams: - """Application execution params.""" - - application: str - system: str - application_params: list[str] - system_params: list[str] - - -class BackendRunner: - """Backend runner.""" - - def __init__(self) -> None: - """Init BackendRunner instance.""" - - @staticmethod - def get_installed_systems() -> list[str]: - """Get list of the installed systems.""" - return [system.name for system in get_available_systems()] - - @staticmethod - def get_installed_applications(system: str | None = None) -> list[str]: - """Get list of the installed application.""" - return [ - app.name - for app in get_available_applications() - if system is None or app.can_run_on(system) - ] - - def is_application_installed(self, application: str, system: str) -> bool: - """Return true if requested application installed.""" - return application in self.get_installed_applications(system) - - def is_system_installed(self, system: str) -> bool: - """Return true if requested system installed.""" - return system in self.get_installed_systems() - - def systems_installed(self, systems: list[str]) -> bool: - """Check if all provided systems are installed.""" - if not systems: - return False - - installed_systems = self.get_installed_systems() - return all(system in installed_systems for system in systems) - - def applications_installed(self, applications: list[str]) -> bool: - """Check if all provided applications are installed.""" - if not applications: - return False - - installed_apps = self.get_installed_applications() - return all(app in installed_apps for app in applications) - - def all_installed(self, systems: list[str], apps: list[str]) -> bool: - """Check if all provided artifacts are installed.""" - return self.systems_installed(systems) and self.applications_installed(apps) - - @staticmethod - def install_system(system_path: Path) -> None: - """Install system.""" - install_system(system_path) - - @staticmethod - def install_application(app_path: Path) -> None: - """Install application.""" - install_application(app_path) - - @staticmethod - def run_application(execution_params: ExecutionParams) -> ExecutionContext: - """Run requested application.""" - ctx = run_application( - execution_params.application, - execution_params.application_params, - execution_params.system, - execution_params.system_params, - ) - return ctx - - @staticmethod - def _params(name: str, params: list[str]) -> list[str]: - return [p for item in [(name, param) for param in params] for p in item] diff --git a/src/mlia/backend/executor/source.py b/src/mlia/backend/executor/source.py deleted file mode 100644 index 6abc49f..0000000 --- a/src/mlia/backend/executor/source.py +++ /dev/null @@ -1,207 +0,0 @@ -# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. -# SPDX-License-Identifier: Apache-2.0 -"""Contain source related classes and functions.""" -from __future__ import annotations - -import os -import shutil -import tarfile -from abc import ABC -from abc import abstractmethod -from pathlib import Path -from tarfile import TarFile - -from mlia.backend.executor.common import BACKEND_CONFIG_FILE -from mlia.backend.executor.common import ConfigurationException -from mlia.backend.executor.common import get_backend_config -from mlia.backend.executor.common import is_backend_directory -from mlia.backend.executor.common import load_config -from mlia.backend.executor.config import BackendConfig -from mlia.backend.executor.fs import copy_directory_content - - -class Source(ABC): - """Source class.""" - - @abstractmethod - def name(self) -> str | None: - """Get source name.""" - - @abstractmethod - def config(self) -> BackendConfig | None: - """Get configuration file content.""" - - @abstractmethod - def install_into(self, destination: Path) -> None: - """Install source into destination directory.""" - - @abstractmethod - def create_destination(self) -> bool: - """Return True if destination folder should be created before installation.""" - - -class DirectorySource(Source): - """DirectorySource class.""" - - def __init__(self, directory_path: Path) -> None: - """Create the DirectorySource instance.""" - assert isinstance(directory_path, Path) - self.directory_path = directory_path - - def name(self) -> str: - """Return name of source.""" - return self.directory_path.name - - def config(self) -> BackendConfig | None: - """Return configuration file content.""" - if not is_backend_directory(self.directory_path): - raise ConfigurationException("No configuration file found") - - config_file = get_backend_config(self.directory_path) - return load_config(config_file) - - def install_into(self, destination: Path) -> None: - """Install source into destination directory.""" - if not destination.is_dir(): - raise ConfigurationException(f"Wrong destination {destination}.") - - if not self.directory_path.is_dir(): - raise ConfigurationException( - f"Directory {self.directory_path} does not exist." - ) - - copy_directory_content(self.directory_path, destination) - - def create_destination(self) -> bool: - """Return True if destination folder should be created before installation.""" - return True - - -class TarArchiveSource(Source): - """TarArchiveSource class.""" - - def __init__(self, archive_path: Path) -> None: - """Create the TarArchiveSource class.""" - assert isinstance(archive_path, Path) - self.archive_path = archive_path - self._config: BackendConfig | None = None - self._has_top_level_folder: bool | None = None - self._name: str | None = None - - def _read_archive_content(self) -> None: - """Read various information about archive.""" - # get source name from archive name (everything without extensions) - extensions = "".join(self.archive_path.suffixes) - self._name = self.archive_path.name.rstrip(extensions) - - if not self.archive_path.exists(): - return - - with self._open(self.archive_path) as archive: - try: - config_entry = archive.getmember(BACKEND_CONFIG_FILE) - self._has_top_level_folder = False - except KeyError as error_no_config: - try: - archive_entries = archive.getnames() - entries_common_prefix = os.path.commonprefix(archive_entries) - top_level_dir = entries_common_prefix.rstrip("/") - - if not top_level_dir: - raise RuntimeError( - "Archive has no top level directory" - ) from error_no_config - - config_path = f"{top_level_dir}/{BACKEND_CONFIG_FILE}" - - config_entry = archive.getmember(config_path) - self._has_top_level_folder = True - self._name = top_level_dir - except (KeyError, RuntimeError) as error_no_root_dir_or_config: - raise ConfigurationException( - "No configuration file found" - ) from error_no_root_dir_or_config - - content = archive.extractfile(config_entry) - self._config = load_config(content) - - def config(self) -> BackendConfig | None: - """Return configuration file content.""" - if self._config is None: - self._read_archive_content() - - return self._config - - def name(self) -> str | None: - """Return name of the source.""" - if self._name is None: - self._read_archive_content() - - return self._name - - def create_destination(self) -> bool: - """Return True if destination folder must be created before installation.""" - if self._has_top_level_folder is None: - self._read_archive_content() - - return not self._has_top_level_folder - - def install_into(self, destination: Path) -> None: - """Install source into destination directory.""" - if not destination.is_dir(): - raise ConfigurationException(f"Wrong destination {destination}.") - - with self._open(self.archive_path) as archive: - archive.extractall(destination) - - def _open(self, archive_path: Path) -> TarFile: - """Open archive file.""" - if not archive_path.is_file(): - raise ConfigurationException(f"File {archive_path} does not exist.") - - if archive_path.name.endswith("tar.gz") or archive_path.name.endswith("tgz"): - mode = "r:gz" - else: - raise ConfigurationException(f"Unsupported archive type {archive_path}.") - - # The returned TarFile object can be used as a context manager (using - # 'with') by the calling instance. - return tarfile.open( # pylint: disable=consider-using-with - self.archive_path, mode=mode - ) - - -def get_source(source_path: Path) -> TarArchiveSource | DirectorySource: - """Return appropriate source instance based on provided source path.""" - if source_path.is_file(): - return TarArchiveSource(source_path) - - if source_path.is_dir(): - return DirectorySource(source_path) - - raise ConfigurationException(f"Unable to read {source_path}.") - - -def create_destination_and_install(source: Source, resource_path: Path) -> None: - """Create destination directory and install source. - - This function is used for actual installation of system/backend New - directory will be created inside :resource_path: if needed If for example - archive contains top level folder then no need to create new directory - """ - destination = resource_path - create_destination = source.create_destination() - - if create_destination: - name = source.name() - if not name: - raise ConfigurationException("Unable to get source name.") - - destination = resource_path / name - destination.mkdir() - try: - source.install_into(destination) - except Exception as error: - if create_destination: - shutil.rmtree(destination) - raise error diff --git a/src/mlia/backend/executor/system.py b/src/mlia/backend/executor/system.py deleted file mode 100644 index a5ecf19..0000000 --- a/src/mlia/backend/executor/system.py +++ /dev/null @@ -1,178 +0,0 @@ -# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. -# SPDX-License-Identifier: Apache-2.0 -"""System backend module.""" -from __future__ import annotations - -from pathlib import Path -from typing import Any -from typing import cast -from typing import List - -from mlia.backend.executor.common import Backend -from mlia.backend.executor.common import ConfigurationException -from mlia.backend.executor.common import get_backend_configs -from mlia.backend.executor.common import get_backend_directories -from mlia.backend.executor.common import load_config -from mlia.backend.executor.common import remove_backend -from mlia.backend.executor.config import SystemConfig -from mlia.backend.executor.fs import get_backends_path -from mlia.backend.executor.proc import run_and_wait -from mlia.backend.executor.source import create_destination_and_install -from mlia.backend.executor.source import get_source - - -class System(Backend): - """System class.""" - - def __init__(self, config: SystemConfig) -> None: - """Construct the System class using the dictionary passed.""" - super().__init__(config) - - self._setup_reporting(config) - - def _setup_reporting(self, config: SystemConfig) -> None: - self.reporting = config.get("reporting") - - def run(self, command: str) -> tuple[int, bytearray, bytearray]: - """ - Run command on the system. - - Returns a tuple: (exit_code, stdout, stderr) - """ - cwd = self.config_location - if not isinstance(cwd, Path) or not cwd.is_dir(): - raise ConfigurationException( - f"System has invalid config location: {cwd}", - ) - - stdout = bytearray() - stderr = bytearray() - - return run_and_wait( - command, - cwd=cwd, - terminate_on_error=True, - out=stdout, - err=stderr, - ) - - def __eq__(self, other: object) -> bool: - """Overload operator ==.""" - if not isinstance(other, System): - return False - - return super().__eq__(other) and self.name == other.name - - def get_details(self) -> dict[str, Any]: - """Return a dictionary with all relevant information of a System.""" - output = { - "type": "system", - "name": self.name, - "description": self.description, - "commands": self._get_command_details(), - "annotations": self.annotations, - } - - return output - - -def get_available_systems_directory_names() -> list[str]: - """Return a list of directory names for all avialable systems.""" - return [entry.name for entry in get_backend_directories("systems")] - - -def get_available_systems() -> list[System]: - """Return a list with all available systems.""" - available_systems = [] - for config_json in get_backend_configs("systems"): - config_entries = cast(List[SystemConfig], (load_config(config_json))) - for config_entry in config_entries: - config_entry["config_location"] = config_json.parent.absolute() - system = load_system(config_entry) - available_systems.append(system) - - return sorted(available_systems, key=lambda system: system.name) - - -def get_system(system_name: str) -> System: - """Return a system instance with the same name passed as argument.""" - available_systems = get_available_systems() - for system in available_systems: - if system_name == system.name: - return system - raise ConfigurationException(f"System '{system_name}' not found.") - - -def install_system(source_path: Path) -> None: - """Install new system.""" - try: - source = get_source(source_path) - config = cast(List[SystemConfig], source.config()) - systems_to_install = [load_system(entry) for entry in config] - except Exception as error: - raise ConfigurationException("Unable to read system definition") from error - - if not systems_to_install: - raise ConfigurationException("No system definition found") - - available_systems = get_available_systems() - already_installed = [s for s in systems_to_install if s in available_systems] - if already_installed: - names = [system.name for system in already_installed] - raise ConfigurationException( - f"Systems [{','.join(names)}] are already installed." - ) - - create_destination_and_install(source, get_backends_path("systems")) - - -def remove_system(directory_name: str) -> None: - """Remove system.""" - remove_backend(directory_name, "systems") - - -def load_system(config: SystemConfig) -> System: - """Load system based on it's execution type.""" - populate_shared_params(config) - - return System(config) - - -def populate_shared_params(config: SystemConfig) -> None: - """Populate command parameters with shared parameters.""" - user_params = config.get("user_params") - if not user_params or "shared" not in user_params: - return - - shared_user_params = user_params["shared"] - if not shared_user_params: - return - - only_aliases = all(p.get("alias") for p in shared_user_params) - if not only_aliases: - raise ConfigurationException("All shared parameters should have aliases") - - commands = config.get("commands", {}) - for cmd_name in ["run"]: - command = commands.get(cmd_name) - if command is None: - commands[cmd_name] = [] - cmd_user_params = user_params.get(cmd_name) - if not cmd_user_params: - cmd_user_params = shared_user_params - else: - only_aliases = all(p.get("alias") for p in cmd_user_params) - if not only_aliases: - raise ConfigurationException( - f"All parameters for command {cmd_name} should have aliases." - ) - merged_by_alias = { - **{p.get("alias"): p for p in shared_user_params}, - **{p.get("alias"): p for p in cmd_user_params}, - } - cmd_user_params = list(merged_by_alias.values()) - - user_params[cmd_name] = cmd_user_params - - config["commands"] = commands - del user_params["shared"] -- cgit v1.2.1