diff options
Diffstat (limited to 'src/mlia/backend/executor/execution.py')
-rw-r--r-- | src/mlia/backend/executor/execution.py | 342 |
1 files changed, 342 insertions, 0 deletions
diff --git a/src/mlia/backend/executor/execution.py b/src/mlia/backend/executor/execution.py new file mode 100644 index 0000000..e253b16 --- /dev/null +++ b/src/mlia/backend/executor/execution.py @@ -0,0 +1,342 @@ +# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. +# SPDX-License-Identifier: Apache-2.0 +"""Application execution module.""" +from __future__ import annotations + +import logging +import re +from typing import cast + +from mlia.backend.executor.application import Application +from mlia.backend.executor.application import get_application +from mlia.backend.executor.common import Backend +from mlia.backend.executor.common import ConfigurationException +from mlia.backend.executor.common import Param +from mlia.backend.executor.system import get_system +from mlia.backend.executor.system import System + +logger = logging.getLogger(__name__) + + +class AnotherInstanceIsRunningException(Exception): + """Concurrent execution error.""" + + +class ExecutionContext: # pylint: disable=too-few-public-methods + """Command execution context.""" + + def __init__( + self, + app: Application, + app_params: list[str], + system: System, + system_params: list[str], + ): + """Init execution context.""" + self.app = app + self.app_params = app_params + self.system = system + self.system_params = system_params + + self.param_resolver = ParamResolver(self) + + self.stdout: bytearray | None = None + self.stderr: bytearray | None = None + + +class ParamResolver: + """Parameter resolver.""" + + def __init__(self, context: ExecutionContext): + """Init parameter resolver.""" + self.ctx = context + + @staticmethod + def resolve_user_params( + cmd_name: str | None, + index_or_alias: str, + resolved_params: list[tuple[str | None, Param]] | None, + ) -> str: + """Resolve user params.""" + if not cmd_name or resolved_params is None: + raise ConfigurationException("Unable to resolve user params") + + param_value: str | None = None + param: Param | None = None + + if index_or_alias.isnumeric(): + i = int(index_or_alias) + if i not in range(len(resolved_params)): + raise ConfigurationException( + f"Invalid index {i} for user params of command {cmd_name}" + ) + param_value, param = resolved_params[i] + else: + for val, par in resolved_params: + if par.alias == index_or_alias: + param_value, param = val, par + break + + if param is None: + raise ConfigurationException( + f"No user parameter for command '{cmd_name}' with " + f"alias '{index_or_alias}'." + ) + + if param_value: + # We need to handle to cases of parameters here: + # 1) Optional parameters (non-positional with a name and value) + # 2) Positional parameters (value only, no name needed) + # Default to empty strings for positional arguments + param_name = "" + separator = "" + if param.name is not None: + # A valid param name means we have an optional/non-positional argument: + # The separator is an empty string in case the param_name + # has an equal sign as we have to honour it. + # If the parameter doesn't end with an equal sign then a + # space character is injected to split the parameter name + # and its value + param_name = param.name + separator = "" if param.name.endswith("=") else " " + + return f"{param_name}{separator}{param_value}" + + if param.name is None: + raise ConfigurationException( + f"Missing user parameter with alias '{index_or_alias}' for " + f"command '{cmd_name}'." + ) + + return param.name # flag: just return the parameter name + + def resolve_commands_and_params( + self, backend_type: str, cmd_name: str, return_params: bool, index_or_alias: str + ) -> str: + """Resolve command or command's param value.""" + if backend_type == "system": + backend = cast(Backend, self.ctx.system) + backend_params = self.ctx.system_params + else: # Application backend + backend = cast(Backend, self.ctx.app) + backend_params = self.ctx.app_params + + if cmd_name not in backend.commands: + raise ConfigurationException(f"Command {cmd_name} not found") + + if return_params: + params = backend.resolved_parameters(cmd_name, backend_params) + if index_or_alias.isnumeric(): + i = int(index_or_alias) + if i not in range(len(params)): + raise ConfigurationException( + f"Invalid parameter index {i} for command {cmd_name}" + ) + + param_value = params[i][0] + else: + param_value = None + for value, param in params: + if param.alias == index_or_alias: + param_value = value + break + + if not param_value: + raise ConfigurationException( + "No value for parameter with index or " + f"alias {index_or_alias} of command {cmd_name}." + ) + return param_value + + if not index_or_alias.isnumeric(): + raise ConfigurationException(f"Bad command index {index_or_alias}") + + i = int(index_or_alias) + commands = backend.build_command(cmd_name, backend_params, self.param_resolver) + if i not in range(len(commands)): + raise ConfigurationException(f"Invalid index {i} for command {cmd_name}") + + return commands[i] + + def resolve_variables(self, backend_type: str, var_name: str) -> str: + """Resolve variable value.""" + if backend_type == "system": + backend = cast(Backend, self.ctx.system) + else: # Application backend + backend = cast(Backend, self.ctx.app) + + if var_name not in backend.variables: + raise ConfigurationException(f"Unknown variable {var_name}") + + return backend.variables[var_name] + + def param_matcher( + self, + param_name: str, + cmd_name: str | None, + resolved_params: list[tuple[str | None, Param]] | None, + ) -> str: + """Regexp to resolve a param from the param_name.""" + # this pattern supports parameter names like "application.commands.run:0" and + # "system.commands.run.params:0" + # Note: 'software' is included for backward compatibility. + commands_and_params_match = re.match( + r"(?P<type>application|software|system)[.]commands[.]" + r"(?P<name>\w+)" + r"(?P<params>[.]params|)[:]" + r"(?P<index_or_alias>\w+)", + param_name, + ) + + if commands_and_params_match: + backend_type, cmd_name, return_params, index_or_alias = ( + commands_and_params_match["type"], + commands_and_params_match["name"], + commands_and_params_match["params"], + commands_and_params_match["index_or_alias"], + ) + return self.resolve_commands_and_params( + backend_type, cmd_name, bool(return_params), index_or_alias + ) + + # Note: 'software' is included for backward compatibility. + variables_match = re.match( + r"(?P<type>application|software|system)[.]variables:(?P<var_name>\w+)", + param_name, + ) + if variables_match: + backend_type, var_name = ( + variables_match["type"], + variables_match["var_name"], + ) + return self.resolve_variables(backend_type, var_name) + + user_params_match = re.match(r"user_params:(?P<index_or_alias>\w+)", param_name) + if user_params_match: + index_or_alias = user_params_match["index_or_alias"] + return self.resolve_user_params(cmd_name, index_or_alias, resolved_params) + + raise ConfigurationException(f"Unable to resolve parameter {param_name}") + + def param_resolver( + self, + param_name: str, + cmd_name: str | None = None, + resolved_params: list[tuple[str | None, Param]] | None = None, + ) -> str: + """Resolve parameter value based on current execution context.""" + # Note: 'software.*' is included for backward compatibility. + resolved_param = None + if param_name in ["application.name", "software.name"]: + resolved_param = self.ctx.app.name + elif param_name in ["application.description", "software.description"]: + resolved_param = self.ctx.app.description + elif self.ctx.app.config_location and ( + param_name in ["application.config_dir", "software.config_dir"] + ): + resolved_param = str(self.ctx.app.config_location.absolute()) + elif self.ctx.system is not None: + if param_name == "system.name": + resolved_param = self.ctx.system.name + elif param_name == "system.description": + resolved_param = self.ctx.system.description + elif param_name == "system.config_dir" and self.ctx.system.config_location: + resolved_param = str(self.ctx.system.config_location.absolute()) + + if not resolved_param: + resolved_param = self.param_matcher(param_name, cmd_name, resolved_params) + return resolved_param + + def __call__( + self, + param_name: str, + cmd_name: str | None = None, + resolved_params: list[tuple[str | None, Param]] | None = None, + ) -> str: + """Resolve provided parameter.""" + return self.param_resolver(param_name, cmd_name, resolved_params) + + +def validate_parameters( + backend: Backend, command_names: list[str], params: list[str] +) -> None: + """Check parameters passed to backend.""" + for param in params: + acceptable = any( + backend.validate_parameter(command_name, param) + for command_name in command_names + if command_name in backend.commands + ) + + if not acceptable: + backend_type = "System" if isinstance(backend, System) else "Application" + raise ValueError( + f"{backend_type} parameter '{param}' not valid for " + f"command '{' or '.join(command_names)}'." + ) + + +def get_application_by_name_and_system( + application_name: str, system_name: str +) -> Application: + """Get application.""" + applications = get_application(application_name, system_name) + if not applications: + raise ValueError( + f"Application '{application_name}' doesn't support the " + f"system '{system_name}'." + ) + + if len(applications) != 1: + raise ValueError( + f"Error during getting application {application_name} for the " + f"system {system_name}." + ) + + return applications[0] + + +def get_application_and_system( + application_name: str, system_name: str +) -> tuple[Application, System]: + """Return application and system by provided names.""" + system = get_system(system_name) + if not system: + raise ValueError(f"System {system_name} is not found.") + + application = get_application_by_name_and_system(application_name, system_name) + + return application, system + + +def run_application( + application_name: str, + application_params: list[str], + system_name: str, + system_params: list[str], +) -> ExecutionContext: + """Run application on the provided system.""" + application, system = get_application_and_system(application_name, system_name) + validate_parameters(application, ["run"], application_params) + validate_parameters(system, ["run"], system_params) + + ctx = ExecutionContext( + app=application, + app_params=application_params, + system=system, + system_params=system_params, + ) + + logger.debug("Generating commands to execute") + commands_to_run = ctx.system.build_command( + "run", ctx.system_params, ctx.param_resolver + ) + + for command in commands_to_run: + logger.debug("Running: %s", command) + exit_code, ctx.stdout, ctx.stderr = ctx.system.run(command) + + if exit_code != 0: + logger.warning("Application exited with exit code %i", exit_code) + + return ctx |