# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. # SPDX-License-Identifier: Apache-2.0 """Application execution module.""" import logging import re from typing import cast from typing import List from typing import Optional from typing import Tuple from mlia.backend.application import Application from mlia.backend.application import get_application from mlia.backend.common import Backend from mlia.backend.common import ConfigurationException from mlia.backend.common import Param from mlia.backend.system import get_system from mlia.backend.system import System logger = logging.getLogger(__name__) class AnotherInstanceIsRunningException(Exception): """Concurrent execution error.""" class ExecutionContext: # pylint: disable=too-few-public-methods """Command execution context.""" def __init__( self, app: Application, app_params: List[str], system: System, system_params: List[str], ): """Init execution context.""" self.app = app self.app_params = app_params self.system = system self.system_params = system_params self.param_resolver = ParamResolver(self) self.stdout: Optional[bytearray] = None self.stderr: Optional[bytearray] = None class ParamResolver: """Parameter resolver.""" def __init__(self, context: ExecutionContext): """Init parameter resolver.""" self.ctx = context @staticmethod def resolve_user_params( cmd_name: Optional[str], index_or_alias: str, resolved_params: Optional[List[Tuple[Optional[str], Param]]], ) -> str: """Resolve user params.""" if not cmd_name or resolved_params is None: raise ConfigurationException("Unable to resolve user params") param_value: Optional[str] = None param: Optional[Param] = None if index_or_alias.isnumeric(): i = int(index_or_alias) if i not in range(len(resolved_params)): raise ConfigurationException( f"Invalid index {i} for user params of command {cmd_name}" ) param_value, param = resolved_params[i] else: for val, par in resolved_params: if par.alias == index_or_alias: param_value, param = val, par break if param is None: raise ConfigurationException( f"No user parameter for command '{cmd_name}' with " f"alias '{index_or_alias}'." ) if param_value: # We need to handle to cases of parameters here: # 1) Optional parameters (non-positional with a name and value) # 2) Positional parameters (value only, no name needed) # Default to empty strings for positional arguments param_name = "" separator = "" if param.name is not None: # A valid param name means we have an optional/non-positional argument: # The separator is an empty string in case the param_name # has an equal sign as we have to honour it. # If the parameter doesn't end with an equal sign then a # space character is injected to split the parameter name # and its value param_name = param.name separator = "" if param.name.endswith("=") else " " return f"{param_name}{separator}{param_value}" if param.name is None: raise ConfigurationException( f"Missing user parameter with alias '{index_or_alias}' for " f"command '{cmd_name}'." ) return param.name # flag: just return the parameter name def resolve_commands_and_params( self, backend_type: str, cmd_name: str, return_params: bool, index_or_alias: str ) -> str: """Resolve command or command's param value.""" if backend_type == "system": backend = cast(Backend, self.ctx.system) backend_params = self.ctx.system_params else: # Application backend backend = cast(Backend, self.ctx.app) backend_params = self.ctx.app_params if cmd_name not in backend.commands: raise ConfigurationException(f"Command {cmd_name} not found") if return_params: params = backend.resolved_parameters(cmd_name, backend_params) if index_or_alias.isnumeric(): i = int(index_or_alias) if i not in range(len(params)): raise ConfigurationException( f"Invalid parameter index {i} for command {cmd_name}" ) param_value = params[i][0] else: param_value = None for value, param in params: if param.alias == index_or_alias: param_value = value break if not param_value: raise ConfigurationException( ( "No value for parameter with index or " f"alias {index_or_alias} of command {cmd_name}." ) ) return param_value if not index_or_alias.isnumeric(): raise ConfigurationException(f"Bad command index {index_or_alias}") i = int(index_or_alias) commands = backend.build_command(cmd_name, backend_params, self.param_resolver) if i not in range(len(commands)): raise ConfigurationException(f"Invalid index {i} for command {cmd_name}") return commands[i] def resolve_variables(self, backend_type: str, var_name: str) -> str: """Resolve variable value.""" if backend_type == "system": backend = cast(Backend, self.ctx.system) else: # Application backend backend = cast(Backend, self.ctx.app) if var_name not in backend.variables: raise ConfigurationException(f"Unknown variable {var_name}") return backend.variables[var_name] def param_matcher( self, param_name: str, cmd_name: Optional[str], resolved_params: Optional[List[Tuple[Optional[str], Param]]], ) -> str: """Regexp to resolve a param from the param_name.""" # this pattern supports parameter names like "application.commands.run:0" and # "system.commands.run.params:0" # Note: 'software' is included for backward compatibility. commands_and_params_match = re.match( r"(?Papplication|software|system)[.]commands[.]" r"(?P\w+)" r"(?P[.]params|)[:]" r"(?P\w+)", param_name, ) if commands_and_params_match: backend_type, cmd_name, return_params, index_or_alias = ( commands_and_params_match["type"], commands_and_params_match["name"], commands_and_params_match["params"], commands_and_params_match["index_or_alias"], ) return self.resolve_commands_and_params( backend_type, cmd_name, bool(return_params), index_or_alias ) # Note: 'software' is included for backward compatibility. variables_match = re.match( r"(?Papplication|software|system)[.]variables:(?P\w+)", param_name, ) if variables_match: backend_type, var_name = ( variables_match["type"], variables_match["var_name"], ) return self.resolve_variables(backend_type, var_name) user_params_match = re.match(r"user_params:(?P\w+)", param_name) if user_params_match: index_or_alias = user_params_match["index_or_alias"] return self.resolve_user_params(cmd_name, index_or_alias, resolved_params) raise ConfigurationException(f"Unable to resolve parameter {param_name}") def param_resolver( self, param_name: str, cmd_name: Optional[str] = None, resolved_params: Optional[List[Tuple[Optional[str], Param]]] = None, ) -> str: """Resolve parameter value based on current execution context.""" # Note: 'software.*' is included for backward compatibility. resolved_param = None if param_name in ["application.name", "software.name"]: resolved_param = self.ctx.app.name elif param_name in ["application.description", "software.description"]: resolved_param = self.ctx.app.description elif self.ctx.app.config_location and ( param_name in ["application.config_dir", "software.config_dir"] ): resolved_param = str(self.ctx.app.config_location.absolute()) elif self.ctx.system is not None: if param_name == "system.name": resolved_param = self.ctx.system.name elif param_name == "system.description": resolved_param = self.ctx.system.description elif param_name == "system.config_dir" and self.ctx.system.config_location: resolved_param = str(self.ctx.system.config_location.absolute()) if not resolved_param: resolved_param = self.param_matcher(param_name, cmd_name, resolved_params) return resolved_param def __call__( self, param_name: str, cmd_name: Optional[str] = None, resolved_params: Optional[List[Tuple[Optional[str], Param]]] = None, ) -> str: """Resolve provided parameter.""" return self.param_resolver(param_name, cmd_name, resolved_params) def validate_parameters( backend: Backend, command_names: List[str], params: List[str] ) -> None: """Check parameters passed to backend.""" for param in params: acceptable = any( backend.validate_parameter(command_name, param) for command_name in command_names if command_name in backend.commands ) if not acceptable: backend_type = "System" if isinstance(backend, System) else "Application" raise ValueError( f"{backend_type} parameter '{param}' not valid for " f"command '{' or '.join(command_names)}'." ) def get_application_by_name_and_system( application_name: str, system_name: str ) -> Application: """Get application.""" applications = get_application(application_name, system_name) if not applications: raise ValueError( f"Application '{application_name}' doesn't support the " f"system '{system_name}'." ) if len(applications) != 1: raise ValueError( f"Error during getting application {application_name} for the " f"system {system_name}." ) return applications[0] def get_application_and_system( application_name: str, system_name: str ) -> Tuple[Application, System]: """Return application and system by provided names.""" system = get_system(system_name) if not system: raise ValueError(f"System {system_name} is not found.") application = get_application_by_name_and_system(application_name, system_name) return application, system def run_application( application_name: str, application_params: List[str], system_name: str, system_params: List[str], ) -> ExecutionContext: """Run application on the provided system.""" application, system = get_application_and_system(application_name, system_name) validate_parameters(application, ["run"], application_params) validate_parameters(system, ["run"], system_params) ctx = ExecutionContext( app=application, app_params=application_params, system=system, system_params=system_params, ) logger.debug("Generating commands to execute") commands_to_run = ctx.system.build_command( "run", ctx.system_params, ctx.param_resolver ) for command in commands_to_run: logger.debug("Running: %s", command) exit_code, ctx.stdout, ctx.stderr = ctx.system.run(command) if exit_code != 0: logger.warning("Application exited with exit code %i", exit_code) return ctx