aboutsummaryrefslogtreecommitdiff
path: root/src/mlia/backend/executor
diff options
context:
space:
mode:
authorDmitrii Agibov <dmitrii.agibov@arm.com>2022-11-18 16:34:03 +0000
committerDmitrii Agibov <dmitrii.agibov@arm.com>2022-11-29 14:44:13 +0000
commit37959522a805a5e23c930ed79aac84920c3cb208 (patch)
tree484af1240a93c955a72ce2e452432383b6704b56 /src/mlia/backend/executor
parent5568f9f000d673ac53e710dcc8991fec6e8a5488 (diff)
downloadmlia-37959522a805a5e23c930ed79aac84920c3cb208.tar.gz
Move backends functionality into separate modules
- Move backend management/executor code into module backend_core - Create separate module for each backend in "backend" module - Move each backend into corresponding module - Split Vela wrapper into several submodules Change-Id: If01b6774aab6501951212541cc5d7f5aa7c97e95
Diffstat (limited to 'src/mlia/backend/executor')
-rw-r--r--src/mlia/backend/executor/__init__.py3
-rw-r--r--src/mlia/backend/executor/application.py170
-rw-r--r--src/mlia/backend/executor/common.py517
-rw-r--r--src/mlia/backend/executor/config.py68
-rw-r--r--src/mlia/backend/executor/execution.py342
-rw-r--r--src/mlia/backend/executor/fs.py88
-rw-r--r--src/mlia/backend/executor/output_consumer.py67
-rw-r--r--src/mlia/backend/executor/proc.py191
-rw-r--r--src/mlia/backend/executor/runner.py98
-rw-r--r--src/mlia/backend/executor/source.py207
-rw-r--r--src/mlia/backend/executor/system.py178
11 files changed, 1929 insertions, 0 deletions
diff --git a/src/mlia/backend/executor/__init__.py b/src/mlia/backend/executor/__init__.py
new file mode 100644
index 0000000..3d60372
--- /dev/null
+++ b/src/mlia/backend/executor/__init__.py
@@ -0,0 +1,3 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Backend module."""
diff --git a/src/mlia/backend/executor/application.py b/src/mlia/backend/executor/application.py
new file mode 100644
index 0000000..738ac4e
--- /dev/null
+++ b/src/mlia/backend/executor/application.py
@@ -0,0 +1,170 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Application backend module."""
+from __future__ import annotations
+
+import re
+from pathlib import Path
+from typing import Any
+from typing import cast
+from typing import List
+
+from mlia.backend.executor.common import Backend
+from mlia.backend.executor.common import ConfigurationException
+from mlia.backend.executor.common import get_backend_configs
+from mlia.backend.executor.common import get_backend_directories
+from mlia.backend.executor.common import load_application_configs
+from mlia.backend.executor.common import load_config
+from mlia.backend.executor.common import remove_backend
+from mlia.backend.executor.config import ApplicationConfig
+from mlia.backend.executor.config import ExtendedApplicationConfig
+from mlia.backend.executor.fs import get_backends_path
+from mlia.backend.executor.source import create_destination_and_install
+from mlia.backend.executor.source import get_source
+
+
+def get_available_application_directory_names() -> list[str]:
+ """Return a list of directory names for all available applications."""
+ return [entry.name for entry in get_backend_directories("applications")]
+
+
+def get_available_applications() -> list[Application]:
+ """Return a list with all available applications."""
+ available_applications = []
+ for config_json in get_backend_configs("applications"):
+ config_entries = cast(List[ExtendedApplicationConfig], load_config(config_json))
+ for config_entry in config_entries:
+ config_entry["config_location"] = config_json.parent.absolute()
+ applications = load_applications(config_entry)
+ available_applications += applications
+
+ return sorted(available_applications, key=lambda application: application.name)
+
+
+def get_application(
+ application_name: str, system_name: str | None = None
+) -> list[Application]:
+ """Return a list of application instances with provided name."""
+ return [
+ application
+ for application in get_available_applications()
+ if application.name == application_name
+ and (not system_name or application.can_run_on(system_name))
+ ]
+
+
+def install_application(source_path: Path) -> None:
+ """Install application."""
+ try:
+ source = get_source(source_path)
+ config = cast(List[ExtendedApplicationConfig], source.config())
+ applications_to_install = [
+ s for entry in config for s in load_applications(entry)
+ ]
+ except Exception as error:
+ raise ConfigurationException("Unable to read application definition") from error
+
+ if not applications_to_install:
+ raise ConfigurationException("No application definition found")
+
+ available_applications = get_available_applications()
+ already_installed = [
+ s for s in applications_to_install if s in available_applications
+ ]
+ if already_installed:
+ names = {application.name for application in already_installed}
+ raise ConfigurationException(
+ f"Applications [{','.join(names)}] are already installed."
+ )
+
+ create_destination_and_install(source, get_backends_path("applications"))
+
+
+def remove_application(directory_name: str) -> None:
+ """Remove application directory."""
+ remove_backend(directory_name, "applications")
+
+
+def get_unique_application_names(system_name: str | None = None) -> list[str]:
+ """Extract a list of unique application names of all application available."""
+ return list(
+ {
+ application.name
+ for application in get_available_applications()
+ if not system_name or application.can_run_on(system_name)
+ }
+ )
+
+
+class Application(Backend):
+ """Class for representing a single application component."""
+
+ def __init__(self, config: ApplicationConfig) -> None:
+ """Construct a Application instance from a dict."""
+ super().__init__(config)
+
+ self.supported_systems = config.get("supported_systems", [])
+
+ def __eq__(self, other: object) -> bool:
+ """Overload operator ==."""
+ if not isinstance(other, Application):
+ return False
+
+ return (
+ super().__eq__(other)
+ and self.name == other.name
+ and set(self.supported_systems) == set(other.supported_systems)
+ )
+
+ def can_run_on(self, system_name: str) -> bool:
+ """Check if the application can run on the system passed as argument."""
+ return system_name in self.supported_systems
+
+ def get_details(self) -> dict[str, Any]:
+ """Return dictionary with information about the Application instance."""
+ output = {
+ "type": "application",
+ "name": self.name,
+ "description": self.description,
+ "supported_systems": self.supported_systems,
+ "commands": self._get_command_details(),
+ }
+
+ return output
+
+ def remove_unused_params(self) -> None:
+ """Remove unused params in commands.
+
+ After merging default and system related configuration application
+ could have parameters that are not being used in commands. They
+ should be removed.
+ """
+ for command in self.commands.values():
+ indexes_or_aliases = [
+ m
+ for cmd_str in command.command_strings
+ for m in re.findall(r"{user_params:(?P<index_or_alias>\w+)}", cmd_str)
+ ]
+
+ only_aliases = all(not item.isnumeric() for item in indexes_or_aliases)
+ if only_aliases:
+ used_params = [
+ param
+ for param in command.params
+ if param.alias in indexes_or_aliases
+ ]
+ command.params = used_params
+
+
+def load_applications(config: ExtendedApplicationConfig) -> list[Application]:
+ """Load application.
+
+ Application configuration could contain different parameters/commands for different
+ supported systems. For each supported system this function will return separate
+ Application instance with appropriate configuration.
+ """
+ configs = load_application_configs(config, ApplicationConfig)
+ applications = [Application(cfg) for cfg in configs]
+ for application in applications:
+ application.remove_unused_params()
+ return applications
diff --git a/src/mlia/backend/executor/common.py b/src/mlia/backend/executor/common.py
new file mode 100644
index 0000000..48dbd4a
--- /dev/null
+++ b/src/mlia/backend/executor/common.py
@@ -0,0 +1,517 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Contain all common functions for the backends."""
+from __future__ import annotations
+
+import json
+import logging
+import re
+from abc import ABC
+from collections import Counter
+from pathlib import Path
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Final
+from typing import IO
+from typing import Iterable
+from typing import Match
+from typing import NamedTuple
+from typing import Pattern
+
+from mlia.backend.executor.config import BackendConfig
+from mlia.backend.executor.config import BaseBackendConfig
+from mlia.backend.executor.config import NamedExecutionConfig
+from mlia.backend.executor.config import UserParamConfig
+from mlia.backend.executor.config import UserParamsConfig
+from mlia.backend.executor.fs import get_backends_path
+from mlia.backend.executor.fs import remove_resource
+from mlia.backend.executor.fs import ResourceType
+
+
+BACKEND_CONFIG_FILE: Final[str] = "backend-config.json"
+
+
+class ConfigurationException(Exception):
+ """Configuration exception."""
+
+
+def get_backend_config(dir_path: Path) -> Path:
+ """Get path to backendir configuration file."""
+ return dir_path / BACKEND_CONFIG_FILE
+
+
+def get_backend_configs(resource_type: ResourceType) -> Iterable[Path]:
+ """Get path to the backend configs for provided resource_type."""
+ return (
+ get_backend_config(entry) for entry in get_backend_directories(resource_type)
+ )
+
+
+def get_backend_directories(resource_type: ResourceType) -> Iterable[Path]:
+ """Get path to the backend directories for provided resource_type."""
+ return (
+ entry
+ for entry in get_backends_path(resource_type).iterdir()
+ if is_backend_directory(entry)
+ )
+
+
+def is_backend_directory(dir_path: Path) -> bool:
+ """Check if path is backend's configuration directory."""
+ return dir_path.is_dir() and get_backend_config(dir_path).is_file()
+
+
+def remove_backend(directory_name: str, resource_type: ResourceType) -> None:
+ """Remove backend with provided type and directory_name."""
+ if not directory_name:
+ raise Exception("No directory name provided")
+
+ remove_resource(directory_name, resource_type)
+
+
+def load_config(config: Path | IO[bytes] | None) -> BackendConfig:
+ """Return a loaded json file."""
+ if config is None:
+ raise Exception("Unable to read config")
+
+ if isinstance(config, Path):
+ with config.open() as json_file:
+ return cast(BackendConfig, json.load(json_file))
+
+ return cast(BackendConfig, json.load(config))
+
+
+def parse_raw_parameter(parameter: str) -> tuple[str, str | None]:
+ """Split the parameter string in name and optional value.
+
+ It manages the following cases:
+ --param=1 -> --param, 1
+ --param 1 -> --param, 1
+ --flag -> --flag, None
+ """
+ data = re.split(" |=", parameter)
+ if len(data) == 1:
+ param_name = data[0]
+ param_value = None
+ else:
+ param_name = " ".join(data[0:-1])
+ param_value = data[-1]
+ return param_name, param_value
+
+
+class DataPaths(NamedTuple):
+ """DataPaths class."""
+
+ src: Path
+ dst: str
+
+
+class Backend(ABC):
+ """Backend class."""
+
+ # pylint: disable=too-many-instance-attributes
+
+ def __init__(self, config: BaseBackendConfig):
+ """Initialize backend."""
+ name = config.get("name")
+ if not name:
+ raise ConfigurationException("Name is empty")
+
+ self.name = name
+ self.description = config.get("description", "")
+ self.config_location = config.get("config_location")
+ self.variables = config.get("variables", {})
+ self.annotations = config.get("annotations", {})
+
+ self._parse_commands_and_params(config)
+
+ def validate_parameter(self, command_name: str, parameter: str) -> bool:
+ """Validate the parameter string against the application configuration.
+
+ We take the parameter string, extract the parameter name/value and
+ check them against the current configuration.
+ """
+ param_name, param_value = parse_raw_parameter(parameter)
+ valid_param_name = valid_param_value = False
+
+ command = self.commands.get(command_name)
+ if not command:
+ raise AttributeError(f"Unknown command: '{command_name}'")
+
+ # Iterate over all available parameters until we have a match.
+ for param in command.params:
+ if self._same_parameter(param_name, param):
+ valid_param_name = True
+ # This is a non-empty list
+ if param.values:
+ # We check if the value is allowed in the configuration
+ valid_param_value = param_value in param.values
+ else:
+ # In this case we don't validate the value and accept
+ # whatever we have set.
+ valid_param_value = True
+ break
+
+ return valid_param_name and valid_param_value
+
+ def __eq__(self, other: object) -> bool:
+ """Overload operator ==."""
+ if not isinstance(other, Backend):
+ return False
+
+ return (
+ self.name == other.name
+ and self.description == other.description
+ and self.commands == other.commands
+ )
+
+ def __repr__(self) -> str:
+ """Represent the Backend instance by its name."""
+ return self.name
+
+ def _parse_commands_and_params(self, config: BaseBackendConfig) -> None:
+ """Parse commands and user parameters."""
+ self.commands: dict[str, Command] = {}
+
+ commands = config.get("commands")
+ if commands:
+ params = config.get("user_params")
+
+ for command_name in commands.keys():
+ command_params = self._parse_params(params, command_name)
+ command_strings = [
+ self._substitute_variables(cmd)
+ for cmd in commands.get(command_name, [])
+ ]
+ self.commands[command_name] = Command(command_strings, command_params)
+
+ def _substitute_variables(self, str_val: str) -> str:
+ """Substitute variables in string.
+
+ Variables is being substituted at backend's creation stage because
+ they could contain references to other params which will be
+ resolved later.
+ """
+ if not str_val:
+ return str_val
+
+ var_pattern: Final[Pattern] = re.compile(r"{variables:(?P<var_name>\w+)}")
+
+ def var_value(match: Match) -> str:
+ var_name = match["var_name"]
+ if var_name not in self.variables:
+ raise ConfigurationException(f"Unknown variable {var_name}")
+
+ return self.variables[var_name]
+
+ return var_pattern.sub(var_value, str_val)
+
+ @classmethod
+ def _parse_params(
+ cls, params: UserParamsConfig | None, command: str
+ ) -> list[Param]:
+ if not params:
+ return []
+
+ return [cls._parse_param(p) for p in params.get(command, [])]
+
+ @classmethod
+ def _parse_param(cls, param: UserParamConfig) -> Param:
+ """Parse a single parameter."""
+ name = param.get("name")
+ if name is not None and not name:
+ raise ConfigurationException("Parameter has an empty 'name' attribute.")
+ values = param.get("values", None)
+ default_value = param.get("default_value", None)
+ description = param.get("description", "")
+ alias = param.get("alias")
+
+ return Param(
+ name=name,
+ description=description,
+ values=values,
+ default_value=default_value,
+ alias=alias,
+ )
+
+ def _get_command_details(self) -> dict:
+ command_details = {
+ command_name: command.get_details()
+ for command_name, command in self.commands.items()
+ }
+ return command_details
+
+ def _get_user_param_value(self, user_params: list[str], param: Param) -> str | None:
+ """Get the user-specified value of a parameter."""
+ for user_param in user_params:
+ user_param_name, user_param_value = parse_raw_parameter(user_param)
+ if user_param_name == param.name:
+ warn_message = (
+ "The direct use of parameter name is deprecated"
+ " and might be removed in the future.\n"
+ f"Please use alias '{param.alias}' instead of "
+ "'{user_param_name}' to provide the parameter."
+ )
+ logging.warning(warn_message)
+
+ if self._same_parameter(user_param_name, param):
+ return user_param_value
+
+ return None
+
+ @staticmethod
+ def _same_parameter(user_param_name_or_alias: str, param: Param) -> bool:
+ """Compare user parameter name with param name or alias."""
+ # Strip the "=" sign in the param_name. This is needed just for
+ # comparison with the parameters passed by the user.
+ # The equal sign needs to be honoured when re-building the
+ # parameter back.
+ param_name = None if not param.name else param.name.rstrip("=")
+ return user_param_name_or_alias in [param_name, param.alias]
+
+ def resolved_parameters(
+ self, command_name: str, user_params: list[str]
+ ) -> list[tuple[str | None, Param]]:
+ """Return list of parameters with values."""
+ result: list[tuple[str | None, Param]] = []
+ command = self.commands.get(command_name)
+ if not command:
+ return result
+
+ for param in command.params:
+ value = self._get_user_param_value(user_params, param)
+ if not value:
+ value = param.default_value
+ result.append((value, param))
+
+ return result
+
+ def build_command(
+ self,
+ command_name: str,
+ user_params: list[str],
+ param_resolver: Callable[[str, str, list[tuple[str | None, Param]]], str],
+ ) -> list[str]:
+ """
+ Return a list of executable command strings.
+
+ Given a command and associated parameters, returns a list of executable command
+ strings.
+ """
+ command = self.commands.get(command_name)
+ if not command:
+ raise ConfigurationException(
+ f"Command '{command_name}' could not be found."
+ )
+
+ commands_to_run = []
+
+ params_values = self.resolved_parameters(command_name, user_params)
+ for cmd_str in command.command_strings:
+ cmd_str = resolve_all_parameters(
+ cmd_str, param_resolver, command_name, params_values
+ )
+ commands_to_run.append(cmd_str)
+
+ return commands_to_run
+
+
+class Param:
+ """Class for representing a generic application parameter."""
+
+ def __init__( # pylint: disable=too-many-arguments
+ self,
+ name: str | None,
+ description: str,
+ values: list[str] | None = None,
+ default_value: str | None = None,
+ alias: str | None = None,
+ ) -> None:
+ """Construct a Param instance."""
+ if not name and not alias:
+ raise ConfigurationException(
+ "Either name, alias or both must be set to identify a parameter."
+ )
+ self.name = name
+ self.values = values
+ self.description = description
+ self.default_value = default_value
+ self.alias = alias
+
+ def get_details(self) -> dict:
+ """Return a dictionary with all relevant information of a Param."""
+ return {key: value for key, value in self.__dict__.items() if value}
+
+ def __eq__(self, other: object) -> bool:
+ """Overload operator ==."""
+ if not isinstance(other, Param):
+ return False
+
+ return (
+ self.name == other.name
+ and self.values == other.values
+ and self.default_value == other.default_value
+ and self.description == other.description
+ )
+
+
+class Command:
+ """Class for representing a command."""
+
+ def __init__(
+ self, command_strings: list[str], params: list[Param] | None = None
+ ) -> None:
+ """Construct a Command instance."""
+ self.command_strings = command_strings
+
+ if params:
+ self.params = params
+ else:
+ self.params = []
+
+ self._validate()
+
+ def _validate(self) -> None:
+ """Validate command."""
+ if not self.params:
+ return
+
+ aliases = [param.alias for param in self.params if param.alias is not None]
+ repeated_aliases = [
+ alias for alias, count in Counter(aliases).items() if count > 1
+ ]
+
+ if repeated_aliases:
+ raise ConfigurationException(
+ f"Non-unique aliases {', '.join(repeated_aliases)}"
+ )
+
+ both_name_and_alias = [
+ param.name
+ for param in self.params
+ if param.name in aliases and param.name != param.alias
+ ]
+ if both_name_and_alias:
+ raise ConfigurationException(
+ f"Aliases {', '.join(both_name_and_alias)} could not be used "
+ "as parameter name."
+ )
+
+ def get_details(self) -> dict:
+ """Return a dictionary with all relevant information of a Command."""
+ output = {
+ "command_strings": self.command_strings,
+ "user_params": [param.get_details() for param in self.params],
+ }
+ return output
+
+ def __eq__(self, other: object) -> bool:
+ """Overload operator ==."""
+ if not isinstance(other, Command):
+ return False
+
+ return (
+ self.command_strings == other.command_strings
+ and self.params == other.params
+ )
+
+
+def resolve_all_parameters(
+ str_val: str,
+ param_resolver: Callable[[str, str, list[tuple[str | None, Param]]], str],
+ command_name: str | None = None,
+ params_values: list[tuple[str | None, Param]] | None = None,
+) -> str:
+ """Resolve all parameters in the string."""
+ if not str_val:
+ return str_val
+
+ param_pattern: Final[Pattern] = re.compile(r"{(?P<param_name>[\w.:]+)}")
+ while param_pattern.findall(str_val):
+ str_val = param_pattern.sub(
+ lambda m: param_resolver(
+ m["param_name"], command_name or "", params_values or []
+ ),
+ str_val,
+ )
+ return str_val
+
+
+def load_application_configs(
+ config: Any,
+ config_type: type[Any],
+ is_system_required: bool = True,
+) -> Any:
+ """Get one config for each system supported by the application.
+
+ The configuration could contain different parameters/commands for different
+ supported systems. For each supported system this function will return separate
+ config with appropriate configuration.
+ """
+ merged_configs = []
+ supported_systems: list[NamedExecutionConfig] | None = config.get(
+ "supported_systems"
+ )
+ if not supported_systems:
+ if is_system_required:
+ raise ConfigurationException("No supported systems definition provided")
+ # Create an empty system to be used in the parsing below
+ supported_systems = [cast(NamedExecutionConfig, {})]
+
+ default_user_params = config.get("user_params", {})
+
+ def merge_config(system: NamedExecutionConfig) -> Any:
+ system_name = system.get("name")
+ if not system_name and is_system_required:
+ raise ConfigurationException(
+ "Unable to read supported system definition, name is missed"
+ )
+
+ merged_config = config_type(**config)
+ merged_config["supported_systems"] = [system_name] if system_name else []
+ # merge default configuration and specific to the system
+ merged_config["commands"] = {
+ **config.get("commands", {}),
+ **system.get("commands", {}),
+ }
+
+ params = {}
+ tool_user_params = system.get("user_params", {})
+ command_names = tool_user_params.keys() | default_user_params.keys()
+ for command_name in command_names:
+ if command_name not in merged_config["commands"]:
+ continue
+
+ params_default = default_user_params.get(command_name, [])
+ params_tool = tool_user_params.get(command_name, [])
+ if not params_default or not params_tool:
+ params[command_name] = params_tool or params_default
+ if params_default and params_tool:
+ if any(not p.get("alias") for p in params_default):
+ raise ConfigurationException(
+ f"Default parameters for command {command_name} "
+ "should have aliases"
+ )
+ if any(not p.get("alias") for p in params_tool):
+ raise ConfigurationException(
+ f"{system_name} parameters for command {command_name} "
+ "should have aliases."
+ )
+
+ merged_by_alias = {
+ **{p.get("alias"): p for p in params_default},
+ **{p.get("alias"): p for p in params_tool},
+ }
+ params[command_name] = list(merged_by_alias.values())
+
+ merged_config["user_params"] = params
+ merged_config["variables"] = {
+ **config.get("variables", {}),
+ **system.get("variables", {}),
+ }
+ return merged_config
+
+ merged_configs = [merge_config(system) for system in supported_systems]
+
+ return merged_configs
diff --git a/src/mlia/backend/executor/config.py b/src/mlia/backend/executor/config.py
new file mode 100644
index 0000000..dca53da
--- /dev/null
+++ b/src/mlia/backend/executor/config.py
@@ -0,0 +1,68 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Contain definition of backend configuration."""
+from __future__ import annotations
+
+from pathlib import Path
+from typing import Dict
+from typing import List
+from typing import TypedDict
+from typing import Union
+
+
+class UserParamConfig(TypedDict, total=False):
+ """User parameter configuration."""
+
+ name: str | None
+ default_value: str
+ values: list[str]
+ description: str
+ alias: str
+
+
+UserParamsConfig = Dict[str, List[UserParamConfig]]
+
+
+class ExecutionConfig(TypedDict, total=False):
+ """Execution configuration."""
+
+ commands: dict[str, list[str]]
+ user_params: UserParamsConfig
+ variables: dict[str, str]
+
+
+class NamedExecutionConfig(ExecutionConfig):
+ """Execution configuration with name."""
+
+ name: str
+
+
+class BaseBackendConfig(ExecutionConfig, total=False):
+ """Base backend configuration."""
+
+ name: str
+ description: str
+ config_location: Path
+ annotations: dict[str, str | list[str]]
+
+
+class ApplicationConfig(BaseBackendConfig, total=False):
+ """Application configuration."""
+
+ supported_systems: list[str]
+
+
+class ExtendedApplicationConfig(BaseBackendConfig, total=False):
+ """Extended application configuration."""
+
+ supported_systems: list[NamedExecutionConfig]
+
+
+class SystemConfig(BaseBackendConfig, total=False):
+ """System configuration."""
+
+ reporting: dict[str, dict]
+
+
+BackendItemConfig = Union[ApplicationConfig, SystemConfig]
+BackendConfig = Union[List[ExtendedApplicationConfig], List[SystemConfig]]
diff --git a/src/mlia/backend/executor/execution.py b/src/mlia/backend/executor/execution.py
new file mode 100644
index 0000000..e253b16
--- /dev/null
+++ b/src/mlia/backend/executor/execution.py
@@ -0,0 +1,342 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Application execution module."""
+from __future__ import annotations
+
+import logging
+import re
+from typing import cast
+
+from mlia.backend.executor.application import Application
+from mlia.backend.executor.application import get_application
+from mlia.backend.executor.common import Backend
+from mlia.backend.executor.common import ConfigurationException
+from mlia.backend.executor.common import Param
+from mlia.backend.executor.system import get_system
+from mlia.backend.executor.system import System
+
+logger = logging.getLogger(__name__)
+
+
+class AnotherInstanceIsRunningException(Exception):
+ """Concurrent execution error."""
+
+
+class ExecutionContext: # pylint: disable=too-few-public-methods
+ """Command execution context."""
+
+ def __init__(
+ self,
+ app: Application,
+ app_params: list[str],
+ system: System,
+ system_params: list[str],
+ ):
+ """Init execution context."""
+ self.app = app
+ self.app_params = app_params
+ self.system = system
+ self.system_params = system_params
+
+ self.param_resolver = ParamResolver(self)
+
+ self.stdout: bytearray | None = None
+ self.stderr: bytearray | None = None
+
+
+class ParamResolver:
+ """Parameter resolver."""
+
+ def __init__(self, context: ExecutionContext):
+ """Init parameter resolver."""
+ self.ctx = context
+
+ @staticmethod
+ def resolve_user_params(
+ cmd_name: str | None,
+ index_or_alias: str,
+ resolved_params: list[tuple[str | None, Param]] | None,
+ ) -> str:
+ """Resolve user params."""
+ if not cmd_name or resolved_params is None:
+ raise ConfigurationException("Unable to resolve user params")
+
+ param_value: str | None = None
+ param: Param | None = None
+
+ if index_or_alias.isnumeric():
+ i = int(index_or_alias)
+ if i not in range(len(resolved_params)):
+ raise ConfigurationException(
+ f"Invalid index {i} for user params of command {cmd_name}"
+ )
+ param_value, param = resolved_params[i]
+ else:
+ for val, par in resolved_params:
+ if par.alias == index_or_alias:
+ param_value, param = val, par
+ break
+
+ if param is None:
+ raise ConfigurationException(
+ f"No user parameter for command '{cmd_name}' with "
+ f"alias '{index_or_alias}'."
+ )
+
+ if param_value:
+ # We need to handle to cases of parameters here:
+ # 1) Optional parameters (non-positional with a name and value)
+ # 2) Positional parameters (value only, no name needed)
+ # Default to empty strings for positional arguments
+ param_name = ""
+ separator = ""
+ if param.name is not None:
+ # A valid param name means we have an optional/non-positional argument:
+ # The separator is an empty string in case the param_name
+ # has an equal sign as we have to honour it.
+ # If the parameter doesn't end with an equal sign then a
+ # space character is injected to split the parameter name
+ # and its value
+ param_name = param.name
+ separator = "" if param.name.endswith("=") else " "
+
+ return f"{param_name}{separator}{param_value}"
+
+ if param.name is None:
+ raise ConfigurationException(
+ f"Missing user parameter with alias '{index_or_alias}' for "
+ f"command '{cmd_name}'."
+ )
+
+ return param.name # flag: just return the parameter name
+
+ def resolve_commands_and_params(
+ self, backend_type: str, cmd_name: str, return_params: bool, index_or_alias: str
+ ) -> str:
+ """Resolve command or command's param value."""
+ if backend_type == "system":
+ backend = cast(Backend, self.ctx.system)
+ backend_params = self.ctx.system_params
+ else: # Application backend
+ backend = cast(Backend, self.ctx.app)
+ backend_params = self.ctx.app_params
+
+ if cmd_name not in backend.commands:
+ raise ConfigurationException(f"Command {cmd_name} not found")
+
+ if return_params:
+ params = backend.resolved_parameters(cmd_name, backend_params)
+ if index_or_alias.isnumeric():
+ i = int(index_or_alias)
+ if i not in range(len(params)):
+ raise ConfigurationException(
+ f"Invalid parameter index {i} for command {cmd_name}"
+ )
+
+ param_value = params[i][0]
+ else:
+ param_value = None
+ for value, param in params:
+ if param.alias == index_or_alias:
+ param_value = value
+ break
+
+ if not param_value:
+ raise ConfigurationException(
+ "No value for parameter with index or "
+ f"alias {index_or_alias} of command {cmd_name}."
+ )
+ return param_value
+
+ if not index_or_alias.isnumeric():
+ raise ConfigurationException(f"Bad command index {index_or_alias}")
+
+ i = int(index_or_alias)
+ commands = backend.build_command(cmd_name, backend_params, self.param_resolver)
+ if i not in range(len(commands)):
+ raise ConfigurationException(f"Invalid index {i} for command {cmd_name}")
+
+ return commands[i]
+
+ def resolve_variables(self, backend_type: str, var_name: str) -> str:
+ """Resolve variable value."""
+ if backend_type == "system":
+ backend = cast(Backend, self.ctx.system)
+ else: # Application backend
+ backend = cast(Backend, self.ctx.app)
+
+ if var_name not in backend.variables:
+ raise ConfigurationException(f"Unknown variable {var_name}")
+
+ return backend.variables[var_name]
+
+ def param_matcher(
+ self,
+ param_name: str,
+ cmd_name: str | None,
+ resolved_params: list[tuple[str | None, Param]] | None,
+ ) -> str:
+ """Regexp to resolve a param from the param_name."""
+ # this pattern supports parameter names like "application.commands.run:0" and
+ # "system.commands.run.params:0"
+ # Note: 'software' is included for backward compatibility.
+ commands_and_params_match = re.match(
+ r"(?P<type>application|software|system)[.]commands[.]"
+ r"(?P<name>\w+)"
+ r"(?P<params>[.]params|)[:]"
+ r"(?P<index_or_alias>\w+)",
+ param_name,
+ )
+
+ if commands_and_params_match:
+ backend_type, cmd_name, return_params, index_or_alias = (
+ commands_and_params_match["type"],
+ commands_and_params_match["name"],
+ commands_and_params_match["params"],
+ commands_and_params_match["index_or_alias"],
+ )
+ return self.resolve_commands_and_params(
+ backend_type, cmd_name, bool(return_params), index_or_alias
+ )
+
+ # Note: 'software' is included for backward compatibility.
+ variables_match = re.match(
+ r"(?P<type>application|software|system)[.]variables:(?P<var_name>\w+)",
+ param_name,
+ )
+ if variables_match:
+ backend_type, var_name = (
+ variables_match["type"],
+ variables_match["var_name"],
+ )
+ return self.resolve_variables(backend_type, var_name)
+
+ user_params_match = re.match(r"user_params:(?P<index_or_alias>\w+)", param_name)
+ if user_params_match:
+ index_or_alias = user_params_match["index_or_alias"]
+ return self.resolve_user_params(cmd_name, index_or_alias, resolved_params)
+
+ raise ConfigurationException(f"Unable to resolve parameter {param_name}")
+
+ def param_resolver(
+ self,
+ param_name: str,
+ cmd_name: str | None = None,
+ resolved_params: list[tuple[str | None, Param]] | None = None,
+ ) -> str:
+ """Resolve parameter value based on current execution context."""
+ # Note: 'software.*' is included for backward compatibility.
+ resolved_param = None
+ if param_name in ["application.name", "software.name"]:
+ resolved_param = self.ctx.app.name
+ elif param_name in ["application.description", "software.description"]:
+ resolved_param = self.ctx.app.description
+ elif self.ctx.app.config_location and (
+ param_name in ["application.config_dir", "software.config_dir"]
+ ):
+ resolved_param = str(self.ctx.app.config_location.absolute())
+ elif self.ctx.system is not None:
+ if param_name == "system.name":
+ resolved_param = self.ctx.system.name
+ elif param_name == "system.description":
+ resolved_param = self.ctx.system.description
+ elif param_name == "system.config_dir" and self.ctx.system.config_location:
+ resolved_param = str(self.ctx.system.config_location.absolute())
+
+ if not resolved_param:
+ resolved_param = self.param_matcher(param_name, cmd_name, resolved_params)
+ return resolved_param
+
+ def __call__(
+ self,
+ param_name: str,
+ cmd_name: str | None = None,
+ resolved_params: list[tuple[str | None, Param]] | None = None,
+ ) -> str:
+ """Resolve provided parameter."""
+ return self.param_resolver(param_name, cmd_name, resolved_params)
+
+
+def validate_parameters(
+ backend: Backend, command_names: list[str], params: list[str]
+) -> None:
+ """Check parameters passed to backend."""
+ for param in params:
+ acceptable = any(
+ backend.validate_parameter(command_name, param)
+ for command_name in command_names
+ if command_name in backend.commands
+ )
+
+ if not acceptable:
+ backend_type = "System" if isinstance(backend, System) else "Application"
+ raise ValueError(
+ f"{backend_type} parameter '{param}' not valid for "
+ f"command '{' or '.join(command_names)}'."
+ )
+
+
+def get_application_by_name_and_system(
+ application_name: str, system_name: str
+) -> Application:
+ """Get application."""
+ applications = get_application(application_name, system_name)
+ if not applications:
+ raise ValueError(
+ f"Application '{application_name}' doesn't support the "
+ f"system '{system_name}'."
+ )
+
+ if len(applications) != 1:
+ raise ValueError(
+ f"Error during getting application {application_name} for the "
+ f"system {system_name}."
+ )
+
+ return applications[0]
+
+
+def get_application_and_system(
+ application_name: str, system_name: str
+) -> tuple[Application, System]:
+ """Return application and system by provided names."""
+ system = get_system(system_name)
+ if not system:
+ raise ValueError(f"System {system_name} is not found.")
+
+ application = get_application_by_name_and_system(application_name, system_name)
+
+ return application, system
+
+
+def run_application(
+ application_name: str,
+ application_params: list[str],
+ system_name: str,
+ system_params: list[str],
+) -> ExecutionContext:
+ """Run application on the provided system."""
+ application, system = get_application_and_system(application_name, system_name)
+ validate_parameters(application, ["run"], application_params)
+ validate_parameters(system, ["run"], system_params)
+
+ ctx = ExecutionContext(
+ app=application,
+ app_params=application_params,
+ system=system,
+ system_params=system_params,
+ )
+
+ logger.debug("Generating commands to execute")
+ commands_to_run = ctx.system.build_command(
+ "run", ctx.system_params, ctx.param_resolver
+ )
+
+ for command in commands_to_run:
+ logger.debug("Running: %s", command)
+ exit_code, ctx.stdout, ctx.stderr = ctx.system.run(command)
+
+ if exit_code != 0:
+ logger.warning("Application exited with exit code %i", exit_code)
+
+ return ctx
diff --git a/src/mlia/backend/executor/fs.py b/src/mlia/backend/executor/fs.py
new file mode 100644
index 0000000..3fce19c
--- /dev/null
+++ b/src/mlia/backend/executor/fs.py
@@ -0,0 +1,88 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Module to host all file system related functions."""
+from __future__ import annotations
+
+import re
+import shutil
+from pathlib import Path
+from typing import Literal
+
+from mlia.utils.filesystem import get_mlia_resources
+
+ResourceType = Literal["applications", "systems"]
+
+
+def get_backend_resources() -> Path:
+ """Get backend resources folder path."""
+ return get_mlia_resources() / "backends"
+
+
+def get_backends_path(name: ResourceType) -> Path:
+ """Return the absolute path of the specified resource.
+
+ It uses importlib to return resources packaged with MANIFEST.in.
+ """
+ if not name:
+ raise ResourceWarning("Resource name is not provided")
+
+ resource_path = get_backend_resources() / name
+ if resource_path.is_dir():
+ return resource_path
+
+ raise ResourceWarning(f"Resource '{name}' not found.")
+
+
+def copy_directory_content(source: Path, destination: Path) -> None:
+ """Copy content of the source directory into destination directory."""
+ for item in source.iterdir():
+ src = source / item.name
+ dest = destination / item.name
+
+ if src.is_dir():
+ shutil.copytree(src, dest)
+ else:
+ shutil.copy2(src, dest)
+
+
+def remove_resource(resource_directory: str, resource_type: ResourceType) -> None:
+ """Remove resource data."""
+ resources = get_backends_path(resource_type)
+
+ resource_location = resources / resource_directory
+ if not resource_location.exists():
+ raise Exception(f"Resource {resource_directory} does not exist")
+
+ if not resource_location.is_dir():
+ raise Exception(f"Wrong resource {resource_directory}")
+
+ shutil.rmtree(resource_location)
+
+
+def remove_directory(directory_path: Path | None) -> None:
+ """Remove directory."""
+ if not directory_path or not directory_path.is_dir():
+ raise Exception("No directory path provided")
+
+ shutil.rmtree(directory_path)
+
+
+def recreate_directory(directory_path: Path | None) -> None:
+ """Recreate directory."""
+ if not directory_path:
+ raise Exception("No directory path provided")
+
+ if directory_path.exists() and not directory_path.is_dir():
+ raise Exception(
+ f"Path {str(directory_path)} does exist and it is not a directory."
+ )
+
+ if directory_path.is_dir():
+ remove_directory(directory_path)
+
+ directory_path.mkdir()
+
+
+def valid_for_filename(value: str, replacement: str = "") -> str:
+ """Replace non alpha numeric characters."""
+ return re.sub(r"[^\w.]", replacement, value, flags=re.ASCII)
diff --git a/src/mlia/backend/executor/output_consumer.py b/src/mlia/backend/executor/output_consumer.py
new file mode 100644
index 0000000..3c3b132
--- /dev/null
+++ b/src/mlia/backend/executor/output_consumer.py
@@ -0,0 +1,67 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Output consumers module."""
+from __future__ import annotations
+
+import base64
+import json
+import re
+from typing import Protocol
+from typing import runtime_checkable
+
+
+@runtime_checkable
+class OutputConsumer(Protocol):
+ """Protocol to consume output."""
+
+ def feed(self, line: str) -> bool:
+ """
+ Feed a new line to be parsed.
+
+ Return True if the line should be removed from the output.
+ """
+
+
+class Base64OutputConsumer(OutputConsumer):
+ """
+ Parser to extract base64-encoded JSON from tagged standard output.
+
+ Example of the tagged output:
+ ```
+ # Encoded JSON: {"test": 1234}
+ <metrics>eyJ0ZXN0IjogMTIzNH0</metrics>
+ ```
+ """
+
+ TAG_NAME = "metrics"
+
+ def __init__(self) -> None:
+ """Set up the regular expression to extract tagged strings."""
+ self._regex = re.compile(rf"<{self.TAG_NAME}>(.*)</{self.TAG_NAME}>")
+ self.parsed_output: list = []
+
+ def feed(self, line: str) -> bool:
+ """
+ Parse the output line and save the decoded output.
+
+ Returns True if the line contains tagged output.
+
+ Example:
+ Using the tagged output from the class docs the parser should collect
+ the following:
+ ```
+ [
+ {"test": 1234}
+ ]
+ ```
+ """
+ res_b64 = self._regex.search(line)
+ if res_b64:
+ res_json = base64.b64decode(res_b64.group(1), validate=True)
+ res = json.loads(res_json)
+ self.parsed_output.append(res)
+ # Remove this line from the output, i.e. consume it, as it
+ # does not contain any human readable content.
+ return True
+
+ return False
diff --git a/src/mlia/backend/executor/proc.py b/src/mlia/backend/executor/proc.py
new file mode 100644
index 0000000..39a0689
--- /dev/null
+++ b/src/mlia/backend/executor/proc.py
@@ -0,0 +1,191 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Processes module.
+
+This module contains all classes and functions for dealing with Linux
+processes.
+"""
+from __future__ import annotations
+
+import datetime
+import logging
+import shlex
+import signal
+import tempfile
+import time
+from pathlib import Path
+from typing import Any
+
+from sh import Command
+from sh import CommandNotFound
+from sh import ErrorReturnCode
+from sh import RunningCommand
+
+from mlia.backend.executor.fs import valid_for_filename
+
+logger = logging.getLogger(__name__)
+
+
+class CommandFailedException(Exception):
+ """Exception for failed command execution."""
+
+
+class ShellCommand:
+ """Wrapper class for shell commands."""
+
+ def run(
+ self,
+ cmd: str,
+ *args: str,
+ _cwd: Path | None = None,
+ _tee: bool = True,
+ _bg: bool = True,
+ _out: Any = None,
+ _err: Any = None,
+ _search_paths: list[Path] | None = None,
+ ) -> RunningCommand:
+ """Run the shell command with the given arguments.
+
+ There are special arguments that modify the behaviour of the process.
+ _cwd: current working directory
+ _tee: it redirects the stdout both to console and file
+ _bg: if True, it runs the process in background and the command is not
+ blocking.
+ _out: use this object for stdout redirect,
+ _err: use this object for stderr redirect,
+ _search_paths: If presented used for searching executable
+ """
+ try:
+ kwargs = {}
+ if _cwd:
+ kwargs["_cwd"] = str(_cwd)
+ command = Command(cmd, _search_paths).bake(args, **kwargs)
+ except CommandNotFound as error:
+ logging.error("Command '%s' not found", error.args[0])
+ raise error
+
+ out, err = _out, _err
+ if not _out and not _err:
+ out, err = (str(item) for item in self.get_stdout_stderr_paths(cmd))
+
+ return command(_out=out, _err=err, _tee=_tee, _bg=_bg, _bg_exc=False)
+
+ @classmethod
+ def get_stdout_stderr_paths(cls, cmd: str) -> tuple[Path, Path]:
+ """Construct and returns the paths of stdout/stderr files."""
+ timestamp = datetime.datetime.now().timestamp()
+ base_path = Path(tempfile.mkdtemp(prefix="mlia-", suffix=f"{timestamp}"))
+ base = base_path / f"{valid_for_filename(cmd, '_')}_{timestamp}"
+ stdout = base.with_suffix(".out")
+ stderr = base.with_suffix(".err")
+ try:
+ stdout.touch()
+ stderr.touch()
+ except FileNotFoundError as error:
+ logging.error("File not found: %s", error.filename)
+ raise error
+ return stdout, stderr
+
+
+def parse_command(command: str, shell: str = "bash") -> list[str]:
+ """Parse command."""
+ cmd, *args = shlex.split(command, posix=True)
+
+ if is_shell_script(cmd):
+ args = [cmd] + args
+ cmd = shell
+
+ return [cmd] + args
+
+
+def execute_command( # pylint: disable=invalid-name
+ command: str,
+ cwd: Path,
+ bg: bool = False,
+ shell: str = "bash",
+ out: Any = None,
+ err: Any = None,
+) -> RunningCommand:
+ """Execute shell command."""
+ cmd, *args = parse_command(command, shell)
+
+ search_paths = None
+ if cmd != shell and (cwd / cmd).is_file():
+ search_paths = [cwd]
+
+ return ShellCommand().run(
+ cmd, *args, _cwd=cwd, _bg=bg, _search_paths=search_paths, _out=out, _err=err
+ )
+
+
+def is_shell_script(cmd: str) -> bool:
+ """Check if command is shell script."""
+ return cmd.endswith(".sh")
+
+
+def run_and_wait(
+ command: str,
+ cwd: Path,
+ terminate_on_error: bool = False,
+ out: Any = None,
+ err: Any = None,
+) -> tuple[int, bytearray, bytearray]:
+ """
+ Run command and wait while it is executing.
+
+ Returns a tuple: (exit_code, stdout, stderr)
+ """
+ running_cmd: RunningCommand | None = None
+ try:
+ running_cmd = execute_command(command, cwd, bg=True, out=out, err=err)
+ return running_cmd.exit_code, running_cmd.stdout, running_cmd.stderr
+ except ErrorReturnCode as cmd_failed:
+ raise CommandFailedException() from cmd_failed
+ except Exception as error:
+ is_running = running_cmd is not None and running_cmd.is_alive()
+ if terminate_on_error and is_running:
+ logger.debug("Terminating ...")
+ terminate_command(running_cmd)
+
+ raise error
+
+
+def terminate_command(
+ running_cmd: RunningCommand,
+ wait: bool = True,
+ wait_period: float = 0.5,
+ number_of_attempts: int = 20,
+) -> None:
+ """Terminate running command."""
+ try:
+ running_cmd.process.signal_group(signal.SIGINT)
+ if wait:
+ for _ in range(number_of_attempts):
+ time.sleep(wait_period)
+ if not running_cmd.is_alive():
+ return
+ logger.error(
+ "Unable to terminate process %i. Sending SIGTERM...",
+ running_cmd.process.pid,
+ )
+ running_cmd.process.signal_group(signal.SIGTERM)
+ except ProcessLookupError:
+ pass
+
+
+def print_command_stdout(command: RunningCommand) -> None:
+ """Print the stdout of a command.
+
+ The command has 2 states: running and done.
+ If the command is running, the output is taken by the running process.
+ If the command has ended its execution, the stdout is taken from stdout
+ property
+ """
+ if command.is_alive():
+ while True:
+ try:
+ print(command.next(), end="")
+ except StopIteration:
+ break
+ else:
+ print(command.stdout)
diff --git a/src/mlia/backend/executor/runner.py b/src/mlia/backend/executor/runner.py
new file mode 100644
index 0000000..2330fd9
--- /dev/null
+++ b/src/mlia/backend/executor/runner.py
@@ -0,0 +1,98 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Module for backend runner."""
+from __future__ import annotations
+
+from dataclasses import dataclass
+from pathlib import Path
+
+from mlia.backend.executor.application import get_available_applications
+from mlia.backend.executor.application import install_application
+from mlia.backend.executor.execution import ExecutionContext
+from mlia.backend.executor.execution import run_application
+from mlia.backend.executor.system import get_available_systems
+from mlia.backend.executor.system import install_system
+
+
+@dataclass
+class ExecutionParams:
+ """Application execution params."""
+
+ application: str
+ system: str
+ application_params: list[str]
+ system_params: list[str]
+
+
+class BackendRunner:
+ """Backend runner."""
+
+ def __init__(self) -> None:
+ """Init BackendRunner instance."""
+
+ @staticmethod
+ def get_installed_systems() -> list[str]:
+ """Get list of the installed systems."""
+ return [system.name for system in get_available_systems()]
+
+ @staticmethod
+ def get_installed_applications(system: str | None = None) -> list[str]:
+ """Get list of the installed application."""
+ return [
+ app.name
+ for app in get_available_applications()
+ if system is None or app.can_run_on(system)
+ ]
+
+ def is_application_installed(self, application: str, system: str) -> bool:
+ """Return true if requested application installed."""
+ return application in self.get_installed_applications(system)
+
+ def is_system_installed(self, system: str) -> bool:
+ """Return true if requested system installed."""
+ return system in self.get_installed_systems()
+
+ def systems_installed(self, systems: list[str]) -> bool:
+ """Check if all provided systems are installed."""
+ if not systems:
+ return False
+
+ installed_systems = self.get_installed_systems()
+ return all(system in installed_systems for system in systems)
+
+ def applications_installed(self, applications: list[str]) -> bool:
+ """Check if all provided applications are installed."""
+ if not applications:
+ return False
+
+ installed_apps = self.get_installed_applications()
+ return all(app in installed_apps for app in applications)
+
+ def all_installed(self, systems: list[str], apps: list[str]) -> bool:
+ """Check if all provided artifacts are installed."""
+ return self.systems_installed(systems) and self.applications_installed(apps)
+
+ @staticmethod
+ def install_system(system_path: Path) -> None:
+ """Install system."""
+ install_system(system_path)
+
+ @staticmethod
+ def install_application(app_path: Path) -> None:
+ """Install application."""
+ install_application(app_path)
+
+ @staticmethod
+ def run_application(execution_params: ExecutionParams) -> ExecutionContext:
+ """Run requested application."""
+ ctx = run_application(
+ execution_params.application,
+ execution_params.application_params,
+ execution_params.system,
+ execution_params.system_params,
+ )
+ return ctx
+
+ @staticmethod
+ def _params(name: str, params: list[str]) -> list[str]:
+ return [p for item in [(name, param) for param in params] for p in item]
diff --git a/src/mlia/backend/executor/source.py b/src/mlia/backend/executor/source.py
new file mode 100644
index 0000000..6abc49f
--- /dev/null
+++ b/src/mlia/backend/executor/source.py
@@ -0,0 +1,207 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""Contain source related classes and functions."""
+from __future__ import annotations
+
+import os
+import shutil
+import tarfile
+from abc import ABC
+from abc import abstractmethod
+from pathlib import Path
+from tarfile import TarFile
+
+from mlia.backend.executor.common import BACKEND_CONFIG_FILE
+from mlia.backend.executor.common import ConfigurationException
+from mlia.backend.executor.common import get_backend_config
+from mlia.backend.executor.common import is_backend_directory
+from mlia.backend.executor.common import load_config
+from mlia.backend.executor.config import BackendConfig
+from mlia.backend.executor.fs import copy_directory_content
+
+
+class Source(ABC):
+ """Source class."""
+
+ @abstractmethod
+ def name(self) -> str | None:
+ """Get source name."""
+
+ @abstractmethod
+ def config(self) -> BackendConfig | None:
+ """Get configuration file content."""
+
+ @abstractmethod
+ def install_into(self, destination: Path) -> None:
+ """Install source into destination directory."""
+
+ @abstractmethod
+ def create_destination(self) -> bool:
+ """Return True if destination folder should be created before installation."""
+
+
+class DirectorySource(Source):
+ """DirectorySource class."""
+
+ def __init__(self, directory_path: Path) -> None:
+ """Create the DirectorySource instance."""
+ assert isinstance(directory_path, Path)
+ self.directory_path = directory_path
+
+ def name(self) -> str:
+ """Return name of source."""
+ return self.directory_path.name
+
+ def config(self) -> BackendConfig | None:
+ """Return configuration file content."""
+ if not is_backend_directory(self.directory_path):
+ raise ConfigurationException("No configuration file found")
+
+ config_file = get_backend_config(self.directory_path)
+ return load_config(config_file)
+
+ def install_into(self, destination: Path) -> None:
+ """Install source into destination directory."""
+ if not destination.is_dir():
+ raise ConfigurationException(f"Wrong destination {destination}.")
+
+ if not self.directory_path.is_dir():
+ raise ConfigurationException(
+ f"Directory {self.directory_path} does not exist."
+ )
+
+ copy_directory_content(self.directory_path, destination)
+
+ def create_destination(self) -> bool:
+ """Return True if destination folder should be created before installation."""
+ return True
+
+
+class TarArchiveSource(Source):
+ """TarArchiveSource class."""
+
+ def __init__(self, archive_path: Path) -> None:
+ """Create the TarArchiveSource class."""
+ assert isinstance(archive_path, Path)
+ self.archive_path = archive_path
+ self._config: BackendConfig | None = None
+ self._has_top_level_folder: bool | None = None
+ self._name: str | None = None
+
+ def _read_archive_content(self) -> None:
+ """Read various information about archive."""
+ # get source name from archive name (everything without extensions)
+ extensions = "".join(self.archive_path.suffixes)
+ self._name = self.archive_path.name.rstrip(extensions)
+
+ if not self.archive_path.exists():
+ return
+
+ with self._open(self.archive_path) as archive:
+ try:
+ config_entry = archive.getmember(BACKEND_CONFIG_FILE)
+ self._has_top_level_folder = False
+ except KeyError as error_no_config:
+ try:
+ archive_entries = archive.getnames()
+ entries_common_prefix = os.path.commonprefix(archive_entries)
+ top_level_dir = entries_common_prefix.rstrip("/")
+
+ if not top_level_dir:
+ raise RuntimeError(
+ "Archive has no top level directory"
+ ) from error_no_config
+
+ config_path = f"{top_level_dir}/{BACKEND_CONFIG_FILE}"
+
+ config_entry = archive.getmember(config_path)
+ self._has_top_level_folder = True
+ self._name = top_level_dir
+ except (KeyError, RuntimeError) as error_no_root_dir_or_config:
+ raise ConfigurationException(
+ "No configuration file found"
+ ) from error_no_root_dir_or_config
+
+ content = archive.extractfile(config_entry)
+ self._config = load_config(content)
+
+ def config(self) -> BackendConfig | None:
+ """Return configuration file content."""
+ if self._config is None:
+ self._read_archive_content()
+
+ return self._config
+
+ def name(self) -> str | None:
+ """Return name of the source."""
+ if self._name is None:
+ self._read_archive_content()
+
+ return self._name
+
+ def create_destination(self) -> bool:
+ """Return True if destination folder must be created before installation."""
+ if self._has_top_level_folder is None:
+ self._read_archive_content()
+
+ return not self._has_top_level_folder
+
+ def install_into(self, destination: Path) -> None:
+ """Install source into destination directory."""
+ if not destination.is_dir():
+ raise ConfigurationException(f"Wrong destination {destination}.")
+
+ with self._open(self.archive_path) as archive:
+ archive.extractall(destination)
+
+ def _open(self, archive_path: Path) -> TarFile:
+ """Open archive file."""
+ if not archive_path.is_file():
+ raise ConfigurationException(f"File {archive_path} does not exist.")
+
+ if archive_path.name.endswith("tar.gz") or archive_path.name.endswith("tgz"):
+ mode = "r:gz"
+ else:
+ raise ConfigurationException(f"Unsupported archive type {archive_path}.")
+
+ # The returned TarFile object can be used as a context manager (using
+ # 'with') by the calling instance.
+ return tarfile.open( # pylint: disable=consider-using-with
+ self.archive_path, mode=mode
+ )
+
+
+def get_source(source_path: Path) -> TarArchiveSource | DirectorySource:
+ """Return appropriate source instance based on provided source path."""
+ if source_path.is_file():
+ return TarArchiveSource(source_path)
+
+ if source_path.is_dir():
+ return DirectorySource(source_path)
+
+ raise ConfigurationException(f"Unable to read {source_path}.")
+
+
+def create_destination_and_install(source: Source, resource_path: Path) -> None:
+ """Create destination directory and install source.
+
+ This function is used for actual installation of system/backend New
+ directory will be created inside :resource_path: if needed If for example
+ archive contains top level folder then no need to create new directory
+ """
+ destination = resource_path
+ create_destination = source.create_destination()
+
+ if create_destination:
+ name = source.name()
+ if not name:
+ raise ConfigurationException("Unable to get source name.")
+
+ destination = resource_path / name
+ destination.mkdir()
+ try:
+ source.install_into(destination)
+ except Exception as error:
+ if create_destination:
+ shutil.rmtree(destination)
+ raise error
diff --git a/src/mlia/backend/executor/system.py b/src/mlia/backend/executor/system.py
new file mode 100644
index 0000000..a5ecf19
--- /dev/null
+++ b/src/mlia/backend/executor/system.py
@@ -0,0 +1,178 @@
+# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
+# SPDX-License-Identifier: Apache-2.0
+"""System backend module."""
+from __future__ import annotations
+
+from pathlib import Path
+from typing import Any
+from typing import cast
+from typing import List
+
+from mlia.backend.executor.common import Backend
+from mlia.backend.executor.common import ConfigurationException
+from mlia.backend.executor.common import get_backend_configs
+from mlia.backend.executor.common import get_backend_directories
+from mlia.backend.executor.common import load_config
+from mlia.backend.executor.common import remove_backend
+from mlia.backend.executor.config import SystemConfig
+from mlia.backend.executor.fs import get_backends_path
+from mlia.backend.executor.proc import run_and_wait
+from mlia.backend.executor.source import create_destination_and_install
+from mlia.backend.executor.source import get_source
+
+
+class System(Backend):
+ """System class."""
+
+ def __init__(self, config: SystemConfig) -> None:
+ """Construct the System class using the dictionary passed."""
+ super().__init__(config)
+
+ self._setup_reporting(config)
+
+ def _setup_reporting(self, config: SystemConfig) -> None:
+ self.reporting = config.get("reporting")
+
+ def run(self, command: str) -> tuple[int, bytearray, bytearray]:
+ """
+ Run command on the system.
+
+ Returns a tuple: (exit_code, stdout, stderr)
+ """
+ cwd = self.config_location
+ if not isinstance(cwd, Path) or not cwd.is_dir():
+ raise ConfigurationException(
+ f"System has invalid config location: {cwd}",
+ )
+
+ stdout = bytearray()
+ stderr = bytearray()
+
+ return run_and_wait(
+ command,
+ cwd=cwd,
+ terminate_on_error=True,
+ out=stdout,
+ err=stderr,
+ )
+
+ def __eq__(self, other: object) -> bool:
+ """Overload operator ==."""
+ if not isinstance(other, System):
+ return False
+
+ return super().__eq__(other) and self.name == other.name
+
+ def get_details(self) -> dict[str, Any]:
+ """Return a dictionary with all relevant information of a System."""
+ output = {
+ "type": "system",
+ "name": self.name,
+ "description": self.description,
+ "commands": self._get_command_details(),
+ "annotations": self.annotations,
+ }
+
+ return output
+
+
+def get_available_systems_directory_names() -> list[str]:
+ """Return a list of directory names for all avialable systems."""
+ return [entry.name for entry in get_backend_directories("systems")]
+
+
+def get_available_systems() -> list[System]:
+ """Return a list with all available systems."""
+ available_systems = []
+ for config_json in get_backend_configs("systems"):
+ config_entries = cast(List[SystemConfig], (load_config(config_json)))
+ for config_entry in config_entries:
+ config_entry["config_location"] = config_json.parent.absolute()
+ system = load_system(config_entry)
+ available_systems.append(system)
+
+ return sorted(available_systems, key=lambda system: system.name)
+
+
+def get_system(system_name: str) -> System:
+ """Return a system instance with the same name passed as argument."""
+ available_systems = get_available_systems()
+ for system in available_systems:
+ if system_name == system.name:
+ return system
+ raise ConfigurationException(f"System '{system_name}' not found.")
+
+
+def install_system(source_path: Path) -> None:
+ """Install new system."""
+ try:
+ source = get_source(source_path)
+ config = cast(List[SystemConfig], source.config())
+ systems_to_install = [load_system(entry) for entry in config]
+ except Exception as error:
+ raise ConfigurationException("Unable to read system definition") from error
+
+ if not systems_to_install:
+ raise ConfigurationException("No system definition found")
+
+ available_systems = get_available_systems()
+ already_installed = [s for s in systems_to_install if s in available_systems]
+ if already_installed:
+ names = [system.name for system in already_installed]
+ raise ConfigurationException(
+ f"Systems [{','.join(names)}] are already installed."
+ )
+
+ create_destination_and_install(source, get_backends_path("systems"))
+
+
+def remove_system(directory_name: str) -> None:
+ """Remove system."""
+ remove_backend(directory_name, "systems")
+
+
+def load_system(config: SystemConfig) -> System:
+ """Load system based on it's execution type."""
+ populate_shared_params(config)
+
+ return System(config)
+
+
+def populate_shared_params(config: SystemConfig) -> None:
+ """Populate command parameters with shared parameters."""
+ user_params = config.get("user_params")
+ if not user_params or "shared" not in user_params:
+ return
+
+ shared_user_params = user_params["shared"]
+ if not shared_user_params:
+ return
+
+ only_aliases = all(p.get("alias") for p in shared_user_params)
+ if not only_aliases:
+ raise ConfigurationException("All shared parameters should have aliases")
+
+ commands = config.get("commands", {})
+ for cmd_name in ["run"]:
+ command = commands.get(cmd_name)
+ if command is None:
+ commands[cmd_name] = []
+ cmd_user_params = user_params.get(cmd_name)
+ if not cmd_user_params:
+ cmd_user_params = shared_user_params
+ else:
+ only_aliases = all(p.get("alias") for p in cmd_user_params)
+ if not only_aliases:
+ raise ConfigurationException(
+ f"All parameters for command {cmd_name} should have aliases."
+ )
+ merged_by_alias = {
+ **{p.get("alias"): p for p in shared_user_params},
+ **{p.get("alias"): p for p in cmd_user_params},
+ }
+ cmd_user_params = list(merged_by_alias.values())
+
+ user_params[cmd_name] = cmd_user_params
+
+ config["commands"] = commands
+ del user_params["shared"]