# SPDX-FileCopyrightText: Copyright 2022, Arm Limited and/or its affiliates. # SPDX-License-Identifier: Apache-2.0 """Application execution module.""" import itertools import json import random import re import string import sys import time import warnings from collections import defaultdict from contextlib import contextmanager from contextlib import ExitStack from pathlib import Path from typing import Any from typing import Callable from typing import cast from typing import ContextManager from typing import Dict from typing import Generator from typing import List from typing import Optional from typing import Sequence from typing import Tuple from typing import TypedDict from typing import Union from filelock import FileLock from filelock import Timeout from aiet.backend.application import Application from aiet.backend.application import get_application from aiet.backend.common import Backend from aiet.backend.common import ConfigurationException from aiet.backend.common import DataPaths from aiet.backend.common import Param from aiet.backend.common import parse_raw_parameter from aiet.backend.common import resolve_all_parameters from aiet.backend.output_parser import Base64OutputParser from aiet.backend.output_parser import OutputParser from aiet.backend.output_parser import RegexOutputParser from aiet.backend.system import ControlledSystem from aiet.backend.system import get_system from aiet.backend.system import StandaloneSystem from aiet.backend.system import System from aiet.backend.tool import get_tool from aiet.backend.tool import Tool from aiet.utils.fs import recreate_directory from aiet.utils.fs import remove_directory from aiet.utils.fs import valid_for_filename from aiet.utils.proc import run_and_wait class AnotherInstanceIsRunningException(Exception): """Concurrent execution error.""" class ConnectionException(Exception): """Connection exception.""" class ExecutionParams(TypedDict, total=False): """Execution parameters.""" disable_locking: bool unique_build_dir: bool class ExecutionContext: """Command execution context.""" # pylint: disable=too-many-arguments,too-many-instance-attributes def __init__( self, app: Union[Application, Tool], app_params: List[str], system: Optional[System], system_params: List[str], custom_deploy_data: Optional[List[DataPaths]] = None, execution_params: Optional[ExecutionParams] = None, report_file: Optional[Path] = None, ): """Init execution context.""" self.app = app self.app_params = app_params self.custom_deploy_data = custom_deploy_data or [] self.system = system self.system_params = system_params self.execution_params = execution_params or ExecutionParams() self.report_file = report_file self.reporter: Optional[Reporter] if self.report_file: # Create reporter with output parsers parsers: List[OutputParser] = [] if system and system.reporting: # Add RegexOutputParser, if it is configured in the system parsers.append(RegexOutputParser("system", system.reporting["regex"])) # Add Base64 parser for applications parsers.append(Base64OutputParser("application")) self.reporter = Reporter(parsers=parsers) else: self.reporter = None # No reporter needed. self.param_resolver = ParamResolver(self) self._resolved_build_dir: Optional[Path] = None @property def is_deploy_needed(self) -> bool: """Check if application requires data deployment.""" if isinstance(self.app, Application): return ( len(self.app.get_deploy_data()) > 0 or len(self.custom_deploy_data) > 0 ) return False @property def is_locking_required(self) -> bool: """Return true if any form of locking required.""" return not self._disable_locking() and ( self.app.lock or (self.system is not None and self.system.lock) ) @property def is_build_required(self) -> bool: """Return true if application build required.""" return "build" in self.app.commands @property def is_unique_build_dir_required(self) -> bool: """Return true if unique build dir required.""" return self.execution_params.get("unique_build_dir", False) def build_dir(self) -> Path: """Return resolved application build dir.""" if self._resolved_build_dir is not None: return self._resolved_build_dir if ( not isinstance(self.app.config_location, Path) or not self.app.config_location.is_dir() ): raise ConfigurationException( "Application {} has wrong config location".format(self.app.name) ) _build_dir = self.app.build_dir if _build_dir: _build_dir = resolve_all_parameters(_build_dir, self.param_resolver) if not _build_dir: raise ConfigurationException( "No build directory defined for the app {}".format(self.app.name) ) if self.is_unique_build_dir_required: random_suffix = "".join( random.choices(string.ascii_lowercase + string.digits, k=7) ) _build_dir = "{}_{}".format(_build_dir, random_suffix) self._resolved_build_dir = self.app.config_location / _build_dir return self._resolved_build_dir def _disable_locking(self) -> bool: """Return true if locking should be disabled.""" return self.execution_params.get("disable_locking", False) class ParamResolver: """Parameter resolver.""" def __init__(self, context: ExecutionContext): """Init parameter resolver.""" self.ctx = context @staticmethod def resolve_user_params( cmd_name: Optional[str], index_or_alias: str, resolved_params: Optional[List[Tuple[Optional[str], Param]]], ) -> str: """Resolve user params.""" if not cmd_name or resolved_params is None: raise ConfigurationException("Unable to resolve user params") param_value: Optional[str] = None param: Optional[Param] = None if index_or_alias.isnumeric(): i = int(index_or_alias) if i not in range(len(resolved_params)): raise ConfigurationException( "Invalid index {} for user params of command {}".format(i, cmd_name) ) param_value, param = resolved_params[i] else: for val, par in resolved_params: if par.alias == index_or_alias: param_value, param = val, par break if param is None: raise ConfigurationException( "No user parameter for command '{}' with alias '{}'.".format( cmd_name, index_or_alias ) ) if param_value: # We need to handle to cases of parameters here: # 1) Optional parameters (non-positional with a name and value) # 2) Positional parameters (value only, no name needed) # Default to empty strings for positional arguments param_name = "" separator = "" if param.name is not None: # A valid param name means we have an optional/non-positional argument: # The separator is an empty string in case the param_name # has an equal sign as we have to honour it. # If the parameter doesn't end with an equal sign then a # space character is injected to split the parameter name # and its value param_name = param.name separator = "" if param.name.endswith("=") else " " return "{param_name}{separator}{param_value}".format( param_name=param_name, separator=separator, param_value=param_value, ) if param.name is None: raise ConfigurationException( "Missing user parameter with alias '{}' for command '{}'.".format( index_or_alias, cmd_name ) ) return param.name # flag: just return the parameter name def resolve_commands_and_params( self, backend_type: str, cmd_name: str, return_params: bool, index_or_alias: str ) -> str: """Resolve command or command's param value.""" if backend_type == "system": backend = cast(Backend, self.ctx.system) backend_params = self.ctx.system_params else: # Application or Tool backend backend = cast(Backend, self.ctx.app) backend_params = self.ctx.app_params if cmd_name not in backend.commands: raise ConfigurationException("Command {} not found".format(cmd_name)) if return_params: params = backend.resolved_parameters(cmd_name, backend_params) if index_or_alias.isnumeric(): i = int(index_or_alias) if i not in range(len(params)): raise ConfigurationException( "Invalid parameter index {} for command {}".format(i, cmd_name) ) param_value = params[i][0] else: param_value = None for value, param in params: if param.alias == index_or_alias: param_value = value break if not param_value: raise ConfigurationException( ( "No value for parameter with index or alias {} of command {}" ).format(index_or_alias, cmd_name) ) return param_value if not index_or_alias.isnumeric(): raise ConfigurationException("Bad command index {}".format(index_or_alias)) i = int(index_or_alias) commands = backend.build_command(cmd_name, backend_params, self.param_resolver) if i not in range(len(commands)): raise ConfigurationException( "Invalid index {} for command {}".format(i, cmd_name) ) return commands[i] def resolve_variables(self, backend_type: str, var_name: str) -> str: """Resolve variable value.""" if backend_type == "system": backend = cast(Backend, self.ctx.system) else: # Application or Tool backend backend = cast(Backend, self.ctx.app) if var_name not in backend.variables: raise ConfigurationException("Unknown variable {}".format(var_name)) return backend.variables[var_name] def param_matcher( self, param_name: str, cmd_name: Optional[str], resolved_params: Optional[List[Tuple[Optional[str], Param]]], ) -> str: """Regexp to resolve a param from the param_name.""" # this pattern supports parameter names like "application.commands.run:0" and # "system.commands.run.params:0" # Note: 'software' is included for backward compatibility. commands_and_params_match = re.match( r"(?Papplication|software|tool|system)[.]commands[.]" r"(?P\w+)" r"(?P[.]params|)[:]" r"(?P\w+)", param_name, ) if commands_and_params_match: backend_type, cmd_name, return_params, index_or_alias = ( commands_and_params_match["type"], commands_and_params_match["name"], commands_and_params_match["params"], commands_and_params_match["index_or_alias"], ) return self.resolve_commands_and_params( backend_type, cmd_name, bool(return_params), index_or_alias ) # Note: 'software' is included for backward compatibility. variables_match = re.match( r"(?Papplication|software|tool|system)[.]variables:(?P\w+)", param_name, ) if variables_match: backend_type, var_name = ( variables_match["type"], variables_match["var_name"], ) return self.resolve_variables(backend_type, var_name) user_params_match = re.match(r"user_params:(?P\w+)", param_name) if user_params_match: index_or_alias = user_params_match["index_or_alias"] return self.resolve_user_params(cmd_name, index_or_alias, resolved_params) raise ConfigurationException( "Unable to resolve parameter {}".format(param_name) ) def param_resolver( self, param_name: str, cmd_name: Optional[str] = None, resolved_params: Optional[List[Tuple[Optional[str], Param]]] = None, ) -> str: """Resolve parameter value based on current execution context.""" # Note: 'software.*' is included for backward compatibility. resolved_param = None if param_name in ["application.name", "tool.name", "software.name"]: resolved_param = self.ctx.app.name elif param_name in [ "application.description", "tool.description", "software.description", ]: resolved_param = self.ctx.app.description elif self.ctx.app.config_location and ( param_name in ["application.config_dir", "tool.config_dir", "software.config_dir"] ): resolved_param = str(self.ctx.app.config_location.absolute()) elif self.ctx.app.build_dir and ( param_name in ["application.build_dir", "tool.build_dir", "software.build_dir"] ): resolved_param = str(self.ctx.build_dir().absolute()) elif self.ctx.system is not None: if param_name == "system.name": resolved_param = self.ctx.system.name elif param_name == "system.description": resolved_param = self.ctx.system.description elif param_name == "system.config_dir" and self.ctx.system.config_location: resolved_param = str(self.ctx.system.config_location.absolute()) if not resolved_param: resolved_param = self.param_matcher(param_name, cmd_name, resolved_params) return resolved_param def __call__( self, param_name: str, cmd_name: Optional[str] = None, resolved_params: Optional[List[Tuple[Optional[str], Param]]] = None, ) -> str: """Resolve provided parameter.""" return self.param_resolver(param_name, cmd_name, resolved_params) class Reporter: """Report metrics from the simulation output.""" def __init__(self, parsers: Optional[List[OutputParser]] = None) -> None: """Create an empty reporter (i.e. no parsers registered).""" self.parsers: List[OutputParser] = parsers if parsers is not None else [] self._report: Dict[str, Any] = defaultdict(lambda: defaultdict(dict)) def parse(self, output: bytearray) -> None: """Parse output and append parsed metrics to internal report dict.""" for parser in self.parsers: # Merge metrics from different parsers (do not overwrite) self._report[parser.name]["metrics"].update(parser(output)) def get_filtered_output(self, output: bytearray) -> bytearray: """Filter the output according to each parser.""" for parser in self.parsers: output = parser.filter_out_parsed_content(output) return output def report(self, ctx: ExecutionContext) -> Dict[str, Any]: """Add static simulation info to parsed data and return the report.""" report: Dict[str, Any] = defaultdict(dict) # Add static simulation info report.update(self._static_info(ctx)) # Add metrics parsed from the output for key, val in self._report.items(): report[key].update(val) return report @staticmethod def save(report: Dict[str, Any], report_file: Path) -> None: """Save the report to a JSON file.""" with open(report_file, "w", encoding="utf-8") as file: json.dump(report, file, indent=4) @staticmethod def _compute_all_params(cli_params: List[str], backend: Backend) -> Dict[str, str]: """ Build a dict of all parameters, {name:value}. Param values taken from command line if specified, defaults otherwise. """ # map of params passed from the cli ["p1=v1","p2=v2"] -> {"p1":"v1", "p2":"v2"} app_params_map = dict(parse_raw_parameter(expr) for expr in cli_params) # a map of params declared in the application, with values taken from the CLI, # defaults otherwise all_params = { (p.alias or p.name): app_params_map.get( cast(str, p.name), cast(str, p.default_value) ) for cmd in backend.commands.values() for p in cmd.params } return cast(Dict[str, str], all_params) @staticmethod def _static_info(ctx: ExecutionContext) -> Dict[str, Any]: """Extract static simulation information from the context.""" if ctx.system is None: raise ValueError("No system available to report.") info = { "system": { "name": ctx.system.name, "params": Reporter._compute_all_params(ctx.system_params, ctx.system), }, "application": { "name": ctx.app.name, "params": Reporter._compute_all_params(ctx.app_params, ctx.app), }, } return info def validate_parameters( backend: Backend, command_names: List[str], params: List[str] ) -> None: """Check parameters passed to backend.""" for param in params: acceptable = any( backend.validate_parameter(command_name, param) for command_name in command_names if command_name in backend.commands ) if not acceptable: backend_type = "System" if isinstance(backend, System) else "Application" raise ValueError( "{} parameter '{}' not valid for command '{}'".format( backend_type, param, " or ".join(command_names) ) ) def get_application_by_name_and_system( application_name: str, system_name: str ) -> Application: """Get application.""" applications = get_application(application_name, system_name) if not applications: raise ValueError( "Application '{}' doesn't support the system '{}'".format( application_name, system_name ) ) if len(applications) != 1: raise ValueError( "Error during getting application {} for the system {}".format( application_name, system_name ) ) return applications[0] def get_application_and_system( application_name: str, system_name: str ) -> Tuple[Application, System]: """Return application and system by provided names.""" system = get_system(system_name) if not system: raise ValueError("System {} is not found".format(system_name)) application = get_application_by_name_and_system(application_name, system_name) return application, system def execute_application_command( # pylint: disable=too-many-arguments command_name: str, application_name: str, application_params: List[str], system_name: str, system_params: List[str], custom_deploy_data: List[DataPaths], ) -> None: """Execute application command. .. deprecated:: 21.12 """ warnings.warn( "Use 'run_application()' instead. Use of 'execute_application_command()' is " "deprecated and might be removed in a future release.", DeprecationWarning, ) if command_name not in ["build", "run"]: raise ConfigurationException("Unsupported command {}".format(command_name)) application, system = get_application_and_system(application_name, system_name) validate_parameters(application, [command_name], application_params) validate_parameters(system, [command_name], system_params) ctx = ExecutionContext( app=application, app_params=application_params, system=system, system_params=system_params, custom_deploy_data=custom_deploy_data, ) if command_name == "run": execute_application_command_run(ctx) else: execute_application_command_build(ctx) # pylint: disable=too-many-arguments def run_application( application_name: str, application_params: List[str], system_name: str, system_params: List[str], custom_deploy_data: List[DataPaths], report_file: Optional[Path] = None, ) -> None: """Run application on the provided system.""" application, system = get_application_and_system(application_name, system_name) validate_parameters(application, ["build", "run"], application_params) validate_parameters(system, ["build", "run"], system_params) execution_params = ExecutionParams() if isinstance(system, StandaloneSystem): execution_params["disable_locking"] = True execution_params["unique_build_dir"] = True ctx = ExecutionContext( app=application, app_params=application_params, system=system, system_params=system_params, custom_deploy_data=custom_deploy_data, execution_params=execution_params, report_file=report_file, ) with build_dir_manager(ctx): if ctx.is_build_required: execute_application_command_build(ctx) execute_application_command_run(ctx) def execute_application_command_build(ctx: ExecutionContext) -> None: """Execute application command 'build'.""" with ExitStack() as context_stack: for manager in get_context_managers("build", ctx): context_stack.enter_context(manager(ctx)) build_dir = ctx.build_dir() recreate_directory(build_dir) build_commands = ctx.app.build_command( "build", ctx.app_params, ctx.param_resolver ) execute_commands_locally(build_commands, build_dir) def execute_commands_locally(commands: List[str], cwd: Path) -> None: """Execute list of commands locally.""" for command in commands: print("Running: {}".format(command)) run_and_wait( command, cwd, terminate_on_error=True, out=sys.stdout, err=sys.stderr ) def execute_application_command_run(ctx: ExecutionContext) -> None: """Execute application command.""" assert ctx.system is not None, "System must be provided." if ctx.is_deploy_needed and not ctx.system.supports_deploy: raise ConfigurationException( "System {} does not support data deploy".format(ctx.system.name) ) with ExitStack() as context_stack: for manager in get_context_managers("run", ctx): context_stack.enter_context(manager(ctx)) print("Generating commands to execute") commands_to_run = build_run_commands(ctx) if ctx.system.connectable: establish_connection(ctx) if ctx.system.supports_deploy: deploy_data(ctx) for command in commands_to_run: print("Running: {}".format(command)) exit_code, std_output, std_err = ctx.system.run(command) if exit_code != 0: print("Application exited with exit code {}".format(exit_code)) if ctx.reporter: ctx.reporter.parse(std_output) std_output = ctx.reporter.get_filtered_output(std_output) print(std_output.decode("utf8"), end="") print(std_err.decode("utf8"), end="") if ctx.reporter: report = ctx.reporter.report(ctx) ctx.reporter.save(report, cast(Path, ctx.report_file)) def establish_connection( ctx: ExecutionContext, retries: int = 90, interval: float = 15.0 ) -> None: """Establish connection with the system.""" assert ctx.system is not None, "System is required." host, port = ctx.system.connection_details() print( "Trying to establish connection with '{}:{}' - " "{} retries every {} seconds ".format(host, port, retries, interval), end="", ) try: for _ in range(retries): print(".", end="", flush=True) if ctx.system.establish_connection(): break if isinstance(ctx.system, ControlledSystem) and not ctx.system.is_running(): print( "\n\n---------- {} execution failed ----------".format( ctx.system.name ) ) stdout, stderr = ctx.system.get_output() print(stdout) print(stderr) raise Exception("System is not running") wait(interval) else: raise ConnectionException("Couldn't connect to '{}:{}'.".format(host, port)) finally: print() def wait(interval: float) -> None: """Wait for a period of time.""" time.sleep(interval) def deploy_data(ctx: ExecutionContext) -> None: """Deploy data to the system.""" if isinstance(ctx.app, Application): # Only application can deploy data (tools can not) assert ctx.system is not None, "System is required." for item in itertools.chain(ctx.app.get_deploy_data(), ctx.custom_deploy_data): print("Deploying {} onto {}".format(item.src, item.dst)) ctx.system.deploy(item.src, item.dst) def build_run_commands(ctx: ExecutionContext) -> List[str]: """Build commands to run application.""" if isinstance(ctx.system, StandaloneSystem): return ctx.system.build_command("run", ctx.system_params, ctx.param_resolver) return ctx.app.build_command("run", ctx.app_params, ctx.param_resolver) @contextmanager def controlled_system_manager(ctx: ExecutionContext) -> Generator[None, None, None]: """Context manager used for system initialisation before run.""" system = cast(ControlledSystem, ctx.system) commands = system.build_command("run", ctx.system_params, ctx.param_resolver) pid_file_path: Optional[Path] = None if ctx.is_locking_required: file_lock_path = get_file_lock_path(ctx) pid_file_path = file_lock_path.parent / "{}.pid".format(file_lock_path.stem) system.start(commands, ctx.is_locking_required, pid_file_path) try: yield finally: print("Shutting down sequence...") print("Stopping {}... (It could take few seconds)".format(system.name)) system.stop(wait=True) print("{} stopped successfully.".format(system.name)) @contextmanager def lock_execution_manager(ctx: ExecutionContext) -> Generator[None, None, None]: """Lock execution manager.""" file_lock_path = get_file_lock_path(ctx) file_lock = FileLock(str(file_lock_path)) try: file_lock.acquire(timeout=1) except Timeout as error: raise AnotherInstanceIsRunningException() from error try: yield finally: file_lock.release() def get_file_lock_path(ctx: ExecutionContext, lock_dir: Path = Path("/tmp")) -> Path: """Get file lock path.""" lock_modules = [] if ctx.app.lock: lock_modules.append(ctx.app.name) if ctx.system is not None and ctx.system.lock: lock_modules.append(ctx.system.name) lock_filename = "" if lock_modules: lock_filename = "_".join(["middleware"] + lock_modules) + ".lock" if lock_filename: lock_filename = resolve_all_parameters(lock_filename, ctx.param_resolver) lock_filename = valid_for_filename(lock_filename) if not lock_filename: raise ConfigurationException("No filename for lock provided") if not isinstance(lock_dir, Path) or not lock_dir.is_dir(): raise ConfigurationException( "Invalid directory {} for lock files provided".format(lock_dir) ) return lock_dir / lock_filename @contextmanager def build_dir_manager(ctx: ExecutionContext) -> Generator[None, None, None]: """Build directory manager.""" try: yield finally: if ( ctx.is_build_required and ctx.is_unique_build_dir_required and ctx.build_dir().is_dir() ): remove_directory(ctx.build_dir()) def get_context_managers( command_name: str, ctx: ExecutionContext ) -> Sequence[Callable[[ExecutionContext], ContextManager[None]]]: """Get context manager for the system.""" managers = [] if ctx.is_locking_required: managers.append(lock_execution_manager) if command_name == "run": if isinstance(ctx.system, ControlledSystem): managers.append(controlled_system_manager) return managers def get_tool_by_system(tool_name: str, system_name: Optional[str]) -> Tool: """Return tool (optionally by provided system name.""" tools = get_tool(tool_name, system_name) if not tools: raise ConfigurationException( "Tool '{}' not found or doesn't support the system '{}'".format( tool_name, system_name ) ) if len(tools) != 1: raise ConfigurationException( "Please specify the system for tool {}.".format(tool_name) ) tool = tools[0] return tool def execute_tool_command( tool_name: str, tool_params: List[str], system_name: Optional[str] = None, ) -> None: """Execute the tool command locally calling the 'run' command.""" tool = get_tool_by_system(tool_name, system_name) ctx = ExecutionContext( app=tool, app_params=tool_params, system=None, system_params=[] ) commands = tool.build_command("run", tool_params, ctx.param_resolver) execute_commands_locally(commands, Path.cwd())