diff options
author | Dmitrii Agibov <dmitrii.agibov@arm.com> | 2022-11-18 16:34:03 +0000 |
---|---|---|
committer | Dmitrii Agibov <dmitrii.agibov@arm.com> | 2022-11-29 14:44:13 +0000 |
commit | 37959522a805a5e23c930ed79aac84920c3cb208 (patch) | |
tree | 484af1240a93c955a72ce2e452432383b6704b56 /src/mlia/backend/executor | |
parent | 5568f9f000d673ac53e710dcc8991fec6e8a5488 (diff) | |
download | mlia-37959522a805a5e23c930ed79aac84920c3cb208.tar.gz |
Move backends functionality into separate modules
- Move backend management/executor code into module backend_core
- Create separate module for each backend in "backend" module
- Move each backend into corresponding module
- Split Vela wrapper into several submodules
Change-Id: If01b6774aab6501951212541cc5d7f5aa7c97e95
Diffstat (limited to 'src/mlia/backend/executor')
-rw-r--r-- | src/mlia/backend/executor/__init__.py | 3 | ||||
-rw-r--r-- | src/mlia/backend/executor/application.py | 170 | ||||
-rw-r--r-- | src/mlia/backend/executor/common.py | 517 | ||||
-rw-r--r-- | src/mlia/backend/executor/config.py | 68 | ||||
-rw-r--r-- | src/mlia/backend/executor/execution.py | 342 | ||||
-rw-r--r-- | src/mlia/backend/executor/fs.py | 88 | ||||
-rw-r--r-- | src/mlia/backend/executor/output_consumer.py | 67 | ||||
-rw-r--r-- | src/mlia/backend/executor/proc.py | 191 | ||||
-rw-r--r-- | src/mlia/backend/executor/runner.py | 98 | ||||
-rw-r--r-- | src/mlia/backend/executor/source.py | 207 | ||||
-rw-r--r-- | src/mlia/backend/executor/system.py | 178 |
11 files changed, 1929 insertions, 0 deletions
diff --git a/src/mlia/backend/executor/__init__.py b/src/mlia/backend/executor/__init__.py new file mode 100644 index 0000000..3d60372 --- /dev/null +++ b/src/mlia/backend/executor/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Backend module.""" diff --git a/src/mlia/backend/executor/application.py b/src/mlia/backend/executor/application.py new file mode 100644 index 0000000..738ac4e --- /dev/null +++ b/src/mlia/backend/executor/application.py @@ -0,0 +1,170 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Application backend module.""" +from __future__ import annotations + +import re +from pathlib import Path +from typing import Any +from typing import cast +from typing import List + +from mlia.backend.executor.common import Backend +from mlia.backend.executor.common import ConfigurationException +from mlia.backend.executor.common import get_backend_configs +from mlia.backend.executor.common import get_backend_directories +from mlia.backend.executor.common import load_application_configs +from mlia.backend.executor.common import load_config +from mlia.backend.executor.common import remove_backend +from mlia.backend.executor.config import ApplicationConfig +from mlia.backend.executor.config import ExtendedApplicationConfig +from mlia.backend.executor.fs import get_backends_path +from mlia.backend.executor.source import create_destination_and_install +from mlia.backend.executor.source import get_source + + +def get_available_application_directory_names() -> list[str]: + """Return a list of directory names for all available applications.""" + return [entry.name for entry in get_backend_directories("applications")] + + +def get_available_applications() -> list[Application]: + """Return a list with all available applications.""" + available_applications = [] + for config_json in get_backend_configs("applications"): + config_entries = cast(List[ExtendedApplicationConfig], load_config(config_json)) + for config_entry in config_entries: + config_entry["config_location"] = config_json.parent.absolute() + applications = load_applications(config_entry) + available_applications += applications + + return sorted(available_applications, key=lambda application: application.name) + + +def get_application( + application_name: str, system_name: str | None = None +) -> list[Application]: + """Return a list of application instances with provided name.""" + return [ + application + for application in get_available_applications() + if application.name == application_name + and (not system_name or application.can_run_on(system_name)) + ] + + +def install_application(source_path: Path) -> None: + """Install application.""" + try: + source = get_source(source_path) + config = cast(List[ExtendedApplicationConfig], source.config()) + applications_to_install = [ + s for entry in config for s in load_applications(entry) + ] + except Exception as error: + raise ConfigurationException("Unable to read application definition") from error + + if not applications_to_install: + raise ConfigurationException("No application definition found") + + available_applications = get_available_applications() + already_installed = [ + s for s in applications_to_install if s in available_applications + ] + if already_installed: + names = {application.name for application in already_installed} + raise ConfigurationException( + f"Applications [{','.join(names)}] are already installed." + ) + + create_destination_and_install(source, get_backends_path("applications")) + + +def remove_application(directory_name: str) -> None: + """Remove application directory.""" + remove_backend(directory_name, "applications") + + +def get_unique_application_names(system_name: str | None = None) -> list[str]: + """Extract a list of unique application names of all application available.""" + return list( + { + application.name + for application in get_available_applications() + if not system_name or application.can_run_on(system_name) + } + ) + + +class Application(Backend): + """Class for representing a single application component.""" + + def __init__(self, config: ApplicationConfig) -> None: + """Construct a Application instance from a dict.""" + super().__init__(config) + + self.supported_systems = config.get("supported_systems", []) + + def __eq__(self, other: object) -> bool: + """Overload operator ==.""" + if not isinstance(other, Application): + return False + + return ( + super().__eq__(other) + and self.name == other.name + and set(self.supported_systems) == set(other.supported_systems) + ) + + def can_run_on(self, system_name: str) -> bool: + """Check if the application can run on the system passed as argument.""" + return system_name in self.supported_systems + + def get_details(self) -> dict[str, Any]: + """Return dictionary with information about the Application instance.""" + output = { + "type": "application", + "name": self.name, + "description": self.description, + "supported_systems": self.supported_systems, + "commands": self._get_command_details(), + } + + return output + + def remove_unused_params(self) -> None: + """Remove unused params in commands. + + After merging default and system related configuration application + could have parameters that are not being used in commands. They + should be removed. + """ + for command in self.commands.values(): + indexes_or_aliases = [ + m + for cmd_str in command.command_strings + for m in re.findall(r"{user_params:(?P<index_or_alias>\w+)}", cmd_str) + ] + + only_aliases = all(not item.isnumeric() for item in indexes_or_aliases) + if only_aliases: + used_params = [ + param + for param in command.params + if param.alias in indexes_or_aliases + ] + command.params = used_params + + +def load_applications(config: ExtendedApplicationConfig) -> list[Application]: + """Load application. + + Application configuration could contain different parameters/commands for different + supported systems. For each supported system this function will return separate + Application instance with appropriate configuration. + """ + configs = load_application_configs(config, ApplicationConfig) + applications = [Application(cfg) for cfg in configs] + for application in applications: + application.remove_unused_params() + return applications diff --git a/src/mlia/backend/executor/common.py b/src/mlia/backend/executor/common.py new file mode 100644 index 0000000..48dbd4a --- /dev/null +++ b/src/mlia/backend/executor/common.py @@ -0,0 +1,517 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Contain all common functions for the backends.""" +from __future__ import annotations + +import json +import logging +import re +from abc import ABC +from collections import Counter +from pathlib import Path +from typing import Any +from typing import Callable +from typing import cast +from typing import Final +from typing import IO +from typing import Iterable +from typing import Match +from typing import NamedTuple +from typing import Pattern + +from mlia.backend.executor.config import BackendConfig +from mlia.backend.executor.config import BaseBackendConfig +from mlia.backend.executor.config import NamedExecutionConfig +from mlia.backend.executor.config import UserParamConfig +from mlia.backend.executor.config import UserParamsConfig +from mlia.backend.executor.fs import get_backends_path +from mlia.backend.executor.fs import remove_resource +from mlia.backend.executor.fs import ResourceType + + +BACKEND_CONFIG_FILE: Final[str] = "backend-config.json" + + +class ConfigurationException(Exception): + """Configuration exception.""" + + +def get_backend_config(dir_path: Path) -> Path: + """Get path to backendir configuration file.""" + return dir_path / BACKEND_CONFIG_FILE + + +def get_backend_configs(resource_type: ResourceType) -> Iterable[Path]: + """Get path to the backend configs for provided resource_type.""" + return ( + get_backend_config(entry) for entry in get_backend_directories(resource_type) + ) + + +def get_backend_directories(resource_type: ResourceType) -> Iterable[Path]: + """Get path to the backend directories for provided resource_type.""" + return ( + entry + for entry in get_backends_path(resource_type).iterdir() + if is_backend_directory(entry) + ) + + +def is_backend_directory(dir_path: Path) -> bool: + """Check if path is backend's configuration directory.""" + return dir_path.is_dir() and get_backend_config(dir_path).is_file() + + +def remove_backend(directory_name: str, resource_type: ResourceType) -> None: + """Remove backend with provided type and directory_name.""" + if not directory_name: + raise Exception("No directory name provided") + + remove_resource(directory_name, resource_type) + + +def load_config(config: Path | IO[bytes] | None) -> BackendConfig: + """Return a loaded json file.""" + if config is None: + raise Exception("Unable to read config") + + if isinstance(config, Path): + with config.open() as json_file: + return cast(BackendConfig, json.load(json_file)) + + return cast(BackendConfig, json.load(config)) + + +def parse_raw_parameter(parameter: str) -> tuple[str, str | None]: + """Split the parameter string in name and optional value. + + It manages the following cases: + --param=1 -> --param, 1 + --param 1 -> --param, 1 + --flag -> --flag, None + """ + data = re.split(" |=", parameter) + if len(data) == 1: + param_name = data[0] + param_value = None + else: + param_name = " ".join(data[0:-1]) + param_value = data[-1] + return param_name, param_value + + +class DataPaths(NamedTuple): + """DataPaths class.""" + + src: Path + dst: str + + +class Backend(ABC): + """Backend class.""" + + # pylint: disable=too-many-instance-attributes + + def __init__(self, config: BaseBackendConfig): + """Initialize backend.""" + name = config.get("name") + if not name: + raise ConfigurationException("Name is empty") + + self.name = name + self.description = config.get("description", "") + self.config_location = config.get("config_location") + self.variables = config.get("variables", {}) + self.annotations = config.get("annotations", {}) + + self._parse_commands_and_params(config) + + def validate_parameter(self, command_name: str, parameter: str) -> bool: + """Validate the parameter string against the application configuration. + + We take the parameter string, extract the parameter name/value and + check them against the current configuration. + """ + param_name, param_value = parse_raw_parameter(parameter) + valid_param_name = valid_param_value = False + + command = self.commands.get(command_name) + if not command: + raise AttributeError(f"Unknown command: '{command_name}'") + + # Iterate over all available parameters until we have a match. + for param in command.params: + if self._same_parameter(param_name, param): + valid_param_name = True + # This is a non-empty list + if param.values: + # We check if the value is allowed in the configuration + valid_param_value = param_value in param.values + else: + # In this case we don't validate the value and accept + # whatever we have set. + valid_param_value = True + break + + return valid_param_name and valid_param_value + + def __eq__(self, other: object) -> bool: + """Overload operator ==.""" + if not isinstance(other, Backend): + return False + + return ( + self.name == other.name + and self.description == other.description + and self.commands == other.commands + ) + + def __repr__(self) -> str: + """Represent the Backend instance by its name.""" + return self.name + + def _parse_commands_and_params(self, config: BaseBackendConfig) -> None: + """Parse commands and user parameters.""" + self.commands: dict[str, Command] = {} + + commands = config.get("commands") + if commands: + params = config.get("user_params") + + for command_name in commands.keys(): + command_params = self._parse_params(params, command_name) + command_strings = [ + self._substitute_variables(cmd) + for cmd in commands.get(command_name, []) + ] + self.commands[command_name] = Command(command_strings, command_params) + + def _substitute_variables(self, str_val: str) -> str: + """Substitute variables in string. + + Variables is being substituted at backend's creation stage because + they could contain references to other params which will be + resolved later. + """ + if not str_val: + return str_val + + var_pattern: Final[Pattern] = re.compile(r"{variables:(?P<var_name>\w+)}") + + def var_value(match: Match) -> str: + var_name = match["var_name"] + if var_name not in self.variables: + raise ConfigurationException(f"Unknown variable {var_name}") + + return self.variables[var_name] + + return var_pattern.sub(var_value, str_val) + + @classmethod + def _parse_params( + cls, params: UserParamsConfig | None, command: str + ) -> list[Param]: + if not params: + return [] + + return [cls._parse_param(p) for p in params.get(command, [])] + + @classmethod + def _parse_param(cls, param: UserParamConfig) -> Param: + """Parse a single parameter.""" + name = param.get("name") + if name is not None and not name: + raise ConfigurationException("Parameter has an empty 'name' attribute.") + values = param.get("values", None) + default_value = param.get("default_value", None) + description = param.get("description", "") + alias = param.get("alias") + + return Param( + name=name, + description=description, + values=values, + default_value=default_value, + alias=alias, + ) + + def _get_command_details(self) -> dict: + command_details = { + command_name: command.get_details() + for command_name, command in self.commands.items() + } + return command_details + + def _get_user_param_value(self, user_params: list[str], param: Param) -> str | None: + """Get the user-specified value of a parameter.""" + for user_param in user_params: + user_param_name, user_param_value = parse_raw_parameter(user_param) + if user_param_name == param.name: + warn_message = ( + "The direct use of parameter name is deprecated" + " and might be removed in the future.\n" + f"Please use alias '{param.alias}' instead of " + "'{user_param_name}' to provide the parameter." + ) + logging.warning(warn_message) + + if self._same_parameter(user_param_name, param): + return user_param_value + + return None + + @staticmethod + def _same_parameter(user_param_name_or_alias: str, param: Param) -> bool: + """Compare user parameter name with param name or alias.""" + # Strip the "=" sign in the param_name. This is needed just for + # comparison with the parameters passed by the user. + # The equal sign needs to be honoured when re-building the + # parameter back. + param_name = None if not param.name else param.name.rstrip("=") + return user_param_name_or_alias in [param_name, param.alias] + + def resolved_parameters( + self, command_name: str, user_params: list[str] + ) -> list[tuple[str | None, Param]]: + """Return list of parameters with values.""" + result: list[tuple[str | None, Param]] = [] + command = self.commands.get(command_name) + if not command: + return result + + for param in command.params: + value = self._get_user_param_value(user_params, param) + if not value: + value = param.default_value + result.append((value, param)) + + return result + + def build_command( + self, + command_name: str, + user_params: list[str], + param_resolver: Callable[[str, str, list[tuple[str | None, Param]]], str], + ) -> list[str]: + """ + Return a list of executable command strings. + + Given a command and associated parameters, returns a list of executable command + strings. + """ + command = self.commands.get(command_name) + if not command: + raise ConfigurationException( + f"Command '{command_name}' could not be found." + ) + + commands_to_run = [] + + params_values = self.resolved_parameters(command_name, user_params) + for cmd_str in command.command_strings: + cmd_str = resolve_all_parameters( + cmd_str, param_resolver, command_name, params_values + ) + commands_to_run.append(cmd_str) + + return commands_to_run + + +class Param: + """Class for representing a generic application parameter.""" + + def __init__( # pylint: disable=too-many-arguments + self, + name: str | None, + description: str, + values: list[str] | None = None, + default_value: str | None = None, + alias: str | None = None, + ) -> None: + """Construct a Param instance.""" + if not name and not alias: + raise ConfigurationException( + "Either name, alias or both must be set to identify a parameter." + ) + self.name = name + self.values = values + self.description = description + self.default_value = default_value + self.alias = alias + + def get_details(self) -> dict: + """Return a dictionary with all relevant information of a Param.""" + return {key: value for key, value in self.__dict__.items() if value} + + def __eq__(self, other: object) -> bool: + """Overload operator ==.""" + if not isinstance(other, Param): + return False + + return ( + self.name == other.name + and self.values == other.values + and self.default_value == other.default_value + and self.description == other.description + ) + + +class Command: + """Class for representing a command.""" + + def __init__( + self, command_strings: list[str], params: list[Param] | None = None + ) -> None: + """Construct a Command instance.""" + self.command_strings = command_strings + + if params: + self.params = params + else: + self.params = [] + + self._validate() + + def _validate(self) -> None: + """Validate command.""" + if not self.params: + return + + aliases = [param.alias for param in self.params if param.alias is not None] + repeated_aliases = [ + alias for alias, count in Counter(aliases).items() if count > 1 + ] + + if repeated_aliases: + raise ConfigurationException( + f"Non-unique aliases {', '.join(repeated_aliases)}" + ) + + both_name_and_alias = [ + param.name + for param in self.params + if param.name in aliases and param.name != param.alias + ] + if both_name_and_alias: + raise ConfigurationException( + f"Aliases {', '.join(both_name_and_alias)} could not be used " + "as parameter name." + ) + + def get_details(self) -> dict: + """Return a dictionary with all relevant information of a Command.""" + output = { + "command_strings": self.command_strings, + "user_params": [param.get_details() for param in self.params], + } + return output + + def __eq__(self, other: object) -> bool: + """Overload operator ==.""" + if not isinstance(other, Command): + return False + + return ( + self.command_strings == other.command_strings + and self.params == other.params + ) + + +def resolve_all_parameters( + str_val: str, + param_resolver: Callable[[str, str, list[tuple[str | None, Param]]], str], + command_name: str | None = None, + params_values: list[tuple[str | None, Param]] | None = None, +) -> str: + """Resolve all parameters in the string.""" + if not str_val: + return str_val + + param_pattern: Final[Pattern] = re.compile(r"{(?P<param_name>[\w.:]+)}") + while param_pattern.findall(str_val): + str_val = param_pattern.sub( + lambda m: param_resolver( + m["param_name"], command_name or "", params_values or [] + ), + str_val, + ) + return str_val + + +def load_application_configs( + config: Any, + config_type: type[Any], + is_system_required: bool = True, +) -> Any: + """Get one config for each system supported by the application. + + The configuration could contain different parameters/commands for different + supported systems. For each supported system this function will return separate + config with appropriate configuration. + """ + merged_configs = [] + supported_systems: list[NamedExecutionConfig] | None = config.get( + "supported_systems" + ) + if not supported_systems: + if is_system_required: + raise ConfigurationException("No supported systems definition provided") + # Create an empty system to be used in the parsing below + supported_systems = [cast(NamedExecutionConfig, {})] + + default_user_params = config.get("user_params", {}) + + def merge_config(system: NamedExecutionConfig) -> Any: + system_name = system.get("name") + if not system_name and is_system_required: + raise ConfigurationException( + "Unable to read supported system definition, name is missed" + ) + + merged_config = config_type(**config) + merged_config["supported_systems"] = [system_name] if system_name else [] + # merge default configuration and specific to the system + merged_config["commands"] = { + **config.get("commands", {}), + **system.get("commands", {}), + } + + params = {} + tool_user_params = system.get("user_params", {}) + command_names = tool_user_params.keys() | default_user_params.keys() + for command_name in command_names: + if command_name not in merged_config["commands"]: + continue + + params_default = default_user_params.get(command_name, []) + params_tool = tool_user_params.get(command_name, []) + if not params_default or not params_tool: + params[command_name] = params_tool or params_default + if params_default and params_tool: + if any(not p.get("alias") for p in params_default): + raise ConfigurationException( + f"Default parameters for command {command_name} " + "should have aliases" + ) + if any(not p.get("alias") for p in params_tool): + raise ConfigurationException( + f"{system_name} parameters for command {command_name} " + "should have aliases." + ) + + merged_by_alias = { + **{p.get("alias"): p for p in params_default}, + **{p.get("alias"): p for p in params_tool}, + } + params[command_name] = list(merged_by_alias.values()) + + merged_config["user_params"] = params + merged_config["variables"] = { + **config.get("variables", {}), + **system.get("variables", {}), + } + return merged_config + + merged_configs = [merge_config(system) for system in supported_systems] + + return merged_configs diff --git a/src/mlia/backend/executor/config.py b/src/mlia/backend/executor/config.py new file mode 100644 index 0000000..dca53da --- /dev/null +++ b/src/mlia/backend/executor/config.py @@ -0,0 +1,68 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Contain definition of backend configuration.""" +from __future__ import annotations + +from pathlib import Path +from typing import Dict +from typing import List +from typing import TypedDict +from typing import Union + + +class UserParamConfig(TypedDict, total=False): + """User parameter configuration.""" + + name: str | None + default_value: str + values: list[str] + description: str + alias: str + + +UserParamsConfig = Dict[str, List[UserParamConfig]] + + +class ExecutionConfig(TypedDict, total=False): + """Execution configuration.""" + + commands: dict[str, list[str]] + user_params: UserParamsConfig + variables: dict[str, str] + + +class NamedExecutionConfig(ExecutionConfig): + """Execution configuration with name.""" + + name: str + + +class BaseBackendConfig(ExecutionConfig, total=False): + """Base backend configuration.""" + + name: str + description: str + config_location: Path + annotations: dict[str, str | list[str]] + + +class ApplicationConfig(BaseBackendConfig, total=False): + """Application configuration.""" + + supported_systems: list[str] + + +class ExtendedApplicationConfig(BaseBackendConfig, total=False): + """Extended application configuration.""" + + supported_systems: list[NamedExecutionConfig] + + +class SystemConfig(BaseBackendConfig, total=False): + """System configuration.""" + + reporting: dict[str, dict] + + +BackendItemConfig = Union[ApplicationConfig, SystemConfig] +BackendConfig = Union[List[ExtendedApplicationConfig], List[SystemConfig]] diff --git a/src/mlia/backend/executor/execution.py b/src/mlia/backend/executor/execution.py new file mode 100644 index 0000000..e253b16 --- /dev/null +++ b/src/mlia/backend/executor/execution.py @@ -0,0 +1,342 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Application execution module.""" +from __future__ import annotations + +import logging +import re +from typing import cast + +from mlia.backend.executor.application import Application +from mlia.backend.executor.application import get_application +from mlia.backend.executor.common import Backend +from mlia.backend.executor.common import ConfigurationException +from mlia.backend.executor.common import Param +from mlia.backend.executor.system import get_system +from mlia.backend.executor.system import System + +logger = logging.getLogger(__name__) + + +class AnotherInstanceIsRunningException(Exception): + """Concurrent execution error.""" + + +class ExecutionContext: # pylint: disable=too-few-public-methods + """Command execution context.""" + + def __init__( + self, + app: Application, + app_params: list[str], + system: System, + system_params: list[str], + ): + """Init execution context.""" + self.app = app + self.app_params = app_params + self.system = system + self.system_params = system_params + + self.param_resolver = ParamResolver(self) + + self.stdout: bytearray | None = None + self.stderr: bytearray | None = None + + +class ParamResolver: + """Parameter resolver.""" + + def __init__(self, context: ExecutionContext): + """Init parameter resolver.""" + self.ctx = context + + @staticmethod + def resolve_user_params( + cmd_name: str | None, + index_or_alias: str, + resolved_params: list[tuple[str | None, Param]] | None, + ) -> str: + """Resolve user params.""" + if not cmd_name or resolved_params is None: + raise ConfigurationException("Unable to resolve user params") + + param_value: str | None = None + param: Param | None = None + + if index_or_alias.isnumeric(): + i = int(index_or_alias) + if i not in range(len(resolved_params)): + raise ConfigurationException( + f"Invalid index {i} for user params of command {cmd_name}" + ) + param_value, param = resolved_params[i] + else: + for val, par in resolved_params: + if par.alias == index_or_alias: + param_value, param = val, par + break + + if param is None: + raise ConfigurationException( + f"No user parameter for command '{cmd_name}' with " + f"alias '{index_or_alias}'." + ) + + if param_value: + # We need to handle to cases of parameters here: + # 1) Optional parameters (non-positional with a name and value) + # 2) Positional parameters (value only, no name needed) + # Default to empty strings for positional arguments + param_name = "" + separator = "" + if param.name is not None: + # A valid param name means we have an optional/non-positional argument: + # The separator is an empty string in case the param_name + # has an equal sign as we have to honour it. + # If the parameter doesn't end with an equal sign then a + # space character is injected to split the parameter name + # and its value + param_name = param.name + separator = "" if param.name.endswith("=") else " " + + return f"{param_name}{separator}{param_value}" + + if param.name is None: + raise ConfigurationException( + f"Missing user parameter with alias '{index_or_alias}' for " + f"command '{cmd_name}'." + ) + + return param.name # flag: just return the parameter name + + def resolve_commands_and_params( + self, backend_type: str, cmd_name: str, return_params: bool, index_or_alias: str + ) -> str: + """Resolve command or command's param value.""" + if backend_type == "system": + backend = cast(Backend, self.ctx.system) + backend_params = self.ctx.system_params + else: # Application backend + backend = cast(Backend, self.ctx.app) + backend_params = self.ctx.app_params + + if cmd_name not in backend.commands: + raise ConfigurationException(f"Command {cmd_name} not found") + + if return_params: + params = backend.resolved_parameters(cmd_name, backend_params) + if index_or_alias.isnumeric(): + i = int(index_or_alias) + if i not in range(len(params)): + raise ConfigurationException( + f"Invalid parameter index {i} for command {cmd_name}" + ) + + param_value = params[i][0] + else: + param_value = None + for value, param in params: + if param.alias == index_or_alias: + param_value = value + break + + if not param_value: + raise ConfigurationException( + "No value for parameter with index or " + f"alias {index_or_alias} of command {cmd_name}." + ) + return param_value + + if not index_or_alias.isnumeric(): + raise ConfigurationException(f"Bad command index {index_or_alias}") + + i = int(index_or_alias) + commands = backend.build_command(cmd_name, backend_params, self.param_resolver) + if i not in range(len(commands)): + raise ConfigurationException(f"Invalid index {i} for command {cmd_name}") + + return commands[i] + + def resolve_variables(self, backend_type: str, var_name: str) -> str: + """Resolve variable value.""" + if backend_type == "system": + backend = cast(Backend, self.ctx.system) + else: # Application backend + backend = cast(Backend, self.ctx.app) + + if var_name not in backend.variables: + raise ConfigurationException(f"Unknown variable {var_name}") + + return backend.variables[var_name] + + def param_matcher( + self, + param_name: str, + cmd_name: str | None, + resolved_params: list[tuple[str | None, Param]] | None, + ) -> str: + """Regexp to resolve a param from the param_name.""" + # this pattern supports parameter names like "application.commands.run:0" and + # "system.commands.run.params:0" + # Note: 'software' is included for backward compatibility. + commands_and_params_match = re.match( + r"(?P<type>application|software|system)[.]commands[.]" + r"(?P<name>\w+)" + r"(?P<params>[.]params|)[:]" + r"(?P<index_or_alias>\w+)", + param_name, + ) + + if commands_and_params_match: + backend_type, cmd_name, return_params, index_or_alias = ( + commands_and_params_match["type"], + commands_and_params_match["name"], + commands_and_params_match["params"], + commands_and_params_match["index_or_alias"], + ) + return self.resolve_commands_and_params( + backend_type, cmd_name, bool(return_params), index_or_alias + ) + + # Note: 'software' is included for backward compatibility. + variables_match = re.match( + r"(?P<type>application|software|system)[.]variables:(?P<var_name>\w+)", + param_name, + ) + if variables_match: + backend_type, var_name = ( + variables_match["type"], + variables_match["var_name"], + ) + return self.resolve_variables(backend_type, var_name) + + user_params_match = re.match(r"user_params:(?P<index_or_alias>\w+)", param_name) + if user_params_match: + index_or_alias = user_params_match["index_or_alias"] + return self.resolve_user_params(cmd_name, index_or_alias, resolved_params) + + raise ConfigurationException(f"Unable to resolve parameter {param_name}") + + def param_resolver( + self, + param_name: str, + cmd_name: str | None = None, + resolved_params: list[tuple[str | None, Param]] | None = None, + ) -> str: + """Resolve parameter value based on current execution context.""" + # Note: 'software.*' is included for backward compatibility. + resolved_param = None + if param_name in ["application.name", "software.name"]: + resolved_param = self.ctx.app.name + elif param_name in ["application.description", "software.description"]: + resolved_param = self.ctx.app.description + elif self.ctx.app.config_location and ( + param_name in ["application.config_dir", "software.config_dir"] + ): + resolved_param = str(self.ctx.app.config_location.absolute()) + elif self.ctx.system is not None: + if param_name == "system.name": + resolved_param = self.ctx.system.name + elif param_name == "system.description": + resolved_param = self.ctx.system.description + elif param_name == "system.config_dir" and self.ctx.system.config_location: + resolved_param = str(self.ctx.system.config_location.absolute()) + + if not resolved_param: + resolved_param = self.param_matcher(param_name, cmd_name, resolved_params) + return resolved_param + + def __call__( + self, + param_name: str, + cmd_name: str | None = None, + resolved_params: list[tuple[str | None, Param]] | None = None, + ) -> str: + """Resolve provided parameter.""" + return self.param_resolver(param_name, cmd_name, resolved_params) + + +def validate_parameters( + backend: Backend, command_names: list[str], params: list[str] +) -> None: + """Check parameters passed to backend.""" + for param in params: + acceptable = any( + backend.validate_parameter(command_name, param) + for command_name in command_names + if command_name in backend.commands + ) + + if not acceptable: + backend_type = "System" if isinstance(backend, System) else "Application" + raise ValueError( + f"{backend_type} parameter '{param}' not valid for " + f"command '{' or '.join(command_names)}'." + ) + + +def get_application_by_name_and_system( + application_name: str, system_name: str +) -> Application: + """Get application.""" + applications = get_application(application_name, system_name) + if not applications: + raise ValueError( + f"Application '{application_name}' doesn't support the " + f"system '{system_name}'." + ) + + if len(applications) != 1: + raise ValueError( + f"Error during getting application {application_name} for the " + f"system {system_name}." + ) + + return applications[0] + + +def get_application_and_system( + application_name: str, system_name: str +) -> tuple[Application, System]: + """Return application and system by provided names.""" + system = get_system(system_name) + if not system: + raise ValueError(f"System {system_name} is not found.") + + application = get_application_by_name_and_system(application_name, system_name) + + return application, system + + +def run_application( + application_name: str, + application_params: list[str], + system_name: str, + system_params: list[str], +) -> ExecutionContext: + """Run application on the provided system.""" + application, system = get_application_and_system(application_name, system_name) + validate_parameters(application, ["run"], application_params) + validate_parameters(system, ["run"], system_params) + + ctx = ExecutionContext( + app=application, + app_params=application_params, + system=system, + system_params=system_params, + ) + + logger.debug("Generating commands to execute") + commands_to_run = ctx.system.build_command( + "run", ctx.system_params, ctx.param_resolver + ) + + for command in commands_to_run: + logger.debug("Running: %s", command) + exit_code, ctx.stdout, ctx.stderr = ctx.system.run(command) + + if exit_code != 0: + logger.warning("Application exited with exit code %i", exit_code) + + return ctx diff --git a/src/mlia/backend/executor/fs.py b/src/mlia/backend/executor/fs.py new file mode 100644 index 0000000..3fce19c --- /dev/null +++ b/src/mlia/backend/executor/fs.py @@ -0,0 +1,88 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Module to host all file system related functions.""" +from __future__ import annotations + +import re +import shutil +from pathlib import Path +from typing import Literal + +from mlia.utils.filesystem import get_mlia_resources + +ResourceType = Literal["applications", "systems"] + + +def get_backend_resources() -> Path: + """Get backend resources folder path.""" + return get_mlia_resources() / "backends" + + +def get_backends_path(name: ResourceType) -> Path: + """Return the absolute path of the specified resource. + + It uses importlib to return resources packaged with MANIFEST.in. + """ + if not name: + raise ResourceWarning("Resource name is not provided") + + resource_path = get_backend_resources() / name + if resource_path.is_dir(): + return resource_path + + raise ResourceWarning(f"Resource '{name}' not found.") + + +def copy_directory_content(source: Path, destination: Path) -> None: + """Copy content of the source directory into destination directory.""" + for item in source.iterdir(): + src = source / item.name + dest = destination / item.name + + if src.is_dir(): + shutil.copytree(src, dest) + else: + shutil.copy2(src, dest) + + +def remove_resource(resource_directory: str, resource_type: ResourceType) -> None: + """Remove resource data.""" + resources = get_backends_path(resource_type) + + resource_location = resources / resource_directory + if not resource_location.exists(): + raise Exception(f"Resource {resource_directory} does not exist") + + if not resource_location.is_dir(): + raise Exception(f"Wrong resource {resource_directory}") + + shutil.rmtree(resource_location) + + +def remove_directory(directory_path: Path | None) -> None: + """Remove directory.""" + if not directory_path or not directory_path.is_dir(): + raise Exception("No directory path provided") + + shutil.rmtree(directory_path) + + +def recreate_directory(directory_path: Path | None) -> None: + """Recreate directory.""" + if not directory_path: + raise Exception("No directory path provided") + + if directory_path.exists() and not directory_path.is_dir(): + raise Exception( + f"Path {str(directory_path)} does exist and it is not a directory." + ) + + if directory_path.is_dir(): + remove_directory(directory_path) + + directory_path.mkdir() + + +def valid_for_filename(value: str, replacement: str = "") -> str: + """Replace non alpha numeric characters.""" + return re.sub(r"[^\w.]", replacement, value, flags=re.ASCII) diff --git a/src/mlia/backend/executor/output_consumer.py b/src/mlia/backend/executor/output_consumer.py new file mode 100644 index 0000000..3c3b132 --- /dev/null +++ b/src/mlia/backend/executor/output_consumer.py @@ -0,0 +1,67 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Output consumers module.""" +from __future__ import annotations + +import base64 +import json +import re +from typing import Protocol +from typing import runtime_checkable + + +@runtime_checkable +class OutputConsumer(Protocol): + """Protocol to consume output.""" + + def feed(self, line: str) -> bool: + """ + Feed a new line to be parsed. + + Return True if the line should be removed from the output. + """ + + +class Base64OutputConsumer(OutputConsumer): + """ + Parser to extract base64-encoded JSON from tagged standard output. + + Example of the tagged output: + ``` + # Encoded JSON: {"test": 1234} + <metrics>eyJ0ZXN0IjogMTIzNH0</metrics> + ``` + """ + + TAG_NAME = "metrics" + + def __init__(self) -> None: + """Set up the regular expression to extract tagged strings.""" + self._regex = re.compile(rf"<{self.TAG_NAME}>(.*)</{self.TAG_NAME}>") + self.parsed_output: list = [] + + def feed(self, line: str) -> bool: + """ + Parse the output line and save the decoded output. + + Returns True if the line contains tagged output. + + Example: + Using the tagged output from the class docs the parser should collect + the following: + ``` + [ + {"test": 1234} + ] + ``` + """ + res_b64 = self._regex.search(line) + if res_b64: + res_json = base64.b64decode(res_b64.group(1), validate=True) + res = json.loads(res_json) + self.parsed_output.append(res) + # Remove this line from the output, i.e. consume it, as it + # does not contain any human readable content. + return True + + return False diff --git a/src/mlia/backend/executor/proc.py b/src/mlia/backend/executor/proc.py new file mode 100644 index 0000000..39a0689 --- /dev/null +++ b/src/mlia/backend/executor/proc.py @@ -0,0 +1,191 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Processes module. + +This module contains all classes and functions for dealing with Linux +processes. +""" +from __future__ import annotations + +import datetime +import logging +import shlex +import signal +import tempfile +import time +from pathlib import Path +from typing import Any + +from sh import Command +from sh import CommandNotFound +from sh import ErrorReturnCode +from sh import RunningCommand + +from mlia.backend.executor.fs import valid_for_filename + +logger = logging.getLogger(__name__) + + +class CommandFailedException(Exception): + """Exception for failed command execution.""" + + +class ShellCommand: + """Wrapper class for shell commands.""" + + def run( + self, + cmd: str, + *args: str, + _cwd: Path | None = None, + _tee: bool = True, + _bg: bool = True, + _out: Any = None, + _err: Any = None, + _search_paths: list[Path] | None = None, + ) -> RunningCommand: + """Run the shell command with the given arguments. + + There are special arguments that modify the behaviour of the process. + _cwd: current working directory + _tee: it redirects the stdout both to console and file + _bg: if True, it runs the process in background and the command is not + blocking. + _out: use this object for stdout redirect, + _err: use this object for stderr redirect, + _search_paths: If presented used for searching executable + """ + try: + kwargs = {} + if _cwd: + kwargs["_cwd"] = str(_cwd) + command = Command(cmd, _search_paths).bake(args, **kwargs) + except CommandNotFound as error: + logging.error("Command '%s' not found", error.args[0]) + raise error + + out, err = _out, _err + if not _out and not _err: + out, err = (str(item) for item in self.get_stdout_stderr_paths(cmd)) + + return command(_out=out, _err=err, _tee=_tee, _bg=_bg, _bg_exc=False) + + @classmethod + def get_stdout_stderr_paths(cls, cmd: str) -> tuple[Path, Path]: + """Construct and returns the paths of stdout/stderr files.""" + timestamp = datetime.datetime.now().timestamp() + base_path = Path(tempfile.mkdtemp(prefix="mlia-", suffix=f"{timestamp}")) + base = base_path / f"{valid_for_filename(cmd, '_')}_{timestamp}" + stdout = base.with_suffix(".out") + stderr = base.with_suffix(".err") + try: + stdout.touch() + stderr.touch() + except FileNotFoundError as error: + logging.error("File not found: %s", error.filename) + raise error + return stdout, stderr + + +def parse_command(command: str, shell: str = "bash") -> list[str]: + """Parse command.""" + cmd, *args = shlex.split(command, posix=True) + + if is_shell_script(cmd): + args = [cmd] + args + cmd = shell + + return [cmd] + args + + +def execute_command( # pylint: disable=invalid-name + command: str, + cwd: Path, + bg: bool = False, + shell: str = "bash", + out: Any = None, + err: Any = None, +) -> RunningCommand: + """Execute shell command.""" + cmd, *args = parse_command(command, shell) + + search_paths = None + if cmd != shell and (cwd / cmd).is_file(): + search_paths = [cwd] + + return ShellCommand().run( + cmd, *args, _cwd=cwd, _bg=bg, _search_paths=search_paths, _out=out, _err=err + ) + + +def is_shell_script(cmd: str) -> bool: + """Check if command is shell script.""" + return cmd.endswith(".sh") + + +def run_and_wait( + command: str, + cwd: Path, + terminate_on_error: bool = False, + out: Any = None, + err: Any = None, +) -> tuple[int, bytearray, bytearray]: + """ + Run command and wait while it is executing. + + Returns a tuple: (exit_code, stdout, stderr) + """ + running_cmd: RunningCommand | None = None + try: + running_cmd = execute_command(command, cwd, bg=True, out=out, err=err) + return running_cmd.exit_code, running_cmd.stdout, running_cmd.stderr + except ErrorReturnCode as cmd_failed: + raise CommandFailedException() from cmd_failed + except Exception as error: + is_running = running_cmd is not None and running_cmd.is_alive() + if terminate_on_error and is_running: + logger.debug("Terminating ...") + terminate_command(running_cmd) + + raise error + + +def terminate_command( + running_cmd: RunningCommand, + wait: bool = True, + wait_period: float = 0.5, + number_of_attempts: int = 20, +) -> None: + """Terminate running command.""" + try: + running_cmd.process.signal_group(signal.SIGINT) + if wait: + for _ in range(number_of_attempts): + time.sleep(wait_period) + if not running_cmd.is_alive(): + return + logger.error( + "Unable to terminate process %i. Sending SIGTERM...", + running_cmd.process.pid, + ) + running_cmd.process.signal_group(signal.SIGTERM) + except ProcessLookupError: + pass + + +def print_command_stdout(command: RunningCommand) -> None: + """Print the stdout of a command. + + The command has 2 states: running and done. + If the command is running, the output is taken by the running process. + If the command has ended its execution, the stdout is taken from stdout + property + """ + if command.is_alive(): + while True: + try: + print(command.next(), end="") + except StopIteration: + break + else: + print(command.stdout) diff --git a/src/mlia/backend/executor/runner.py b/src/mlia/backend/executor/runner.py new file mode 100644 index 0000000..2330fd9 --- /dev/null +++ b/src/mlia/backend/executor/runner.py @@ -0,0 +1,98 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Module for backend runner.""" +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + +from mlia.backend.executor.application import get_available_applications +from mlia.backend.executor.application import install_application +from mlia.backend.executor.execution import ExecutionContext +from mlia.backend.executor.execution import run_application +from mlia.backend.executor.system import get_available_systems +from mlia.backend.executor.system import install_system + + +@dataclass +class ExecutionParams: + """Application execution params.""" + + application: str + system: str + application_params: list[str] + system_params: list[str] + + +class BackendRunner: + """Backend runner.""" + + def __init__(self) -> None: + """Init BackendRunner instance.""" + + @staticmethod + def get_installed_systems() -> list[str]: + """Get list of the installed systems.""" + return [system.name for system in get_available_systems()] + + @staticmethod + def get_installed_applications(system: str | None = None) -> list[str]: + """Get list of the installed application.""" + return [ + app.name + for app in get_available_applications() + if system is None or app.can_run_on(system) + ] + + def is_application_installed(self, application: str, system: str) -> bool: + """Return true if requested application installed.""" + return application in self.get_installed_applications(system) + + def is_system_installed(self, system: str) -> bool: + """Return true if requested system installed.""" + return system in self.get_installed_systems() + + def systems_installed(self, systems: list[str]) -> bool: + """Check if all provided systems are installed.""" + if not systems: + return False + + installed_systems = self.get_installed_systems() + return all(system in installed_systems for system in systems) + + def applications_installed(self, applications: list[str]) -> bool: + """Check if all provided applications are installed.""" + if not applications: + return False + + installed_apps = self.get_installed_applications() + return all(app in installed_apps for app in applications) + + def all_installed(self, systems: list[str], apps: list[str]) -> bool: + """Check if all provided artifacts are installed.""" + return self.systems_installed(systems) and self.applications_installed(apps) + + @staticmethod + def install_system(system_path: Path) -> None: + """Install system.""" + install_system(system_path) + + @staticmethod + def install_application(app_path: Path) -> None: + """Install application.""" + install_application(app_path) + + @staticmethod + def run_application(execution_params: ExecutionParams) -> ExecutionContext: + """Run requested application.""" + ctx = run_application( + execution_params.application, + execution_params.application_params, + execution_params.system, + execution_params.system_params, + ) + return ctx + + @staticmethod + def _params(name: str, params: list[str]) -> list[str]: + return [p for item in [(name, param) for param in params] for p in item] diff --git a/src/mlia/backend/executor/source.py b/src/mlia/backend/executor/source.py new file mode 100644 index 0000000..6abc49f --- /dev/null +++ b/src/mlia/backend/executor/source.py @@ -0,0 +1,207 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Contain source related classes and functions.""" +from __future__ import annotations + +import os +import shutil +import tarfile +from abc import ABC +from abc import abstractmethod +from pathlib import Path +from tarfile import TarFile + +from mlia.backend.executor.common import BACKEND_CONFIG_FILE +from mlia.backend.executor.common import ConfigurationException +from mlia.backend.executor.common import get_backend_config +from mlia.backend.executor.common import is_backend_directory +from mlia.backend.executor.common import load_config +from mlia.backend.executor.config import BackendConfig +from mlia.backend.executor.fs import copy_directory_content + + +class Source(ABC): + """Source class.""" + + @abstractmethod + def name(self) -> str | None: + """Get source name.""" + + @abstractmethod + def config(self) -> BackendConfig | None: + """Get configuration file content.""" + + @abstractmethod + def install_into(self, destination: Path) -> None: + """Install source into destination directory.""" + + @abstractmethod + def create_destination(self) -> bool: + """Return True if destination folder should be created before installation.""" + + +class DirectorySource(Source): + """DirectorySource class.""" + + def __init__(self, directory_path: Path) -> None: + """Create the DirectorySource instance.""" + assert isinstance(directory_path, Path) + self.directory_path = directory_path + + def name(self) -> str: + """Return name of source.""" + return self.directory_path.name + + def config(self) -> BackendConfig | None: + """Return configuration file content.""" + if not is_backend_directory(self.directory_path): + raise ConfigurationException("No configuration file found") + + config_file = get_backend_config(self.directory_path) + return load_config(config_file) + + def install_into(self, destination: Path) -> None: + """Install source into destination directory.""" + if not destination.is_dir(): + raise ConfigurationException(f"Wrong destination {destination}.") + + if not self.directory_path.is_dir(): + raise ConfigurationException( + f"Directory {self.directory_path} does not exist." + ) + + copy_directory_content(self.directory_path, destination) + + def create_destination(self) -> bool: + """Return True if destination folder should be created before installation.""" + return True + + +class TarArchiveSource(Source): + """TarArchiveSource class.""" + + def __init__(self, archive_path: Path) -> None: + """Create the TarArchiveSource class.""" + assert isinstance(archive_path, Path) + self.archive_path = archive_path + self._config: BackendConfig | None = None + self._has_top_level_folder: bool | None = None + self._name: str | None = None + + def _read_archive_content(self) -> None: + """Read various information about archive.""" + # get source name from archive name (everything without extensions) + extensions = "".join(self.archive_path.suffixes) + self._name = self.archive_path.name.rstrip(extensions) + + if not self.archive_path.exists(): + return + + with self._open(self.archive_path) as archive: + try: + config_entry = archive.getmember(BACKEND_CONFIG_FILE) + self._has_top_level_folder = False + except KeyError as error_no_config: + try: + archive_entries = archive.getnames() + entries_common_prefix = os.path.commonprefix(archive_entries) + top_level_dir = entries_common_prefix.rstrip("/") + + if not top_level_dir: + raise RuntimeError( + "Archive has no top level directory" + ) from error_no_config + + config_path = f"{top_level_dir}/{BACKEND_CONFIG_FILE}" + + config_entry = archive.getmember(config_path) + self._has_top_level_folder = True + self._name = top_level_dir + except (KeyError, RuntimeError) as error_no_root_dir_or_config: + raise ConfigurationException( + "No configuration file found" + ) from error_no_root_dir_or_config + + content = archive.extractfile(config_entry) + self._config = load_config(content) + + def config(self) -> BackendConfig | None: + """Return configuration file content.""" + if self._config is None: + self._read_archive_content() + + return self._config + + def name(self) -> str | None: + """Return name of the source.""" + if self._name is None: + self._read_archive_content() + + return self._name + + def create_destination(self) -> bool: + """Return True if destination folder must be created before installation.""" + if self._has_top_level_folder is None: + self._read_archive_content() + + return not self._has_top_level_folder + + def install_into(self, destination: Path) -> None: + """Install source into destination directory.""" + if not destination.is_dir(): + raise ConfigurationException(f"Wrong destination {destination}.") + + with self._open(self.archive_path) as archive: + archive.extractall(destination) + + def _open(self, archive_path: Path) -> TarFile: + """Open archive file.""" + if not archive_path.is_file(): + raise ConfigurationException(f"File {archive_path} does not exist.") + + if archive_path.name.endswith("tar.gz") or archive_path.name.endswith("tgz"): + mode = "r:gz" + else: + raise ConfigurationException(f"Unsupported archive type {archive_path}.") + + # The returned TarFile object can be used as a context manager (using + # 'with') by the calling instance. + return tarfile.open( # pylint: disable=consider-using-with + self.archive_path, mode=mode + ) + + +def get_source(source_path: Path) -> TarArchiveSource | DirectorySource: + """Return appropriate source instance based on provided source path.""" + if source_path.is_file(): + return TarArchiveSource(source_path) + + if source_path.is_dir(): + return DirectorySource(source_path) + + raise ConfigurationException(f"Unable to read {source_path}.") + + +def create_destination_and_install(source: Source, resource_path: Path) -> None: + """Create destination directory and install source. + + This function is used for actual installation of system/backend New + directory will be created inside :resource_path: if needed If for example + archive contains top level folder then no need to create new directory + """ + destination = resource_path + create_destination = source.create_destination() + + if create_destination: + name = source.name() + if not name: + raise ConfigurationException("Unable to get source name.") + + destination = resource_path / name + destination.mkdir() + try: + source.install_into(destination) + except Exception as error: + if create_destination: + shutil.rmtree(destination) + raise error diff --git a/src/mlia/backend/executor/system.py b/src/mlia/backend/executor/system.py new file mode 100644 index 0000000..a5ecf19 --- /dev/null +++ b/src/mlia/backend/executor/system.py @@ -0,0 +1,178 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""System backend module.""" +from __future__ import annotations + +from pathlib import Path +from typing import Any +from typing import cast +from typing import List + +from mlia.backend.executor.common import Backend +from mlia.backend.executor.common import ConfigurationException +from mlia.backend.executor.common import get_backend_configs +from mlia.backend.executor.common import get_backend_directories +from mlia.backend.executor.common import load_config +from mlia.backend.executor.common import remove_backend +from mlia.backend.executor.config import SystemConfig +from mlia.backend.executor.fs import get_backends_path +from mlia.backend.executor.proc import run_and_wait +from mlia.backend.executor.source import create_destination_and_install +from mlia.backend.executor.source import get_source + + +class System(Backend): + """System class.""" + + def __init__(self, config: SystemConfig) -> None: + """Construct the System class using the dictionary passed.""" + super().__init__(config) + + self._setup_reporting(config) + + def _setup_reporting(self, config: SystemConfig) -> None: + self.reporting = config.get("reporting") + + def run(self, command: str) -> tuple[int, bytearray, bytearray]: + """ + Run command on the system. + + Returns a tuple: (exit_code, stdout, stderr) + """ + cwd = self.config_location + if not isinstance(cwd, Path) or not cwd.is_dir(): + raise ConfigurationException( + f"System has invalid config location: {cwd}", + ) + + stdout = bytearray() + stderr = bytearray() + + return run_and_wait( + command, + cwd=cwd, + terminate_on_error=True, + out=stdout, + err=stderr, + ) + + def __eq__(self, other: object) -> bool: + """Overload operator ==.""" + if not isinstance(other, System): + return False + + return super().__eq__(other) and self.name == other.name + + def get_details(self) -> dict[str, Any]: + """Return a dictionary with all relevant information of a System.""" + output = { + "type": "system", + "name": self.name, + "description": self.description, + "commands": self._get_command_details(), + "annotations": self.annotations, + } + + return output + + +def get_available_systems_directory_names() -> list[str]: + """Return a list of directory names for all avialable systems.""" + return [entry.name for entry in get_backend_directories("systems")] + + +def get_available_systems() -> list[System]: + """Return a list with all available systems.""" + available_systems = [] + for config_json in get_backend_configs("systems"): + config_entries = cast(List[SystemConfig], (load_config(config_json))) + for config_entry in config_entries: + config_entry["config_location"] = config_json.parent.absolute() + system = load_system(config_entry) + available_systems.append(system) + + return sorted(available_systems, key=lambda system: system.name) + + +def get_system(system_name: str) -> System: + """Return a system instance with the same name passed as argument.""" + available_systems = get_available_systems() + for system in available_systems: + if system_name == system.name: + return system + raise ConfigurationException(f"System '{system_name}' not found.") + + +def install_system(source_path: Path) -> None: + """Install new system.""" + try: + source = get_source(source_path) + config = cast(List[SystemConfig], source.config()) + systems_to_install = [load_system(entry) for entry in config] + except Exception as error: + raise ConfigurationException("Unable to read system definition") from error + + if not systems_to_install: + raise ConfigurationException("No system definition found") + + available_systems = get_available_systems() + already_installed = [s for s in systems_to_install if s in available_systems] + if already_installed: + names = [system.name for system in already_installed] + raise ConfigurationException( + f"Systems [{','.join(names)}] are already installed." + ) + + create_destination_and_install(source, get_backends_path("systems")) + + +def remove_system(directory_name: str) -> None: + """Remove system.""" + remove_backend(directory_name, "systems") + + +def load_system(config: SystemConfig) -> System: + """Load system based on it's execution type.""" + populate_shared_params(config) + + return System(config) + + +def populate_shared_params(config: SystemConfig) -> None: + """Populate command parameters with shared parameters.""" + user_params = config.get("user_params") + if not user_params or "shared" not in user_params: + return + + shared_user_params = user_params["shared"] + if not shared_user_params: + return + + only_aliases = all(p.get("alias") for p in shared_user_params) + if not only_aliases: + raise ConfigurationException("All shared parameters should have aliases") + + commands = config.get("commands", {}) + for cmd_name in ["run"]: + command = commands.get(cmd_name) + if command is None: + commands[cmd_name] = [] + cmd_user_params = user_params.get(cmd_name) + if not cmd_user_params: + cmd_user_params = shared_user_params + else: + only_aliases = all(p.get("alias") for p in cmd_user_params) + if not only_aliases: + raise ConfigurationException( + f"All parameters for command {cmd_name} should have aliases." + ) + merged_by_alias = { + **{p.get("alias"): p for p in shared_user_params}, + **{p.get("alias"): p for p in cmd_user_params}, + } + cmd_user_params = list(merged_by_alias.values()) + + user_params[cmd_name] = cmd_user_params + + config["commands"] = commands + del user_params["shared"] |