aboutsummaryrefslogtreecommitdiff
path: root/src/mlia/backend/executor
diff options
context:
space:
mode:
authorDmitrii Agibov <dmitrii.agibov@arm.com>2023-01-27 09:12:50 +0000
committerBenjamin Klimczak <benjamin.klimczak@arm.com>2023-02-08 15:25:11 +0000
commit3e3dcb9bd5abb88adcd85b4f89e8a81e7f6fa293 (patch)
tree020eee6abef093113de5b49c135c915c37173843 /src/mlia/backend/executor
parent836efd40317a397761ec8b66e3f4398faac43ad0 (diff)
downloadmlia-3e3dcb9bd5abb88adcd85b4f89e8a81e7f6fa293.tar.gz
MLIA-595 Remove old backend configuration mechanism
- Remove old backend configuration code - Install backends into directory ~/.mlia - Rename targets/backends in registry to make it consistent across codebase. Change-Id: I9c8b012fe863280f1c692940c0dcad3ef638aaae
Diffstat (limited to 'src/mlia/backend/executor')
-rw-r--r--src/mlia/backend/executor/__init__.py3
-rw-r--r--src/mlia/backend/executor/application.py170
-rw-r--r--src/mlia/backend/executor/common.py517
-rw-r--r--src/mlia/backend/executor/config.py68
-rw-r--r--src/mlia/backend/executor/execution.py342
-rw-r--r--src/mlia/backend/executor/fs.py88
-rw-r--r--src/mlia/backend/executor/output_consumer.py67
-rw-r--r--src/mlia/backend/executor/proc.py191
-rw-r--r--src/mlia/backend/executor/runner.py98
-rw-r--r--src/mlia/backend/executor/source.py207
-rw-r--r--src/mlia/backend/executor/system.py178
11 files changed, 0 insertions, 1929 deletions
diff --git a/src/mlia/backend/executor/__init__.py b/src/mlia/backend/executor/__init__.py
deleted file mode 100644
index 3d60372..0000000
--- a/src/mlia/backend/executor/__init__.py
+++ /dev/null
@@ -1,3 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""Backend module."""
diff --git a/src/mlia/backend/executor/application.py b/src/mlia/backend/executor/application.py
deleted file mode 100644
index 738ac4e..0000000
--- a/src/mlia/backend/executor/application.py
+++ /dev/null
@@ -1,170 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""Application backend module."""
-from __future__ import annotations
-
-import re
-from pathlib import Path
-from typing import Any
-from typing import cast
-from typing import List
-
-from mlia.backend.executor.common import Backend
-from mlia.backend.executor.common import ConfigurationException
-from mlia.backend.executor.common import get_backend_configs
-from mlia.backend.executor.common import get_backend_directories
-from mlia.backend.executor.common import load_application_configs
-from mlia.backend.executor.common import load_config
-from mlia.backend.executor.common import remove_backend
-from mlia.backend.executor.config import ApplicationConfig
-from mlia.backend.executor.config import ExtendedApplicationConfig
-from mlia.backend.executor.fs import get_backends_path
-from mlia.backend.executor.source import create_destination_and_install
-from mlia.backend.executor.source import get_source
-
-
-def get_available_application_directory_names() -> list[str]:
- """Return a list of directory names for all available applications."""
- return [entry.name for entry in get_backend_directories("applications")]
-
-
-def get_available_applications() -> list[Application]:
- """Return a list with all available applications."""
- available_applications = []
- for config_json in get_backend_configs("applications"):
- config_entries = cast(List[ExtendedApplicationConfig], load_config(config_json))
- for config_entry in config_entries:
- config_entry["config_location"] = config_json.parent.absolute()
- applications = load_applications(config_entry)
- available_applications += applications
-
- return sorted(available_applications, key=lambda application: application.name)
-
-
-def get_application(
- application_name: str, system_name: str | None = None
-) -> list[Application]:
- """Return a list of application instances with provided name."""
- return [
- application
- for application in get_available_applications()
- if application.name == application_name
- and (not system_name or application.can_run_on(system_name))
- ]
-
-
-def install_application(source_path: Path) -> None:
- """Install application."""
- try:
- source = get_source(source_path)
- config = cast(List[ExtendedApplicationConfig], source.config())
- applications_to_install = [
- s for entry in config for s in load_applications(entry)
- ]
- except Exception as error:
- raise ConfigurationException("Unable to read application definition") from error
-
- if not applications_to_install:
- raise ConfigurationException("No application definition found")
-
- available_applications = get_available_applications()
- already_installed = [
- s for s in applications_to_install if s in available_applications
- ]
- if already_installed:
- names = {application.name for application in already_installed}
- raise ConfigurationException(
- f"Applications [{','.join(names)}] are already installed."
- )
-
- create_destination_and_install(source, get_backends_path("applications"))
-
-
-def remove_application(directory_name: str) -> None:
- """Remove application directory."""
- remove_backend(directory_name, "applications")
-
-
-def get_unique_application_names(system_name: str | None = None) -> list[str]:
- """Extract a list of unique application names of all application available."""
- return list(
- {
- application.name
- for application in get_available_applications()
- if not system_name or application.can_run_on(system_name)
- }
- )
-
-
-class Application(Backend):
- """Class for representing a single application component."""
-
- def __init__(self, config: ApplicationConfig) -> None:
- """Construct a Application instance from a dict."""
- super().__init__(config)
-
- self.supported_systems = config.get("supported_systems", [])
-
- def __eq__(self, other: object) -> bool:
- """Overload operator ==."""
- if not isinstance(other, Application):
- return False
-
- return (
- super().__eq__(other)
- and self.name == other.name
- and set(self.supported_systems) == set(other.supported_systems)
- )
-
- def can_run_on(self, system_name: str) -> bool:
- """Check if the application can run on the system passed as argument."""
- return system_name in self.supported_systems
-
- def get_details(self) -> dict[str, Any]:
- """Return dictionary with information about the Application instance."""
- output = {
- "type": "application",
- "name": self.name,
- "description": self.description,
- "supported_systems": self.supported_systems,
- "commands": self._get_command_details(),
- }
-
- return output
-
- def remove_unused_params(self) -> None:
- """Remove unused params in commands.
-
- After merging default and system related configuration application
- could have parameters that are not being used in commands. They
- should be removed.
- """
- for command in self.commands.values():
- indexes_or_aliases = [
- m
- for cmd_str in command.command_strings
- for m in re.findall(r"{user_params:(?P<index_or_alias>\w+)}", cmd_str)
- ]
-
- only_aliases = all(not item.isnumeric() for item in indexes_or_aliases)
- if only_aliases:
- used_params = [
- param
- for param in command.params
- if param.alias in indexes_or_aliases
- ]
- command.params = used_params
-
-
-def load_applications(config: ExtendedApplicationConfig) -> list[Application]:
- """Load application.
-
- Application configuration could contain different parameters/commands for different
- supported systems. For each supported system this function will return separate
- Application instance with appropriate configuration.
- """
- configs = load_application_configs(config, ApplicationConfig)
- applications = [Application(cfg) for cfg in configs]
- for application in applications:
- application.remove_unused_params()
- return applications
diff --git a/src/mlia/backend/executor/common.py b/src/mlia/backend/executor/common.py
deleted file mode 100644
index 48dbd4a..0000000
--- a/src/mlia/backend/executor/common.py
+++ /dev/null
@@ -1,517 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""Contain all common functions for the backends."""
-from __future__ import annotations
-
-import json
-import logging
-import re
-from abc import ABC
-from collections import Counter
-from pathlib import Path
-from typing import Any
-from typing import Callable
-from typing import cast
-from typing import Final
-from typing import IO
-from typing import Iterable
-from typing import Match
-from typing import NamedTuple
-from typing import Pattern
-
-from mlia.backend.executor.config import BackendConfig
-from mlia.backend.executor.config import BaseBackendConfig
-from mlia.backend.executor.config import NamedExecutionConfig
-from mlia.backend.executor.config import UserParamConfig
-from mlia.backend.executor.config import UserParamsConfig
-from mlia.backend.executor.fs import get_backends_path
-from mlia.backend.executor.fs import remove_resource
-from mlia.backend.executor.fs import ResourceType
-
-
-BACKEND_CONFIG_FILE: Final[str] = "backend-config.json"
-
-
-class ConfigurationException(Exception):
- """Configuration exception."""
-
-
-def get_backend_config(dir_path: Path) -> Path:
- """Get path to backendir configuration file."""
- return dir_path / BACKEND_CONFIG_FILE
-
-
-def get_backend_configs(resource_type: ResourceType) -> Iterable[Path]:
- """Get path to the backend configs for provided resource_type."""
- return (
- get_backend_config(entry) for entry in get_backend_directories(resource_type)
- )
-
-
-def get_backend_directories(resource_type: ResourceType) -> Iterable[Path]:
- """Get path to the backend directories for provided resource_type."""
- return (
- entry
- for entry in get_backends_path(resource_type).iterdir()
- if is_backend_directory(entry)
- )
-
-
-def is_backend_directory(dir_path: Path) -> bool:
- """Check if path is backend's configuration directory."""
- return dir_path.is_dir() and get_backend_config(dir_path).is_file()
-
-
-def remove_backend(directory_name: str, resource_type: ResourceType) -> None:
- """Remove backend with provided type and directory_name."""
- if not directory_name:
- raise Exception("No directory name provided")
-
- remove_resource(directory_name, resource_type)
-
-
-def load_config(config: Path | IO[bytes] | None) -> BackendConfig:
- """Return a loaded json file."""
- if config is None:
- raise Exception("Unable to read config")
-
- if isinstance(config, Path):
- with config.open() as json_file:
- return cast(BackendConfig, json.load(json_file))
-
- return cast(BackendConfig, json.load(config))
-
-
-def parse_raw_parameter(parameter: str) -> tuple[str, str | None]:
- """Split the parameter string in name and optional value.
-
- It manages the following cases:
- --param=1 -> --param, 1
- --param 1 -> --param, 1
- --flag -> --flag, None
- """
- data = re.split(" |=", parameter)
- if len(data) == 1:
- param_name = data[0]
- param_value = None
- else:
- param_name = " ".join(data[0:-1])
- param_value = data[-1]
- return param_name, param_value
-
-
-class DataPaths(NamedTuple):
- """DataPaths class."""
-
- src: Path
- dst: str
-
-
-class Backend(ABC):
- """Backend class."""
-
- # pylint: disable=too-many-instance-attributes
-
- def __init__(self, config: BaseBackendConfig):
- """Initialize backend."""
- name = config.get("name")
- if not name:
- raise ConfigurationException("Name is empty")
-
- self.name = name
- self.description = config.get("description", "")
- self.config_location = config.get("config_location")
- self.variables = config.get("variables", {})
- self.annotations = config.get("annotations", {})
-
- self._parse_commands_and_params(config)
-
- def validate_parameter(self, command_name: str, parameter: str) -> bool:
- """Validate the parameter string against the application configuration.
-
- We take the parameter string, extract the parameter name/value and
- check them against the current configuration.
- """
- param_name, param_value = parse_raw_parameter(parameter)
- valid_param_name = valid_param_value = False
-
- command = self.commands.get(command_name)
- if not command:
- raise AttributeError(f"Unknown command: '{command_name}'")
-
- # Iterate over all available parameters until we have a match.
- for param in command.params:
- if self._same_parameter(param_name, param):
- valid_param_name = True
- # This is a non-empty list
- if param.values:
- # We check if the value is allowed in the configuration
- valid_param_value = param_value in param.values
- else:
- # In this case we don't validate the value and accept
- # whatever we have set.
- valid_param_value = True
- break
-
- return valid_param_name and valid_param_value
-
- def __eq__(self, other: object) -> bool:
- """Overload operator ==."""
- if not isinstance(other, Backend):
- return False
-
- return (
- self.name == other.name
- and self.description == other.description
- and self.commands == other.commands
- )
-
- def __repr__(self) -> str:
- """Represent the Backend instance by its name."""
- return self.name
-
- def _parse_commands_and_params(self, config: BaseBackendConfig) -> None:
- """Parse commands and user parameters."""
- self.commands: dict[str, Command] = {}
-
- commands = config.get("commands")
- if commands:
- params = config.get("user_params")
-
- for command_name in commands.keys():
- command_params = self._parse_params(params, command_name)
- command_strings = [
- self._substitute_variables(cmd)
- for cmd in commands.get(command_name, [])
- ]
- self.commands[command_name] = Command(command_strings, command_params)
-
- def _substitute_variables(self, str_val: str) -> str:
- """Substitute variables in string.
-
- Variables is being substituted at backend's creation stage because
- they could contain references to other params which will be
- resolved later.
- """
- if not str_val:
- return str_val
-
- var_pattern: Final[Pattern] = re.compile(r"{variables:(?P<var_name>\w+)}")
-
- def var_value(match: Match) -> str:
- var_name = match["var_name"]
- if var_name not in self.variables:
- raise ConfigurationException(f"Unknown variable {var_name}")
-
- return self.variables[var_name]
-
- return var_pattern.sub(var_value, str_val)
-
- @classmethod
- def _parse_params(
- cls, params: UserParamsConfig | None, command: str
- ) -> list[Param]:
- if not params:
- return []
-
- return [cls._parse_param(p) for p in params.get(command, [])]
-
- @classmethod
- def _parse_param(cls, param: UserParamConfig) -> Param:
- """Parse a single parameter."""
- name = param.get("name")
- if name is not None and not name:
- raise ConfigurationException("Parameter has an empty 'name' attribute.")
- values = param.get("values", None)
- default_value = param.get("default_value", None)
- description = param.get("description", "")
- alias = param.get("alias")
-
- return Param(
- name=name,
- description=description,
- values=values,
- default_value=default_value,
- alias=alias,
- )
-
- def _get_command_details(self) -> dict:
- command_details = {
- command_name: command.get_details()
- for command_name, command in self.commands.items()
- }
- return command_details
-
- def _get_user_param_value(self, user_params: list[str], param: Param) -> str | None:
- """Get the user-specified value of a parameter."""
- for user_param in user_params:
- user_param_name, user_param_value = parse_raw_parameter(user_param)
- if user_param_name == param.name:
- warn_message = (
- "The direct use of parameter name is deprecated"
- " and might be removed in the future.\n"
- f"Please use alias '{param.alias}' instead of "
- "'{user_param_name}' to provide the parameter."
- )
- logging.warning(warn_message)
-
- if self._same_parameter(user_param_name, param):
- return user_param_value
-
- return None
-
- @staticmethod
- def _same_parameter(user_param_name_or_alias: str, param: Param) -> bool:
- """Compare user parameter name with param name or alias."""
- # Strip the "=" sign in the param_name. This is needed just for
- # comparison with the parameters passed by the user.
- # The equal sign needs to be honoured when re-building the
- # parameter back.
- param_name = None if not param.name else param.name.rstrip("=")
- return user_param_name_or_alias in [param_name, param.alias]
-
- def resolved_parameters(
- self, command_name: str, user_params: list[str]
- ) -> list[tuple[str | None, Param]]:
- """Return list of parameters with values."""
- result: list[tuple[str | None, Param]] = []
- command = self.commands.get(command_name)
- if not command:
- return result
-
- for param in command.params:
- value = self._get_user_param_value(user_params, param)
- if not value:
- value = param.default_value
- result.append((value, param))
-
- return result
-
- def build_command(
- self,
- command_name: str,
- user_params: list[str],
- param_resolver: Callable[[str, str, list[tuple[str | None, Param]]], str],
- ) -> list[str]:
- """
- Return a list of executable command strings.
-
- Given a command and associated parameters, returns a list of executable command
- strings.
- """
- command = self.commands.get(command_name)
- if not command:
- raise ConfigurationException(
- f"Command '{command_name}' could not be found."
- )
-
- commands_to_run = []
-
- params_values = self.resolved_parameters(command_name, user_params)
- for cmd_str in command.command_strings:
- cmd_str = resolve_all_parameters(
- cmd_str, param_resolver, command_name, params_values
- )
- commands_to_run.append(cmd_str)
-
- return commands_to_run
-
-
-class Param:
- """Class for representing a generic application parameter."""
-
- def __init__( # pylint: disable=too-many-arguments
- self,
- name: str | None,
- description: str,
- values: list[str] | None = None,
- default_value: str | None = None,
- alias: str | None = None,
- ) -> None:
- """Construct a Param instance."""
- if not name and not alias:
- raise ConfigurationException(
- "Either name, alias or both must be set to identify a parameter."
- )
- self.name = name
- self.values = values
- self.description = description
- self.default_value = default_value
- self.alias = alias
-
- def get_details(self) -> dict:
- """Return a dictionary with all relevant information of a Param."""
- return {key: value for key, value in self.__dict__.items() if value}
-
- def __eq__(self, other: object) -> bool:
- """Overload operator ==."""
- if not isinstance(other, Param):
- return False
-
- return (
- self.name == other.name
- and self.values == other.values
- and self.default_value == other.default_value
- and self.description == other.description
- )
-
-
-class Command:
- """Class for representing a command."""
-
- def __init__(
- self, command_strings: list[str], params: list[Param] | None = None
- ) -> None:
- """Construct a Command instance."""
- self.command_strings = command_strings
-
- if params:
- self.params = params
- else:
- self.params = []
-
- self._validate()
-
- def _validate(self) -> None:
- """Validate command."""
- if not self.params:
- return
-
- aliases = [param.alias for param in self.params if param.alias is not None]
- repeated_aliases = [
- alias for alias, count in Counter(aliases).items() if count > 1
- ]
-
- if repeated_aliases:
- raise ConfigurationException(
- f"Non-unique aliases {', '.join(repeated_aliases)}"
- )
-
- both_name_and_alias = [
- param.name
- for param in self.params
- if param.name in aliases and param.name != param.alias
- ]
- if both_name_and_alias:
- raise ConfigurationException(
- f"Aliases {', '.join(both_name_and_alias)} could not be used "
- "as parameter name."
- )
-
- def get_details(self) -> dict:
- """Return a dictionary with all relevant information of a Command."""
- output = {
- "command_strings": self.command_strings,
- "user_params": [param.get_details() for param in self.params],
- }
- return output
-
- def __eq__(self, other: object) -> bool:
- """Overload operator ==."""
- if not isinstance(other, Command):
- return False
-
- return (
- self.command_strings == other.command_strings
- and self.params == other.params
- )
-
-
-def resolve_all_parameters(
- str_val: str,
- param_resolver: Callable[[str, str, list[tuple[str | None, Param]]], str],
- command_name: str | None = None,
- params_values: list[tuple[str | None, Param]] | None = None,
-) -> str:
- """Resolve all parameters in the string."""
- if not str_val:
- return str_val
-
- param_pattern: Final[Pattern] = re.compile(r"{(?P<param_name>[\w.:]+)}")
- while param_pattern.findall(str_val):
- str_val = param_pattern.sub(
- lambda m: param_resolver(
- m["param_name"], command_name or "", params_values or []
- ),
- str_val,
- )
- return str_val
-
-
-def load_application_configs(
- config: Any,
- config_type: type[Any],
- is_system_required: bool = True,
-) -> Any:
- """Get one config for each system supported by the application.
-
- The configuration could contain different parameters/commands for different
- supported systems. For each supported system this function will return separate
- config with appropriate configuration.
- """
- merged_configs = []
- supported_systems: list[NamedExecutionConfig] | None = config.get(
- "supported_systems"
- )
- if not supported_systems:
- if is_system_required:
- raise ConfigurationException("No supported systems definition provided")
- # Create an empty system to be used in the parsing below
- supported_systems = [cast(NamedExecutionConfig, {})]
-
- default_user_params = config.get("user_params", {})
-
- def merge_config(system: NamedExecutionConfig) -> Any:
- system_name = system.get("name")
- if not system_name and is_system_required:
- raise ConfigurationException(
- "Unable to read supported system definition, name is missed"
- )
-
- merged_config = config_type(**config)
- merged_config["supported_systems"] = [system_name] if system_name else []
- # merge default configuration and specific to the system
- merged_config["commands"] = {
- **config.get("commands", {}),
- **system.get("commands", {}),
- }
-
- params = {}
- tool_user_params = system.get("user_params", {})
- command_names = tool_user_params.keys() | default_user_params.keys()
- for command_name in command_names:
- if command_name not in merged_config["commands"]:
- continue
-
- params_default = default_user_params.get(command_name, [])
- params_tool = tool_user_params.get(command_name, [])
- if not params_default or not params_tool:
- params[command_name] = params_tool or params_default
- if params_default and params_tool:
- if any(not p.get("alias") for p in params_default):
- raise ConfigurationException(
- f"Default parameters for command {command_name} "
- "should have aliases"
- )
- if any(not p.get("alias") for p in params_tool):
- raise ConfigurationException(
- f"{system_name} parameters for command {command_name} "
- "should have aliases."
- )
-
- merged_by_alias = {
- **{p.get("alias"): p for p in params_default},
- **{p.get("alias"): p for p in params_tool},
- }
- params[command_name] = list(merged_by_alias.values())
-
- merged_config["user_params"] = params
- merged_config["variables"] = {
- **config.get("variables", {}),
- **system.get("variables", {}),
- }
- return merged_config
-
- merged_configs = [merge_config(system) for system in supported_systems]
-
- return merged_configs
diff --git a/src/mlia/backend/executor/config.py b/src/mlia/backend/executor/config.py
deleted file mode 100644
index dca53da..0000000
--- a/src/mlia/backend/executor/config.py
+++ /dev/null
@@ -1,68 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""Contain definition of backend configuration."""
-from __future__ import annotations
-
-from pathlib import Path
-from typing import Dict
-from typing import List
-from typing import TypedDict
-from typing import Union
-
-
-class UserParamConfig(TypedDict, total=False):
- """User parameter configuration."""
-
- name: str | None
- default_value: str
- values: list[str]
- description: str
- alias: str
-
-
-UserParamsConfig = Dict[str, List[UserParamConfig]]
-
-
-class ExecutionConfig(TypedDict, total=False):
- """Execution configuration."""
-
- commands: dict[str, list[str]]
- user_params: UserParamsConfig
- variables: dict[str, str]
-
-
-class NamedExecutionConfig(ExecutionConfig):
- """Execution configuration with name."""
-
- name: str
-
-
-class BaseBackendConfig(ExecutionConfig, total=False):
- """Base backend configuration."""
-
- name: str
- description: str
- config_location: Path
- annotations: dict[str, str | list[str]]
-
-
-class ApplicationConfig(BaseBackendConfig, total=False):
- """Application configuration."""
-
- supported_systems: list[str]
-
-
-class ExtendedApplicationConfig(BaseBackendConfig, total=False):
- """Extended application configuration."""
-
- supported_systems: list[NamedExecutionConfig]
-
-
-class SystemConfig(BaseBackendConfig, total=False):
- """System configuration."""
-
- reporting: dict[str, dict]
-
-
-BackendItemConfig = Union[ApplicationConfig, SystemConfig]
-BackendConfig = Union[List[ExtendedApplicationConfig], List[SystemConfig]]
diff --git a/src/mlia/backend/executor/execution.py b/src/mlia/backend/executor/execution.py
deleted file mode 100644
index e253b16..0000000
--- a/src/mlia/backend/executor/execution.py
+++ /dev/null
@@ -1,342 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""Application execution module."""
-from __future__ import annotations
-
-import logging
-import re
-from typing import cast
-
-from mlia.backend.executor.application import Application
-from mlia.backend.executor.application import get_application
-from mlia.backend.executor.common import Backend
-from mlia.backend.executor.common import ConfigurationException
-from mlia.backend.executor.common import Param
-from mlia.backend.executor.system import get_system
-from mlia.backend.executor.system import System
-
-logger = logging.getLogger(__name__)
-
-
-class AnotherInstanceIsRunningException(Exception):
- """Concurrent execution error."""
-
-
-class ExecutionContext: # pylint: disable=too-few-public-methods
- """Command execution context."""
-
- def __init__(
- self,
- app: Application,
- app_params: list[str],
- system: System,
- system_params: list[str],
- ):
- """Init execution context."""
- self.app = app
- self.app_params = app_params
- self.system = system
- self.system_params = system_params
-
- self.param_resolver = ParamResolver(self)
-
- self.stdout: bytearray | None = None
- self.stderr: bytearray | None = None
-
-
-class ParamResolver:
- """Parameter resolver."""
-
- def __init__(self, context: ExecutionContext):
- """Init parameter resolver."""
- self.ctx = context
-
- @staticmethod
- def resolve_user_params(
- cmd_name: str | None,
- index_or_alias: str,
- resolved_params: list[tuple[str | None, Param]] | None,
- ) -> str:
- """Resolve user params."""
- if not cmd_name or resolved_params is None:
- raise ConfigurationException("Unable to resolve user params")
-
- param_value: str | None = None
- param: Param | None = None
-
- if index_or_alias.isnumeric():
- i = int(index_or_alias)
- if i not in range(len(resolved_params)):
- raise ConfigurationException(
- f"Invalid index {i} for user params of command {cmd_name}"
- )
- param_value, param = resolved_params[i]
- else:
- for val, par in resolved_params:
- if par.alias == index_or_alias:
- param_value, param = val, par
- break
-
- if param is None:
- raise ConfigurationException(
- f"No user parameter for command '{cmd_name}' with "
- f"alias '{index_or_alias}'."
- )
-
- if param_value:
- # We need to handle to cases of parameters here:
- # 1) Optional parameters (non-positional with a name and value)
- # 2) Positional parameters (value only, no name needed)
- # Default to empty strings for positional arguments
- param_name = ""
- separator = ""
- if param.name is not None:
- # A valid param name means we have an optional/non-positional argument:
- # The separator is an empty string in case the param_name
- # has an equal sign as we have to honour it.
- # If the parameter doesn't end with an equal sign then a
- # space character is injected to split the parameter name
- # and its value
- param_name = param.name
- separator = "" if param.name.endswith("=") else " "
-
- return f"{param_name}{separator}{param_value}"
-
- if param.name is None:
- raise ConfigurationException(
- f"Missing user parameter with alias '{index_or_alias}' for "
- f"command '{cmd_name}'."
- )
-
- return param.name # flag: just return the parameter name
-
- def resolve_commands_and_params(
- self, backend_type: str, cmd_name: str, return_params: bool, index_or_alias: str
- ) -> str:
- """Resolve command or command's param value."""
- if backend_type == "system":
- backend = cast(Backend, self.ctx.system)
- backend_params = self.ctx.system_params
- else: # Application backend
- backend = cast(Backend, self.ctx.app)
- backend_params = self.ctx.app_params
-
- if cmd_name not in backend.commands:
- raise ConfigurationException(f"Command {cmd_name} not found")
-
- if return_params:
- params = backend.resolved_parameters(cmd_name, backend_params)
- if index_or_alias.isnumeric():
- i = int(index_or_alias)
- if i not in range(len(params)):
- raise ConfigurationException(
- f"Invalid parameter index {i} for command {cmd_name}"
- )
-
- param_value = params[i][0]
- else:
- param_value = None
- for value, param in params:
- if param.alias == index_or_alias:
- param_value = value
- break
-
- if not param_value:
- raise ConfigurationException(
- "No value for parameter with index or "
- f"alias {index_or_alias} of command {cmd_name}."
- )
- return param_value
-
- if not index_or_alias.isnumeric():
- raise ConfigurationException(f"Bad command index {index_or_alias}")
-
- i = int(index_or_alias)
- commands = backend.build_command(cmd_name, backend_params, self.param_resolver)
- if i not in range(len(commands)):
- raise ConfigurationException(f"Invalid index {i} for command {cmd_name}")
-
- return commands[i]
-
- def resolve_variables(self, backend_type: str, var_name: str) -> str:
- """Resolve variable value."""
- if backend_type == "system":
- backend = cast(Backend, self.ctx.system)
- else: # Application backend
- backend = cast(Backend, self.ctx.app)
-
- if var_name not in backend.variables:
- raise ConfigurationException(f"Unknown variable {var_name}")
-
- return backend.variables[var_name]
-
- def param_matcher(
- self,
- param_name: str,
- cmd_name: str | None,
- resolved_params: list[tuple[str | None, Param]] | None,
- ) -> str:
- """Regexp to resolve a param from the param_name."""
- # this pattern supports parameter names like "application.commands.run:0" and
- # "system.commands.run.params:0"
- # Note: 'software' is included for backward compatibility.
- commands_and_params_match = re.match(
- r"(?P<type>application|software|system)[.]commands[.]"
- r"(?P<name>\w+)"
- r"(?P<params>[.]params|)[:]"
- r"(?P<index_or_alias>\w+)",
- param_name,
- )
-
- if commands_and_params_match:
- backend_type, cmd_name, return_params, index_or_alias = (
- commands_and_params_match["type"],
- commands_and_params_match["name"],
- commands_and_params_match["params"],
- commands_and_params_match["index_or_alias"],
- )
- return self.resolve_commands_and_params(
- backend_type, cmd_name, bool(return_params), index_or_alias
- )
-
- # Note: 'software' is included for backward compatibility.
- variables_match = re.match(
- r"(?P<type>application|software|system)[.]variables:(?P<var_name>\w+)",
- param_name,
- )
- if variables_match:
- backend_type, var_name = (
- variables_match["type"],
- variables_match["var_name"],
- )
- return self.resolve_variables(backend_type, var_name)
-
- user_params_match = re.match(r"user_params:(?P<index_or_alias>\w+)", param_name)
- if user_params_match:
- index_or_alias = user_params_match["index_or_alias"]
- return self.resolve_user_params(cmd_name, index_or_alias, resolved_params)
-
- raise ConfigurationException(f"Unable to resolve parameter {param_name}")
-
- def param_resolver(
- self,
- param_name: str,
- cmd_name: str | None = None,
- resolved_params: list[tuple[str | None, Param]] | None = None,
- ) -> str:
- """Resolve parameter value based on current execution context."""
- # Note: 'software.*' is included for backward compatibility.
- resolved_param = None
- if param_name in ["application.name", "software.name"]:
- resolved_param = self.ctx.app.name
- elif param_name in ["application.description", "software.description"]:
- resolved_param = self.ctx.app.description
- elif self.ctx.app.config_location and (
- param_name in ["application.config_dir", "software.config_dir"]
- ):
- resolved_param = str(self.ctx.app.config_location.absolute())
- elif self.ctx.system is not None:
- if param_name == "system.name":
- resolved_param = self.ctx.system.name
- elif param_name == "system.description":
- resolved_param = self.ctx.system.description
- elif param_name == "system.config_dir" and self.ctx.system.config_location:
- resolved_param = str(self.ctx.system.config_location.absolute())
-
- if not resolved_param:
- resolved_param = self.param_matcher(param_name, cmd_name, resolved_params)
- return resolved_param
-
- def __call__(
- self,
- param_name: str,
- cmd_name: str | None = None,
- resolved_params: list[tuple[str | None, Param]] | None = None,
- ) -> str:
- """Resolve provided parameter."""
- return self.param_resolver(param_name, cmd_name, resolved_params)
-
-
-def validate_parameters(
- backend: Backend, command_names: list[str], params: list[str]
-) -> None:
- """Check parameters passed to backend."""
- for param in params:
- acceptable = any(
- backend.validate_parameter(command_name, param)
- for command_name in command_names
- if command_name in backend.commands
- )
-
- if not acceptable:
- backend_type = "System" if isinstance(backend, System) else "Application"
- raise ValueError(
- f"{backend_type} parameter '{param}' not valid for "
- f"command '{' or '.join(command_names)}'."
- )
-
-
-def get_application_by_name_and_system(
- application_name: str, system_name: str
-) -> Application:
- """Get application."""
- applications = get_application(application_name, system_name)
- if not applications:
- raise ValueError(
- f"Application '{application_name}' doesn't support the "
- f"system '{system_name}'."
- )
-
- if len(applications) != 1:
- raise ValueError(
- f"Error during getting application {application_name} for the "
- f"system {system_name}."
- )
-
- return applications[0]
-
-
-def get_application_and_system(
- application_name: str, system_name: str
-) -> tuple[Application, System]:
- """Return application and system by provided names."""
- system = get_system(system_name)
- if not system:
- raise ValueError(f"System {system_name} is not found.")
-
- application = get_application_by_name_and_system(application_name, system_name)
-
- return application, system
-
-
-def run_application(
- application_name: str,
- application_params: list[str],
- system_name: str,
- system_params: list[str],
-) -> ExecutionContext:
- """Run application on the provided system."""
- application, system = get_application_and_system(application_name, system_name)
- validate_parameters(application, ["run"], application_params)
- validate_parameters(system, ["run"], system_params)
-
- ctx = ExecutionContext(
- app=application,
- app_params=application_params,
- system=system,
- system_params=system_params,
- )
-
- logger.debug("Generating commands to execute")
- commands_to_run = ctx.system.build_command(
- "run", ctx.system_params, ctx.param_resolver
- )
-
- for command in commands_to_run:
- logger.debug("Running: %s", command)
- exit_code, ctx.stdout, ctx.stderr = ctx.system.run(command)
-
- if exit_code != 0:
- logger.warning("Application exited with exit code %i", exit_code)
-
- return ctx
diff --git a/src/mlia/backend/executor/fs.py b/src/mlia/backend/executor/fs.py
deleted file mode 100644
index 3fce19c..0000000
--- a/src/mlia/backend/executor/fs.py
+++ /dev/null
@@ -1,88 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""Module to host all file system related functions."""
-from __future__ import annotations
-
-import re
-import shutil
-from pathlib import Path
-from typing import Literal
-
-from mlia.utils.filesystem import get_mlia_resources
-
-ResourceType = Literal["applications", "systems"]
-
-
-def get_backend_resources() -> Path:
- """Get backend resources folder path."""
- return get_mlia_resources() / "backends"
-
-
-def get_backends_path(name: ResourceType) -> Path:
- """Return the absolute path of the specified resource.
-
- It uses importlib to return resources packaged with MANIFEST.in.
- """
- if not name:
- raise ResourceWarning("Resource name is not provided")
-
- resource_path = get_backend_resources() / name
- if resource_path.is_dir():
- return resource_path
-
- raise ResourceWarning(f"Resource '{name}' not found.")
-
-
-def copy_directory_content(source: Path, destination: Path) -> None:
- """Copy content of the source directory into destination directory."""
- for item in source.iterdir():
- src = source / item.name
- dest = destination / item.name
-
- if src.is_dir():
- shutil.copytree(src, dest)
- else:
- shutil.copy2(src, dest)
-
-
-def remove_resource(resource_directory: str, resource_type: ResourceType) -> None:
- """Remove resource data."""
- resources = get_backends_path(resource_type)
-
- resource_location = resources / resource_directory
- if not resource_location.exists():
- raise Exception(f"Resource {resource_directory} does not exist")
-
- if not resource_location.is_dir():
- raise Exception(f"Wrong resource {resource_directory}")
-
- shutil.rmtree(resource_location)
-
-
-def remove_directory(directory_path: Path | None) -> None:
- """Remove directory."""
- if not directory_path or not directory_path.is_dir():
- raise Exception("No directory path provided")
-
- shutil.rmtree(directory_path)
-
-
-def recreate_directory(directory_path: Path | None) -> None:
- """Recreate directory."""
- if not directory_path:
- raise Exception("No directory path provided")
-
- if directory_path.exists() and not directory_path.is_dir():
- raise Exception(
- f"Path {str(directory_path)} does exist and it is not a directory."
- )
-
- if directory_path.is_dir():
- remove_directory(directory_path)
-
- directory_path.mkdir()
-
-
-def valid_for_filename(value: str, replacement: str = "") -> str:
- """Replace non alpha numeric characters."""
- return re.sub(r"[^\w.]", replacement, value, flags=re.ASCII)
diff --git a/src/mlia/backend/executor/output_consumer.py b/src/mlia/backend/executor/output_consumer.py
deleted file mode 100644
index 3c3b132..0000000
--- a/src/mlia/backend/executor/output_consumer.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""Output consumers module."""
-from __future__ import annotations
-
-import base64
-import json
-import re
-from typing import Protocol
-from typing import runtime_checkable
-
-
-@runtime_checkable
-class OutputConsumer(Protocol):
- """Protocol to consume output."""
-
- def feed(self, line: str) -> bool:
- """
- Feed a new line to be parsed.
-
- Return True if the line should be removed from the output.
- """
-
-
-class Base64OutputConsumer(OutputConsumer):
- """
- Parser to extract base64-encoded JSON from tagged standard output.
-
- Example of the tagged output:
- ```
- # Encoded JSON: {"test": 1234}
- <metrics>eyJ0ZXN0IjogMTIzNH0</metrics>
- ```
- """
-
- TAG_NAME = "metrics"
-
- def __init__(self) -> None:
- """Set up the regular expression to extract tagged strings."""
- self._regex = re.compile(rf"<{self.TAG_NAME}>(.*)</{self.TAG_NAME}>")
- self.parsed_output: list = []
-
- def feed(self, line: str) -> bool:
- """
- Parse the output line and save the decoded output.
-
- Returns True if the line contains tagged output.
-
- Example:
- Using the tagged output from the class docs the parser should collect
- the following:
- ```
- [
- {"test": 1234}
- ]
- ```
- """
- res_b64 = self._regex.search(line)
- if res_b64:
- res_json = base64.b64decode(res_b64.group(1), validate=True)
- res = json.loads(res_json)
- self.parsed_output.append(res)
- # Remove this line from the output, i.e. consume it, as it
- # does not contain any human readable content.
- return True
-
- return False
diff --git a/src/mlia/backend/executor/proc.py b/src/mlia/backend/executor/proc.py
deleted file mode 100644
index 39a0689..0000000
--- a/src/mlia/backend/executor/proc.py
+++ /dev/null
@@ -1,191 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""Processes module.
-
-This module contains all classes and functions for dealing with Linux
-processes.
-"""
-from __future__ import annotations
-
-import datetime
-import logging
-import shlex
-import signal
-import tempfile
-import time
-from pathlib import Path
-from typing import Any
-
-from sh import Command
-from sh import CommandNotFound
-from sh import ErrorReturnCode
-from sh import RunningCommand
-
-from mlia.backend.executor.fs import valid_for_filename
-
-logger = logging.getLogger(__name__)
-
-
-class CommandFailedException(Exception):
- """Exception for failed command execution."""
-
-
-class ShellCommand:
- """Wrapper class for shell commands."""
-
- def run(
- self,
- cmd: str,
- *args: str,
- _cwd: Path | None = None,
- _tee: bool = True,
- _bg: bool = True,
- _out: Any = None,
- _err: Any = None,
- _search_paths: list[Path] | None = None,
- ) -> RunningCommand:
- """Run the shell command with the given arguments.
-
- There are special arguments that modify the behaviour of the process.
- _cwd: current working directory
- _tee: it redirects the stdout both to console and file
- _bg: if True, it runs the process in background and the command is not
- blocking.
- _out: use this object for stdout redirect,
- _err: use this object for stderr redirect,
- _search_paths: If presented used for searching executable
- """
- try:
- kwargs = {}
- if _cwd:
- kwargs["_cwd"] = str(_cwd)
- command = Command(cmd, _search_paths).bake(args, **kwargs)
- except CommandNotFound as error:
- logging.error("Command '%s' not found", error.args[0])
- raise error
-
- out, err = _out, _err
- if not _out and not _err:
- out, err = (str(item) for item in self.get_stdout_stderr_paths(cmd))
-
- return command(_out=out, _err=err, _tee=_tee, _bg=_bg, _bg_exc=False)
-
- @classmethod
- def get_stdout_stderr_paths(cls, cmd: str) -> tuple[Path, Path]:
- """Construct and returns the paths of stdout/stderr files."""
- timestamp = datetime.datetime.now().timestamp()
- base_path = Path(tempfile.mkdtemp(prefix="mlia-", suffix=f"{timestamp}"))
- base = base_path / f"{valid_for_filename(cmd, '_')}_{timestamp}"
- stdout = base.with_suffix(".out")
- stderr = base.with_suffix(".err")
- try:
- stdout.touch()
- stderr.touch()
- except FileNotFoundError as error:
- logging.error("File not found: %s", error.filename)
- raise error
- return stdout, stderr
-
-
-def parse_command(command: str, shell: str = "bash") -> list[str]:
- """Parse command."""
- cmd, *args = shlex.split(command, posix=True)
-
- if is_shell_script(cmd):
- args = [cmd] + args
- cmd = shell
-
- return [cmd] + args
-
-
-def execute_command( # pylint: disable=invalid-name
- command: str,
- cwd: Path,
- bg: bool = False,
- shell: str = "bash",
- out: Any = None,
- err: Any = None,
-) -> RunningCommand:
- """Execute shell command."""
- cmd, *args = parse_command(command, shell)
-
- search_paths = None
- if cmd != shell and (cwd / cmd).is_file():
- search_paths = [cwd]
-
- return ShellCommand().run(
- cmd, *args, _cwd=cwd, _bg=bg, _search_paths=search_paths, _out=out, _err=err
- )
-
-
-def is_shell_script(cmd: str) -> bool:
- """Check if command is shell script."""
- return cmd.endswith(".sh")
-
-
-def run_and_wait(
- command: str,
- cwd: Path,
- terminate_on_error: bool = False,
- out: Any = None,
- err: Any = None,
-) -> tuple[int, bytearray, bytearray]:
- """
- Run command and wait while it is executing.
-
- Returns a tuple: (exit_code, stdout, stderr)
- """
- running_cmd: RunningCommand | None = None
- try:
- running_cmd = execute_command(command, cwd, bg=True, out=out, err=err)
- return running_cmd.exit_code, running_cmd.stdout, running_cmd.stderr
- except ErrorReturnCode as cmd_failed:
- raise CommandFailedException() from cmd_failed
- except Exception as error:
- is_running = running_cmd is not None and running_cmd.is_alive()
- if terminate_on_error and is_running:
- logger.debug("Terminating ...")
- terminate_command(running_cmd)
-
- raise error
-
-
-def terminate_command(
- running_cmd: RunningCommand,
- wait: bool = True,
- wait_period: float = 0.5,
- number_of_attempts: int = 20,
-) -> None:
- """Terminate running command."""
- try:
- running_cmd.process.signal_group(signal.SIGINT)
- if wait:
- for _ in range(number_of_attempts):
- time.sleep(wait_period)
- if not running_cmd.is_alive():
- return
- logger.error(
- "Unable to terminate process %i. Sending SIGTERM...",
- running_cmd.process.pid,
- )
- running_cmd.process.signal_group(signal.SIGTERM)
- except ProcessLookupError:
- pass
-
-
-def print_command_stdout(command: RunningCommand) -> None:
- """Print the stdout of a command.
-
- The command has 2 states: running and done.
- If the command is running, the output is taken by the running process.
- If the command has ended its execution, the stdout is taken from stdout
- property
- """
- if command.is_alive():
- while True:
- try:
- print(command.next(), end="")
- except StopIteration:
- break
- else:
- print(command.stdout)
diff --git a/src/mlia/backend/executor/runner.py b/src/mlia/backend/executor/runner.py
deleted file mode 100644
index 2330fd9..0000000
--- a/src/mlia/backend/executor/runner.py
+++ /dev/null
@@ -1,98 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""Module for backend runner."""
-from __future__ import annotations
-
-from dataclasses import dataclass
-from pathlib import Path
-
-from mlia.backend.executor.application import get_available_applications
-from mlia.backend.executor.application import install_application
-from mlia.backend.executor.execution import ExecutionContext
-from mlia.backend.executor.execution import run_application
-from mlia.backend.executor.system import get_available_systems
-from mlia.backend.executor.system import install_system
-
-
-@dataclass
-class ExecutionParams:
- """Application execution params."""
-
- application: str
- system: str
- application_params: list[str]
- system_params: list[str]
-
-
-class BackendRunner:
- """Backend runner."""
-
- def __init__(self) -> None:
- """Init BackendRunner instance."""
-
- @staticmethod
- def get_installed_systems() -> list[str]:
- """Get list of the installed systems."""
- return [system.name for system in get_available_systems()]
-
- @staticmethod
- def get_installed_applications(system: str | None = None) -> list[str]:
- """Get list of the installed application."""
- return [
- app.name
- for app in get_available_applications()
- if system is None or app.can_run_on(system)
- ]
-
- def is_application_installed(self, application: str, system: str) -> bool:
- """Return true if requested application installed."""
- return application in self.get_installed_applications(system)
-
- def is_system_installed(self, system: str) -> bool:
- """Return true if requested system installed."""
- return system in self.get_installed_systems()
-
- def systems_installed(self, systems: list[str]) -> bool:
- """Check if all provided systems are installed."""
- if not systems:
- return False
-
- installed_systems = self.get_installed_systems()
- return all(system in installed_systems for system in systems)
-
- def applications_installed(self, applications: list[str]) -> bool:
- """Check if all provided applications are installed."""
- if not applications:
- return False
-
- installed_apps = self.get_installed_applications()
- return all(app in installed_apps for app in applications)
-
- def all_installed(self, systems: list[str], apps: list[str]) -> bool:
- """Check if all provided artifacts are installed."""
- return self.systems_installed(systems) and self.applications_installed(apps)
-
- @staticmethod
- def install_system(system_path: Path) -> None:
- """Install system."""
- install_system(system_path)
-
- @staticmethod
- def install_application(app_path: Path) -> None:
- """Install application."""
- install_application(app_path)
-
- @staticmethod
- def run_application(execution_params: ExecutionParams) -> ExecutionContext:
- """Run requested application."""
- ctx = run_application(
- execution_params.application,
- execution_params.application_params,
- execution_params.system,
- execution_params.system_params,
- )
- return ctx
-
- @staticmethod
- def _params(name: str, params: list[str]) -> list[str]:
- return [p for item in [(name, param) for param in params] for p in item]
diff --git a/src/mlia/backend/executor/source.py b/src/mlia/backend/executor/source.py
deleted file mode 100644
index 6abc49f..0000000
--- a/src/mlia/backend/executor/source.py
+++ /dev/null
@@ -1,207 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""Contain source related classes and functions."""
-from __future__ import annotations
-
-import os
-import shutil
-import tarfile
-from abc import ABC
-from abc import abstractmethod
-from pathlib import Path
-from tarfile import TarFile
-
-from mlia.backend.executor.common import BACKEND_CONFIG_FILE
-from mlia.backend.executor.common import ConfigurationException
-from mlia.backend.executor.common import get_backend_config
-from mlia.backend.executor.common import is_backend_directory
-from mlia.backend.executor.common import load_config
-from mlia.backend.executor.config import BackendConfig
-from mlia.backend.executor.fs import copy_directory_content
-
-
-class Source(ABC):
- """Source class."""
-
- @abstractmethod
- def name(self) -> str | None:
- """Get source name."""
-
- @abstractmethod
- def config(self) -> BackendConfig | None:
- """Get configuration file content."""
-
- @abstractmethod
- def install_into(self, destination: Path) -> None:
- """Install source into destination directory."""
-
- @abstractmethod
- def create_destination(self) -> bool:
- """Return True if destination folder should be created before installation."""
-
-
-class DirectorySource(Source):
- """DirectorySource class."""
-
- def __init__(self, directory_path: Path) -> None:
- """Create the DirectorySource instance."""
- assert isinstance(directory_path, Path)
- self.directory_path = directory_path
-
- def name(self) -> str:
- """Return name of source."""
- return self.directory_path.name
-
- def config(self) -> BackendConfig | None:
- """Return configuration file content."""
- if not is_backend_directory(self.directory_path):
- raise ConfigurationException("No configuration file found")
-
- config_file = get_backend_config(self.directory_path)
- return load_config(config_file)
-
- def install_into(self, destination: Path) -> None:
- """Install source into destination directory."""
- if not destination.is_dir():
- raise ConfigurationException(f"Wrong destination {destination}.")
-
- if not self.directory_path.is_dir():
- raise ConfigurationException(
- f"Directory {self.directory_path} does not exist."
- )
-
- copy_directory_content(self.directory_path, destination)
-
- def create_destination(self) -> bool:
- """Return True if destination folder should be created before installation."""
- return True
-
-
-class TarArchiveSource(Source):
- """TarArchiveSource class."""
-
- def __init__(self, archive_path: Path) -> None:
- """Create the TarArchiveSource class."""
- assert isinstance(archive_path, Path)
- self.archive_path = archive_path
- self._config: BackendConfig | None = None
- self._has_top_level_folder: bool | None = None
- self._name: str | None = None
-
- def _read_archive_content(self) -> None:
- """Read various information about archive."""
- # get source name from archive name (everything without extensions)
- extensions = "".join(self.archive_path.suffixes)
- self._name = self.archive_path.name.rstrip(extensions)
-
- if not self.archive_path.exists():
- return
-
- with self._open(self.archive_path) as archive:
- try:
- config_entry = archive.getmember(BACKEND_CONFIG_FILE)
- self._has_top_level_folder = False
- except KeyError as error_no_config:
- try:
- archive_entries = archive.getnames()
- entries_common_prefix = os.path.commonprefix(archive_entries)
- top_level_dir = entries_common_prefix.rstrip("/")
-
- if not top_level_dir:
- raise RuntimeError(
- "Archive has no top level directory"
- ) from error_no_config
-
- config_path = f"{top_level_dir}/{BACKEND_CONFIG_FILE}"
-
- config_entry = archive.getmember(config_path)
- self._has_top_level_folder = True
- self._name = top_level_dir
- except (KeyError, RuntimeError) as error_no_root_dir_or_config:
- raise ConfigurationException(
- "No configuration file found"
- ) from error_no_root_dir_or_config
-
- content = archive.extractfile(config_entry)
- self._config = load_config(content)
-
- def config(self) -> BackendConfig | None:
- """Return configuration file content."""
- if self._config is None:
- self._read_archive_content()
-
- return self._config
-
- def name(self) -> str | None:
- """Return name of the source."""
- if self._name is None:
- self._read_archive_content()
-
- return self._name
-
- def create_destination(self) -> bool:
- """Return True if destination folder must be created before installation."""
- if self._has_top_level_folder is None:
- self._read_archive_content()
-
- return not self._has_top_level_folder
-
- def install_into(self, destination: Path) -> None:
- """Install source into destination directory."""
- if not destination.is_dir():
- raise ConfigurationException(f"Wrong destination {destination}.")
-
- with self._open(self.archive_path) as archive:
- archive.extractall(destination)
-
- def _open(self, archive_path: Path) -> TarFile:
- """Open archive file."""
- if not archive_path.is_file():
- raise ConfigurationException(f"File {archive_path} does not exist.")
-
- if archive_path.name.endswith("tar.gz") or archive_path.name.endswith("tgz"):
- mode = "r:gz"
- else:
- raise ConfigurationException(f"Unsupported archive type {archive_path}.")
-
- # The returned TarFile object can be used as a context manager (using
- # 'with') by the calling instance.
- return tarfile.open( # pylint: disable=consider-using-with
- self.archive_path, mode=mode
- )
-
-
-def get_source(source_path: Path) -> TarArchiveSource | DirectorySource:
- """Return appropriate source instance based on provided source path."""
- if source_path.is_file():
- return TarArchiveSource(source_path)
-
- if source_path.is_dir():
- return DirectorySource(source_path)
-
- raise ConfigurationException(f"Unable to read {source_path}.")
-
-
-def create_destination_and_install(source: Source, resource_path: Path) -> None:
- """Create destination directory and install source.
-
- This function is used for actual installation of system/backend New
- directory will be created inside :resource_path: if needed If for example
- archive contains top level folder then no need to create new directory
- """
- destination = resource_path
- create_destination = source.create_destination()
-
- if create_destination:
- name = source.name()
- if not name:
- raise ConfigurationException("Unable to get source name.")
-
- destination = resource_path / name
- destination.mkdir()
- try:
- source.install_into(destination)
- except Exception as error:
- if create_destination:
- shutil.rmtree(destination)
- raise error
diff --git a/src/mlia/backend/executor/system.py b/src/mlia/backend/executor/system.py
deleted file mode 100644
index a5ecf19..0000000
--- a/src/mlia/backend/executor/system.py
+++ /dev/null
@@ -1,178 +0,0 @@
-# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates.
-# SPDX-License-Identifier: Apache-2.0
-"""System backend module."""
-from __future__ import annotations
-
-from pathlib import Path
-from typing import Any
-from typing import cast
-from typing import List
-
-from mlia.backend.executor.common import Backend
-from mlia.backend.executor.common import ConfigurationException
-from mlia.backend.executor.common import get_backend_configs
-from mlia.backend.executor.common import get_backend_directories
-from mlia.backend.executor.common import load_config
-from mlia.backend.executor.common import remove_backend
-from mlia.backend.executor.config import SystemConfig
-from mlia.backend.executor.fs import get_backends_path
-from mlia.backend.executor.proc import run_and_wait
-from mlia.backend.executor.source import create_destination_and_install
-from mlia.backend.executor.source import get_source
-
-
-class System(Backend):
- """System class."""
-
- def __init__(self, config: SystemConfig) -> None:
- """Construct the System class using the dictionary passed."""
- super().__init__(config)
-
- self._setup_reporting(config)
-
- def _setup_reporting(self, config: SystemConfig) -> None:
- self.reporting = config.get("reporting")
-
- def run(self, command: str) -> tuple[int, bytearray, bytearray]:
- """
- Run command on the system.
-
- Returns a tuple: (exit_code, stdout, stderr)
- """
- cwd = self.config_location
- if not isinstance(cwd, Path) or not cwd.is_dir():
- raise ConfigurationException(
- f"System has invalid config location: {cwd}",
- )
-
- stdout = bytearray()
- stderr = bytearray()
-
- return run_and_wait(
- command,
- cwd=cwd,
- terminate_on_error=True,
- out=stdout,
- err=stderr,
- )
-
- def __eq__(self, other: object) -> bool:
- """Overload operator ==."""
- if not isinstance(other, System):
- return False
-
- return super().__eq__(other) and self.name == other.name
-
- def get_details(self) -> dict[str, Any]:
- """Return a dictionary with all relevant information of a System."""
- output = {
- "type": "system",
- "name": self.name,
- "description": self.description,
- "commands": self._get_command_details(),
- "annotations": self.annotations,
- }
-
- return output
-
-
-def get_available_systems_directory_names() -> list[str]:
- """Return a list of directory names for all avialable systems."""
- return [entry.name for entry in get_backend_directories("systems")]
-
-
-def get_available_systems() -> list[System]:
- """Return a list with all available systems."""
- available_systems = []
- for config_json in get_backend_configs("systems"):
- config_entries = cast(List[SystemConfig], (load_config(config_json)))
- for config_entry in config_entries:
- config_entry["config_location"] = config_json.parent.absolute()
- system = load_system(config_entry)
- available_systems.append(system)
-
- return sorted(available_systems, key=lambda system: system.name)
-
-
-def get_system(system_name: str) -> System:
- """Return a system instance with the same name passed as argument."""
- available_systems = get_available_systems()
- for system in available_systems:
- if system_name == system.name:
- return system
- raise ConfigurationException(f"System '{system_name}' not found.")
-
-
-def install_system(source_path: Path) -> None:
- """Install new system."""
- try:
- source = get_source(source_path)
- config = cast(List[SystemConfig], source.config())
- systems_to_install = [load_system(entry) for entry in config]
- except Exception as error:
- raise ConfigurationException("Unable to read system definition") from error
-
- if not systems_to_install:
- raise ConfigurationException("No system definition found")
-
- available_systems = get_available_systems()
- already_installed = [s for s in systems_to_install if s in available_systems]
- if already_installed:
- names = [system.name for system in already_installed]
- raise ConfigurationException(
- f"Systems [{','.join(names)}] are already installed."
- )
-
- create_destination_and_install(source, get_backends_path("systems"))
-
-
-def remove_system(directory_name: str) -> None:
- """Remove system."""
- remove_backend(directory_name, "systems")
-
-
-def load_system(config: SystemConfig) -> System:
- """Load system based on it's execution type."""
- populate_shared_params(config)
-
- return System(config)
-
-
-def populate_shared_params(config: SystemConfig) -> None:
- """Populate command parameters with shared parameters."""
- user_params = config.get("user_params")
- if not user_params or "shared" not in user_params:
- return
-
- shared_user_params = user_params["shared"]
- if not shared_user_params:
- return
-
- only_aliases = all(p.get("alias") for p in shared_user_params)
- if not only_aliases:
- raise ConfigurationException("All shared parameters should have aliases")
-
- commands = config.get("commands", {})
- for cmd_name in ["run"]:
- command = commands.get(cmd_name)
- if command is None:
- commands[cmd_name] = []
- cmd_user_params = user_params.get(cmd_name)
- if not cmd_user_params:
- cmd_user_params = shared_user_params
- else:
- only_aliases = all(p.get("alias") for p in cmd_user_params)
- if not only_aliases:
- raise ConfigurationException(
- f"All parameters for command {cmd_name} should have aliases."
- )
- merged_by_alias = {
- **{p.get("alias"): p for p in shared_user_params},
- **{p.get("alias"): p for p in cmd_user_params},
- }
- cmd_user_params = list(merged_by_alias.values())
-
- user_params[cmd_name] = cmd_user_params
-
- config["commands"] = commands
- del user_params["shared"]